Repository: incubator-slider
Updated Branches:
  refs/heads/develop ac3beff15 -> 4923bf6bb


SLIDER-1166 Every cluster destroy operation creates and holds on to a zk session


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/4923bf6b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/4923bf6b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/4923bf6b

Branch: refs/heads/develop
Commit: 4923bf6bb13635b6f63216e6cebc31ff1565a30f
Parents: ac3beff
Author: Gour Saha <gourks...@apache.org>
Authored: Wed Sep 7 11:31:44 2016 -0700
Committer: Gour Saha <gourks...@apache.org>
Committed: Wed Sep 7 11:31:44 2016 -0700

----------------------------------------------------------------------
 .../org/apache/slider/client/SliderClient.java  |  9 ++++--
 .../apache/slider/core/zk/ZKIntegration.java    | 34 +++++++++++++++++---
 .../test/YarnZKMiniClusterTestBase.groovy       |  4 +--
 3 files changed, 38 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4923bf6b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java 
b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
index fd3647d..471110b 100644
--- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -604,11 +604,14 @@ public class SliderClient extends 
AbstractSliderLaunchedService implements RunSe
       BlockingZKWatcher watcher = new BlockingZKWatcher();
       client = ZKIntegration.newInstance(registryQuorum, user, clusterName, 
true, false, watcher,
           ZKIntegration.SESSION_TIMEOUT);
-      client.init();
-      watcher.waitForZKConnection(2 * 1000);
+      boolean fromCache = client.init();
+      if (!fromCache) {
+        watcher.waitForZKConnection(2 * 1000);
+      }
     } catch (InterruptedException e) {
       client = null;
-      log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+      log.warn("Interrupted - unable to connect to zookeeper quorum {}",
+          registryQuorum, e);
     } catch (IOException e) {
       log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4923bf6b/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java 
b/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
index ca41e4b..d2deaf9 100644
--- a/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
+++ b/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
@@ -33,6 +33,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 
@@ -65,6 +67,8 @@ public class ZKIntegration implements Watcher, Closeable {
   private final String clustername;
   private final String userPath;
   private int sessionTimeout = SESSION_TIMEOUT;
+  private static final Map<String, ZooKeeper> zkSessions = new HashMap<>();
+
 /**
  flag to set to indicate that the user path should be created if
  it is not already there
@@ -93,10 +97,32 @@ public class ZKIntegration implements Watcher, Closeable {
     this.userPath = mkSliderUserPath(username);
   }
 
-  public void init() throws IOException {
-    assert zookeeper == null;
-    log.debug("Binding ZK client to {}", zkConnection);
-    zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this, 
canBeReadOnly);
+  /**
+   * Returns true only if an active ZK session is available and retrieved from
+   * cache, false when it has to create a new one
+   * 
+   * @return true if from cache, false when new session created
+   * @throws IOException
+   */
+  public synchronized boolean init() throws IOException {
+    if (zookeeper != null && getAlive()) {
+      return true;
+    }
+
+    synchronized (zkSessions) {
+      if (zkSessions.containsKey(zkConnection)) {
+        zookeeper = zkSessions.get(zkConnection);
+      }
+      if (zookeeper == null || !getAlive()) {
+        log.info("Binding ZK client to {}", zkConnection);
+        zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this,
+            canBeReadOnly);
+        zkSessions.put(zkConnection, zookeeper);
+        return false;
+      } else {
+        return true;
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4923bf6b/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
----------------------------------------------------------------------
diff --git 
a/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
 
b/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
index 313ab46..a94248c 100644
--- 
a/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
+++ 
b/slider-core/src/test/groovy/org/apache/slider/test/YarnZKMiniClusterTestBase.groovy
@@ -60,10 +60,10 @@ public abstract class YarnZKMiniClusterTestBase extends 
YarnMiniClusterTestBase
         canBeReadOnly,
         watcher,
         sessionTimeout)
-    zki.init()
+    boolean fromCache = zki.init()
     //here the callback may or may not have occurred.
     //optionally wait for it
-    if (timeout > 0) {
+    if (timeout > 0 && !fromCache) {
       watcher.waitForZKConnection(timeout)
     }
     //if we get here, the binding worked

Reply via email to