Repository: samza Updated Branches: refs/heads/master c58d74b35 -> 4d7b3b353
SAMZA-1103: ZkBarrier SAMZA-1103: Barrier for JobModel upgrades. When all the processors got notification about the new JobModel, only after that they can start using the new model. Author: Boris Shkolnik <[email protected]> Author: Boris Shkolnik <[email protected]> Author: navina <[email protected]> Reviewers: Fred Ji <[email protected]>, Navina Ramesh <[email protected]>, Xiliu Liu <[email protected]> Closes #61 from sborya/ZkBarrier Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4d7b3b35 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4d7b3b35 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4d7b3b35 Branch: refs/heads/master Commit: 4d7b3b3534ed804ad54227901bf3bbaff32814e1 Parents: c58d74b Author: Boris Shkolnik <[email protected]> Authored: Tue Feb 28 17:56:50 2017 -0800 Committer: navina <[email protected]> Committed: Tue Feb 28 17:56:50 2017 -0800 ---------------------------------------------------------------------- .../samza/zk/BarrierForVersionUpgrade.java | 46 +++++ .../samza/zk/ScheduleAfterDebounceTime.java | 8 +- .../samza/zk/ZkBarrierForVersionUpgrade.java | 166 +++++++++++++++++++ .../java/org/apache/samza/zk/ZkKeyBuilder.java | 4 +- .../main/java/org/apache/samza/zk/ZkUtils.java | 11 +- .../apache/samza/task/ReadableCoordinator.scala | 1 + .../zk/TestZkBarrierForVersionUpgrade.java | 148 +++++++++++++++++ .../apache/samza/zk/TestZkLeaderElector.java | 11 +- 8 files changed, 379 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java new file mode 100644 index 0000000..b2d80d0 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.zk; + +import java.util.List; + + +/** + * Interface for a barrier - to allow synchronization between different processors to switch to a newly published + * JobModel. + */ +public interface BarrierForVersionUpgrade { + /** + * Barrier is usually started by the leader. + * @param version - for which the barrier is started. + * @param processorsNames - list of processors available at the time of the JobModel generation. + */ + void startBarrier(String version, List<String> processorsNames); + + /** + * Called by the processor. + * Updates the processor readiness to use the new version and wait on the barrier, until all other processors + * joined. + * @param version of the jobModel this barrier is protecting. + * @param processorsName as it appears in the list of processors. + * @param callback will be invoked, when barrier is reached. + */ + void waitForBarrier(String version, String processorsName, Runnable callback); +} http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java index 0a4db6d..289d900 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java @@ -42,15 +42,17 @@ public class ScheduleAfterDebounceTime { public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class); public static final long TIMEOUT_MS = 1000 * 10; // timeout to wait for a task to complete + // Names of actions. + // When the same action is scheduled it needs to cancel the previous one. + // To accomplish that we keep the previous future in a map, keyed by the action name. + + // Here we predefine some actions which are used in the ZK based standalone app. // Action name when the JobModel version changes public static final String JOB_MODEL_VERSION_CHANGE = "JobModelVersionChange"; // Action name when the Processor membership changes public static final String ON_PROCESSOR_CHANGE = "OnProcessorChange"; - // Action name when the Processor Data changes - public static final String ON_DATA_CHANGE_ON = "OnDataChangeOn"; - public static final int DEBOUNCE_TIME_MS = 2000; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java new file mode 100644 index 0000000..3ec87b0 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.zk; + +import java.util.List; + +import org.I0Itec.zkclient.IZkChildListener; +import org.I0Itec.zkclient.IZkDataListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { + private final ZkUtils zkUtils; + private final ZkKeyBuilder keyBuilder; + private final static String BARRIER_DONE = "done"; + private final static Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class); + + private final ScheduleAfterDebounceTime debounceTimer; + + private final String barrierPrefix; + + public ZkBarrierForVersionUpgrade(ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer) { + this.zkUtils = zkUtils; + keyBuilder = zkUtils.getKeyBuilder(); + + barrierPrefix = keyBuilder.getJobModelVersionBarrierPrefix(); + this.debounceTimer = debounceTimer; + } + + @Override + public void startBarrier(String version, List<String> processorsNames) { + String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); + String barrierDonePath = String.format("%s/barrier_done", barrierPath); + String barrierProcessors = String.format("%s/barrier_processors", barrierPath); + + zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath}); + + // callback for when the barrier is reached + Runnable callback = new Runnable() { + @Override + public void run() { + LOG.info("Writing BARRIER DONE to " + barrierDonePath); + zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE); + } + }; + // subscribe for processor's list changes + LOG.info("Subscribing for child changes at " + barrierProcessors); + zkUtils.getZkClient().subscribeChildChanges(barrierProcessors, + new ZkBarrierChangeHandler(callback, processorsNames)); + } + + @Override + public void waitForBarrier(String version, String processorsName, Runnable callback) { + // if participant makes this call it means it has already stopped the old container and got the new job model. + String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); + String barrierDonePath = String.format("%s/barrier_done", barrierPath); + String barrierProcessors = String.format("%s/barrier_processors", barrierPath); + String barrierProcessorThis = String.format("%s/%s", barrierProcessors, processorsName); + + + // update the barrier for this processor + LOG.info("Creating a child for barrier at " + barrierProcessorThis); + zkUtils.getZkClient().createPersistent(barrierProcessorThis); + + // now subscribe for the barrier + zkUtils.getZkClient().subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, debounceTimer, callback)); + } + + /** + * listener for the subscription. + */ + class ZkBarrierChangeHandler implements IZkChildListener { + Runnable callback; + List<String> names; + + public ZkBarrierChangeHandler(Runnable callback, List<String> names) { + this.callback = callback; + this.names = names; + } + + @Override + public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception { + // Find out the event & Log + boolean allIn = true; + + if (currentChildren == null) { + LOG.info("Got handleChildChange with null currentChildren"); + return; + } + // debug + StringBuilder sb = new StringBuilder(); + for (String child : currentChildren) { + sb.append(child).append(","); + } + LOG.info("list of children in the barrier = " + parentPath + ":" + sb.toString()); + sb = new StringBuilder(); + for (String child : names) { + sb.append(child).append(","); + } + LOG.info("list of children to compare against = " + parentPath + ":" + sb.toString()); + + + // check if all the names are in + for (String n : names) { + if (!currentChildren.contains(n)) { + LOG.info("node " + n + " is still not in the list "); + allIn = false; + break; + } + } + if (allIn) { + LOG.info("ALl nodes reached the barrier"); + callback.run(); // all the names have registered + } + } + } + + class ZkBarrierReachedHandler implements IZkDataListener { + private final ScheduleAfterDebounceTime debounceTimer; + private final String barrierPathDone; + private final Runnable callback; + + public ZkBarrierReachedHandler(String barrierPathDone, ScheduleAfterDebounceTime debounceTimer, Runnable callback) { + this.barrierPathDone = barrierPathDone; + this.callback = callback; + this.debounceTimer = debounceTimer; + } + + @Override + public void handleDataChange(String dataPath, Object data) + throws Exception { + String done = (String) data; + LOG.info("got notification about barrier path=" + barrierPathDone + "; done=" + done); + if (done.equals(BARRIER_DONE)) { + zkUtils.unsubscribeDataChanges(barrierPathDone, this); + debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, callback); + } + // we do not need to resubscribe because, ZkClient library does it for us. + + } + + @Override + public void handleDataDeleted(String dataPath) + throws Exception { + LOG.warn("barrier done got deleted at " + dataPath); + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java index d6cb9f3..0a8f37e 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java @@ -88,5 +88,7 @@ public class ZkKeyBuilder { return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion); } - + public String getJobModelVersionBarrierPrefix() { + return String.format("/%s/versionBarriers", pathPrefix); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index b11e02f..320cd49 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -19,6 +19,9 @@ package org.apache.samza.zk; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; @@ -27,10 +30,6 @@ import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - /** * Util class to help manage Zk connection and ZkClient. * It also provides additional utility methods for read/write/subscribe/unsubscribe access to the ZK tree. @@ -165,7 +164,7 @@ public class ZkUtils { /** * verify that given paths exist in ZK - * @param paths + * @param paths - paths to verify or create */ public void makeSurePersistentPathsExists(String[] paths) { for (String path : paths) { @@ -177,7 +176,7 @@ public class ZkUtils { /** * subscribe to the changes in the list of processors in ZK - * @param listener + * @param listener - will be called when a processor is added or removed. */ public void subscribeToProcessorChange(IZkChildListener listener) { LOG.info("pid=" + processorId + " subscribing for child change at:" + keyBuilder.getProcessorsPath()); http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala index 6e1134d..6c7641b 100644 --- a/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala +++ b/samza-core/src/main/scala/org/apache/samza/task/ReadableCoordinator.scala @@ -38,3 +38,4 @@ class ReadableCoordinator(val taskName: TaskName) extends TaskCoordinator { def requestedShutdownOnConsensus = shutdownRequest.isDefined && shutdownRequest.get == RequestScope.CURRENT_TASK def requestedShutdownNow = shutdownRequest.isDefined && shutdownRequest.get == RequestScope.ALL_TASKS_IN_CONTAINER } + http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java new file mode 100644 index 0000000..92cb2c9 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import java.util.ArrayList; +import java.util.List; +import junit.framework.Assert; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.samza.testUtils.EmbeddedZookeeper; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + + +public class TestZkBarrierForVersionUpgrade { + private static EmbeddedZookeeper zkServer = null; + private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test"); + private String testZkConnectionString = null; + private ZkUtils testZkUtils = null; + private static final int SESSION_TIMEOUT_MS = 20000; + private static final int CONNECTION_TIMEOUT_MS = 10000; + + @BeforeClass + public static void setup() throws InterruptedException { + zkServer = new EmbeddedZookeeper(); + zkServer.setup(); + } + + @Before + public void testSetup() { + testZkConnectionString = "localhost:" + zkServer.getPort(); + try { + testZkUtils = getZkUtilsWithNewClient(); + } catch (Exception e) { + Assert.fail("Client connection setup failed. Aborting tests.."); + } + } + + @After + public void testTeardown() { + testZkUtils.deleteRoot(); + testZkUtils.close(); + testZkUtils = null; + } + + @AfterClass + public static void teardown() { + zkServer.teardown(); + } + + @Test + public void testZkBarrierForVersionUpgrade() { + ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime(); + ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer); + String ver = "1"; + List<String> processors = new ArrayList<String>(); + processors.add("p1"); + processors.add("p2"); + + class Status { + boolean p1 = false; + boolean p2 = false; + } + final Status s = new Status(); + + barrier.startBarrier(ver, processors); + + barrier.waitForBarrier(ver, "p1", new Runnable() { + @Override + public void run() { + s.p1 = true; + } + }); + + barrier.waitForBarrier(ver, "p2", new Runnable() { + @Override + public void run() { + s.p2 = true; + } + }); + + Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2, 2, 100)); + } + + @Test + public void testNegativeZkBarrierForVersionUpgrade() { + ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime(); + ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer); + String ver = "1"; + List<String> processors = new ArrayList<String>(); + processors.add("p1"); + processors.add("p2"); + processors.add("p3"); + + class Status { + boolean p1 = false; + boolean p2 = false; + boolean p3 = false; + } + final Status s = new Status(); + + barrier.startBarrier(ver, processors); + + barrier.waitForBarrier(ver, "p1", new Runnable() { + @Override + public void run() { + s.p1 = true; + } + }); + + barrier.waitForBarrier(ver, "p2", new Runnable() { + @Override + public void run() { + s.p2 = true; + } + }); + + Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 100)); + + } + + + private ZkUtils getZkUtilsWithNewClient() { + ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS); + return new ZkUtils( + KEY_BUILDER, + ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS), + CONNECTION_TIMEOUT_MS); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/4d7b3b35/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index 6342fde..bfda464 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -18,6 +18,11 @@ */ package org.apache.samza.zk; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkNodeExistsException; @@ -30,12 +35,6 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when;
