Repository: samza Updated Branches: refs/heads/master 02153fa50 -> ebdb5b2f7
SAMZA-1723: Schedule barrier changes on the debounce thread In existing implementation, `ZkBarrierChangeHandler` is executed from the `ZkEventThread` and has following drawbacks: * `ZkWatch` events are buffered into a in-memory queue(maintained by ZkClient) and delivered one at a time to ZkClient listener implementations. If the exeuction of a delivered `ZkWatch` event is in progress, then no other `ZkWatch` event will be delivered to the listeners. If `ZkBarrierChangeHandler` is executed from `ZkEventThread`, any increase in processing latency will delay the delivery of other `ZkWatch` events(buffered in in-memory queue of ZkClient). * During session expiration(zkConnection error scenario), buffering all events into `ScheduleAfterDebounceTime` helps us to garbage collect older generation events(to ensure correctness and not execute older generation `ZkWatch` events). Author: Shanthoosh Venkataraman <[email protected]> Author: Shanthoosh Venkataraman <[email protected]> Reviewers: Jagadish <[email protected]> Closes #526 from shanthoosh/schedule_barrier_change_in_debounce_thread Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/ebdb5b2f Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/ebdb5b2f Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/ebdb5b2f Branch: refs/heads/master Commit: ebdb5b2f712250f1f3aba92aa2fd033aded690e9 Parents: 02153fa Author: Shanthoosh Venkataraman <[email protected]> Authored: Mon May 21 11:15:51 2018 -0700 Committer: Jagadish <[email protected]> Committed: Mon May 21 11:15:51 2018 -0700 ---------------------------------------------------------------------- .../samza/zk/ZkBarrierForVersionUpgrade.java | 32 ++++++++++++-------- .../org/apache/samza/zk/ZkJobCoordinator.java | 5 +-- .../zk/TestZkBarrierForVersionUpgrade.java | 20 ++++++------ .../java/org/apache/samza/zk/TestZkUtils.java | 2 +- 4 files changed, 31 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/ebdb5b2f/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java index 63f9120..a2ed823 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java @@ -75,10 +75,11 @@ import java.util.Optional; * | | |- ... */ public class ZkBarrierForVersionUpgrade { - private final static Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class); + private static final Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class); private final ZkUtils zkUtils; private final BarrierKeyBuilder keyBuilder; private final Optional<ZkBarrierListener> barrierListenerOptional; + private final ScheduleAfterDebounceTime debounceTimer; public enum State { NEW("NEW"), TIMED_OUT("TIMED_OUT"), DONE("DONE"); @@ -95,13 +96,14 @@ public class ZkBarrierForVersionUpgrade { } } - public ZkBarrierForVersionUpgrade(String barrierRoot, ZkUtils zkUtils, ZkBarrierListener barrierListener) { + public ZkBarrierForVersionUpgrade(String barrierRoot, ZkUtils zkUtils, ZkBarrierListener barrierListener, ScheduleAfterDebounceTime debounceTimer) { if (zkUtils == null) { throw new RuntimeException("Cannot operate ZkBarrierForVersionUpgrade without ZkUtils."); } this.zkUtils = zkUtils; this.keyBuilder = new BarrierKeyBuilder(barrierRoot); this.barrierListenerOptional = Optional.ofNullable(barrierListener); + this.debounceTimer = debounceTimer; } /** @@ -167,11 +169,13 @@ public class ZkBarrierForVersionUpgrade { * node. It checks to see when the barrier is ready to be marked as completed. */ class ZkBarrierChangeHandler extends ZkUtils.GenIZkChildListener { + private static final String ACTION_NAME = "ZkBarrierChangeHandler"; + private final String barrierVersion; private final List<String> expectedParticipantIds; public ZkBarrierChangeHandler(String barrierVersion, List<String> expectedParticipantIds, ZkUtils zkUtils) { - super(zkUtils, "ZkBarrierChangeHandler"); + super(zkUtils, ACTION_NAME); this.barrierVersion = barrierVersion; this.expectedParticipantIds = expectedParticipantIds; } @@ -190,16 +194,18 @@ public class ZkBarrierForVersionUpgrade { // check if all the expected participants are in if (participantIds.size() == expectedParticipantIds.size() && CollectionUtils.containsAll(participantIds, expectedParticipantIds)) { - String barrierStatePath = keyBuilder.getBarrierStatePath(barrierVersion); - State barrierState = zkUtils.getZkClient().readData(barrierStatePath); - if (Objects.equals(barrierState, State.NEW)) { - LOG.info(String.format("Expected participants has joined the barrier version: %s. Marking the barrier state: %s as %s.", barrierVersion, barrierStatePath, State.DONE)); - zkUtils.writeData(barrierStatePath, State.DONE); // this will trigger notifications - } else { - LOG.debug(String.format("Barrier version: %s is at: %s state. Not marking barrier as %s.", barrierVersion, barrierState, State.DONE)); - } - LOG.info("Unsubscribing child changes on the path: {} for barrier version: {}.", barrierParticipantPath, barrierVersion); - zkUtils.unsubscribeChildChanges(barrierParticipantPath, this); + debounceTimer.scheduleAfterDebounceTime(ACTION_NAME, 0, () -> { + String barrierStatePath = keyBuilder.getBarrierStatePath(barrierVersion); + State barrierState = zkUtils.getZkClient().readData(barrierStatePath); + if (Objects.equals(barrierState, State.NEW)) { + LOG.info(String.format("Expected participants has joined the barrier version: %s. Marking the barrier state: %s as %s.", barrierVersion, barrierStatePath, State.DONE)); + zkUtils.writeData(barrierStatePath, State.DONE); // this will trigger notifications + } else { + LOG.debug(String.format("Barrier version: %s is at: %s state. Not marking barrier as %s.", barrierVersion, barrierState, State.DONE)); + } + LOG.info("Unsubscribing child changes on the path: {} for barrier version: {}.", barrierParticipantPath, barrierVersion); + zkUtils.unsubscribeChildChanges(barrierParticipantPath, this); + }); } } } http://git-wip-us.apache.org/repos/asf/samza/blob/ebdb5b2f/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java index 74abf55..23fb3b0 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java @@ -118,10 +118,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { leaderElector = new ZkLeaderElector(processorId, zkUtils); leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl()); this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector); - this.barrier = new ZkBarrierForVersionUpgrade( - zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), - zkUtils, - new ZkBarrierListenerImpl()); this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs(); this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId); debounceTimer = new ScheduleAfterDebounceTime(processorId); @@ -129,6 +125,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener { LOG.error("Received exception in debounce timer! Stopping the job coordinator", throwable); stop(); }); + this.barrier = new ZkBarrierForVersionUpgrade(zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(), zkUtils, new ZkBarrierListenerImpl(), debounceTimer); systemAdmins = new SystemAdmins(config); streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance()); } http://git-wip-us.apache.org/repos/asf/samza/blob/ebdb5b2f/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java index 011794d..bd84b57 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java @@ -37,10 +37,10 @@ import org.junit.Test; import static junit.framework.Assert.*; - -// TODO: Rename this such that it is clear that it is an integration test and NOT unit test public class TestZkBarrierForVersionUpgrade { private static final String BARRIER_VERSION = "1"; + + private final ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime("TEST_PROCESSOR_ID"); private static EmbeddedZookeeper zkServer = null; private static String testZkConnectionString = null; private ZkUtils zkUtils; @@ -105,8 +105,8 @@ public class TestZkBarrierForVersionUpgrade { CountDownLatch latch = new CountDownLatch(2); TestZkBarrierListener listener = new TestZkBarrierListener(latch, State.DONE); - ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener); - ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener); + ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener, debounceTimer); + ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener, debounceTimer); processor1Barrier.create(BARRIER_VERSION, processors); @@ -140,8 +140,8 @@ public class TestZkBarrierForVersionUpgrade { CountDownLatch latch = new CountDownLatch(2); TestZkBarrierListener listener = new TestZkBarrierListener(latch, State.TIMED_OUT); - ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener); - ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener); + ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener, debounceTimer); + ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener, debounceTimer); processor1Barrier.create(BARRIER_VERSION, processors); @@ -172,8 +172,8 @@ public class TestZkBarrierForVersionUpgrade { CountDownLatch latch = new CountDownLatch(2); TestZkBarrierListener listener = new TestZkBarrierListener(latch, State.DONE); - ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener); - ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener); + ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener, debounceTimer); + ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener, debounceTimer); processor1Barrier.create(BARRIER_VERSION, processors); @@ -201,8 +201,8 @@ public class TestZkBarrierForVersionUpgrade { CountDownLatch latch = new CountDownLatch(2); TestZkBarrierListener listener = new TestZkBarrierListener(latch, State.TIMED_OUT); - ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener); - ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener); + ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener, debounceTimer); + ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener, debounceTimer); processor1Barrier.create(BARRIER_VERSION, processors); http://git-wip-us.apache.org/repos/asf/samza/blob/ebdb5b2f/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index ee523aa..1d6ff86 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -330,7 +330,7 @@ public class TestZkUtils { public void testCleanUpZkBarrierVersion() { String root = zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(); zkUtils.getZkClient().createPersistent(root, true); - ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(root, zkUtils, null); + ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(root, zkUtils, null, null); for (int i = 200; i < 210; i++) { barrier.create(String.valueOf(i), new ArrayList<>(Arrays.asList(i + "a", i + "b", i + "c"))); }
