Repository: incubator-slider Updated Branches: refs/heads/develop ac3beff15 -> 4923bf6bb
SLIDER-1166 Every cluster destroy operation creates and holds on to a zk session Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/4923bf6b Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/4923bf6b Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/4923bf6b Branch: refs/heads/develop Commit: 4923bf6bb13635b6f63216e6cebc31ff1565a30f Parents: ac3beff Author: Gour Saha <gourks...@apache.org> Authored: Wed Sep 7 11:31:44 2016 -0700 Committer: Gour Saha <gourks...@apache.org> Committed: Wed Sep 7 11:31:44 2016 -0700 ---------------------------------------------------------------------- .../org/apache/slider/client/SliderClient.java | 9 ++++-- .../apache/slider/core/zk/ZKIntegration.java | 34 +++++++++++++++++--- .../test/YarnZKMiniClusterTestBase.groovy | 4 +-- 3 files changed, 38 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4923bf6b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java index fd3647d..471110b 100644 --- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java +++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java @@ -604,11 +604,14 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe BlockingZKWatcher watcher = new BlockingZKWatcher(); client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher, ZKIntegration.SESSION_TIMEOUT); - client.init(); - watcher.waitForZKConnection(2 * 1000); + boolean fromCache = client.init(); + if (!fromCache) { + watcher.waitForZKConnection(2 * 1000); + } } catch (InterruptedException e) { client = null; - log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e); + log.warn("Interrupted - unable to connect to zookeeper quorum {}", + registryQuorum, e); } catch (IOException e) { log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4923bf6b/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java b/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java index ca41e4b..d2deaf9 100644 --- a/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java +++ b/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java @@ -33,6 +33,8 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -65,6 +67,8 @@ public class ZKIntegration implements Watcher, Closeable { private final String clustername; private final String userPath; private int sessionTimeout = SESSION_TIMEOUT; + private static final Map<String, ZooKeeper> zkSessions = new HashMap<>(); + /** flag to set to indicate that the user path should be created if it is not already there @@ -93,10 +97,32 @@ public class ZKIntegration implements Watcher, Closeable { this.userPath = mkSliderUserPath(username); } - public void init() throws IOException { - assert zookeeper == null; - log.debug("Binding ZK client to {}", zkConnection); - zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this, canBeReadOnly); + /** + * Returns true only if an active ZK session is available and retrieved from + * cache, false when it has to create a new one + * + * @return true if from cache, false when new session created + * @throws IOException + */ + public synchronized boolean init() throws IOException { + if (zookeeper != null && getAlive()) { + return true; + } + + synchronized (zkSessions) { + if (zkSessions.containsKey(zkConnection)) { + zookeeper = zkSessions.get(zkConnection); + } + if (zookeeper == null || !getAlive()) { + log.info("Binding ZK client to {}", zkConnection); + zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this, + canBeReadOnly); + zkSessions.put(zkConnection, zookeeper); + return false; + } else { + return true; + } + } } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4923bf6b/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy b/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy index 313ab46..a94248c 100644 --- a/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy @@ -60,10 +60,10 @@ public abstract class YarnZKMiniClusterTestBase extends YarnMiniClusterTestBase canBeReadOnly, watcher, sessionTimeout) - zki.init() + boolean fromCache = zki.init() //here the callback may or may not have occurred. //optionally wait for it - if (timeout > 0) { + if (timeout > 0 && !fromCache) { watcher.waitForZKConnection(timeout) } //if we get here, the binding worked