Repository: samza
Updated Branches:
  refs/heads/master 1e9ae7395 -> f758fb591


SAMZA-1298; Create zk path.

if ZK path contains extra path at the end, it needs to be created in ZK at 
first connect.

Author: Boris Shkolnik <bor...@apache.org>

Reviewers: Jagadish <jagad...@apache.org>

Closes #197 from sborya/createZkPath


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f758fb59
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f758fb59
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f758fb59

Branch: refs/heads/master
Commit: f758fb59129264c222c395c262efbc8f7520a7f3
Parents: 1e9ae73
Author: Boris Shkolnik <bor...@apache.org>
Authored: Mon May 22 11:58:53 2017 -0700
Committer: vjagadish1989 <jvenk...@linkedin.com>
Committed: Mon May 22 11:58:53 2017 -0700

----------------------------------------------------------------------
 .../samza/zk/ZkCoordinationServiceFactory.java  | 52 ++++++++++++++++++--
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 15 ++----
 .../apache/samza/zk/TestZkLeaderElector.java    | 17 +++----
 .../apache/samza/zk/TestZkProcessorLatch.java   | 20 ++++----
 .../java/org/apache/samza/zk/TestZkUtils.java   | 27 ++++++++--
 5 files changed, 93 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f758fb59/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
 
b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
index 9971732..661650d 100644
--- 
a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java
@@ -18,27 +18,69 @@
  */
 package org.apache.samza.zk;
 
+import com.google.common.base.Strings;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.CoordinationServiceFactory;
 import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.zookeeper.client.ConnectStringParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public class ZkCoordinationServiceFactory implements 
CoordinationServiceFactory {
+  private final static Logger LOG = 
LoggerFactory.getLogger(ZkCoordinationServiceFactory.class);
+
   // TODO - Why should this method be synchronized?
   synchronized public CoordinationUtils getCoordinationService(String groupId, 
String participantId, Config config) {
     ZkConfig zkConfig = new ZkConfig(config);
-    ZkClient zkClient;
+
+    ZkClient zkClient = createZkClient(zkConfig.getZkConnect(),
+        zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+
+    // make sure the 'path' exists
+    createZkPath(zkConfig.getZkConnect(), zkClient);
+
+    ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, 
zkConfig.getZkConnectionTimeoutMs());
+
+    return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new 
ScheduleAfterDebounceTime());
+  }
+
+  /**
+   * helper method to create zkClient
+   * @param connectString - zkConnect string
+   * @param sessionTimeoutMS - session timeout
+   * @param connectionTimeoutMs - connection timeout
+   * @return zkClient object
+   */
+  public static ZkClient createZkClient(String connectString, int 
sessionTimeoutMS, int connectionTimeoutMs) {
     try {
-      zkClient = new ZkClient(zkConfig.getZkConnect(), 
zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+      return new ZkClient(connectString, sessionTimeoutMS, 
connectionTimeoutMs);
     } catch (Exception e) {
-      throw new SamzaException("zkClient failed to connect to ZK at :" + 
zkConfig.getZkConnect(), e);
+      // ZkClient constructor may throw a variety of different exceptions, not 
all of them Zk based.
+      throw new SamzaException("zkClient failed to connect to ZK at :" + 
connectString, e);
     }
+  }
 
-    ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, 
zkConfig.getZkConnectionTimeoutMs());
-    return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new 
ScheduleAfterDebounceTime());
+  /**
+   * if ZkConnectString contains some path at the end, it needs to be created 
when connecting for the first time.
+   * @param zkConnect - connect string
+   * @param zkClient - zkClient object to talk to the ZK
+   */
+  public static void createZkPath(String zkConnect, ZkClient zkClient) {
+    ConnectStringParser parser = new ConnectStringParser(zkConnect);
+
+    String path = parser.getChrootPath();
+    LOG.info("path =" + path);
+    if (!Strings.isNullOrEmpty(path)) {
+      // create this path in zk
+      LOG.info("first connect. creating path =" + path + " in ZK " + 
parser.getServerAddresses());
+      if (!zkClient.exists(path)) {
+        zkClient.createPersistent(path, true); // will create parents if 
needed and will not throw exception if exists
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f758fb59/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 5c8fcf3..c547901 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -19,6 +19,11 @@
 
 package org.apache.samza.zk;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
@@ -32,12 +37,6 @@ import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Util class to help manage Zk connection and ZkClient.
  * It also provides additional utility methods for 
read/write/subscribe/unsubscribe access to the ZK tree.
@@ -84,10 +83,6 @@ public class ZkUtils {
     return new ZkConnection(zkConnectString, sessionTimeoutMs);
   }
 
-  public static ZkClient createZkClient(ZkConnection zkConnection, int 
connectionTimeoutMs) {
-    return new ZkClient(zkConnection, connectionTimeoutMs);
-  }
-
   ZkClient getZkClient() {
     return zkClient;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/f758fb59/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 7cfad61..393d733 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -18,8 +18,13 @@
  */
 package org.apache.samza.zk;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
@@ -32,12 +37,6 @@ import org.junit.Test;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -433,10 +432,10 @@ public class TestZkLeaderElector {
   }
 
   private ZkUtils getZkUtilsWithNewClient() {
-    ZkConnection zkConnection = 
ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
+    ZkClient zkClient = 
ZkCoordinationServiceFactory.createZkClient(testZkConnectionString, 
SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS);
     return new ZkUtils(
         KEY_BUILDER,
-        ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
+        zkClient,
         CONNECTION_TIMEOUT_MS);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f758fb59/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
index 2385b32..9f089a0 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
@@ -18,7 +18,13 @@
  */
 package org.apache.samza.zk;
 
-import org.I0Itec.zkclient.ZkConnection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.junit.After;
@@ -28,13 +34,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
 /**
  * The ZkProcessorLatch uses a shared Znode as a latch. Each participant await 
existence of a target znode under the
  * shared latch, which is a persistent, sequential target znode with value 
(latchSize - 1). latchSize is the minimum
@@ -215,10 +214,11 @@ public class TestZkProcessorLatch {
 
   }
   private ZkUtils getZkUtilsWithNewClient(String processorId) {
-    ZkConnection zkConnection = 
ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
+    ZkClient zkClient = ZkCoordinationServiceFactory
+        .createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, 
CONNECTION_TIMEOUT_MS);
     return new ZkUtils(
         KEY_BUILDER,
-        ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
+        zkClient,
         CONNECTION_TIMEOUT_MS);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f758fb59/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 63e2361..173b8a6 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,7 +18,10 @@
  */
 package org.apache.samza.zk;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.function.BooleanSupplier;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
@@ -35,10 +38,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.BooleanSupplier;
-
 public class TestZkUtils {
   private static EmbeddedZookeeper zkServer = null;
   private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
@@ -87,6 +86,26 @@ public class TestZkUtils {
     zkServer.teardown();
   }
 
+
+  @Test
+  public void testInitZkPath() {
+    String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1";
+    ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient);
+
+    Assert.assertTrue(zkClient.exists("/samza1"));
+
+    zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1/samza2";
+    ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient);
+
+    Assert.assertTrue(zkClient.exists("/samza1/samza2"));
+
+
+    zkConnect = "127.0.0.1:" + zkServer.getPort(); // empty path.
+    ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient);
+
+    Assert.assertTrue(zkClient.exists("/"));
+  }
+
   @Test
   public void testRegisterProcessorId() {
     String assignedPath = zkUtils.registerProcessorAndGetId(new 
ProcessorData("host", "1"));

Reply via email to