Repository: samza
Updated Branches:
  refs/heads/master cf4872e2a -> 603af35a5


SAMZA-1689: Add validations before state transitions in 
ZkBarrierForVersionUpgrade.

Prevent invalid state updations on barrier.
* Introduced a additional barrier state NEW.
* Add state validations before updating the barrier.
* Fix existing TestZkBarrier tests that are disabled and add new tests
  to verify the intended behavior.

Author: Shanthoosh Venkataraman <[email protected]>
Author: Shanthoosh Venkataraman <[email protected]>

Reviewers: Jagadish <[email protected]>

Closes #490 from shanthoosh/fix_barrier_state_transitions


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/603af35a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/603af35a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/603af35a

Branch: refs/heads/master
Commit: 603af35a526a1b241067a5a3d10d8d49d7ac3d58
Parents: cf4872e
Author: Shanthoosh Venkataraman <[email protected]>
Authored: Fri Apr 27 18:55:15 2018 -0700
Committer: Jagadish <[email protected]>
Committed: Fri Apr 27 18:55:15 2018 -0700

----------------------------------------------------------------------
 .../samza/zk/ZkBarrierForVersionUpgrade.java    | 113 ++++++--
 .../zk/TestZkBarrierForVersionUpgrade.java      | 272 ++++++++++---------
 2 files changed, 222 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/603af35a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index abea299..63f9120 100644
--- 
a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ 
b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -19,11 +19,12 @@
 
 package org.apache.samza.zk;
 
+import com.google.common.collect.ImmutableList;
+import java.util.Objects;
 import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
 
@@ -41,6 +42,25 @@ import java.util.Optional;
  * with value {@link 
org.apache.samza.zk.ZkBarrierForVersionUpgrade.State#TIMED_OUT} and indicates 
to everyone that it
  * is no longer valid.
  *
+ *
+ * Describes the lifecycle of a barrier.
+ * <pre>
+ *
+ *                               When expected participants join
+ *      Leader    ---&lt; NEW ---------------------------------------- &lt; 
DONE
+ *                         |      barrier within barrierTimeOut.
+ *                         |
+ *                         |
+ *                         |
+ *                         |
+ *                         |
+ *                         |    When expected participants doesn't
+ *                         | ----------------------------------------- &lt; 
TIMED_OUT
+ *                              join barrier within barrierTimeOut.
+ *
+ * </pre>
+ *
+ *
  * The caller can listen to events associated with the barrier by registering 
a {@link ZkBarrierListener}.
  *
  * Zk Tree Reference:
@@ -48,7 +68,7 @@ import java.util.Optional;
  *  |
  *  |- barrier_{version1}/
  *  |   |- barrier_state/
- *  |   |  ([DONE|TIMED_OUT])
+ *  |   |  ([NEW|DONE|TIMED_OUT])
  *  |   |- barrier_participants/
  *  |   |   |- {id1}
  *  |   |   |- {id2}
@@ -61,9 +81,19 @@ public class ZkBarrierForVersionUpgrade {
   private final Optional<ZkBarrierListener> barrierListenerOptional;
 
   public enum State {
-    TIMED_OUT, DONE
-  }
+    NEW("NEW"), TIMED_OUT("TIMED_OUT"), DONE("DONE");
+
+    private String str;
 
+    State(String str) {
+      this.str = str;
+    }
+
+    @Override
+    public String toString() {
+      return str;
+    }
+  }
 
   public ZkBarrierForVersionUpgrade(String barrierRoot, ZkUtils zkUtils, 
ZkBarrierListener barrierListener) {
     if (zkUtils == null) {
@@ -81,16 +111,19 @@ public class ZkBarrierForVersionUpgrade {
    * @param participants List of expected participated for this barrier to 
complete
    */
   public void create(final String version, List<String> participants) {
+    LOG.info(String.format("Creating barrier with version: %s, participants: 
%s.", version, participants));
     String barrierRoot = keyBuilder.getBarrierRoot();
     String barrierParticipantsPath = 
keyBuilder.getBarrierParticipantsPath(version);
+    String barrierStatePath = keyBuilder.getBarrierStatePath(version);
     zkUtils.validatePaths(new String[]{
         barrierRoot,
         keyBuilder.getBarrierPath(version),
         barrierParticipantsPath,
-        keyBuilder.getBarrierStatePath(version)});
+        barrierStatePath});
+    LOG.info("Marking the barrier state: {} as {}.", barrierStatePath, 
State.NEW);
+    zkUtils.writeData(barrierStatePath, State.NEW);
 
-    // subscribe for participant's list changes
-    LOG.info("Subscribing for child changes at " + barrierParticipantsPath);
+    LOG.info("Subscribing child changes on the path: {} for barrier version: 
{}.", barrierParticipantsPath, version);
     zkUtils.subscribeChildChanges(barrierParticipantsPath, new 
ZkBarrierChangeHandler(version, participants, zkUtils));
 
     barrierListenerOptional.ifPresent(zkBarrierListener -> 
zkBarrierListener.onBarrierCreated(version));
@@ -103,8 +136,10 @@ public class ZkBarrierForVersionUpgrade {
    * @param participantId Identifier of the participant
    */
   public void join(String version, String participantId) {
-    String barrierDonePath = keyBuilder.getBarrierStatePath(version);
-    zkUtils.subscribeDataChanges(barrierDonePath, new 
ZkBarrierReachedHandler(barrierDonePath, version, zkUtils));
+    LOG.info("Joining the barrier version: {} as participant: {}.", version, 
participantId);
+    String barrierStatePath = keyBuilder.getBarrierStatePath(version);
+    LOG.info("Subscribing data changes on the path: {} for barrier version: 
{}.", barrierStatePath, version);
+    zkUtils.subscribeDataChanges(barrierStatePath, new 
ZkBarrierReachedHandler(barrierStatePath, version, zkUtils));
 
     // TODO: Handle ZkNodeExistsException - SAMZA-1304
     zkUtils.getZkClient().createPersistent(
@@ -117,41 +152,54 @@ public class ZkBarrierForVersionUpgrade {
    * @param version Version associated with the Barrier
    */
   public void expire(String version) {
-    zkUtils.writeData(keyBuilder.getBarrierStatePath(version), 
State.TIMED_OUT);
-
+    String barrierStatePath = keyBuilder.getBarrierStatePath(version);
+    State barrierState = zkUtils.getZkClient().readData(barrierStatePath);
+    if (Objects.equals(barrierState, State.NEW)) {
+      LOG.info(String.format("Expiring the barrier version: %s. Marking the 
barrier state: %s as %s.", version, barrierStatePath, State.TIMED_OUT));
+      zkUtils.writeData(keyBuilder.getBarrierStatePath(version), 
State.TIMED_OUT);
+    } else {
+      LOG.debug(String.format("Barrier version: %s is at: %s state. Not 
marking barrier as %s.", version, barrierState, State.TIMED_OUT));
+    }
   }
+
   /**
    * Listener for changes to the list of participants. It is meant to be 
subscribed only by the creator of the barrier
    * node. It checks to see when the barrier is ready to be marked as 
completed.
    */
   class ZkBarrierChangeHandler extends ZkUtils.GenIZkChildListener {
     private final String barrierVersion;
-    private final List<String> names;
+    private final List<String> expectedParticipantIds;
 
-    public ZkBarrierChangeHandler(String barrierVersion, List<String> names, 
ZkUtils zkUtils) {
+    public ZkBarrierChangeHandler(String barrierVersion, List<String> 
expectedParticipantIds, ZkUtils zkUtils) {
       super(zkUtils, "ZkBarrierChangeHandler");
       this.barrierVersion = barrierVersion;
-      this.names = names;
+      this.expectedParticipantIds = expectedParticipantIds;
     }
 
     @Override
-    public void handleChildChange(String parentPath, List<String> 
currentChildren) {
+    public void handleChildChange(String barrierParticipantPath, List<String> 
participantIds) {
       if (notAValidEvent()) {
         return;
       }
-      if (currentChildren == null) {
-        LOG.info("Got ZkBarrierChangeHandler handleChildChange with null 
currentChildren");
+      if (participantIds == null) {
+        LOG.info("Received notification with null participants for barrier: 
{}. Ignoring it.", barrierParticipantPath);
         return;
       }
-      LOG.info("list of children in the barrier = " + parentPath + ":" + 
Arrays.toString(currentChildren.toArray()));
-      LOG.info("list of children to compare against = " + parentPath + ":" + 
Arrays.toString(names.toArray()));
+      LOG.info(String.format("Current participants in barrier version: %s = 
%s.", barrierVersion, participantIds));
+      LOG.info(String.format("Expected participants in barrier version: %s = 
%s.", barrierVersion, expectedParticipantIds));
 
       // check if all the expected participants are in
-      if (currentChildren.size() == names.size() && 
CollectionUtils.containsAll(currentChildren, names)) {
-        String barrierDonePath = 
keyBuilder.getBarrierStatePath(barrierVersion);
-        LOG.info("Writing BARRIER DONE to " + barrierDonePath);
-        zkUtils.writeData(barrierDonePath, State.DONE); // this will trigger 
notifications
-        zkUtils.unsubscribeChildChanges(barrierDonePath, this);
+      if (participantIds.size() == expectedParticipantIds.size() && 
CollectionUtils.containsAll(participantIds, expectedParticipantIds)) {
+        String barrierStatePath = 
keyBuilder.getBarrierStatePath(barrierVersion);
+        State barrierState = zkUtils.getZkClient().readData(barrierStatePath);
+        if (Objects.equals(barrierState, State.NEW)) {
+          LOG.info(String.format("Expected participants has joined the barrier 
version: %s. Marking the barrier state: %s as %s.", barrierVersion, 
barrierStatePath, State.DONE));
+          zkUtils.writeData(barrierStatePath, State.DONE); // this will 
trigger notifications
+        } else {
+          LOG.debug(String.format("Barrier version: %s is at: %s state. Not 
marking barrier as %s.", barrierVersion, barrierState, State.DONE));
+        }
+        LOG.info("Unsubscribing child changes on the path: {} for barrier 
version: {}.", barrierParticipantPath, barrierVersion);
+        zkUtils.unsubscribeChildChanges(barrierParticipantPath, this);
       }
     }
   }
@@ -174,18 +222,23 @@ public class ZkBarrierForVersionUpgrade {
 
     @Override
     public void handleDataChange(String dataPath, Object data) {
-      LOG.info("got notification about barrier " + barrierStatePath + "; 
done=" + data);
+      LOG.info(String.format("Received barrierState change notification for 
barrier version: %s from zkNode: %s with data: %s.", barrierVersion, dataPath, 
data));
       if (notAValidEvent())
         return;
 
-      zkUtils.unsubscribeDataChanges(barrierStatePath, this);
-      barrierListenerOptional.ifPresent(
-          zkBarrierListener -> 
zkBarrierListener.onBarrierStateChanged(barrierVersion, (State) data));
+      State barrierState = (State) data;
+      List<State> expectedBarrierStates = ImmutableList.of(State.DONE, 
State.TIMED_OUT);
+
+      if (barrierState != null && 
expectedBarrierStates.contains(barrierState)) {
+        zkUtils.unsubscribeDataChanges(barrierStatePath, this);
+        barrierListenerOptional.ifPresent(zkBarrierListener -> 
zkBarrierListener.onBarrierStateChanged(barrierVersion, (State) data));
+      } else {
+        LOG.debug("Barrier version: {} is at state: {}. Ignoring the 
barrierState change notification.", barrierVersion, barrierState);
+      }
     }
 
     @Override
-    public void handleDataDeleted(String dataPath)
-        throws Exception {
+    public void handleDataDeleted(String dataPath) {
       LOG.warn("barrier done node got deleted at " + dataPath);
       if (notAValidEvent())
         return;

http://git-wip-us.apache.org/repos/asf/samza/blob/603af35a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index 7689901..011794d 100644
--- 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -18,38 +18,42 @@
  */
 package org.apache.samza.zk;
 
-import junit.framework.Assert;
+import com.google.common.collect.ImmutableList;
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.apache.samza.util.NoOpMetricsRegistry;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-//import org.junit.After;
-//import org.junit.AfterClass;
-//import org.junit.Before;
-//import org.junit.BeforeClass;
-//import org.junit.Test;
+import org.apache.samza.zk.ZkBarrierForVersionUpgrade.State;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.*;
+
 
 // TODO: Rename this such that it is clear that it is an integration test and 
NOT unit test
 public class TestZkBarrierForVersionUpgrade {
+  private static final String BARRIER_VERSION = "1";
   private static EmbeddedZookeeper zkServer = null;
   private static String testZkConnectionString = null;
   private ZkUtils zkUtils;
   private ZkUtils zkUtils1;
 
-  //@BeforeClass
+  @BeforeClass
   public static void test() {
     zkServer = new EmbeddedZookeeper();
     zkServer.setup();
-    testZkConnectionString = "127.0.0.1:" + zkServer.getPort();
+    testZkConnectionString = String.format("127.0.0.1:%d", zkServer.getPort());
   }
 
-  //@Before
+  @Before
   public void testSetup() {
     ZkClient zkClient = new ZkClient(testZkConnectionString, 
ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
     this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, 
ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
@@ -57,68 +61,60 @@ public class TestZkBarrierForVersionUpgrade {
     this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, 
ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
   }
 
-  //@After
+  @After
   public void testTearDown() {
     zkUtils.close();
     zkUtils1.close();
   }
 
-  //@AfterClass
+  @AfterClass
   public static void teardown() {
     zkServer.teardown();
   }
 
-  //@Test
-  public void testZkBarrierForVersionUpgrade() {
-    String barrierId = zkUtils.getKeyBuilder().getRootPath() + "/b1";
-    String ver = "1";
-    List<String> processors = new ArrayList<>();
-    processors.add("p1");
-    processors.add("p2");
-    final CountDownLatch latch = new CountDownLatch(2);
-    final AtomicInteger stateChangedCalled = new AtomicInteger(0);
-
-    ZkBarrierForVersionUpgrade processor1Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils, new ZkBarrierListener() {
-      @Override
-      public void onBarrierCreated(String version) {
-      }
+  static class TestZkBarrierListener implements ZkBarrierListener {
 
-      @Override
-      public void onBarrierStateChanged(String version, 
ZkBarrierForVersionUpgrade.State state) {
-        if (state.equals(ZkBarrierForVersionUpgrade.State.DONE)) {
-          latch.countDown();
-          stateChangedCalled.incrementAndGet();
-        }
-      }
+    private final CountDownLatch stateChangedLatch;
+    private final State expectedState;
+
+    TestZkBarrierListener(CountDownLatch stateChangedLatch, State 
expectedState) {
+      this.stateChangedLatch = stateChangedLatch;
+      this.expectedState = expectedState;
+    }
 
-      @Override
-      public void onBarrierError(String version, Throwable t) {
+    @Override
+    public void onBarrierCreated(String version) {}
 
+    @Override
+    public void onBarrierStateChanged(String version, State state) {
+      if (state.equals(expectedState)) {
+        stateChangedLatch.countDown();
       }
-    });
+    }
 
-    processor1Barrier.create(ver, processors);
-    processor1Barrier.join(ver, "p1");
+    @Override
+    public void onBarrierError(String version, Throwable t) {}
+  }
 
-    ZkBarrierForVersionUpgrade processor2Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils1, new ZkBarrierListener() {
-      @Override
-      public void onBarrierCreated(String version) {
-      }
+  @Test
+  public void testZkBarrierForVersionUpgrade() {
+    String barrierId = String.format("%s/%s", 
zkUtils1.getKeyBuilder().getRootPath(), RandomStringUtils.randomAlphabetic(4));
 
-      @Override
-      public void onBarrierStateChanged(String version, 
ZkBarrierForVersionUpgrade.State state) {
-        if (state.equals(ZkBarrierForVersionUpgrade.State.DONE)) {
-          latch.countDown();
-          stateChangedCalled.incrementAndGet();
-        }
-      }
+    List<String> processors = ImmutableList.of("p1", "p2");
 
-      @Override
-      public void onBarrierError(String version, Throwable t) {
+    CountDownLatch latch = new CountDownLatch(2);
+    TestZkBarrierListener listener = new TestZkBarrierListener(latch, 
State.DONE);
 
-      }
-    });
-    processor2Barrier.join(ver, "p2");
+    ZkBarrierForVersionUpgrade processor1Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener);
+    ZkBarrierForVersionUpgrade processor2Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener);
+
+    processor1Barrier.create(BARRIER_VERSION, processors);
+
+    State barrierState = zkUtils.getZkClient().readData(barrierId + 
"/barrier_1/barrier_state");
+    assertEquals(State.NEW, barrierState);
+
+    processor1Barrier.join(BARRIER_VERSION, "p1");
+    processor2Barrier.join(BARRIER_VERSION, "p2");
 
     boolean result = false;
     try {
@@ -126,97 +122,107 @@ public class TestZkBarrierForVersionUpgrade {
     } catch (InterruptedException e) {
       e.printStackTrace();
     }
-    Assert.assertTrue("Barrier failed to complete within test timeout.", 
result);
+    assertTrue("Barrier failed to complete within test timeout.", result);
+
+    List<String> children = zkUtils.getZkClient().getChildren(barrierId + 
"/barrier_1/barrier_participants");
+    barrierState = zkUtils.getZkClient().readData(barrierId + 
"/barrier_1/barrier_state");
+    assertEquals(State.DONE, barrierState);
+    assertNotNull(children);
+    assertEquals("Unexpected barrier state. Didn't find two processors.", 2, 
children.size());
+    assertEquals("Unexpected barrier state. Didn't find the expected 
members.", processors, children);
+  }
+
+  @Test
+  public void testZkBarrierForVersionUpgradeWithTimeOut() {
+    String barrierId = String.format("%s/%s", 
zkUtils1.getKeyBuilder().getRootPath(), RandomStringUtils.randomAlphabetic(4));
+    List<String> processors = ImmutableList.of("p1", "p2", "p3");
+
+    CountDownLatch latch = new CountDownLatch(2);
+    TestZkBarrierListener listener = new TestZkBarrierListener(latch, 
State.TIMED_OUT);
+
+    ZkBarrierForVersionUpgrade processor1Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener);
+    ZkBarrierForVersionUpgrade processor2Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener);
 
+    processor1Barrier.create(BARRIER_VERSION, processors);
+
+    processor1Barrier.join(BARRIER_VERSION, "p1");
+    processor2Barrier.join(BARRIER_VERSION, "p2");
+
+    processor1Barrier.expire(BARRIER_VERSION);
+    boolean result = false;
     try {
-      List<String> children = zkUtils.getZkClient().getChildren(barrierId + 
"/barrier_v1/barrier_participants");
-      Assert.assertNotNull(children);
-      Assert.assertEquals("Unexpected barrier state. Didn't find two 
processors.", 2, children.size());
-      Assert.assertEquals("Unexpected barrier state. Didn't find the expected 
members.", processors, children);
-    } catch (Exception e) {
-      // no-op
+      result = latch.await(10000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
     }
-    Assert.assertEquals(2, stateChangedCalled.get());
+    assertTrue("Barrier Timeout test failed to complete within test timeout.", 
result);
+
+    List<String> children = zkUtils.getZkClient().getChildren(barrierId + 
"/barrier_1/barrier_participants");
+    State barrierState = zkUtils.getZkClient().readData(barrierId + 
"/barrier_1/barrier_state");
+    assertEquals(State.TIMED_OUT, barrierState);
+    assertNotNull(children);
+    assertEquals("Unexpected barrier state. Didn't find two processors.", 2, 
children.size());
+    assertEquals("Unexpected barrier state. Didn't find the expected 
members.", ImmutableList.of("p1", "p2"), children);
   }
 
-  //@Test
-  public void testZkBarrierForVersionUpgradeWithTimeOut() {
-    String barrierId = zkUtils1.getKeyBuilder().getRootPath() + 
"/barrierTimeout";
-    String ver = "1";
-    List<String> processors = new ArrayList<>();
-    processors.add("p1");
-    processors.add("p2");
-    processors.add("p3"); // Simply to prevent barrier from completion for 
testing purposes
-
-    final AtomicInteger timeoutStateChangeCalled = new AtomicInteger(0);
-    final CountDownLatch latch = new CountDownLatch(2);
-    final ZkBarrierForVersionUpgrade processor1Barrier = new 
ZkBarrierForVersionUpgrade(
-        barrierId,
-        zkUtils,
-        new ZkBarrierListener() {
-          @Override
-          public void onBarrierCreated(String version) {
-          }
-
-          @Override
-          public void onBarrierStateChanged(String version, 
ZkBarrierForVersionUpgrade.State state) {
-            if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
-              timeoutStateChangeCalled.incrementAndGet();
-              latch.countDown();
-            }
-          }
-
-          @Override
-          public void onBarrierError(String version, Throwable t) {
-
-          }
-
-        });
-    processor1Barrier.create(ver, processors);
-    processor1Barrier.join(ver, "p1");
-
-    final ZkBarrierForVersionUpgrade processor2Barrier = new 
ZkBarrierForVersionUpgrade(
-        barrierId,
-        zkUtils1,
-        new ZkBarrierListener() {
-          @Override
-          public void onBarrierCreated(String version) {
-          }
-
-          @Override
-          public void onBarrierStateChanged(String version, 
ZkBarrierForVersionUpgrade.State state) {
-            if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
-              timeoutStateChangeCalled.incrementAndGet();
-              latch.countDown();
-            }
-          }
-
-          @Override
-          public void onBarrierError(String version, Throwable t) {
-
-          }
-
-        });
-
-    processor2Barrier.join(ver, "p2");
-
-    processor1Barrier.expire(ver);
+  @Test
+  public void 
testShouldDiscardBarrierUpdateEventsAfterABarrierIsMarkedAsDone() {
+    String barrierId = String.format("%s/%s", 
zkUtils1.getKeyBuilder().getRootPath(), RandomStringUtils.randomAlphabetic(4));
+    List<String> processors = ImmutableList.of("p1", "p2");
+
+    CountDownLatch latch = new CountDownLatch(2);
+    TestZkBarrierListener listener = new TestZkBarrierListener(latch, 
State.DONE);
+    ZkBarrierForVersionUpgrade processor1Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener);
+    ZkBarrierForVersionUpgrade processor2Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener);
+
+    processor1Barrier.create(BARRIER_VERSION, processors);
+
+    processor1Barrier.join(BARRIER_VERSION, "p1");
+    processor2Barrier.join(BARRIER_VERSION, "p2");
+
     boolean result = false;
     try {
       result = latch.await(10000, TimeUnit.MILLISECONDS);
     } catch (InterruptedException e) {
       e.printStackTrace();
     }
-    Assert.assertTrue("Barrier Timeout test failed to complete within test 
timeout.", result);
+    assertTrue("Barrier Timeout test failed to complete within test timeout.", 
result);
+
+    processor1Barrier.expire(BARRIER_VERSION);
+
+    State barrierState = zkUtils.getZkClient().readData(barrierId + 
"/barrier_1/barrier_state");
+    assertEquals(State.DONE, barrierState);
+  }
+
+  @Test
+  public void 
testShouldDiscardBarrierUpdateEventsAfterABarrierIsMarkedAsTimedOut() {
+    String barrierId = String.format("%s/%s", 
zkUtils1.getKeyBuilder().getRootPath(), RandomStringUtils.randomAlphabetic(4));
+    List<String> processors = ImmutableList.of("p1", "p2", "p3");
 
+    CountDownLatch latch = new CountDownLatch(2);
+    TestZkBarrierListener listener = new TestZkBarrierListener(latch, 
State.TIMED_OUT);
+    ZkBarrierForVersionUpgrade processor1Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils, listener);
+    ZkBarrierForVersionUpgrade processor2Barrier = new 
ZkBarrierForVersionUpgrade(barrierId, zkUtils1, listener);
+
+    processor1Barrier.create(BARRIER_VERSION, processors);
+
+    processor1Barrier.join(BARRIER_VERSION, "p1");
+    processor2Barrier.join(BARRIER_VERSION, "p2");
+
+    processor1Barrier.expire(BARRIER_VERSION);
+
+    boolean result = false;
     try {
-      List<String> children = zkUtils.getZkClient().getChildren(barrierId + 
"/barrier_v1/barrier_participants");
-      Assert.assertNotNull(children);
-      Assert.assertEquals("Unexpected barrier state. Didn't find two 
processors.", 2, children.size());
-      Assert.assertEquals("Unexpected barrier state. Didn't find the expected 
members.", processors, children);
-    } catch (Exception e) {
-      // no-op
+      result = latch.await(10000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
     }
-    Assert.assertEquals(2, timeoutStateChangeCalled.get());
+    assertTrue("Barrier Timeout test failed to complete within test timeout.", 
result);
+
+
+    processor1Barrier.join(BARRIER_VERSION, "p3");
+
+    State barrierState = zkUtils.getZkClient().readData(barrierId + 
"/barrier_1/barrier_state");
+    assertEquals(State.TIMED_OUT, barrierState);
   }
 }

Reply via email to