Repository: samza Updated Branches: refs/heads/master e6147fdac -> f1bc1d0b3
SAMZA-1102: Zk controller SAMZA-1102: Added ZKController and ZkControllerImpl Author: Boris Shkolnik <[email protected]> Author: navina <[email protected]> Reviewers: Navina Ramesh <[email protected]>, Fred Ji <[email protected]>, Xinyu Liu <[email protected]> Closes #50 from sborya/ZkController Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f1bc1d0b Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f1bc1d0b Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f1bc1d0b Branch: refs/heads/master Commit: f1bc1d0b36242170930c0001c9efa7e5c24f8dd0 Parents: e6147fd Author: Boris Shkolnik <[email protected]> Authored: Thu Feb 23 14:02:05 2017 -0800 Committer: navina <[email protected]> Committed: Thu Feb 23 14:02:05 2017 -0800 ---------------------------------------------------------------------- .../processor/SamzaContainerController.java | 1 + .../apache/samza/processor/StreamProcessor.java | 10 +- .../java/org/apache/samza/zk/ZkController.java | 32 ++++ .../org/apache/samza/zk/ZkControllerImpl.java | 163 +++++++++++++++++++ .../apache/samza/zk/ZkControllerListener.java | 34 ++++ .../java/org/apache/samza/zk/ZkKeyBuilder.java | 22 ++- .../org/apache/samza/zk/ZkLeaderElector.java | 36 ++-- .../main/java/org/apache/samza/zk/ZkUtils.java | 49 ++++++ .../org/apache/samza/zk/TestZkKeyBuilder.java | 4 +- .../apache/samza/zk/TestZkLeaderElector.java | 152 ++++++++++++++--- .../java/org/apache/samza/zk/TestZkUtils.java | 105 ++++++++++-- 11 files changed, 549 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java index d448d30..76e2053 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java +++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java @@ -60,6 +60,7 @@ public class SamzaContainerController { * @param taskFactory Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or * {@link org.apache.samza.task.AsyncStreamTask} * @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances + * @param processorId Id of the processor * @param metricsReporterMap Map of metric reporter name and {@link MetricsReporter} instance */ public SamzaContainerController( http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java index 5e90c56..4d3e8ab 100644 --- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java +++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java @@ -94,9 +94,14 @@ public class StreamProcessor { this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory); } + /** - * Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created + *Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created * using the provided {@link StreamTaskFactory}. + * @param processorId - this processor Id + * @param config - config + * @param customMetricsReporters metric Reporter + * @param streamTaskFactory task factory to instantiate the Task */ public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters, StreamTaskFactory streamTaskFactory) { @@ -106,6 +111,9 @@ public class StreamProcessor { /** * Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created * using the "task.class" configuration instead of a task factory. + * @param processorId - this processor Id + * @param config - config + * @param customMetricsReporters metrics */ public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters) { this(processorId, config, customMetricsReporters, (Object) null); http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java new file mode 100644 index 0000000..20c62cf --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.samza.zk; + +/** + * Api to the functionality provided by ZK + */ +public interface ZkController { + void register(); + boolean isLeader(); + void notifyJobModelChange(String version); + void stop(); + void listenToProcessorLiveness(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java new file mode 100644 index 0000000..70c8a37 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.zk; + +import org.I0Itec.zkclient.IZkChildListener; +import org.I0Itec.zkclient.IZkDataListener; +import org.apache.samza.SamzaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + + +public class ZkControllerImpl implements ZkController { + private static final Logger LOG = LoggerFactory.getLogger(ZkControllerImpl.class); + + private final String processorIdStr; + private final ZkUtils zkUtils; + private final ZkControllerListener zkControllerListener; + private final ZkLeaderElector leaderElector; + private final ScheduleAfterDebounceTime debounceTimer; + + public ZkControllerImpl(String processorIdStr, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, + ZkControllerListener zkControllerListener) { + this.processorIdStr = processorIdStr; + this.zkUtils = zkUtils; + this.zkControllerListener = zkControllerListener; + this.leaderElector = new ZkLeaderElector(processorIdStr, zkUtils, + new ZkLeaderElector.ZkLeaderElectorListener() { + @Override + public void onBecomingLeader() { + onBecomeLeader(); + } + } + ); + this.debounceTimer = debounceTimer; + + init(); + } + + private void init() { + ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder(); + zkUtils.makeSurePersistentPathsExists( + new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder + .getJobModelPathPrefix()}); + } + + private void onBecomeLeader() { + + listenToProcessorLiveness(); // subscribe for adding new processors + + // inform the caller + zkControllerListener.onBecomeLeader(); + + } + + @Override + public void register() { + + // TODO - make a loop here with some number of attempts. + // possibly split into two method - becomeLeader() and becomeParticipant() + leaderElector.tryBecomeLeader(); + + // subscribe to JobModel version updates + zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(debounceTimer)); + } + + @Override + public boolean isLeader() { + return leaderElector.amILeader(); + } + + @Override + public void notifyJobModelChange(String version) { + zkControllerListener.onNewJobModelAvailable(version); + } + + @Override + public void stop() { + if (isLeader()) { + leaderElector.resignLeadership(); + } + zkUtils.close(); + } + + @Override + public void listenToProcessorLiveness() { + zkUtils.subscribeToProcessorChange(new ZkProcessorChangeHandler(debounceTimer)); + } + + // Only by Leader + class ZkProcessorChangeHandler implements IZkChildListener { + private final ScheduleAfterDebounceTime debounceTimer; + public ZkProcessorChangeHandler(ScheduleAfterDebounceTime debounceTimer) { + this.debounceTimer = debounceTimer; + } + /** + * Called when the children of the given path changed. + * + * @param parentPath The parent path + * @param currentChilds The children or null if the root node (parent path) was deleted. + * @throws Exception + */ + @Override + public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { + LOG.info( + "ZkControllerImpl::ZkProcessorChangeHandler::handleChildChange - Path: " + parentPath + " Current Children: " + + currentChilds); + debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, + ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> zkControllerListener.onProcessorChange(currentChilds)); + } + } + + class ZkJobModelVersionChangeHandler implements IZkDataListener { + private final ScheduleAfterDebounceTime debounceTimer; + public ZkJobModelVersionChangeHandler(ScheduleAfterDebounceTime debounceTimer) { + this.debounceTimer = debounceTimer; + } + /** + * called when job model version gets updated + * @param dataPath + * @param data + * @throws Exception + */ + @Override + public void handleDataChange(String dataPath, Object data) throws Exception { + LOG.info("pid=" + processorIdStr + ". Got notification on version update change. path=" + dataPath + "; data=" + + (String) data); + + debounceTimer + .scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () -> notifyJobModelChange((String) data)); + } + @Override + public void handleDataDeleted(String dataPath) throws Exception { + throw new SamzaException("version update path has been deleted!"); + } + } + + public void shutdown() { + if (debounceTimer != null) + debounceTimer.stopScheduler(); + + if (zkUtils != null) + zkUtils.close(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java new file mode 100644 index 0000000..f7fedd7 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.zk; + +import java.util.List; + + +/** + * callbacks to the caller of the ZkController + */ +public interface ZkControllerListener { + void onBecomeLeader(); + void onProcessorChange(List<String> processorIds); + + void onNewJobModelAvailable(String version); // start job model update (stop current work) + void onNewJobModelConfirmed(String version); // start new work according to the new model +} http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java index 28344e9..d6cb9f3 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java @@ -19,8 +19,8 @@ package org.apache.samza.zk; -import com.google.common.base.Strings; import org.apache.samza.SamzaException; +import com.google.common.base.Strings; /** * The following ZK hierarchy is maintained for Standalone jobs: @@ -44,7 +44,7 @@ public class ZkKeyBuilder { private final String pathPrefix; static final String PROCESSORS_PATH = "processors"; - static final String PROCESSOR_ID_PREFIX = "processor-"; + public static final String JOBMODEL_VERSION_PATH = "jobModelVersion"; public ZkKeyBuilder(String pathPrefix) { if (Strings.isNullOrEmpty(pathPrefix)) { @@ -53,6 +53,10 @@ public class ZkKeyBuilder { this.pathPrefix = pathPrefix.trim(); } + public String getRootPath() { + return "/" + pathPrefix; + } + public String getProcessorsPath() { return String.format("/%s/%s", pathPrefix, PROCESSORS_PATH); } @@ -71,4 +75,18 @@ public class ZkKeyBuilder { return path.substring(path.lastIndexOf("/") + 1); return null; } + + public String getJobModelVersionPath() { + return String.format("/%s/%s", pathPrefix, JOBMODEL_VERSION_PATH); + } + + public String getJobModelPathPrefix() { + return String.format("/%s/jobModels", pathPrefix); + } + + public String getJobModelPath(String jobModelVersion) { + return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion); + } + + } http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java index 8cdf8fc..b9bdf11 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java @@ -50,25 +50,30 @@ public class ZkLeaderElector implements LeaderElector { private final String hostName; private AtomicBoolean isLeader = new AtomicBoolean(false); - private final IZkDataListener zkLeaderElectionListener; + private final IZkDataListener previousProcessorChangeListener; + ZkLeaderElectorListener zkLeaderElectorListener; private String currentSubscription = null; private final Random random = new Random(); @VisibleForTesting - ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, IZkDataListener leaderElectionListener) { + ZkLeaderElector(String processorIdStr, + ZkUtils zkUtils, + ZkLeaderElectorListener zkLeaderElectorListener, + IZkDataListener previousProcessorChangeListener) { this.processorIdStr = processorIdStr; this.zkUtils = zkUtils; - this.zkLeaderElectionListener = leaderElectionListener; this.keyBuilder = this.zkUtils.getKeyBuilder(); this.hostName = getHostName(); + this.zkLeaderElectorListener = zkLeaderElectorListener; // listener to inform the caller that they have become the leader + if (previousProcessorChangeListener == null) + this.previousProcessorChangeListener = new PreviousProcessorChangeListener(); + else + this.previousProcessorChangeListener = previousProcessorChangeListener; } - public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils) { - this.zkLeaderElectionListener = new ZkLeaderElectionListener(); - this.processorIdStr = processorIdStr; - this.zkUtils = zkUtils; - this.keyBuilder = this.zkUtils.getKeyBuilder(); - this.hostName = getHostName(); + public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, ZkLeaderElectorListener zkLeaderElectorListener) { + this(processorIdStr, zkUtils, zkLeaderElectorListener, null); + } // TODO: This should go away once we integrate with Zk based Job Coordinator @@ -81,6 +86,10 @@ public class ZkLeaderElector implements LeaderElector { } } + public interface ZkLeaderElectorListener { + void onBecomingLeader(); + } + @Override public boolean tryBecomeLeader() { String currentPath = zkUtils.registerProcessorAndGetId(hostName); @@ -96,6 +105,7 @@ public class ZkLeaderElector implements LeaderElector { if (index == 0) { isLeader.getAndSet(true); LOGGER.info(zLog("Eligible to become the leader!")); + zkLeaderElectorListener.onBecomingLeader(); // inform the caller return true; } @@ -105,11 +115,13 @@ public class ZkLeaderElector implements LeaderElector { if (!predecessor.equals(currentSubscription)) { if (currentSubscription != null) { LOGGER.debug(zLog("Unsubscribing data change for " + currentSubscription)); - zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderElectionListener); + zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, + previousProcessorChangeListener); } currentSubscription = predecessor; LOGGER.info(zLog("Subscribing data change for " + predecessor)); - zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderElectionListener); + zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, + previousProcessorChangeListener); } /** * Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes @@ -146,7 +158,7 @@ public class ZkLeaderElector implements LeaderElector { } // Only by non-leaders - class ZkLeaderElectionListener implements IZkDataListener { + class PreviousProcessorChangeListener implements IZkDataListener { @Override public void handleDataChange(String dataPath, Object data) throws Exception { http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index d0a269d..b11e02f 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -19,6 +19,7 @@ package org.apache.samza.zk; +import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; @@ -58,6 +59,7 @@ public class ZkUtils { private volatile String ephemeralPath = null; private final ZkKeyBuilder keyBuilder; private final int connectionTimeoutMs; + private final String processorId = "TO BE PASSED IN THE CONSTRUCTOR"; //TODO public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) { this.keyBuilder = zkKeyBuilder; @@ -143,4 +145,51 @@ public class ZkUtils { public void close() throws ZkInterruptedException { zkClient.close(); } + + /** + * subscribe for changes of JobModel version + * @param dataListener describe this + */ + public void subscribeToJobModelVersionChange(IZkDataListener dataListener) { + LOG.info("pid=" + processorId + " subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath()); + zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener); + } + + /** + * read the jobmodel version from ZK + * @return jobmodel version as a string + */ + public String getJobModelVersion() { + return zkClient.<String>readData(keyBuilder.getJobModelVersionPath()); + } + + /** + * verify that given paths exist in ZK + * @param paths + */ + public void makeSurePersistentPathsExists(String[] paths) { + for (String path : paths) { + if (!zkClient.exists(path)) { + zkClient.createPersistent(path, true); + } + } + } + + /** + * subscribe to the changes in the list of processors in ZK + * @param listener + */ + public void subscribeToProcessorChange(IZkChildListener listener) { + LOG.info("pid=" + processorId + " subscribing for child change at:" + keyBuilder.getProcessorsPath()); + zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener); + } + + public void deleteRoot() { + String rootPath = keyBuilder.getRootPath(); + if (rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) { + LOG.info("pid=" + processorId + " Deleteing root: " + rootPath); + zkClient.deleteRecursive(rootPath); + } + } + } http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java index e04f7c9..8e048b2 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java @@ -45,8 +45,8 @@ public class TestZkKeyBuilder { @Test public void testParseIdFromPath() { Assert.assertEquals( - ZkKeyBuilder.PROCESSOR_ID_PREFIX + "1", - ZkKeyBuilder.parseIdFromPath("/test/processors/" + ZkKeyBuilder.PROCESSOR_ID_PREFIX + "1")); + "1", + ZkKeyBuilder.parseIdFromPath("/test/processors/" + "1")); Assert.assertNull(ZkKeyBuilder.parseIdFromPath(null)); Assert.assertNull(ZkKeyBuilder.parseIdFromPath("")); } http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index b999ec5..6342fde 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -70,9 +70,13 @@ public class TestZkLeaderElector { } } + public static class BooleanResult { + public boolean res = false; + } @After public void testTeardown() { + testZkUtils.deleteRoot(); testZkUtils.close(); } @@ -94,8 +98,17 @@ public class TestZkLeaderElector { thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000"); when(mockZkUtils.getSortedActiveProcessors()).thenReturn(activeProcessors); - ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils); - Assert.assertTrue(leaderElector.tryBecomeLeader()); + BooleanResult isLeader = new BooleanResult(); + ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils, + new ZkLeaderElector.ZkLeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader.res = true; + } + } + ); + leaderElector.tryBecomeLeader(); + Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader.res, 2, 100)); } @Test @@ -104,7 +117,13 @@ public class TestZkLeaderElector { ZkUtils mockZkUtils = mock(ZkUtils.class); when(mockZkUtils.getSortedActiveProcessors()).thenReturn(new ArrayList<String>()); - ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils); + ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils, + new ZkLeaderElector.ZkLeaderElectorListener() { + @Override + public void onBecomingLeader() { + } + } + ); try { leaderElector.tryBecomeLeader(); Assert.fail("Was expecting leader election to fail!"); @@ -118,29 +137,50 @@ public class TestZkLeaderElector { */ @Test public void testLeaderElection() { + BooleanResult isLeader1 = new BooleanResult(); + BooleanResult isLeader2 = new BooleanResult(); + BooleanResult isLeader3 = new BooleanResult(); // Processor-1 ZkUtils zkUtils1 = getZkUtilsWithNewClient(); - ZkLeaderElector leaderElector1 = new ZkLeaderElector( - "1", - zkUtils1); + ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, + new ZkLeaderElector.ZkLeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader1.res = true; + } + } + ); // Processor-2 ZkUtils zkUtils2 = getZkUtilsWithNewClient(); - ZkLeaderElector leaderElector2 = new ZkLeaderElector( - "2", - zkUtils2); + ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, + new ZkLeaderElector.ZkLeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader2.res = true; + } + } + ); // Processor-3 ZkUtils zkUtils3 = getZkUtilsWithNewClient(); - ZkLeaderElector leaderElector3 = new ZkLeaderElector( - "3", - zkUtils3); + ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3, + new ZkLeaderElector.ZkLeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader3.res = true; + } + }); Assert.assertEquals(0, testZkUtils.getSortedActiveProcessors().size()); - Assert.assertTrue(leaderElector1.tryBecomeLeader()); - Assert.assertFalse(leaderElector2.tryBecomeLeader()); - Assert.assertFalse(leaderElector3.tryBecomeLeader()); + leaderElector1.tryBecomeLeader(); + leaderElector2.tryBecomeLeader(); + leaderElector3.tryBecomeLeader(); + + Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100)); + Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100)); + Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100)); Assert.assertEquals(3, testZkUtils.getSortedActiveProcessors().size()); @@ -166,16 +206,26 @@ public class TestZkLeaderElector { final CountDownLatch electionLatch = new CountDownLatch(1); final AtomicInteger count = new AtomicInteger(0); + BooleanResult isLeader1 = new BooleanResult(); + BooleanResult isLeader2 = new BooleanResult(); + BooleanResult isLeader3 = new BooleanResult(); + + // Processor-1 ZkUtils zkUtils1 = getZkUtilsWithNewClient(); zkUtils1.registerProcessorAndGetId("processor1"); ZkLeaderElector leaderElector1 = new ZkLeaderElector( "1", zkUtils1, + new ZkLeaderElector.ZkLeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader1.res = true; + } + }, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { - } @Override @@ -191,6 +241,12 @@ public class TestZkLeaderElector { ZkLeaderElector leaderElector2 = new ZkLeaderElector( "2", zkUtils2, + new ZkLeaderElector.ZkLeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader2.res = true; + } + }, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { @@ -223,6 +279,12 @@ public class TestZkLeaderElector { ZkLeaderElector leaderElector3 = new ZkLeaderElector( "3", zkUtils3, + new ZkLeaderElector.ZkLeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader3.res = true; + } + }, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { @@ -236,9 +298,12 @@ public class TestZkLeaderElector { }); // Join Leader Election - Assert.assertTrue(leaderElector1.tryBecomeLeader()); - Assert.assertFalse(leaderElector2.tryBecomeLeader()); - Assert.assertFalse(leaderElector3.tryBecomeLeader()); + leaderElector1.tryBecomeLeader(); + leaderElector2.tryBecomeLeader(); + leaderElector3.tryBecomeLeader(); + Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100)); + Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100)); + Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100)); Assert.assertTrue(leaderElector1.amILeader()); Assert.assertFalse(leaderElector2.amILeader()); @@ -278,12 +343,22 @@ public class TestZkLeaderElector { final CountDownLatch electionLatch = new CountDownLatch(1); final AtomicInteger count = new AtomicInteger(0); + BooleanResult isLeader1 = new BooleanResult(); + BooleanResult isLeader2 = new BooleanResult(); + BooleanResult isLeader3 = new BooleanResult(); + // Processor-1 ZkUtils zkUtils1 = getZkUtilsWithNewClient(); zkUtils1.registerProcessorAndGetId("processor1"); ZkLeaderElector leaderElector1 = new ZkLeaderElector( "1", zkUtils1, + new ZkLeaderElector.ZkLeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader1.res = true; + } + }, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { @@ -302,6 +377,12 @@ public class TestZkLeaderElector { ZkLeaderElector leaderElector2 = new ZkLeaderElector( "2", zkUtils2, + new ZkLeaderElector.ZkLeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader2.res = true; + } + }, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { @@ -320,6 +401,12 @@ public class TestZkLeaderElector { ZkLeaderElector leaderElector3 = new ZkLeaderElector( "3", zkUtils3, + new ZkLeaderElector.ZkLeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader3.res = true; + } + }, new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { @@ -347,9 +434,12 @@ public class TestZkLeaderElector { }); // Join Leader Election - Assert.assertTrue(leaderElector1.tryBecomeLeader()); - Assert.assertFalse(leaderElector2.tryBecomeLeader()); - Assert.assertFalse(leaderElector3.tryBecomeLeader()); + leaderElector1.tryBecomeLeader(); + leaderElector2.tryBecomeLeader(); + leaderElector3.tryBecomeLeader(); + Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100)); + Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100)); + Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100)); List<String> currentActiveProcessors = testZkUtils.getSortedActiveProcessors(); Assert.assertEquals(3, currentActiveProcessors.size()); @@ -373,15 +463,29 @@ public class TestZkLeaderElector { @Test public void testAmILeader() { + BooleanResult isLeader1 = new BooleanResult(); + BooleanResult isLeader2 = new BooleanResult(); // Processor-1 ZkLeaderElector leaderElector1 = new ZkLeaderElector( "1", - getZkUtilsWithNewClient()); + getZkUtilsWithNewClient(), + new ZkLeaderElector.ZkLeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader1.res = true; + } + }); // Processor-2 ZkLeaderElector leaderElector2 = new ZkLeaderElector( "2", - getZkUtilsWithNewClient()); + getZkUtilsWithNewClient(), + new ZkLeaderElector.ZkLeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader2.res = true; + } + }); // Before Leader Election Assert.assertFalse(leaderElector1.amILeader()); http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index 855d29d..b719e28 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -18,6 +18,8 @@ */ package org.apache.samza.zk; +import java.util.function.BooleanSupplier; +import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkNodeExistsException; @@ -32,10 +34,10 @@ import org.junit.Test; public class TestZkUtils { private static EmbeddedZookeeper zkServer = null; private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test"); - private ZkConnection zkConnection = null; private ZkClient zkClient = null; private static final int SESSION_TIMEOUT_MS = 20000; private static final int CONNECTION_TIMEOUT_MS = 10000; + private ZkUtils zkUtils; @BeforeClass public static void setup() throws InterruptedException { @@ -57,11 +59,21 @@ public class TestZkUtils { } catch (ZkNodeExistsException e) { // Do nothing } + + + zkUtils = new ZkUtils( + KEY_BUILDER, + zkClient, + SESSION_TIMEOUT_MS); + + zkUtils.connect(); + } @After public void testTeardown() { + zkUtils.close(); zkClient.close(); } @@ -72,34 +84,91 @@ public class TestZkUtils { @Test public void testRegisterProcessorId() { - ZkUtils utils = new ZkUtils( - KEY_BUILDER, - zkClient, - SESSION_TIMEOUT_MS); - utils.connect(); - String assignedPath = utils.registerProcessorAndGetId("0.0.0.0"); + String assignedPath = zkUtils.registerProcessorAndGetId("0.0.0.0"); Assert.assertTrue(assignedPath.startsWith(KEY_BUILDER.getProcessorsPath())); // Calling registerProcessorId again should return the same ephemeralPath as long as the session is valid - Assert.assertTrue(utils.registerProcessorAndGetId("0.0.0.0").equals(assignedPath)); + Assert.assertTrue(zkUtils.registerProcessorAndGetId("0.0.0.0").equals(assignedPath)); - utils.close(); } @Test public void testGetActiveProcessors() { - ZkUtils utils = new ZkUtils( - KEY_BUILDER, - zkClient, - SESSION_TIMEOUT_MS); - utils.connect(); + Assert.assertEquals(0, zkUtils.getSortedActiveProcessors().size()); + zkUtils.registerProcessorAndGetId("processorData"); - Assert.assertEquals(0, utils.getSortedActiveProcessors().size()); - utils.registerProcessorAndGetId("processorData"); + Assert.assertEquals(1, zkUtils.getSortedActiveProcessors().size()); - Assert.assertEquals(1, utils.getSortedActiveProcessors().size()); + } - utils.close(); + @Test + public void testSubscribeToJobModelVersionChange() { + + ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test"); + String root = keyBuilder.getRootPath(); + zkClient.deleteRecursive(root); + + class Result { + String res = ""; + public String getRes() { + return res; + } + public void updateRes(String newRes) { + res = newRes; + } + } + + Assert.assertFalse(zkUtils.exists(root)); + + // create the paths + zkUtils.makeSurePersistentPathsExists( + new String[]{root, keyBuilder.getJobModelVersionPath(), keyBuilder.getProcessorsPath()}); + Assert.assertTrue(zkUtils.exists(root)); + Assert.assertTrue(zkUtils.exists(keyBuilder.getJobModelVersionPath())); + Assert.assertTrue(zkUtils.exists(keyBuilder.getProcessorsPath())); + + final Result res = new Result(); + // define the callback + IZkDataListener dataListener = new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) + throws Exception { + res.updateRes((String) data); + } + + @Override + public void handleDataDeleted(String dataPath) + throws Exception { + Assert.fail("Data wasn't deleted;"); + } + }; + // subscribe + zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener); + zkClient.subscribeDataChanges(keyBuilder.getProcessorsPath(), dataListener); + // update + zkClient.writeData(keyBuilder.getJobModelVersionPath(), "newVersion"); + + // verify + Assert.assertTrue(testWithDelayBackOff(() -> "newVersion".equals(res.getRes()), 2, 1000)); + + // update again + zkClient.writeData(keyBuilder.getProcessorsPath(), "newProcessor"); + + Assert.assertTrue(testWithDelayBackOff(() -> "newProcessor".equals(res.getRes()), 2, 1000)); } + public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) { + long delay = startDelayMs; + while (delay < maxDelayMs) { + if (cond.getAsBoolean()) + return true; + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + return false; + } + delay *= 2; + } + return false; + } }
