Repository: samza Updated Branches: refs/heads/master 371473dd1 -> 1eb8fd472
SAMZA-1327: fail if namespace specified in the connection string does not exist Author: Boris Shkolnik <[email protected]> Author: Boris Shkolnik <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #218 from sborya/zkNameSpaceFail Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1eb8fd47 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1eb8fd47 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1eb8fd47 Branch: refs/heads/master Commit: 1eb8fd472458ad1ba03ac4e95627203bc7b7a12f Parents: 371473d Author: Boris Shkolnik <[email protected]> Authored: Mon Jun 12 10:51:27 2017 -0700 Committer: Xinyu Liu <[email protected]> Committed: Mon Jun 12 10:51:27 2017 -0700 ---------------------------------------------------------------------- .../samza/zk/ZkCoordinationServiceFactory.java | 39 ++--- .../org/apache/samza/zk/TestZkNamespace.java | 153 +++++++++++++++++++ .../java/org/apache/samza/zk/TestZkUtils.java | 20 --- 3 files changed, 174 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1eb8fd47/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java index b3a2a6f..20fcfa4 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationServiceFactory.java @@ -33,15 +33,11 @@ import org.slf4j.LoggerFactory; public class ZkCoordinationServiceFactory implements CoordinationServiceFactory { private final static Logger LOG = LoggerFactory.getLogger(ZkCoordinationServiceFactory.class); - // TODO - Why should this method be synchronized? - synchronized public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) { + public CoordinationUtils getCoordinationService(String groupId, String participantId, Config config) { ZkConfig zkConfig = new ZkConfig(config); - ZkClient zkClient = createZkClient(zkConfig.getZkConnect(), - zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); - - // make sure the 'path' exists - createZkPath(zkConfig.getZkConnect(), zkClient); + ZkClient zkClient = + createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs()); @@ -56,31 +52,38 @@ public class ZkCoordinationServiceFactory implements CoordinationServiceFactory * @return zkClient object */ public static ZkClient createZkClient(String connectString, int sessionTimeoutMS, int connectionTimeoutMs) { + ZkClient zkClient; try { - return new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs); + zkClient = new ZkClient(connectString, sessionTimeoutMS, connectionTimeoutMs); } catch (Exception e) { // ZkClient constructor may throw a variety of different exceptions, not all of them Zk based. throw new SamzaException("zkClient failed to connect to ZK at :" + connectString, e); } + + // make sure the namespace in zk exists (if specified) + validateZkNameSpace(connectString, zkClient); + + return zkClient; } /** - * if ZkConnectString contains some path at the end, it needs to be created when connecting for the first time. + * if ZkConnectString contains namespace path at the end, but it does not exist we should fail * @param zkConnect - connect string * @param zkClient - zkClient object to talk to the ZK */ - public static void createZkPath(String zkConnect, ZkClient zkClient) { + public static void validateZkNameSpace(String zkConnect, ZkClient zkClient) { ConnectStringParser parser = new ConnectStringParser(zkConnect); String path = parser.getChrootPath(); - LOG.info("path =" + path); - if (!Strings.isNullOrEmpty(path)) { - // create this path in zk - LOG.info("first connect. creating path =" + path + " in ZK " + parser.getServerAddresses()); - if (!zkClient.exists(path)) { - zkClient.createPersistent(path, true); // will create parents if needed and will not throw exception if exists - } + if (Strings.isNullOrEmpty(path)) { + return; // no namespace path } - } + LOG.info("connectString = " + zkConnect + "; path =" + path); + + // if namespace specified (path above) but "/" does not exists, we will fail + if (!zkClient.exists("/")) { + throw new SamzaException("Zookeeper namespace: " + path + " does not exist for zk at " + zkConnect); + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/1eb8fd47/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java new file mode 100644 index 0000000..3ce203e --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkNamespace.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.zk; + +import com.google.common.base.Strings; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.samza.SamzaException; +import org.apache.samza.testUtils.EmbeddedZookeeper; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +// zk namespace is similar to chroot in unix. It is defined in ZK, but user doesn't see it. +// For user "/" is the root, but in ZK tree it is actually host:port/namespace. If namespace is not created, then accessing +// "/" by user will fail. +public class TestZkNamespace { + private static EmbeddedZookeeper zkServer = null; + private ZkClient zkClient = null; + private ZkClient zkClient1 = null; + private static final int SESSION_TIMEOUT_MS = 20000; + private static final int CONNECTION_TIMEOUT_MS = 10000; + + @BeforeClass + public static void setup() + throws InterruptedException { + zkServer = new EmbeddedZookeeper(); + zkServer.setup(); + } + + @AfterClass + public static void teardown() { + zkServer.teardown(); + } + + // for these tests we need to connect to zk multiple times + private void initZk(String zkConnect) { + try { + zkClient = new ZkClient(new ZkConnection(zkConnect, SESSION_TIMEOUT_MS), CONNECTION_TIMEOUT_MS); + } catch (Exception e) { + Assert.fail("Client connection setup failed for connect + " + zkConnect + ": " + e); + } + } + + private void tearDownZk() { + if (zkClient != null) { + zkClient.close(); + } + + if (zkClient1 != null) { + zkClient1.close(); + } + } + + // create namespace for zk before accessing it, thus using a separate client + private void createNamespace(String pathToCreate) { + if (Strings.isNullOrEmpty(pathToCreate)) { + return; + } + + String zkConnect = "127.0.0.1:" + zkServer.getPort(); + try { + zkClient1 = new ZkClient(new ZkConnection(zkConnect, SESSION_TIMEOUT_MS), CONNECTION_TIMEOUT_MS); + } catch (Exception e) { + Assert.fail("Client connection setup failed. Aborting tests.."); + } + zkClient1.createPersistent(pathToCreate, true); + } + + // create namespace, create connection, validate the connection + private void testDoNotFailIfNameSpacePresent(String zkNameSpace) { + String zkConnect = "127.0.0.1:" + zkServer.getPort() + zkNameSpace; + createNamespace(zkNameSpace); + initZk(zkConnect); + ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient); + + zkClient.createPersistent("/test"); + zkClient.createPersistent("/test/test1"); + + // test if the new root exists + Assert.assertTrue(zkClient.exists("/")); + Assert.assertTrue(zkClient.exists("/test")); + Assert.assertTrue(zkClient.exists("/test/test1")); + } + + @Test + public void testValidateFailZkNameSpace1LevelPath() { + try { + String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/zkNameSpace"; + initZk(zkConnect); + ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient); + Assert.fail("1.Should fail with exception, because namespace doesn't exist"); + } catch (SamzaException e) { + // expected + } finally { + tearDownZk(); + } + } + + @Test + public void testValidateFailZkNameSpace2LevelPath() { + try { + String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/zkNameSpace/xyz"; + initZk(zkConnect); + ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient); + Assert.fail("2.Should fail with exception, because namespace doesn't exist"); + } catch (SamzaException e) { + // expected + } finally { + tearDownZk(); + } + } + + @Test + public void testValidateFailZkNameSpaceEmptyPath() { + // should succeed, because no namespace provided + String zkConnect = "127.0.0.1:" + zkServer.getPort() + ""; + initZk(zkConnect); + ZkCoordinationServiceFactory.validateZkNameSpace(zkConnect, zkClient); + tearDownZk(); + } + + @Test + public void testValidateNotFailZkNameSpace() { + // now positive tests - with existing namespace + testDoNotFailIfNameSpacePresent("/zkNameSpace1"); + + testDoNotFailIfNameSpacePresent("/zkNameSpace1/xyz1"); + + testDoNotFailIfNameSpacePresent(""); + + tearDownZk(); + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/1eb8fd47/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index 173b8a6..b7a0eb8 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -86,26 +86,6 @@ public class TestZkUtils { zkServer.teardown(); } - - @Test - public void testInitZkPath() { - String zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1"; - ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient); - - Assert.assertTrue(zkClient.exists("/samza1")); - - zkConnect = "127.0.0.1:" + zkServer.getPort() + "/samza1/samza2"; - ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient); - - Assert.assertTrue(zkClient.exists("/samza1/samza2")); - - - zkConnect = "127.0.0.1:" + zkServer.getPort(); // empty path. - ZkCoordinationServiceFactory.createZkPath(zkConnect, zkClient); - - Assert.assertTrue(zkClient.exists("/")); - } - @Test public void testRegisterProcessorId() { String assignedPath = zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1"));
