Repository: samza Updated Branches: refs/heads/master 38b1dc38d -> 2226e3e71
SAMZA-1082 : Implement Leader Election using ZK Simple implementation of leader election recipe along with unit tests Author: navina <[email protected]> Reviewers: Xinyu Liu <[email protected]>, Fred Ji <[email protected]> Closes #48 from navina/LeaderElector Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2226e3e7 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2226e3e7 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2226e3e7 Branch: refs/heads/master Commit: 2226e3e710171336e0904c5a097d8ac8fff9f517 Parents: 38b1dc3 Author: navina <[email protected]> Authored: Mon Feb 13 13:32:06 2017 -0800 Committer: navina <[email protected]> Committed: Mon Feb 13 13:32:06 2017 -0800 ---------------------------------------------------------------------- build.gradle | 1 + gradle/dependency-versions.gradle | 2 +- .../java/org/apache/samza/config/ZkConfig.java | 49 +++ .../leaderelection/LeaderElector.java | 46 +++ .../java/org/apache/samza/zk/ZkKeyBuilder.java | 74 ++++ .../org/apache/samza/zk/ZkLeaderElector.java | 162 ++++++++ .../main/java/org/apache/samza/zk/ZkUtils.java | 146 +++++++ .../samza/coordinator/JobModelManager.scala | 2 - .../samza/testUtils/EmbeddedZookeeper.java | 103 +++++ .../org/apache/samza/testUtils/FileUtil.java | 81 ++++ .../org/apache/samza/zk/TestZkKeyBuilder.java | 53 +++ .../apache/samza/zk/TestZkLeaderElector.java | 405 +++++++++++++++++++ .../java/org/apache/samza/zk/TestZkUtils.java | 105 +++++ 13 files changed, 1226 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/2226e3e7/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 5b41c52..0d60970 100644 --- a/build.gradle +++ b/build.gradle @@ -159,6 +159,7 @@ project(":samza-core_$scalaVersion") { compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion" compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion" compile "org.eclipse.jetty:jetty-webapp:$jettyVersion" + compile "com.101tec:zkclient:$zkClientVersion" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" http://git-wip-us.apache.org/repos/asf/samza/blob/2226e3e7/gradle/dependency-versions.gradle ---------------------------------------------------------------------- diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index db59672..0193b64 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -26,7 +26,7 @@ mockitoVersion = "1.8.4" scalaTestVersion = "2.2.4" zkClientVersion = "0.8" - zookeeperVersion = "3.3.4" + zookeeperVersion = "3.4.6" metricsVersion = "2.2.0" kafkaVersion = "0.10.0.1" commonsHttpClientVersion = "3.1" http://git-wip-us.apache.org/repos/asf/samza/blob/2226e3e7/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java new file mode 100644 index 0000000..f26b2d9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/config/ZkConfig.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.config; + +public class ZkConfig extends MapConfig { + // Connection string for ZK, format: :<hostname>:<port>,..." + public static final String ZK_CONNECT = "coordinator.zk.connect"; + public static final String ZK_SESSION_TIMEOUT_MS = "coordinator.zk.session-timeout-ms"; + public static final String ZK_CONNECTION_TIMEOUT_MS = "coordinator.zk.connection-timeout-ms"; + + public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000; + public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000; + + public ZkConfig(Config config) { + super(config); + } + + public String getZkConnect() { + if (!containsKey(ZK_CONNECT)) { + throw new ConfigException("Missing " + ZK_CONNECT + " config!"); + } + return get(ZK_CONNECT); + } + + public int getZkSessionTimeoutMs() { + return getInt(ZK_SESSION_TIMEOUT_MS, DEFAULT_SESSION_TIMEOUT_MS); + } + + public int getZkConnectionTimeoutMs() { + return getInt(ZK_CONNECTION_TIMEOUT_MS, DEFAULT_CONNECTION_TIMEOUT_MS); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2226e3e7/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java b/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java new file mode 100644 index 0000000..94e3311 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/coordinator/leaderelection/LeaderElector.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.coordinator.leaderelection; + +import org.apache.samza.annotation.InterfaceStability; + [email protected] +public interface LeaderElector { + /** + * Method that helps the caller participate in leader election and returns when the participation is complete + * + * @return True, if caller is chosen as a leader through the leader election process. False, otherwise. + */ + boolean tryBecomeLeader(); + + /** + * Method that allows a caller to resign from leadership role. Caller can resign from leadership due to various + * reasons such as shutdown, connection failures etc. + * This method should clear any state created by the leader and clean-up the resources used by the leader. + */ + void resignLeadership(); + + /** + * Method that can be used to know if the caller is the current leader or not + * + * @return True, if the caller is the current leader. False, otherwise + */ + boolean amILeader(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/2226e3e7/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java new file mode 100644 index 0000000..28344e9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.zk; + +import com.google.common.base.Strings; +import org.apache.samza.SamzaException; + +/** + * The following ZK hierarchy is maintained for Standalone jobs: + * <pre> + * - / + * |- jobName-jobId/ + * |- processors/ + * |- 00000001 + * |- 00000002 + * |- ... + * </pre> + * Note: ZK Node levels without an ending forward slash ('/') represent a leaf node and non-leaf node, otherwise. + * + * This class provides helper methods to easily generate/parse the path in the ZK hierarchy. + */ +public class ZkKeyBuilder { + /** + * Prefix generated to uniquely identify a particular deployment of a job. + * TODO: For now, it looks like $jobName-$jobId. We need to add a unique deployment/attempt identifier as well. + */ + private final String pathPrefix; + + static final String PROCESSORS_PATH = "processors"; + static final String PROCESSOR_ID_PREFIX = "processor-"; + + public ZkKeyBuilder(String pathPrefix) { + if (Strings.isNullOrEmpty(pathPrefix)) { + throw new SamzaException("Zk PathPrefix cannot be null or empty!"); + } + this.pathPrefix = pathPrefix.trim(); + } + + public String getProcessorsPath() { + return String.format("/%s/%s", pathPrefix, PROCESSORS_PATH); + } + + /** + * Static method that helps parse the processorId substring from the ZK path + * + * Processor ID is prefixed by "processor-" and is an leaf node in ZK tree. Hence, this pattern is used to extract + * the processorId. + * + * @param path Full ZK path of a registered processor + * @return String representing the processor ID + */ + public static String parseIdFromPath(String path) { + if (!Strings.isNullOrEmpty(path)) + return path.substring(path.lastIndexOf("/") + 1); + return null; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2226e3e7/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java new file mode 100644 index 0000000..8cdf8fc --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.zk; + +import com.google.common.annotations.VisibleForTesting; +import org.I0Itec.zkclient.IZkDataListener; +import org.apache.samza.SamzaException; +import org.apache.samza.coordinator.leaderelection.LeaderElector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * <p> + * An implementation of Leader Elector using Zookeeper. + * + * Each participant in the leader election process creates an instance of this class and tries to become the leader. + * The participant with the lowest sequence number in the ZK subtree for election becomes the leader. Every non-leader + * sets a watcher on its predecessor, where the predecessor is the participant with the largest sequence number + * that is less than the current participant's sequence number. + * </p> + * */ +public class ZkLeaderElector implements LeaderElector { + public static final Logger LOGGER = LoggerFactory.getLogger(ZkLeaderElector.class); + private final ZkUtils zkUtils; + private final String processorIdStr; + private final ZkKeyBuilder keyBuilder; + private final String hostName; + + private AtomicBoolean isLeader = new AtomicBoolean(false); + private final IZkDataListener zkLeaderElectionListener; + private String currentSubscription = null; + private final Random random = new Random(); + + @VisibleForTesting + ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, IZkDataListener leaderElectionListener) { + this.processorIdStr = processorIdStr; + this.zkUtils = zkUtils; + this.zkLeaderElectionListener = leaderElectionListener; + this.keyBuilder = this.zkUtils.getKeyBuilder(); + this.hostName = getHostName(); + } + + public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils) { + this.zkLeaderElectionListener = new ZkLeaderElectionListener(); + this.processorIdStr = processorIdStr; + this.zkUtils = zkUtils; + this.keyBuilder = this.zkUtils.getKeyBuilder(); + this.hostName = getHostName(); + } + + // TODO: This should go away once we integrate with Zk based Job Coordinator + private String getHostName() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOGGER.error("Failed to fetch hostname of the processor", e); + throw new SamzaException(e); + } + } + + @Override + public boolean tryBecomeLeader() { + String currentPath = zkUtils.registerProcessorAndGetId(hostName); + + List<String> children = zkUtils.getSortedActiveProcessors(); + LOGGER.debug(zLog("Current active processors - " + children)); + int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath)); + + if (children.size() == 0 || index == -1) { + throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!"); + } + + if (index == 0) { + isLeader.getAndSet(true); + LOGGER.info(zLog("Eligible to become the leader!")); + return true; + } + + isLeader.getAndSet(false); + LOGGER.info("Index = " + index + " Not eligible to be a leader yet!"); + String predecessor = children.get(index - 1); + if (!predecessor.equals(currentSubscription)) { + if (currentSubscription != null) { + LOGGER.debug(zLog("Unsubscribing data change for " + currentSubscription)); + zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderElectionListener); + } + currentSubscription = predecessor; + LOGGER.info(zLog("Subscribing data change for " + predecessor)); + zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderElectionListener); + } + /** + * Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes + * on the path, even if the path doesn't exist. Since we are using Ephemeral Sequential nodes, if the path doesn't + * exist during subscription, it is not going to get created in the future. + */ + boolean predecessorExists = zkUtils.exists(keyBuilder.getProcessorsPath() + "/" + currentSubscription); + if (predecessorExists) { + LOGGER.info(zLog("Predecessor still exists. Current subscription is valid. Continuing as non-leader.")); + } else { + try { + Thread.sleep(random.nextInt(1000)); + } catch (InterruptedException e) { + Thread.interrupted(); + } + LOGGER.info(zLog("Predecessor doesn't exist anymore. Trying to become leader again...")); + return tryBecomeLeader(); + } + return false; + } + + @Override + public void resignLeadership() { + isLeader.compareAndSet(true, false); + } + + @Override + public boolean amILeader() { + return isLeader.get(); + } + + private String zLog(String logMessage) { + return String.format("[Processor-%s] %s", processorIdStr, logMessage); + } + + // Only by non-leaders + class ZkLeaderElectionListener implements IZkDataListener { + + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + LOGGER.debug("Data change on path: " + dataPath + " Data: " + data); + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + LOGGER.info(zLog("Data deleted on path " + dataPath + ". Predecessor went away. So, trying to become leader again...")); + tryBecomeLeader(); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2226e3e7/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java new file mode 100644 index 0000000..d0a269d --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.zk; + +import org.I0Itec.zkclient.IZkDataListener; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.I0Itec.zkclient.exception.ZkInterruptedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Util class to help manage Zk connection and ZkClient. + * It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree. + * + * <p> + * <b>Note on ZkClient:</b> + * {@link ZkClient} consists of two threads - I/O thread and Event thread. + * I/O thread manages heartbeats to the Zookeeper server in the ensemble and handles responses to synchronous methods + * in Zookeeper API. + * Event thread typically receives all the Watcher events and delivers to registered listeners. It, also, handles + * responses to asynchronous methods in Zookeeper API. + * </p> + * + * <p> + * <b>Note on Session Management:</b> + * Session management, if needed, should be handled by the caller. This can be done by implementing + * {@link org.I0Itec.zkclient.IZkStateListener} and subscribing this listener to the current ZkClient. Note: The connection state change + * callbacks are invoked in the context of the Event thread of the ZkClient. So, it is advised to do non-blocking + * processing in the callbacks. + * </p> + */ +public class ZkUtils { + private static final Logger LOG = LoggerFactory.getLogger(ZkUtils.class); + + private final ZkClient zkClient; + private volatile String ephemeralPath = null; + private final ZkKeyBuilder keyBuilder; + private final int connectionTimeoutMs; + + public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) { + this.keyBuilder = zkKeyBuilder; + this.connectionTimeoutMs = connectionTimeoutMs; + this.zkClient = zkClient; + } + + public void connect() throws ZkInterruptedException { + boolean isConnected = zkClient.waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS); + if (!isConnected) { + throw new RuntimeException("Unable to connect to Zookeeper within connectionTimeout " + connectionTimeoutMs + "ms. Shutting down!"); + } + } + + public static ZkConnection createZkConnection(String zkConnectString, int sessionTimeoutMs) { + return new ZkConnection(zkConnectString, sessionTimeoutMs); + } + + public static ZkClient createZkClient(ZkConnection zkConnection, int connectionTimeoutMs) { + return new ZkClient(zkConnection, connectionTimeoutMs); + } + + ZkClient getZkClient() { + return zkClient; + } + + public ZkKeyBuilder getKeyBuilder() { + return keyBuilder; + } + + /** + * Returns a ZK generated identifier for this client. + * If the current client is registering for the first time, it creates an ephemeral sequential node in the ZK tree + * If the current client has already registered and is still within the same session, it returns the already existing + * value for the ephemeralPath + * + * @param data Object that should be written as data in the registered ephemeral ZK node + * @return String representing the absolute ephemeralPath of this client in the current session + */ + public synchronized String registerProcessorAndGetId(final Object data) { + if (ephemeralPath == null) { + // TODO: Data should be more than just the hostname. Use Json serialized data + ephemeralPath = + zkClient.createEphemeralSequential( + keyBuilder.getProcessorsPath() + "/", data); + return ephemeralPath; + } else { + return ephemeralPath; + } + } + + public synchronized String getEphemeralPath() { + return ephemeralPath; + } + + /** + * Method is used to get the <i>sorted</i> list of currently active/registered processors + * + * @return List of absolute ZK node paths + */ + public List<String> getSortedActiveProcessors() { + List<String> children = zkClient.getChildren(keyBuilder.getProcessorsPath()); + if (children.size() > 0) { + Collections.sort(children); + LOG.info("Found these children - " + children); + } + return children; + } + + /* Wrapper for standard I0Itec methods */ + public void unsubscribeDataChanges(String path, IZkDataListener dataListener) { + zkClient.unsubscribeDataChanges(path, dataListener); + } + + public void subscribeDataChanges(String path, IZkDataListener dataListener) { + zkClient.subscribeDataChanges(path, dataListener); + } + + public boolean exists(String path) { + return zkClient.exists(path); + } + + public void close() throws ZkInterruptedException { + zkClient.close(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2226e3e7/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala index 85f4df0..7f5d05d 100644 --- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala +++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala @@ -214,7 +214,6 @@ object JobModelManager extends Logging { // Generate the jobModel def jobModelGenerator(): JobModel = refreshJobModel(config, - allSystemStreamPartitions, groups, previousChangelogMapping, localityManager) @@ -247,7 +246,6 @@ object JobModelManager extends Logging { * refresh. Hence, there is no need for synchronization as before. */ private def refreshJobModel(config: Config, - allSystemStreamPartitions: util.Set[SystemStreamPartition], groups: util.Map[TaskName, util.Set[SystemStreamPartition]], previousChangelogMapping: util.Map[TaskName, Integer], localityManager: LocalityManager): JobModel = { http://git-wip-us.apache.org/repos/asf/samza/blob/2226e3e7/samza-core/src/test/java/org/apache/samza/testUtils/EmbeddedZookeeper.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/EmbeddedZookeeper.java b/samza-core/src/test/java/org/apache/samza/testUtils/EmbeddedZookeeper.java new file mode 100644 index 0000000..bd0a2d1 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/testUtils/EmbeddedZookeeper.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.testUtils; + +import org.apache.zookeeper.server.NIOServerCnxnFactory; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.InetSocketAddress; + +public class EmbeddedZookeeper { + private static final Logger LOGGER = LoggerFactory.getLogger(EmbeddedZookeeper.class); + + private static final String SNAPSHOT_DIR_RELATIVE_PATH = "zk/snapshot"; + private static final String LOG_DIR_RELATIVE_PATH = "zk/log"; + private static final int TICK_TIME = 500; + private static final int MAX_CLIENT_CONNECTIONS = 1024; + private static final int RANDOM_PORT = 0; + + private ZooKeeperServer zooKeeperServer = null; + private ServerCnxnFactory serverCnxnFactory = null; + private File snapshotDir = null; + private File logDir = null; + + public void setup() { + try { + snapshotDir = FileUtil.createFileInTempDir(SNAPSHOT_DIR_RELATIVE_PATH); + logDir = FileUtil.createFileInTempDir(LOG_DIR_RELATIVE_PATH); + } catch (IOException e) { + LOGGER.error("Failed to setup Zookeeper Server Environment", e); + Assert.fail("Failed to setup Zookeeper Server Environment"); + } + + try { + zooKeeperServer = new ZooKeeperServer(snapshotDir, logDir, TICK_TIME); + serverCnxnFactory = NIOServerCnxnFactory.createFactory(); + InetSocketAddress addr = new InetSocketAddress("127.0.0.1", RANDOM_PORT); + serverCnxnFactory.configure(addr, MAX_CLIENT_CONNECTIONS); + + serverCnxnFactory.startup(zooKeeperServer); + } catch (Exception e) { + LOGGER.error("Zookeeper Server failed to start", e); + Assert.fail("Zookeeper Server failed to start"); + } + } + + public void teardown() { + serverCnxnFactory.shutdown(); + + try { + serverCnxnFactory.join(); + } catch (InterruptedException e) { + LOGGER.warn("Zookeeper server may not have terminated cleanly!", e); + } + + try { + FileUtil.deleteDir(snapshotDir); + FileUtil.deleteDir(logDir); + } catch (FileNotFoundException | NullPointerException e) { + LOGGER.warn("Zookeeper Server Environment Cleanup failed!", e); + } + } + + public int getPort() { + return zooKeeperServer.getClientPort(); + } + + public static void main(String[] args) { + EmbeddedZookeeper zk = new EmbeddedZookeeper(); + zk.setup(); + System.out.println("Zk Server Started!!"); + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + zk.teardown(); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/2226e3e7/samza-core/src/test/java/org/apache/samza/testUtils/FileUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/testUtils/FileUtil.java b/samza-core/src/test/java/org/apache/samza/testUtils/FileUtil.java new file mode 100644 index 0000000..b33a9fa --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/testUtils/FileUtil.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.testUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; + +public class FileUtil { + public static final String TMP_DIR = System.getProperty("java.io.tmpdir"); + private static final Logger LOGGER = LoggerFactory.getLogger(FileUtil.class); + + private FileUtil() {} + + /** + * Creates a file, along with any parents (if any), in the system-specific Java temporary directory + * If the file already exists, this method simply returns + * + * @param path Path relative to the temporary directory + * @return True, if the file was created successfully, along with parent files (if any) + * @throws IOException + */ + static File createFileInTempDir(String path) throws IOException { + if (path == null || path.isEmpty()) { + throw new RuntimeException("Unable to create file - Null or empty path!"); + } + File file = new File(TMP_DIR, path); + if (!file.exists()) { + if (!file.mkdirs()) { + throw new IOException("Failed to create file"); + } + } + return file; + } + + /** + * Deletes a given {@link File}, if it exists. If it doesn't exist, it throws a {@link FileNotFoundException} + * If the given {@link File} is a directory, it recursively deletes the files in the directory, before deleting the + * directory itself. + * + * @param path Reference to the {@link File} to be deleted + * @return True, if it successfully deleted the given {@link File}. False, otherwise. + * @throws FileNotFoundException When the given {@link File} does not exist + * @throws NullPointerException When the given {@link File} reference is null + */ + static boolean deleteDir(File path) throws FileNotFoundException, NullPointerException { + if (path == null) { + throw new NullPointerException("Path cannot be null!"); + } + if (!path.exists()) { + throw new FileNotFoundException("File not found: " + path); + } + boolean result = true; + + if (path.isDirectory()) { + for (File f: path.listFiles()) { + result = result & deleteDir(f); + } + } + return result && path.delete(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2226e3e7/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java new file mode 100644 index 0000000..e04f7c9 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import org.apache.samza.SamzaException; +import org.junit.Assert; +import org.junit.Test; + +public class TestZkKeyBuilder { + + @Test + public void pathPrefixCannotBeNullOrEmpty() { + try { + new ZkKeyBuilder(""); + Assert.fail("Key Builder was created with empty path prefix!"); + new ZkKeyBuilder(null); + Assert.fail("Key Builder was created with null path prefix!"); + } catch (SamzaException e) { + // Expected + } + } + + @Test + public void testProcessorsPath() { + ZkKeyBuilder builder = new ZkKeyBuilder("test"); + Assert.assertEquals("/test/" + ZkKeyBuilder.PROCESSORS_PATH, builder.getProcessorsPath()); + } + + @Test + public void testParseIdFromPath() { + Assert.assertEquals( + ZkKeyBuilder.PROCESSOR_ID_PREFIX + "1", + ZkKeyBuilder.parseIdFromPath("/test/processors/" + ZkKeyBuilder.PROCESSOR_ID_PREFIX + "1")); + Assert.assertNull(ZkKeyBuilder.parseIdFromPath(null)); + Assert.assertNull(ZkKeyBuilder.parseIdFromPath("")); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2226e3e7/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java new file mode 100644 index 0000000..b999ec5 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import org.I0Itec.zkclient.IZkDataListener; +import org.I0Itec.zkclient.ZkConnection; +import org.I0Itec.zkclient.exception.ZkNodeExistsException; +import org.apache.samza.SamzaException; +import org.apache.samza.testUtils.EmbeddedZookeeper; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestZkLeaderElector { + + private static EmbeddedZookeeper zkServer = null; + private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test"); + private String testZkConnectionString = null; + private ZkUtils testZkUtils = null; + private static final int SESSION_TIMEOUT_MS = 20000; + private static final int CONNECTION_TIMEOUT_MS = 10000; + + @BeforeClass + public static void setup() throws InterruptedException { + zkServer = new EmbeddedZookeeper(); + zkServer.setup(); + } + + @Before + public void testSetup() { + testZkConnectionString = "localhost:" + zkServer.getPort(); + try { + testZkUtils = getZkUtilsWithNewClient(); + } catch (Exception e) { + Assert.fail("Client connection setup failed. Aborting tests.."); + } + try { + testZkUtils.getZkClient().createPersistent(KEY_BUILDER.getProcessorsPath(), true); + } catch (ZkNodeExistsException e) { + // Do nothing + } + } + + + @After + public void testTeardown() { + testZkUtils.close(); + } + + @AfterClass + public static void teardown() { + zkServer.teardown(); + } + + @Test + public void testLeaderElectionRegistersProcessor() { + List<String> activeProcessors = new ArrayList<String>() { + { + add("0000000000"); + } + }; + + ZkUtils mockZkUtils = mock(ZkUtils.class); + when(mockZkUtils.registerProcessorAndGetId(any())). + thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000"); + when(mockZkUtils.getSortedActiveProcessors()).thenReturn(activeProcessors); + + ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils); + Assert.assertTrue(leaderElector.tryBecomeLeader()); + } + + @Test + public void testUnregisteredProcessorInLeaderElection() { + String processorId = "1"; + ZkUtils mockZkUtils = mock(ZkUtils.class); + when(mockZkUtils.getSortedActiveProcessors()).thenReturn(new ArrayList<String>()); + + ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils); + try { + leaderElector.tryBecomeLeader(); + Assert.fail("Was expecting leader election to fail!"); + } catch (SamzaException e) { + // No-op Expected + } + } + + /** + * Test starts 3 processors and verifies the state of the Zk tree after all processors participate in LeaderElection + */ + @Test + public void testLeaderElection() { + // Processor-1 + ZkUtils zkUtils1 = getZkUtilsWithNewClient(); + ZkLeaderElector leaderElector1 = new ZkLeaderElector( + "1", + zkUtils1); + + // Processor-2 + ZkUtils zkUtils2 = getZkUtilsWithNewClient(); + ZkLeaderElector leaderElector2 = new ZkLeaderElector( + "2", + zkUtils2); + + // Processor-3 + ZkUtils zkUtils3 = getZkUtilsWithNewClient(); + ZkLeaderElector leaderElector3 = new ZkLeaderElector( + "3", + zkUtils3); + + Assert.assertEquals(0, testZkUtils.getSortedActiveProcessors().size()); + + Assert.assertTrue(leaderElector1.tryBecomeLeader()); + Assert.assertFalse(leaderElector2.tryBecomeLeader()); + Assert.assertFalse(leaderElector3.tryBecomeLeader()); + + Assert.assertEquals(3, testZkUtils.getSortedActiveProcessors().size()); + + // Clean up + zkUtils1.close(); + zkUtils2.close(); + zkUtils3.close(); + + Assert.assertEquals(new ArrayList<String>(), testZkUtils.getSortedActiveProcessors()); + + } + + /** + * Tests that Leader Failure automatically promotes the next successor to become the leader + */ + @Test + public void testLeaderFailure() { + /** + * electionLatch and count together verify that: + * 1. the registered listeners are actually invoked by the ZkClient on the correct path + * 2. for a single participant failure, at-most 1 other participant is notified + */ + final CountDownLatch electionLatch = new CountDownLatch(1); + final AtomicInteger count = new AtomicInteger(0); + + // Processor-1 + ZkUtils zkUtils1 = getZkUtilsWithNewClient(); + zkUtils1.registerProcessorAndGetId("processor1"); + ZkLeaderElector leaderElector1 = new ZkLeaderElector( + "1", + zkUtils1, + new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + count.incrementAndGet(); + } + }); + + + // Processor-2 + ZkUtils zkUtils2 = getZkUtilsWithNewClient(); + final String path2 = zkUtils2.registerProcessorAndGetId("processor2"); + ZkLeaderElector leaderElector2 = new ZkLeaderElector( + "2", + zkUtils2, + new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path2); + Assert.assertNotNull(registeredIdStr); + + String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath); + Assert.assertNotNull(predecessorIdStr); + + try { + int selfId = Integer.parseInt(registeredIdStr); + int predecessorId = Integer.parseInt(predecessorIdStr); + Assert.assertEquals(1, selfId - predecessorId); + } catch (Exception e) { + System.out.println(e.getMessage()); + } + count.incrementAndGet(); + electionLatch.countDown(); + } + }); + + // Processor-3 + ZkUtils zkUtils3 = getZkUtilsWithNewClient(); + zkUtils3.registerProcessorAndGetId("processor3"); + ZkLeaderElector leaderElector3 = new ZkLeaderElector( + "3", + zkUtils3, + new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + count.incrementAndGet(); + } + }); + + // Join Leader Election + Assert.assertTrue(leaderElector1.tryBecomeLeader()); + Assert.assertFalse(leaderElector2.tryBecomeLeader()); + Assert.assertFalse(leaderElector3.tryBecomeLeader()); + + Assert.assertTrue(leaderElector1.amILeader()); + Assert.assertFalse(leaderElector2.amILeader()); + Assert.assertFalse(leaderElector3.amILeader()); + + List<String> currentActiveProcessors = testZkUtils.getSortedActiveProcessors(); + Assert.assertEquals(3, currentActiveProcessors.size()); + + // Leader Failure + zkUtils1.close(); + currentActiveProcessors.remove(0); + + try { + Assert.assertTrue(electionLatch.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Assert.fail("Interrupted while waiting for leaderElection listener callback to complete!"); + } + + Assert.assertEquals(1, count.get()); + Assert.assertEquals(currentActiveProcessors, testZkUtils.getSortedActiveProcessors()); + + // Clean up + zkUtils2.close(); + zkUtils3.close(); + } + + /** + * Tests that a non-leader failure updates the Zk tree and participants' state correctly + */ + @Test + public void testNonLeaderFailure() { + /** + * electionLatch and count together verify that: + * 1. the registered listeners are actually invoked by the ZkClient on the correct path + * 2. for a single participant failure, at-most 1 other participant is notified + */ + final CountDownLatch electionLatch = new CountDownLatch(1); + final AtomicInteger count = new AtomicInteger(0); + + // Processor-1 + ZkUtils zkUtils1 = getZkUtilsWithNewClient(); + zkUtils1.registerProcessorAndGetId("processor1"); + ZkLeaderElector leaderElector1 = new ZkLeaderElector( + "1", + zkUtils1, + new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + count.incrementAndGet(); + } + }); + + // Processor-2 + ZkUtils zkUtils2 = getZkUtilsWithNewClient(); + zkUtils2.registerProcessorAndGetId("processor2"); + ZkLeaderElector leaderElector2 = new ZkLeaderElector( + "2", + zkUtils2, + new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + count.incrementAndGet(); + } + }); + + // Processor-3 + ZkUtils zkUtils3 = getZkUtilsWithNewClient(); + final String path3 = zkUtils3.registerProcessorAndGetId("processor3"); + ZkLeaderElector leaderElector3 = new ZkLeaderElector( + "3", + zkUtils3, + new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + + } + + @Override + public void handleDataDeleted(String dataPath) throws Exception { + String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path3); + Assert.assertNotNull(registeredIdStr); + + String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath); + Assert.assertNotNull(predecessorIdStr); + + try { + int selfId = Integer.parseInt(registeredIdStr); + int predecessorId = Integer.parseInt(predecessorIdStr); + Assert.assertEquals(1, selfId - predecessorId); + } catch (Exception e) { + Assert.fail("Exception in LeaderElectionListener!"); + } + count.incrementAndGet(); + electionLatch.countDown(); + } + }); + + // Join Leader Election + Assert.assertTrue(leaderElector1.tryBecomeLeader()); + Assert.assertFalse(leaderElector2.tryBecomeLeader()); + Assert.assertFalse(leaderElector3.tryBecomeLeader()); + + List<String> currentActiveProcessors = testZkUtils.getSortedActiveProcessors(); + Assert.assertEquals(3, currentActiveProcessors.size()); + + zkUtils2.close(); + currentActiveProcessors.remove(1); + + try { + Assert.assertTrue(electionLatch.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Assert.fail("Interrupted while waiting for leaderElection listener callback to complete!"); + } + + Assert.assertEquals(1, count.get()); + Assert.assertEquals(currentActiveProcessors, testZkUtils.getSortedActiveProcessors()); + + // Clean up + zkUtils1.close(); + zkUtils3.close(); + } + + @Test + public void testAmILeader() { + // Processor-1 + ZkLeaderElector leaderElector1 = new ZkLeaderElector( + "1", + getZkUtilsWithNewClient()); + + // Processor-2 + ZkLeaderElector leaderElector2 = new ZkLeaderElector( + "2", + getZkUtilsWithNewClient()); + + // Before Leader Election + Assert.assertFalse(leaderElector1.amILeader()); + Assert.assertFalse(leaderElector2.amILeader()); + + leaderElector1.tryBecomeLeader(); + leaderElector2.tryBecomeLeader(); + + // After Leader Election + Assert.assertTrue(leaderElector1.amILeader()); + Assert.assertFalse(leaderElector2.amILeader()); + } + + private ZkUtils getZkUtilsWithNewClient() { + ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS); + return new ZkUtils( + KEY_BUILDER, + ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS), + CONNECTION_TIMEOUT_MS); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/2226e3e7/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java new file mode 100644 index 0000000..855d29d --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.I0Itec.zkclient.exception.ZkNodeExistsException; +import org.apache.samza.testUtils.EmbeddedZookeeper; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestZkUtils { + private static EmbeddedZookeeper zkServer = null; + private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test"); + private ZkConnection zkConnection = null; + private ZkClient zkClient = null; + private static final int SESSION_TIMEOUT_MS = 20000; + private static final int CONNECTION_TIMEOUT_MS = 10000; + + @BeforeClass + public static void setup() throws InterruptedException { + zkServer = new EmbeddedZookeeper(); + zkServer.setup(); + } + + @Before + public void testSetup() { + try { + zkClient = new ZkClient( + new ZkConnection("localhost:" + zkServer.getPort(), SESSION_TIMEOUT_MS), + CONNECTION_TIMEOUT_MS); + } catch (Exception e) { + Assert.fail("Client connection setup failed. Aborting tests.."); + } + try { + zkClient.createPersistent(KEY_BUILDER.getProcessorsPath(), true); + } catch (ZkNodeExistsException e) { + // Do nothing + } + } + + + @After + public void testTeardown() { + zkClient.close(); + } + + @AfterClass + public static void teardown() { + zkServer.teardown(); + } + + @Test + public void testRegisterProcessorId() { + ZkUtils utils = new ZkUtils( + KEY_BUILDER, + zkClient, + SESSION_TIMEOUT_MS); + utils.connect(); + String assignedPath = utils.registerProcessorAndGetId("0.0.0.0"); + Assert.assertTrue(assignedPath.startsWith(KEY_BUILDER.getProcessorsPath())); + + // Calling registerProcessorId again should return the same ephemeralPath as long as the session is valid + Assert.assertTrue(utils.registerProcessorAndGetId("0.0.0.0").equals(assignedPath)); + + utils.close(); + } + + @Test + public void testGetActiveProcessors() { + ZkUtils utils = new ZkUtils( + KEY_BUILDER, + zkClient, + SESSION_TIMEOUT_MS); + utils.connect(); + + Assert.assertEquals(0, utils.getSortedActiveProcessors().size()); + utils.registerProcessorAndGetId("processorData"); + + Assert.assertEquals(1, utils.getSortedActiveProcessors().size()); + + utils.close(); + } + +}
