Repository: samza Updated Branches: refs/heads/master cf4872e2a -> 603af35a5
SAMZA-1689: Add validations before state transitions in ZkBarrierForVersionUpgrade. Prevent invalid state updations on barrier. * Introduced a additional barrier state NEW. * Add state validations before updating the barrier. * Fix existing TestZkBarrier tests that are disabled and add new tests to verify the intended behavior. Author: Shanthoosh Venkataraman <[email protected]> Author: Shanthoosh Venkataraman <[email protected]> Reviewers: Jagadish <[email protected]> Closes #490 from shanthoosh/fix_barrier_state_transitions Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/603af35a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/603af35a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/603af35a Branch: refs/heads/master Commit: 603af35a526a1b241067a5a3d10d8d49d7ac3d58 Parents: cf4872e Author: Shanthoosh Venkataraman <[email protected]> Authored: Fri Apr 27 18:55:15 2018 -0700 Committer: Jagadish <[email protected]> Committed: Fri Apr 27 18:55:15 2018 -0700 ---------------------------------------------------------------------- .../samza/zk/ZkBarrierForVersionUpgrade.java | 113 ++++++-- .../zk/TestZkBarrierForVersionUpgrade.java | 272 ++++++++++--------- 2 files changed, 222 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/603af35a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java index abea299..63f9120 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java @@ -19,11 +19,12 @@ package org.apache.samza.zk; +import com.google.common.collect.ImmutableList; +import java.util.Objects; import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -41,6 +42,25 @@ import java.util.Optional; * with value {@link org.apache.samza.zk.ZkBarrierForVersionUpgrade.State#TIMED_OUT} and indicates to everyone that it * is no longer valid. * + * + * Describes the lifecycle of a barrier. + * <pre> + * + * When expected participants join + * Leader ---< NEW ---------------------------------------- < DONE + * | barrier within barrierTimeOut. + * | + * | + * | + * | + * | + * | When expected participants doesn't + * | ----------------------------------------- < TIMED_OUT + * join barrier within barrierTimeOut. + * + * </pre> + * + * * The caller can listen to events associated with the barrier by registering a {@link ZkBarrierListener}. * * Zk Tree Reference: @@ -48,7 +68,7 @@ import java.util.Optional; * | * |- barrier_{version1}/ * | |- barrier_state/ - * | | ([DONE|TIMED_OUT]) + * | | ([NEW|DONE|TIMED_OUT]) * | |- barrier_participants/ * | | |- {id1} * | | |- {id2} @@ -61,9 +81,19 @@ public class ZkBarrierForVersionUpgrade { private final Optional<ZkBarrierListener> barrierListenerOptional; public enum State { - TIMED_OUT, DONE - } + NEW("NEW"), TIMED_OUT("TIMED_OUT"), DONE("DONE"); + + private String str; + State(String str) { + this.str = str; + } + + @Override + public String toString() { + return str; + } + } public ZkBarrierForVersionUpgrade(String barrierRoot, ZkUtils zkUtils, ZkBarrierListener barrierListener) { if (zkUtils == null) { @@ -81,16 +111,19 @@ public class ZkBarrierForVersionUpgrade { * @param participants List of expected participated for this barrier to complete */ public void create(final String version, List<String> participants) { + LOG.info(String.format("Creating barrier with version: %s, participants: %s.", version, participants)); String barrierRoot = keyBuilder.getBarrierRoot(); String barrierParticipantsPath = keyBuilder.getBarrierParticipantsPath(version); + String barrierStatePath = keyBuilder.getBarrierStatePath(version); zkUtils.validatePaths(new String[]{ barrierRoot, keyBuilder.getBarrierPath(version), barrierParticipantsPath, - keyBuilder.getBarrierStatePath(version)}); + barrierStatePath}); + LOG.info("Marking the barrier state: {} as {}.", barrierStatePath, State.NEW); + zkUtils.writeData(barrierStatePath, State.NEW); - // subscribe for participant's list changes - LOG.info("Subscribing for child changes at " + barrierParticipantsPath); + LOG.info("Subscribing child changes on the path: {} for barrier version: {}.", barrierParticipantsPath, version); zkUtils.subscribeChildChanges(barrierParticipantsPath, new ZkBarrierChangeHandler(version, participants, zkUtils)); barrierListenerOptional.ifPresent(zkBarrierListener -> zkBarrierListener.onBarrierCreated(version)); @@ -103,8 +136,10 @@ public class ZkBarrierForVersionUpgrade { * @param participantId Identifier of the participant */ public void join(String version, String participantId) { - String barrierDonePath = keyBuilder.getBarrierStatePath(version); - zkUtils.subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, version, zkUtils)); + LOG.info("Joining the barrier version: {} as participant: {}.", version, participantId); + String barrierStatePath = keyBuilder.getBarrierStatePath(version); + LOG.info("Subscribing data changes on the path: {} for barrier version: {}.", barrierStatePath, version); + zkUtils.subscribeDataChanges(barrierStatePath, new ZkBarrierReachedHandler(barrierStatePath, version, zkUtils)); // TODO: Handle ZkNodeExistsException - SAMZA-1304 zkUtils.getZkClient().createPersistent( @@ -117,41 +152,54 @@ public class ZkBarrierForVersionUpgrade { * @param version Version associated with the Barrier */ public void expire(String version) { - zkUtils.writeData(keyBuilder.getBarrierStatePath(version), State.TIMED_OUT); - + String barrierStatePath = keyBuilder.getBarrierStatePath(version); + State barrierState = zkUtils.getZkClient().readData(barrierStatePath); + if (Objects.equals(barrierState, State.NEW)) { + LOG.info(String.format("Expiring the barrier version: %s. Marking the barrier state: %s as %s.", version, barrierStatePath, State.TIMED_OUT)); + zkUtils.writeData(keyBuilder.getBarrierStatePath(version), State.TIMED_OUT); + } else { + LOG.debug(String.format("Barrier version: %s is at: %s state. Not marking barrier as %s.", version, barrierState, State.TIMED_OUT)); + } } + /** * Listener for changes to the list of participants. It is meant to be subscribed only by the creator of the barrier * node. It checks to see when the barrier is ready to be marked as completed. */ class ZkBarrierChangeHandler extends ZkUtils.GenIZkChildListener { private final String barrierVersion; - private final List<String> names; + private final List<String> expectedParticipantIds; - public ZkBarrierChangeHandler(String barrierVersion, List<String> names, ZkUtils zkUtils) { + public ZkBarrierChangeHandler(String barrierVersion, List<String> expectedParticipantIds, ZkUtils zkUtils) { super(zkUtils, "ZkBarrierChangeHandler"); this.barrierVersion = barrierVersion; - this.names = names; + this.expectedParticipantIds = expectedParticipantIds; } @Override - public void handleChildChange(String parentPath, List<String> currentChildren) { + public void handleChildChange(String barrierParticipantPath, List<String> participantIds) { if (notAValidEvent()) { return; } - if (currentChildren == null) { - LOG.info("Got ZkBarrierChangeHandler handleChildChange with null currentChildren"); + if (participantIds == null) { + LOG.info("Received notification with null participants for barrier: {}. Ignoring it.", barrierParticipantPath); return; } - LOG.info("list of children in the barrier = " + parentPath + ":" + Arrays.toString(currentChildren.toArray())); - LOG.info("list of children to compare against = " + parentPath + ":" + Arrays.toString(names.toArray())); + LOG.info(String.format("Current participants in barrier version: %s = %s.", barrierVersion, participantIds)); + LOG.info(String.format("Expected participants in barrier version: %s = %s.", barrierVersion, expectedParticipantIds)); // check if all the expected participants are in - if (currentChildren.size() == names.size() && CollectionUtils.containsAll(currentChildren, names)) { - String barrierDonePath = keyBuilder.getBarrierStatePath(barrierVersion); - LOG.info("Writing BARRIER DONE to " + barrierDonePath); - zkUtils.writeData(barrierDonePath, State.DONE); // this will trigger notifications - zkUtils.unsubscribeChildChanges(barrierDonePath, this); + if (participantIds.size() == expectedParticipantIds.size() && CollectionUtils.containsAll(participantIds, expectedParticipantIds)) { + String barrierStatePath = keyBuilder.getBarrierStatePath(barrierVersion); + State barrierState = zkUtils.getZkClient().readData(barrierStatePath); + if (Objects.equals(barrierState, State.NEW)) { + LOG.info(String.format("Expected participants has joined the barrier version: %s. Marking the barrier state: %s as %s.", barrierVersion, barrierStatePath, State.DONE)); + zkUtils.writeData(barrierStatePath, State.DONE); // this will trigger notifications + } else { + LOG.debug(String.format("Barrier version: %s is at: %s state. Not marking barrier as %s.", barrierVersion, barrierState, State.DONE)); + } + LOG.info("Unsubscribing child changes on the path: {} for barrier version: {}.", barrierParticipantPath, barrierVersion); + zkUtils.unsubscribeChildChanges(barrierParticipantPath, this); } } } @@ -174,18 +222,23 @@ public class ZkBarrierForVersionUpgrade { @Override public void handleDataChange(String dataPath, Object data) { - LOG.info("got notification about barrier " + barrierStatePath + "; done=" + data); + LOG.info(String.format("Received barrierState change notification for barrier version: %s from zkNode: %s with data: %s.", barrierVersion, dataPath, data)); if (notAValidEvent()) return; - zkUtils.unsubscribeDataChanges(barrierStatePath, this); - barrierListenerOptional.ifPresent( - zkBarrierListener -> zkBarrierListener.onBarrierStateChanged(barrierVersion, (State) data)); + State barrierState = (State) data; + List<State> expectedBarrierStates = ImmutableList.of(State.DONE, State.TIMED_OUT); + + if (barrierState != null && expectedBarrierStates.contains(barrierState)) { + zkUtils.unsubscribeDataChanges(barrierStatePath, this); + barrierListenerOptional.ifPresent(zkBarrierListener -> zkBarrierListener.onBarrierStateChanged(barrierVersion, (State) data)); + } else { + LOG.debug("Barrier version: {} is at state: {}. Ignoring the barrierState change notification.", barrierVersion, barrierState); + } } @Override - public void handleDataDeleted(String dataPath) - throws Exception { + public void handleDataDeleted(String dataPath) { LOG.warn("barrier done node got deleted at " + dataPath); if (notAValidEvent()) return; http://git-wip-us.apache.org/repos/asf/samza/blob/603af35a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java index 7689901..011794d 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java @@ -18,38 +18,42 @@ */ package org.apache.samza.zk; -import junit.framework.Assert; +import com.google.common.collect.ImmutableList; import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.samza.config.ZkConfig; import org.apache.samza.testUtils.EmbeddedZookeeper; import org.apache.samza.util.NoOpMetricsRegistry; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -//import org.junit.After; -//import org.junit.AfterClass; -//import org.junit.Before; -//import org.junit.BeforeClass; -//import org.junit.Test; +import org.apache.samza.zk.ZkBarrierForVersionUpgrade.State; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import static junit.framework.Assert.*; + // TODO: Rename this such that it is clear that it is an integration test and NOT unit test public class TestZkBarrierForVersionUpgrade { + private static final String BARRIER_VERSION = "1"; private static EmbeddedZookeeper zkServer = null; private static String testZkConnectionString = null; private ZkUtils zkUtils; private ZkUtils zkUtils1; - //@BeforeClass + @BeforeClass public static void test() { zkServer = new EmbeddedZookeeper(); zkServer.setup(); - testZkConnectionString = "127.0.0.1:" + zkServer.getPort(); + testZkConnectionString = String.format("127.0.0.1:%d", zkServer.getPort()); } - //@Before + @Before public void testSetup() { ZkClient zkClient = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS); this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry()); @@ -57,68 +61,60 @@ public class TestZkBarrierForVersionUpgrade { this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry()); } - //@After + @After public void testTearDown() { zkUtils.close(); zkUtils1.close(); } - //@AfterClass + @AfterClass public static void teardown() { zkServer.teardown(); } - //@Test - public void testZkBarrierForVersionUpgrade() { - String barrierId = zkUtils.getKeyBuilder().getRootPath() + "/b1"; - String ver = "1"; - List<String> processors = new ArrayList<>(); - processors.add("p1"); - processors.add("p2"); - final CountDownLatch latch = new CountDownLatch(2); - final AtomicInteger stateChangedCalled = new AtomicInteger(0); - - ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, new ZkBarrierListener() { - @Override - public void onBarrierCreated(String version) { - } + static class TestZkBarrierListener implements ZkBarrierListener { - @Override - public void onBarrierStateChanged(String version, ZkBarrierForVersionUpgrade.State state) { - if (state.equals(ZkBarrierForVersionUpgrade.State.DONE)) { - latch.countDown(); - stateChangedCalled.incrementAndGet(); - } - } + private final CountDownLatch stateChangedLatch; + private final State expectedState; + + TestZkBarrierListener(CountDownLatch stateChangedLatch, State expectedState) { + this.stateChangedLatch = stateChangedLatch; + this.expectedState = expectedState; + } - @Override - public void onBarrierError(String version, Throwable t) { + @Override + public void onBarrierCreated(String version) {} + @Override + public void onBarrierStateChanged(String version, State state) { + if (state.equals(expectedState)) { + stateChangedLatch.countDown(); } - }); + } - processor1Barrier.create(ver, processors); - processor1Barrier.join(ver, "p1"); + @Override + public void onBarrierError(String version, Throwable t) {} + } - ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, new ZkBarrierListener() { - @Override - public void onBarrierCreated(String version) { - } + @Test + public void testZkBarrierForVersionUpgrade() { + String barrierId = String.format("%s/%s", zkUtils1.getKeyBuilder().getRootPath(), RandomStringUtils.randomAlphabetic(4)); - @Override - public void onBarrierStateChanged(String version, ZkBarrierForVersionUpgrade.State state) { - if (state.equals(ZkBarrierForVersionUpgrade.State.DONE)) { - latch.countDown(); - stateChangedCalled.incrementAndGet(); - } - } + List<String> processors = ImmutableList.of("p1", "p2"); - @Override - public void onBarrierError(String version, Throwable t) { + CountDownLatch latch = new CountDownLatch(2); + TestZkBarrierListener listener = new TestZkBarrierListener(latch, State.DONE); - } - }); - processor2Barrier.join(ver, "p2"); + ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener); + ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener); + + processor1Barrier.create(BARRIER_VERSION, processors); + + State barrierState = zkUtils.getZkClient().readData(barrierId + "/barrier_1/barrier_state"); + assertEquals(State.NEW, barrierState); + + processor1Barrier.join(BARRIER_VERSION, "p1"); + processor2Barrier.join(BARRIER_VERSION, "p2"); boolean result = false; try { @@ -126,97 +122,107 @@ public class TestZkBarrierForVersionUpgrade { } catch (InterruptedException e) { e.printStackTrace(); } - Assert.assertTrue("Barrier failed to complete within test timeout.", result); + assertTrue("Barrier failed to complete within test timeout.", result); + + List<String> children = zkUtils.getZkClient().getChildren(barrierId + "/barrier_1/barrier_participants"); + barrierState = zkUtils.getZkClient().readData(barrierId + "/barrier_1/barrier_state"); + assertEquals(State.DONE, barrierState); + assertNotNull(children); + assertEquals("Unexpected barrier state. Didn't find two processors.", 2, children.size()); + assertEquals("Unexpected barrier state. Didn't find the expected members.", processors, children); + } + + @Test + public void testZkBarrierForVersionUpgradeWithTimeOut() { + String barrierId = String.format("%s/%s", zkUtils1.getKeyBuilder().getRootPath(), RandomStringUtils.randomAlphabetic(4)); + List<String> processors = ImmutableList.of("p1", "p2", "p3"); + + CountDownLatch latch = new CountDownLatch(2); + TestZkBarrierListener listener = new TestZkBarrierListener(latch, State.TIMED_OUT); + + ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener); + ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener); + processor1Barrier.create(BARRIER_VERSION, processors); + + processor1Barrier.join(BARRIER_VERSION, "p1"); + processor2Barrier.join(BARRIER_VERSION, "p2"); + + processor1Barrier.expire(BARRIER_VERSION); + boolean result = false; try { - List<String> children = zkUtils.getZkClient().getChildren(barrierId + "/barrier_v1/barrier_participants"); - Assert.assertNotNull(children); - Assert.assertEquals("Unexpected barrier state. Didn't find two processors.", 2, children.size()); - Assert.assertEquals("Unexpected barrier state. Didn't find the expected members.", processors, children); - } catch (Exception e) { - // no-op + result = latch.await(10000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); } - Assert.assertEquals(2, stateChangedCalled.get()); + assertTrue("Barrier Timeout test failed to complete within test timeout.", result); + + List<String> children = zkUtils.getZkClient().getChildren(barrierId + "/barrier_1/barrier_participants"); + State barrierState = zkUtils.getZkClient().readData(barrierId + "/barrier_1/barrier_state"); + assertEquals(State.TIMED_OUT, barrierState); + assertNotNull(children); + assertEquals("Unexpected barrier state. Didn't find two processors.", 2, children.size()); + assertEquals("Unexpected barrier state. Didn't find the expected members.", ImmutableList.of("p1", "p2"), children); } - //@Test - public void testZkBarrierForVersionUpgradeWithTimeOut() { - String barrierId = zkUtils1.getKeyBuilder().getRootPath() + "/barrierTimeout"; - String ver = "1"; - List<String> processors = new ArrayList<>(); - processors.add("p1"); - processors.add("p2"); - processors.add("p3"); // Simply to prevent barrier from completion for testing purposes - - final AtomicInteger timeoutStateChangeCalled = new AtomicInteger(0); - final CountDownLatch latch = new CountDownLatch(2); - final ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade( - barrierId, - zkUtils, - new ZkBarrierListener() { - @Override - public void onBarrierCreated(String version) { - } - - @Override - public void onBarrierStateChanged(String version, ZkBarrierForVersionUpgrade.State state) { - if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) { - timeoutStateChangeCalled.incrementAndGet(); - latch.countDown(); - } - } - - @Override - public void onBarrierError(String version, Throwable t) { - - } - - }); - processor1Barrier.create(ver, processors); - processor1Barrier.join(ver, "p1"); - - final ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade( - barrierId, - zkUtils1, - new ZkBarrierListener() { - @Override - public void onBarrierCreated(String version) { - } - - @Override - public void onBarrierStateChanged(String version, ZkBarrierForVersionUpgrade.State state) { - if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) { - timeoutStateChangeCalled.incrementAndGet(); - latch.countDown(); - } - } - - @Override - public void onBarrierError(String version, Throwable t) { - - } - - }); - - processor2Barrier.join(ver, "p2"); - - processor1Barrier.expire(ver); + @Test + public void testShouldDiscardBarrierUpdateEventsAfterABarrierIsMarkedAsDone() { + String barrierId = String.format("%s/%s", zkUtils1.getKeyBuilder().getRootPath(), RandomStringUtils.randomAlphabetic(4)); + List<String> processors = ImmutableList.of("p1", "p2"); + + CountDownLatch latch = new CountDownLatch(2); + TestZkBarrierListener listener = new TestZkBarrierListener(latch, State.DONE); + ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener); + ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener); + + processor1Barrier.create(BARRIER_VERSION, processors); + + processor1Barrier.join(BARRIER_VERSION, "p1"); + processor2Barrier.join(BARRIER_VERSION, "p2"); + boolean result = false; try { result = latch.await(10000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } - Assert.assertTrue("Barrier Timeout test failed to complete within test timeout.", result); + assertTrue("Barrier Timeout test failed to complete within test timeout.", result); + + processor1Barrier.expire(BARRIER_VERSION); + + State barrierState = zkUtils.getZkClient().readData(barrierId + "/barrier_1/barrier_state"); + assertEquals(State.DONE, barrierState); + } + + @Test + public void testShouldDiscardBarrierUpdateEventsAfterABarrierIsMarkedAsTimedOut() { + String barrierId = String.format("%s/%s", zkUtils1.getKeyBuilder().getRootPath(), RandomStringUtils.randomAlphabetic(4)); + List<String> processors = ImmutableList.of("p1", "p2", "p3"); + CountDownLatch latch = new CountDownLatch(2); + TestZkBarrierListener listener = new TestZkBarrierListener(latch, State.TIMED_OUT); + ZkBarrierForVersionUpgrade processor1Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener); + ZkBarrierForVersionUpgrade processor2Barrier = new ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener); + + processor1Barrier.create(BARRIER_VERSION, processors); + + processor1Barrier.join(BARRIER_VERSION, "p1"); + processor2Barrier.join(BARRIER_VERSION, "p2"); + + processor1Barrier.expire(BARRIER_VERSION); + + boolean result = false; try { - List<String> children = zkUtils.getZkClient().getChildren(barrierId + "/barrier_v1/barrier_participants"); - Assert.assertNotNull(children); - Assert.assertEquals("Unexpected barrier state. Didn't find two processors.", 2, children.size()); - Assert.assertEquals("Unexpected barrier state. Didn't find the expected members.", processors, children); - } catch (Exception e) { - // no-op + result = latch.await(10000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); } - Assert.assertEquals(2, timeoutStateChangeCalled.get()); + assertTrue("Barrier Timeout test failed to complete within test timeout.", result); + + + processor1Barrier.join(BARRIER_VERSION, "p3"); + + State barrierState = zkUtils.getZkClient().readData(barrierId + "/barrier_1/barrier_state"); + assertEquals(State.TIMED_OUT, barrierState); } }
