Repository: samza
Updated Branches:
  refs/heads/master 02153fa50 -> ebdb5b2f7


SAMZA-1723: Schedule barrier changes on the debounce thread

In existing implementation, `ZkBarrierChangeHandler` is executed from the 
`ZkEventThread` and has following drawbacks:
* `ZkWatch` events are buffered into a in-memory queue(maintained by ZkClient) 
and delivered one at a time to ZkClient listener implementations. If the 
exeuction of a delivered `ZkWatch` event is in progress, then no other 
`ZkWatch` event will be delivered to the listeners. If `ZkBarrierChangeHandler` 
is executed from `ZkEventThread`,  any increase in processing latency will 
delay the delivery of other `ZkWatch` events(buffered in in-memory queue of 
ZkClient).
* During session expiration(zkConnection error scenario), buffering all events 
into `ScheduleAfterDebounceTime` helps us to garbage collect older generation 
events(to ensure correctness and not execute older generation `ZkWatch` events).

Author: Shanthoosh Venkataraman <[email protected]>
Author: Shanthoosh Venkataraman <[email protected]>

Reviewers: Jagadish <[email protected]>

Closes #526 from shanthoosh/schedule_barrier_change_in_debounce_thread


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ebdb5b2f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ebdb5b2f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ebdb5b2f

Branch: refs/heads/master
Commit: ebdb5b2f712250f1f3aba92aa2fd033aded690e9
Parents: 02153fa
Author: Shanthoosh Venkataraman <[email protected]>
Authored: Mon May 21 11:15:51 2018 -0700
Committer: Jagadish <[email protected]>
Committed: Mon May 21 11:15:51 2018 -0700

----------------------------------------------------------------------
 .../samza/zk/ZkBarrierForVersionUpgrade.java    | 32 ++++++++++++--------
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  5 +--
 .../zk/TestZkBarrierForVersionUpgrade.java      | 20 ++++++------
 .../java/org/apache/samza/zk/TestZkUtils.java   |  2 +-
 4 files changed, 31 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/ebdb5b2f/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index 63f9120..a2ed823 100644
--- 
a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ 
b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -75,10 +75,11 @@ import java.util.Optional;
  *  |   |   |-  ...
  */
 public class ZkBarrierForVersionUpgrade {
-  private final static Logger LOG = 
LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class);
   private final ZkUtils zkUtils;
   private final BarrierKeyBuilder keyBuilder;
   private final Optional<ZkBarrierListener> barrierListenerOptional;
+  private final ScheduleAfterDebounceTime debounceTimer;
 
   public enum State {
     NEW("NEW"), TIMED_OUT("TIMED_OUT"), DONE("DONE");
@@ -95,13 +96,14 @@ public class ZkBarrierForVersionUpgrade {
     }
   }
 
-  public ZkBarrierForVersionUpgrade(String barrierRoot, ZkUtils zkUtils, 
ZkBarrierListener barrierListener) {
+  public ZkBarrierForVersionUpgrade(String barrierRoot, ZkUtils zkUtils, 
ZkBarrierListener barrierListener, ScheduleAfterDebounceTime debounceTimer) {
     if (zkUtils == null) {
       throw new RuntimeException("Cannot operate ZkBarrierForVersionUpgrade 
without ZkUtils.");
     }
     this.zkUtils = zkUtils;
     this.keyBuilder = new BarrierKeyBuilder(barrierRoot);
     this.barrierListenerOptional = Optional.ofNullable(barrierListener);
+    this.debounceTimer = debounceTimer;
   }
 
   /**
@@ -167,11 +169,13 @@ public class ZkBarrierForVersionUpgrade {
    * node. It checks to see when the barrier is ready to be marked as 
completed.
    */
   class ZkBarrierChangeHandler extends ZkUtils.GenIZkChildListener {
+    private static final String ACTION_NAME = "ZkBarrierChangeHandler";
+
     private final String barrierVersion;
     private final List<String> expectedParticipantIds;
 
     public ZkBarrierChangeHandler(String barrierVersion, List<String> 
expectedParticipantIds, ZkUtils zkUtils) {
-      super(zkUtils, "ZkBarrierChangeHandler");
+      super(zkUtils, ACTION_NAME);
       this.barrierVersion = barrierVersion;
       this.expectedParticipantIds = expectedParticipantIds;
     }
@@ -190,16 +194,18 @@ public class ZkBarrierForVersionUpgrade {
 
       // check if all the expected participants are in
       if (participantIds.size() == expectedParticipantIds.size() && 
CollectionUtils.containsAll(participantIds, expectedParticipantIds)) {
-        String barrierStatePath = 
keyBuilder.getBarrierStatePath(barrierVersion);
-        State barrierState = zkUtils.getZkClient().readData(barrierStatePath);
-        if (Objects.equals(barrierState, State.NEW)) {
-          LOG.info(String.format("Expected participants has joined the barrier 
version: %s. Marking the barrier state: %s as %s.", barrierVersion, 
barrierStatePath, State.DONE));
-          zkUtils.writeData(barrierStatePath, State.DONE); // this will 
trigger notifications
-        } else {
-          LOG.debug(String.format("Barrier version: %s is at: %s state. Not 
marking barrier as %s.", barrierVersion, barrierState, State.DONE));
-        }
-        LOG.info("Unsubscribing child changes on the path: {} for barrier 
version: {}.", barrierParticipantPath, barrierVersion);
-        zkUtils.unsubscribeChildChanges(barrierParticipantPath, this);
+        debounceTimer.scheduleAfterDebounceTime(ACTION_NAME, 0, () -> {
+            String barrierStatePath = 
keyBuilder.getBarrierStatePath(barrierVersion);
+            State barrierState = 
zkUtils.getZkClient().readData(barrierStatePath);
+            if (Objects.equals(barrierState, State.NEW)) {
+              LOG.info(String.format("Expected participants has joined the 
barrier version: %s. Marking the barrier state: %s as %s.", barrierVersion, 
barrierStatePath, State.DONE));
+              zkUtils.writeData(barrierStatePath, State.DONE); // this will 
trigger notifications
+            } else {
+              LOG.debug(String.format("Barrier version: %s is at: %s state. 
Not marking barrier as %s.", barrierVersion, barrierState, State.DONE));
+            }
+            LOG.info("Unsubscribing child changes on the path: {} for barrier 
version: {}.", barrierParticipantPath, barrierVersion);
+            zkUtils.unsubscribeChildChanges(barrierParticipantPath, this);
+          });
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/ebdb5b2f/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 74abf55..23fb3b0 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -118,10 +118,6 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
     leaderElector = new ZkLeaderElector(processorId, zkUtils);
     leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
     this.zkController = new ZkControllerImpl(processorId, zkUtils, this, 
leaderElector);
-    this.barrier =  new ZkBarrierForVersionUpgrade(
-        zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(),
-        zkUtils,
-        new ZkBarrierListenerImpl());
     this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
     this.reporters = MetricsReporterLoader.getMetricsReporters(new 
MetricsConfig(config), processorId);
     debounceTimer = new ScheduleAfterDebounceTime(processorId);
@@ -129,6 +125,7 @@ public class ZkJobCoordinator implements JobCoordinator, 
ZkControllerListener {
         LOG.error("Received exception in debounce timer! Stopping the job 
coordinator", throwable);
         stop();
       });
+    this.barrier =  new 
ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(),
 zkUtils, new ZkBarrierListenerImpl(), debounceTimer);
     systemAdmins = new SystemAdmins(config);
     streamMetadataCache = new StreamMetadataCache(systemAdmins, 
METADATA_CACHE_TTL_MS, SystemClock.instance());
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/ebdb5b2f/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index 011794d..bd84b57 100644
--- 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -37,10 +37,10 @@ import org.junit.Test;
 
 import static junit.framework.Assert.*;
 
-
-// TODO: Rename this such that it is clear that it is an integration test and 
NOT unit test
 public class TestZkBarrierForVersionUpgrade {
   private static final String BARRIER_VERSION = "1";
+
+  private final ScheduleAfterDebounceTime debounceTimer = new 
ScheduleAfterDebounceTime("TEST_PROCESSOR_ID");
   private static EmbeddedZookeeper zkServer = null;
   private static String testZkConnectionString = null;
   private ZkUtils zkUtils;
@@ -105,8 +105,8 @@ public class TestZkBarrierForVersionUpgrade {
     CountDownLatch latch = new CountDownLatch(2);
     TestZkBarrierListener listener = new TestZkBarrierListener(latch, 
State.DONE);
 
-    ZkBarrierForVersionUpgrade processor1Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener);
-    ZkBarrierForVersionUpgrade processor2Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener);
+    ZkBarrierForVersionUpgrade processor1Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener, debounceTimer);
+    ZkBarrierForVersionUpgrade processor2Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener, debounceTimer);
 
     processor1Barrier.create(BARRIER_VERSION, processors);
 
@@ -140,8 +140,8 @@ public class TestZkBarrierForVersionUpgrade {
     CountDownLatch latch = new CountDownLatch(2);
     TestZkBarrierListener listener = new TestZkBarrierListener(latch, 
State.TIMED_OUT);
 
-    ZkBarrierForVersionUpgrade processor1Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener);
-    ZkBarrierForVersionUpgrade processor2Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener);
+    ZkBarrierForVersionUpgrade processor1Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener, debounceTimer);
+    ZkBarrierForVersionUpgrade processor2Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener, debounceTimer);
 
     processor1Barrier.create(BARRIER_VERSION, processors);
 
@@ -172,8 +172,8 @@ public class TestZkBarrierForVersionUpgrade {
 
     CountDownLatch latch = new CountDownLatch(2);
     TestZkBarrierListener listener = new TestZkBarrierListener(latch, 
State.DONE);
-    ZkBarrierForVersionUpgrade processor1Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener);
-    ZkBarrierForVersionUpgrade processor2Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener);
+    ZkBarrierForVersionUpgrade processor1Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener, debounceTimer);
+    ZkBarrierForVersionUpgrade processor2Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener, debounceTimer);
 
     processor1Barrier.create(BARRIER_VERSION, processors);
 
@@ -201,8 +201,8 @@ public class TestZkBarrierForVersionUpgrade {
 
     CountDownLatch latch = new CountDownLatch(2);
     TestZkBarrierListener listener = new TestZkBarrierListener(latch, 
State.TIMED_OUT);
-    ZkBarrierForVersionUpgrade processor1Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener);
-    ZkBarrierForVersionUpgrade processor2Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener);
+    ZkBarrierForVersionUpgrade processor1Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener, debounceTimer);
+    ZkBarrierForVersionUpgrade processor2Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener, debounceTimer);
 
     processor1Barrier.create(BARRIER_VERSION, processors);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/ebdb5b2f/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index ee523aa..1d6ff86 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -330,7 +330,7 @@ public class TestZkUtils {
   public void testCleanUpZkBarrierVersion() {
     String root = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix();
     zkUtils.getZkClient().createPersistent(root, true);
-    ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(root, 
zkUtils, null);
+    ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(root, 
zkUtils, null, null);
     for (int i = 200; i < 210; i++) {
       barrier.create(String.valueOf(i), new ArrayList<>(Arrays.asList(i + "a", 
i + "b", i + "c")));
     }

Reply via email to