Repository: samza Updated Branches: refs/heads/master 1c27ba88a -> 1bb737a8d
SAMZA-1134:Simplify barrier for zk version upgrade. see SAMZA-1134 for details. Author: Boris Shkolnik <[email protected]> Author: Boris Shkolnik <[email protected]> Reviewers: Navina Ramesh <[email protected]> Closes #81 from sborya/SimplifyBarrierTO Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1bb737a8 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1bb737a8 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1bb737a8 Branch: refs/heads/master Commit: 1bb737a8d049a815927000766f85620f8aec657b Parents: 1c27ba8 Author: Boris Shkolnik <[email protected]> Authored: Thu Mar 23 14:25:53 2017 -0700 Committer: navina <[email protected]> Committed: Thu Mar 23 14:25:53 2017 -0700 ---------------------------------------------------------------------- .../samza/zk/BarrierForVersionUpgrade.java | 12 +-- .../samza/zk/ZkBarrierForVersionUpgrade.java | 90 ++++++++++---------- .../org/apache/samza/zk/ZkJobCoordinator.java | 9 +- .../zk/TestZkBarrierForVersionUpgrade.java | 40 +++++---- .../apache/samza/zk/TestZkLeaderElector.java | 4 +- 5 files changed, 78 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1bb737a8/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java index 2b785f0..553e730 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java +++ b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java @@ -19,9 +19,6 @@ package org.apache.samza.zk; -import java.util.List; - - /** * Interface for a barrier - to allow synchronization between different processors to switch to a newly published * JobModel. @@ -29,19 +26,16 @@ import java.util.List; public interface BarrierForVersionUpgrade { /** * Barrier is usually started by the leader. - * @param version - for which the barrier is started. - * @param processorsNames - list of processors available at the time of the JobModel generation. */ - void start(String version, List<String> processorsNames); + void start(); /** * Called by the processor. * Updates the processor readiness to use the new version and wait on the barrier, until all other processors * joined. * The call is async. The callback will be invoked when the barrier is reached. - * @param version of the jobModel this barrier is protecting. - * @param processorsName as it appears in the list of processors. + * @param thisProcessorsName as it appears in the list of processors. * @param callback will be invoked, when barrier is reached. */ - void waitForBarrier(String version, String processorsName, Runnable callback); + void waitForBarrier(String thisProcessorsName, Runnable callback); } http://git-wip-us.apache.org/repos/asf/samza/blob/1bb737a8/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java index f7efa48..d0332ab 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java @@ -23,12 +23,21 @@ import java.util.Arrays; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; +import org.I0Itec.zkclient.exception.ZkBadVersionException; import org.apache.commons.collections4.CollectionUtils; +import org.apache.samza.SamzaException; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * This class creates a barrier for version upgrade. + * Barrier is started by the participant responsible for the upgrade. (start()) + * Each participant will mark its readiness and register for a notification when the barrier is reached. (waitFor()) + * If a timer (started in start()) goes off before the barrier is reached, all the participants will unsubscribe + * from the notification and the barrier becomes invalid. + */ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { private final ZkUtils zkUtils; private final ZkKeyBuilder keyBuilder; @@ -40,13 +49,28 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { private final ScheduleAfterDebounceTime debounceTimer; private final String barrierPrefix; - - public ZkBarrierForVersionUpgrade(ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer) { + private final String barrierPath; + private final String barrierDonePath; + private final String barrierProcessors; + private final String version; + private final List<String> processorsNames; + private static final String VERSION_UPGRADE_TIMEOUT_TIMER = "VersionUpgradeTimeout"; + + public ZkBarrierForVersionUpgrade(ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer, String version, List<String> processorsNames) { this.zkUtils = zkUtils; keyBuilder = zkUtils.getKeyBuilder(); barrierPrefix = keyBuilder.getJobModelVersionBarrierPrefix(); this.debounceTimer = debounceTimer; + + barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); + barrierDonePath = String.format("%s/barrier_done", barrierPath); + barrierProcessors = String.format("%s/barrier_processors", barrierPath); + + this.version = version; + this.processorsNames = processorsNames; + + zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath}); } /** @@ -54,60 +78,43 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { * @param version for which the barrier is created * @param timeout - time in ms to wait */ - public void setTimer(String version, long timeout) { - debounceTimer.scheduleAfterDebounceTime("VersionUpgradeTimeout", timeout, ()->timerOff(version)); + private void setTimer(final String version, final long timeout, final Stat currentStatOfBarrierDone) { + debounceTimer.scheduleAfterDebounceTime(VERSION_UPGRADE_TIMEOUT_TIMER, timeout, ()->timerOff(version, currentStatOfBarrierDone)); } protected long getBarrierTimeOutMs() { return BARRIER_TIMED_OUT_MS; } - private void timerOff(String version) { - // check if barrier has finished - final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); - final String barrierDonePath = String.format("%s/barrier_done", barrierPath); - Stat stat = new Stat(); - String done = zkUtils.getZkClient().<String>readData(barrierDonePath, stat); - if (done != null && done.equals(BARRIER_DONE)) - return; //nothing to do - - while (true) { - try { - // write a new value if no one else did, if the value was changed since previous reading - retry - zkUtils.getZkClient().writeData(barrierDonePath, "TIMED_OUT", stat.getVersion()); - return; - } catch (Exception e) { - // failed to write, try read/write again - LOG.info("Barrier timeout write failed"); - done = zkUtils.getZkClient().<String>readData(barrierDonePath, stat); - if (done.equals(BARRIER_DONE)) - return; //nothing to do + private void timerOff(final String version, final Stat currentStatOfBarrierDone) { + try { + // write a new value "TIMED_OUT", if the value was changed since previous value, make sure it was changed to "DONE" + zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_TIMED_OUT, currentStatOfBarrierDone.getVersion()); + } catch (ZkBadVersionException e) { + // failed to write, make sure the value is "DONE" + LOG.warn("Barrier timeout write failed"); + String done = zkUtils.getZkClient().<String>readData(barrierDonePath); + if (!done.equals(BARRIER_DONE)) { + throw new SamzaException("Failed to write to the barrier_done, version=" + version, e); } } } @Override - public void start(String version, List<String> processorsNames) { - final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); - final String barrierDonePath = String.format("%s/barrier_done", barrierPath); - final String barrierProcessors = String.format("%s/barrier_processors", barrierPath); - - zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath}); + public void start() { // subscribe for processor's list changes LOG.info("Subscribing for child changes at " + barrierProcessors); - zkUtils.getZkClient().subscribeChildChanges(barrierProcessors, - new ZkBarrierChangeHandler(version, processorsNames)); + zkUtils.getZkClient().subscribeChildChanges(barrierProcessors, new ZkBarrierChangeHandler(version, processorsNames)); - setTimer(version, getBarrierTimeOutMs()); + // create a timer for time-out + Stat currentStatOfBarrierDone = new Stat(); + zkUtils.getZkClient().readData(barrierDonePath, currentStatOfBarrierDone); + setTimer(version, getBarrierTimeOutMs(), currentStatOfBarrierDone); } @Override - public void waitForBarrier(String version, String processorsName, Runnable callback) { - // if participant makes this call it means it has already stopped the old container and got the new job model. - final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); - final String barrierDonePath = String.format("%s/barrier_done", barrierPath); - final String barrierProcessors = String.format("%s/barrier_processors", barrierPath); + public void waitForBarrier(String processorsName, Runnable callback) { final String barrierProcessorThis = String.format("%s/%s", barrierProcessors, processorsName); // update the barrier for this processor @@ -137,15 +144,12 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { LOG.info("Got handleChildChange with null currentChildren"); return; } - LOG.info("list of children in the barrier = " + parentPath + ":" + Arrays.toString(currentChildren.toArray())); LOG.info("list of children to compare against = " + parentPath + ":" + Arrays.toString(names.toArray())); // check if all the names are in if (CollectionUtils.containsAll(names, currentChildren)) { LOG.info("ALl nodes reached the barrier"); - final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version); - final String barrierDonePath = String.format("%s/barrier_done", barrierPath); LOG.info("Writing BARRIER DONE to " + barrierDonePath); zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE); } @@ -173,8 +177,8 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade { debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, callback); } else if (done.equals(BARRIER_TIMED_OUT)) { // timed out - LOG.error("Barrier for " + dataPath + " timed out"); - System.out.println("Barrier for " + dataPath + " timed out"); + LOG.warn("Barrier for " + dataPath + " timed out"); + LOG.info("Barrier for " + dataPath + " timed out"); zkUtils.unsubscribeDataChanges(barrierPathDone, this); } // we do not need to resubscribe because, ZkClient library does it for us. http://git-wip-us.apache.org/repos/asf/samza/blob/1bb737a8/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 944c438..9e5dd84 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -49,7 +49,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { private final int processorId; private final ZkController zkController; private final SamzaContainerController containerController; - private final BarrierForVersionUpgrade barrier; + private BarrierForVersionUpgrade barrier; private final ScheduleAfterDebounceTime debounceTimer; private final StreamMetadataCache streamMetadataCache; private final ZkKeyBuilder keyBuilder; @@ -68,8 +68,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { this.zkController = new ZkControllerImpl(String.valueOf(processorId), zkUtils, debounceTimer, this); this.config = config; - - barrier = new ZkBarrierForVersionUpgrade(zkUtils, debounceTimer); streamMetadataCache = getStreamMetadataCache(); } @@ -154,7 +152,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { String zkProcessorId = keyBuilder.parseIdFromPath(currentPath); // update ZK and wait for all the processors to get this new version - barrier.waitForBarrier(version, String.valueOf(zkProcessorId), new Runnable() { + barrier.waitForBarrier(String.valueOf(zkProcessorId), new Runnable() { @Override public void run() { onNewJobModelConfirmed(version); @@ -210,7 +208,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion + ";jm=" + jobModel); // start the barrier for the job model update - barrier.start(nextJMVersion, currentProcessors); + barrier = new ZkBarrierForVersionUpgrade(zkUtils, debounceTimer, nextJMVersion, currentProcessors); + barrier.start(); // publish new JobModel version zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion); http://git-wip-us.apache.org/repos/asf/samza/blob/1bb737a8/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java index f26d4d0..a1af782 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java @@ -69,28 +69,29 @@ public class TestZkBarrierForVersionUpgrade { @Test public void testZkBarrierForVersionUpgrade() { ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime(); - ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer); String ver = "1"; List<String> processors = new ArrayList<String>(); processors.add("p1"); processors.add("p2"); + ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer, ver, processors); + class Status { boolean p1 = false; boolean p2 = false; } final Status s = new Status(); - barrier.start(ver, processors); + barrier.start(); - barrier.waitForBarrier(ver, "p1", new Runnable() { + barrier.waitForBarrier("p1", new Runnable() { @Override public void run() { s.p1 = true; } }); - barrier.waitForBarrier(ver, "p2", new Runnable() { + barrier.waitForBarrier("p2", new Runnable() { @Override public void run() { s.p2 = true; @@ -103,13 +104,14 @@ public class TestZkBarrierForVersionUpgrade { @Test public void testNegativeZkBarrierForVersionUpgrade() { ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime(); - ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer); String ver = "1"; List<String> processors = new ArrayList<String>(); processors.add("p1"); processors.add("p2"); processors.add("p3"); + ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer, ver, processors); + class Status { boolean p1 = false; boolean p2 = false; @@ -117,16 +119,16 @@ public class TestZkBarrierForVersionUpgrade { } final Status s = new Status(); - barrier.start(ver, processors); + barrier.start(); - barrier.waitForBarrier(ver, "p1", new Runnable() { + barrier.waitForBarrier("p1", new Runnable() { @Override public void run() { s.p1 = true; } }); - barrier.waitForBarrier(ver, "p2", new Runnable() { + barrier.waitForBarrier("p2", new Runnable() { @Override public void run() { s.p2 = true; @@ -139,18 +141,20 @@ public class TestZkBarrierForVersionUpgrade { @Test public void testZkBarrierForVersionUpgradeWithTimeOut() { ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime(); - ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer) { - @Override - protected long getBarrierTimeOutMs() { - return 200; - } - }; + String ver = "1"; List<String> processors = new ArrayList<String>(); processors.add("p1"); processors.add("p2"); processors.add("p3"); + ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer, ver, processors) { + @Override + protected long getBarrierTimeOutMs() { + return 200; + } + }; + class Status { boolean p1 = false; boolean p2 = false; @@ -158,16 +162,16 @@ public class TestZkBarrierForVersionUpgrade { } final Status s = new Status(); - barrier.start(ver, processors); + barrier.start(); - barrier.waitForBarrier(ver, "p1", new Runnable() { + barrier.waitForBarrier("p1", new Runnable() { @Override public void run() { s.p1 = true; } }); - barrier.waitForBarrier(ver, "p2", new Runnable() { + barrier.waitForBarrier("p2", new Runnable() { @Override public void run() { s.p2 = true; @@ -175,7 +179,7 @@ public class TestZkBarrierForVersionUpgrade { }); // this node will join "too late" - barrier.waitForBarrier(ver, "p3", new Runnable() { + barrier.waitForBarrier("p3", new Runnable() { @Override public void run() { TestZkUtils.sleepMs(300); http://git-wip-us.apache.org/repos/asf/samza/blob/1bb737a8/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index 7813da2..b48bc70 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -68,8 +68,8 @@ public class TestZkLeaderElector { // Do nothing } } - - public static class BooleanResult { + // used in the callbacks + private static class BooleanResult { public boolean res = false; }
