Repository: samza
Updated Branches:
  refs/heads/master 1c27ba88a -> 1bb737a8d


SAMZA-1134:Simplify barrier for zk version upgrade.

see SAMZA-1134 for details.

Author: Boris Shkolnik <[email protected]>
Author: Boris Shkolnik <[email protected]>

Reviewers: Navina Ramesh <[email protected]>

Closes #81 from sborya/SimplifyBarrierTO


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1bb737a8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1bb737a8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1bb737a8

Branch: refs/heads/master
Commit: 1bb737a8d049a815927000766f85620f8aec657b
Parents: 1c27ba8
Author: Boris Shkolnik <[email protected]>
Authored: Thu Mar 23 14:25:53 2017 -0700
Committer: navina <[email protected]>
Committed: Thu Mar 23 14:25:53 2017 -0700

----------------------------------------------------------------------
 .../samza/zk/BarrierForVersionUpgrade.java      | 12 +--
 .../samza/zk/ZkBarrierForVersionUpgrade.java    | 90 ++++++++++----------
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  9 +-
 .../zk/TestZkBarrierForVersionUpgrade.java      | 40 +++++----
 .../apache/samza/zk/TestZkLeaderElector.java    |  4 +-
 5 files changed, 78 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1bb737a8/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java 
b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
index 2b785f0..553e730 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
@@ -19,9 +19,6 @@
 
 package org.apache.samza.zk;
 
-import java.util.List;
-
-
 /**
  * Interface for a barrier - to allow synchronization between different 
processors to switch to a newly published
  * JobModel.
@@ -29,19 +26,16 @@ import java.util.List;
 public interface BarrierForVersionUpgrade {
   /**
    * Barrier is usually started by the leader.
-   * @param version - for which the barrier is started.
-   * @param processorsNames - list of processors available at the time of the 
JobModel generation.
    */
-  void start(String version, List<String> processorsNames);
+  void start();
 
   /**
    * Called by the processor.
    * Updates the processor readiness to use the new version and wait on the 
barrier, until all other processors
    * joined.
    * The call is async. The callback will be invoked when the barrier is 
reached.
-   * @param version of the jobModel this barrier is protecting.
-   * @param processorsName as it appears in the list of processors.
+   * @param thisProcessorsName as it appears in the list of processors.
    * @param callback  will be invoked, when barrier is reached.
    */
-  void waitForBarrier(String version, String processorsName, Runnable 
callback);
+  void waitForBarrier(String thisProcessorsName, Runnable callback);
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/1bb737a8/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index f7efa48..d0332ab 100644
--- 
a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ 
b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -23,12 +23,21 @@ import java.util.Arrays;
 import java.util.List;
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkBadVersionException;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.samza.SamzaException;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
+/**
+ * This class creates a barrier for version upgrade.
+ * Barrier is started by the participant responsible for the upgrade. (start())
+ * Each participant will mark its readiness and register for a notification 
when the barrier is reached. (waitFor())
+ * If a timer (started in start()) goes off before the barrier is reached, all 
the participants will unsubscribe
+ * from the notification and the barrier becomes invalid.
+ */
 public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
   private final ZkUtils zkUtils;
   private final ZkKeyBuilder keyBuilder;
@@ -40,13 +49,28 @@ public class ZkBarrierForVersionUpgrade implements 
BarrierForVersionUpgrade {
   private final ScheduleAfterDebounceTime debounceTimer;
 
   private final String barrierPrefix;
-
-  public ZkBarrierForVersionUpgrade(ZkUtils zkUtils, ScheduleAfterDebounceTime 
debounceTimer) {
+  private final String barrierPath;
+  private final String barrierDonePath;
+  private final String barrierProcessors;
+  private final String version;
+  private final List<String> processorsNames;
+  private static final String VERSION_UPGRADE_TIMEOUT_TIMER = 
"VersionUpgradeTimeout";
+
+  public ZkBarrierForVersionUpgrade(ZkUtils zkUtils, ScheduleAfterDebounceTime 
debounceTimer, String version, List<String> processorsNames) {
     this.zkUtils = zkUtils;
     keyBuilder = zkUtils.getKeyBuilder();
 
     barrierPrefix = keyBuilder.getJobModelVersionBarrierPrefix();
     this.debounceTimer = debounceTimer;
+
+    barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+    barrierDonePath = String.format("%s/barrier_done", barrierPath);
+    barrierProcessors = String.format("%s/barrier_processors", barrierPath);
+
+    this.version = version;
+    this.processorsNames = processorsNames;
+
+    zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, 
barrierPath, barrierProcessors, barrierDonePath});
   }
 
   /**
@@ -54,60 +78,43 @@ public class ZkBarrierForVersionUpgrade implements 
BarrierForVersionUpgrade {
    * @param version for which the barrier is created
    * @param timeout - time in ms to wait
    */
-  public void setTimer(String version, long timeout) {
-    debounceTimer.scheduleAfterDebounceTime("VersionUpgradeTimeout", timeout, 
()->timerOff(version));
+  private void setTimer(final String version, final long timeout, final Stat 
currentStatOfBarrierDone) {
+    debounceTimer.scheduleAfterDebounceTime(VERSION_UPGRADE_TIMEOUT_TIMER, 
timeout, ()->timerOff(version, currentStatOfBarrierDone));
   }
 
   protected long getBarrierTimeOutMs() {
     return BARRIER_TIMED_OUT_MS;
   }
 
-  private void timerOff(String version) {
-    // check if barrier has finished
-    final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, 
version);
-    final String barrierDonePath = String.format("%s/barrier_done", 
barrierPath);
-    Stat stat = new Stat();
-    String done = zkUtils.getZkClient().<String>readData(barrierDonePath, 
stat);
-    if (done != null && done.equals(BARRIER_DONE))
-      return; //nothing to do
-
-    while (true) {
-      try {
-        // write a new value if no one else did, if the value was changed 
since previous reading - retry
-        zkUtils.getZkClient().writeData(barrierDonePath, "TIMED_OUT", 
stat.getVersion());
-        return;
-      } catch (Exception e) {
-        // failed to write, try read/write again
-        LOG.info("Barrier timeout write failed");
-        done = zkUtils.getZkClient().<String>readData(barrierDonePath, stat);
-        if (done.equals(BARRIER_DONE))
-          return; //nothing to do
+  private void timerOff(final String version, final Stat 
currentStatOfBarrierDone) {
+    try {
+      // write a new value "TIMED_OUT", if the value was changed since 
previous value, make sure it was changed to "DONE"
+      zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_TIMED_OUT, 
currentStatOfBarrierDone.getVersion());
+    } catch (ZkBadVersionException e) {
+      // failed to write, make sure the value is "DONE"
+      LOG.warn("Barrier timeout write failed");
+      String done = zkUtils.getZkClient().<String>readData(barrierDonePath);
+      if (!done.equals(BARRIER_DONE)) {
+        throw new SamzaException("Failed to write to the barrier_done, 
version=" + version, e);
       }
     }
   }
 
   @Override
-  public void start(String version, List<String> processorsNames) {
-    final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, 
version);
-    final String barrierDonePath = String.format("%s/barrier_done", 
barrierPath);
-    final String barrierProcessors = String.format("%s/barrier_processors", 
barrierPath);
-
-    zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, 
barrierPath, barrierProcessors, barrierDonePath});
+  public void start() {
 
     // subscribe for processor's list changes
     LOG.info("Subscribing for child changes at " + barrierProcessors);
-    zkUtils.getZkClient().subscribeChildChanges(barrierProcessors,
-        new ZkBarrierChangeHandler(version, processorsNames));
+    zkUtils.getZkClient().subscribeChildChanges(barrierProcessors, new 
ZkBarrierChangeHandler(version, processorsNames));
 
-    setTimer(version, getBarrierTimeOutMs());
+    // create a timer for time-out
+    Stat currentStatOfBarrierDone = new Stat();
+    zkUtils.getZkClient().readData(barrierDonePath, currentStatOfBarrierDone);
+    setTimer(version, getBarrierTimeOutMs(), currentStatOfBarrierDone);
   }
 
   @Override
-  public void waitForBarrier(String version, String processorsName, Runnable 
callback) {
-    // if participant makes this call it means it has already stopped the old 
container and got the new job model.
-    final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, 
version);
-    final String barrierDonePath = String.format("%s/barrier_done", 
barrierPath);
-    final String barrierProcessors = String.format("%s/barrier_processors", 
barrierPath);
+  public void waitForBarrier(String processorsName, Runnable callback) {
     final String barrierProcessorThis = String.format("%s/%s", 
barrierProcessors, processorsName);
 
     // update the barrier for this processor
@@ -137,15 +144,12 @@ public class ZkBarrierForVersionUpgrade implements 
BarrierForVersionUpgrade {
         LOG.info("Got handleChildChange with null currentChildren");
         return;
       }
-
       LOG.info("list of children in the barrier = " + parentPath + ":" + 
Arrays.toString(currentChildren.toArray()));
       LOG.info("list of children to compare against = " + parentPath + ":" + 
Arrays.toString(names.toArray()));
 
       // check if all the names are in
       if (CollectionUtils.containsAll(names, currentChildren)) {
         LOG.info("ALl nodes reached the barrier");
-        final String barrierPath = String.format("%s/barrier_%s", 
barrierPrefix, version);
-        final String barrierDonePath = String.format("%s/barrier_done", 
barrierPath);
         LOG.info("Writing BARRIER DONE to " + barrierDonePath);
         zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE);
       }
@@ -173,8 +177,8 @@ public class ZkBarrierForVersionUpgrade implements 
BarrierForVersionUpgrade {
         
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE,
 0, callback);
       } else if (done.equals(BARRIER_TIMED_OUT)) {
         // timed out
-        LOG.error("Barrier for " + dataPath + " timed out");
-        System.out.println("Barrier for " + dataPath + " timed out");
+        LOG.warn("Barrier for " + dataPath + " timed out");
+        LOG.info("Barrier for " + dataPath + " timed out");
         zkUtils.unsubscribeDataChanges(barrierPathDone, this);
       }
       // we do not need to resubscribe because, ZkClient library does it for 
us.

http://git-wip-us.apache.org/repos/asf/samza/blob/1bb737a8/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 944c438..9e5dd84 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -49,7 +49,7 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
   private final int processorId;
   private final ZkController zkController;
   private final SamzaContainerController containerController;
-  private final BarrierForVersionUpgrade barrier;
+  private BarrierForVersionUpgrade barrier;
   private final ScheduleAfterDebounceTime debounceTimer;
   private final StreamMetadataCache  streamMetadataCache;
   private final ZkKeyBuilder keyBuilder;
@@ -68,8 +68,6 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     this.zkController = new ZkControllerImpl(String.valueOf(processorId), 
zkUtils, debounceTimer, this);
     this.config = config;
 
-
-    barrier = new ZkBarrierForVersionUpgrade(zkUtils, debounceTimer);
     streamMetadataCache = getStreamMetadataCache();
   }
 
@@ -154,7 +152,7 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     String zkProcessorId = keyBuilder.parseIdFromPath(currentPath);
 
     // update ZK and wait for all the processors to get this new version
-    barrier.waitForBarrier(version, String.valueOf(zkProcessorId), new 
Runnable() {
+    barrier.waitForBarrier(String.valueOf(zkProcessorId), new Runnable() {
       @Override
       public void run() {
         onNewJobModelConfirmed(version);
@@ -210,7 +208,8 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     log.info("pid=" + processorId + "published new JobModel ver=" + 
nextJMVersion + ";jm=" + jobModel);
 
     // start the barrier for the job model update
-    barrier.start(nextJMVersion, currentProcessors);
+    barrier = new ZkBarrierForVersionUpgrade(zkUtils, debounceTimer, 
nextJMVersion, currentProcessors);
+    barrier.start();
 
     // publish new JobModel version
     zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);

http://git-wip-us.apache.org/repos/asf/samza/blob/1bb737a8/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index f26d4d0..a1af782 100644
--- 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -69,28 +69,29 @@ public class TestZkBarrierForVersionUpgrade {
   @Test
   public void testZkBarrierForVersionUpgrade() {
     ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
-    ZkBarrierForVersionUpgrade barrier = new 
ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer);
     String ver = "1";
     List<String> processors = new ArrayList<String>();
     processors.add("p1");
     processors.add("p2");
 
+    ZkBarrierForVersionUpgrade barrier = new 
ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer, ver, processors);
+
     class Status {
       boolean p1 = false;
       boolean p2 = false;
     }
     final Status s = new Status();
 
-    barrier.start(ver, processors);
+    barrier.start();
 
-    barrier.waitForBarrier(ver, "p1", new Runnable() {
+    barrier.waitForBarrier("p1", new Runnable() {
       @Override
       public void run() {
         s.p1 = true;
       }
     });
 
-    barrier.waitForBarrier(ver, "p2", new Runnable() {
+    barrier.waitForBarrier("p2", new Runnable() {
       @Override
       public void run() {
         s.p2 = true;
@@ -103,13 +104,14 @@ public class TestZkBarrierForVersionUpgrade {
   @Test
   public void testNegativeZkBarrierForVersionUpgrade() {
     ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
-    ZkBarrierForVersionUpgrade barrier = new 
ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer);
     String ver = "1";
     List<String> processors = new ArrayList<String>();
     processors.add("p1");
     processors.add("p2");
     processors.add("p3");
 
+    ZkBarrierForVersionUpgrade barrier = new 
ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer, ver, processors);
+
     class Status {
       boolean p1 = false;
       boolean p2 = false;
@@ -117,16 +119,16 @@ public class TestZkBarrierForVersionUpgrade {
     }
     final Status s = new Status();
 
-    barrier.start(ver, processors);
+    barrier.start();
 
-    barrier.waitForBarrier(ver, "p1", new Runnable() {
+    barrier.waitForBarrier("p1", new Runnable() {
       @Override
       public void run() {
         s.p1 = true;
       }
     });
 
-    barrier.waitForBarrier(ver, "p2", new Runnable() {
+    barrier.waitForBarrier("p2", new Runnable() {
       @Override
       public void run() {
         s.p2 = true;
@@ -139,18 +141,20 @@ public class TestZkBarrierForVersionUpgrade {
   @Test
   public void testZkBarrierForVersionUpgradeWithTimeOut() {
     ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
-    ZkBarrierForVersionUpgrade barrier = new 
ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer) {
-      @Override
-      protected long getBarrierTimeOutMs() {
-        return 200;
-      }
-    };
+
     String ver = "1";
     List<String> processors = new ArrayList<String>();
     processors.add("p1");
     processors.add("p2");
     processors.add("p3");
 
+    ZkBarrierForVersionUpgrade barrier = new 
ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer, ver, processors) {
+      @Override
+      protected long getBarrierTimeOutMs() {
+        return 200;
+      }
+    };
+
     class Status {
       boolean p1 = false;
       boolean p2 = false;
@@ -158,16 +162,16 @@ public class TestZkBarrierForVersionUpgrade {
     }
     final Status s = new Status();
 
-    barrier.start(ver, processors);
+    barrier.start();
 
-    barrier.waitForBarrier(ver, "p1", new Runnable() {
+    barrier.waitForBarrier("p1", new Runnable() {
       @Override
       public void run() {
         s.p1 = true;
       }
     });
 
-    barrier.waitForBarrier(ver, "p2", new Runnable() {
+    barrier.waitForBarrier("p2", new Runnable() {
       @Override
       public void run() {
         s.p2 = true;
@@ -175,7 +179,7 @@ public class TestZkBarrierForVersionUpgrade {
     });
 
     // this node will join "too late"
-    barrier.waitForBarrier(ver, "p3", new Runnable() {
+    barrier.waitForBarrier("p3", new Runnable() {
       @Override
       public void run() {
         TestZkUtils.sleepMs(300);

http://git-wip-us.apache.org/repos/asf/samza/blob/1bb737a8/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 7813da2..b48bc70 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -68,8 +68,8 @@ public class TestZkLeaderElector {
       // Do nothing
     }
   }
-
-  public static class BooleanResult {
+  // used in the callbacks
+  private static class BooleanResult {
     public boolean res = false;
   }
 

Reply via email to