Repository: samza Updated Branches: refs/heads/master 1e9ae7395 -> f758fb591
SAMZA-1298; Create zk path. if ZK path contains extra path at the end, it needs to be created in ZK at first connect. Author: Boris Shkolnik <bor...@apache.org> Reviewers: Jagadish <jagad...@apache.org> Closes #197 from sborya/createZkPath Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f758fb59 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f758fb59 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f758fb59 Branch: refs/heads/master Commit: f758fb59129264c222c395c262efbc8f7520a7f3 Parents: 1e9ae73 Author: Boris Shkolnik <bor...@apache.org> Authored: Mon May 22 11:58:53 2017 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Mon May 22 11:58:53 2017 -0700 ---------------------------------------------------------------------- .../samza/zk/ZkCoordinationServiceFactory.java | 52 ++++++++++++++++++-- .../main/java/org/apache/samza/zk/ZkUtils.java | 15 ++---- .../apache/samza/zk/TestZkLeaderElector.java | 17 +++---- .../apache/samza/zk/TestZkProcessorLatch.java | 20 ++++---- .../java/org/apache/samza/zk/TestZkUtils.java | 27 ++++++++-- 5 files changed, 93 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f758fb59/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java index 9971732..661650d 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java @@ -18,27 +18,69 @@ */ package org.apache.samza.zk; +import com.google.common.base.Strings; import org.I0Itec.zkclient.ZkClient; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.ZkConfig; import org.apache.samza.coordinator.CoordinationServiceFactory; import org.apache.samza.coordinator.CoordinationUtils; +import org.apache.zookeeper.client.ConnectStringParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ZkCoordinationServiceFactory implements CoordinationServiceFactory { + private final static Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class); + // TODO - Why should this method be synchronized? synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) { ZkConfig zkConfig = new ZkConfig(config); - ZkClient zkClient; + + ZkClient zkClient = createZkClient(zkConfig.getZkConnect(), + zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); + + // make sure the 'path' exists + createZkPath(zkConfig.getZkConnect(), zkClient); + + ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs()); + + return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new ScheduleAfterDebounceTime()); + } + + /** + * helper method to create zkClient + * @param connectString - zkConnect string + * @param sessionTimeoutMS - session timeout + * @param connectionTimeoutMs - connection timeout + * @return zkClient object + */ + public static ZkClient createZkClient(String connectString, int sessionTimeoutMS, int connectionTimeoutMs) { try { - zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); + return new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs); } catch (Exception e) { - throw new SamzaException("zkClient failed to connect to ZK at :" + zkConfig.getZkConnect(), e); + // ZkClient constructor may throw a variety of different exceptions, not all of them Zk based. + throw new SamzaException("zkClient failed to connect to ZK at :" + connectString, e); } + } - ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs()); - return new ZkCoordinationUtils(participantId, zkConfig, zkUtils, new ScheduleAfterDebounceTime()); + /** + * if ZkConnectString contains some path at the end, it needs to be created when connecting for the first time. + * @param zkConnect - connect string + * @param zkClient - zkClient object to talk to the ZK + */ + public static void createZkPath(String zkConnect, ZkClient zkClient) { + ConnectStringParser parser = new ConnectStringParser(zkConnect); + + String path = parser.getChrootPath(); + LOG.info("path =" + path); + if (!Strings.isNullOrEmpty(path)) { + // create this path in zk + LOG.info("first connect. creating path =" + path + " in ZK " + parser.getServerAddresses()); + if (!zkClient.exists(path)) { + zkClient.createPersistent(path, true); // will create parents if needed and will not throw exception if exists + } + } } } http://git-wip-us.apache.org/repos/asf/samza/blob/f758fb59/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 5c8fcf3..c547901 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -19,6 +19,11 @@ package org.apache.samza.zk; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; @@ -32,12 +37,6 @@ import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - /** * Util class to help manage Zk connection and ZkClient. * It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree. @@ -84,10 +83,6 @@ public class ZkUtils { return new ZkConnection(zkConnectString, sessionTimeoutMs); } - public static ZkClient createZkClient(ZkConnection zkConnection, int connectionTimeoutMs) { - return new ZkClient(zkConnection, connectionTimeoutMs); - } - ZkClient getZkClient() { return zkClient; } http://git-wip-us.apache.org/repos/asf/samza/blob/f758fb59/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index 7cfad61..393d733 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -18,8 +18,13 @@ */ package org.apache.samza.zk; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.I0Itec.zkclient.IZkDataListener; -import org.I0Itec.zkclient.ZkConnection; +import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.samza.SamzaException; import org.apache.samza.testUtils.EmbeddedZookeeper; @@ -32,12 +37,6 @@ import org.junit.Test; import org.mockito.Mockito; import org.slf4j.Logger; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -433,10 +432,10 @@ public class TestZkLeaderElector { } private ZkUtils getZkUtilsWithNewClient() { - ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS); + ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); return new ZkUtils( KEY_BUILDER, - ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS), + zkClient, CONNECTION_TIMEOUT_MS); } } http://git-wip-us.apache.org/repos/asf/samza/blob/f758fb59/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java index 2385b32..9f089a0 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java @@ -18,7 +18,13 @@ */ package org.apache.samza.zk; -import org.I0Itec.zkclient.ZkConnection; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.I0Itec.zkclient.ZkClient; import org.apache.samza.coordinator.Latch; import org.apache.samza.testUtils.EmbeddedZookeeper; import org.junit.After; @@ -28,13 +34,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - /** * The ZkProcessorLatch uses a shared Znode as a latch. Each participant await existence of a target znode under the * shared latch, which is a persistent, sequential target znode with value (latchSize - 1). latchSize is the minimum @@ -215,10 +214,11 @@ public class TestZkProcessorLatch { } private ZkUtils getZkUtilsWithNewClient(String processorId) { - ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS); + ZkClient zkClient = ZkCoordinationServiceFactory + .createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); return new ZkUtils( KEY_BUILDER, - ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS), + zkClient, CONNECTION_TIMEOUT_MS); } } http://git-wip-us.apache.org/repos/asf/samza/blob/f758fb59/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index 63e2361..173b8a6 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -18,7 +18,10 @@ */ package org.apache.samza.zk; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.function.BooleanSupplier; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; @@ -35,10 +38,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import java.util.HashMap; -import java.util.Map; -import java.util.function.BooleanSupplier; - public class TestZkUtils { private static EmbeddedZookeeper zkServer = null; private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test"); @@ -87,6 +86,26 @@ public class TestZkUtils { zkServer.teardown(); } + + @Test + public void testInitZkPath() { + String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1"; + ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient); + + Assert.assertTrue(zkClient.exists("/samza1")); + + zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1/samza2"; + ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient); + + Assert.assertTrue(zkClient.exists("/samza1/samza2")); + + + zkConnect = "127.0.0.1:" + zkServer.getPort(); // empty path. + ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient); + + Assert.assertTrue(zkClient.exists("/")); + } + @Test public void testRegisterProcessorId() { String assignedPath = zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1"));