Repository: samza
Updated Branches:
  refs/heads/master 61cf4e4df -> 553ce33b1


http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index b48bc70..fb31054 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -19,7 +19,9 @@
 package org.apache.samza.zk;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -27,6 +29,12 @@ import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationServiceFactory;
+import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -34,12 +42,15 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestZkLeaderElector {
+  private static final Logger LOG = 
org.slf4j.LoggerFactory.getLogger(TestZkLeaderElector.class);
 
   private static EmbeddedZookeeper zkServer = null;
   private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
@@ -47,6 +58,7 @@ public class TestZkLeaderElector {
   private ZkUtils testZkUtils = null;
   private static final int SESSION_TIMEOUT_MS = 20000;
   private static final int CONNECTION_TIMEOUT_MS = 10000;
+  private final CoordinationServiceFactory factory = new 
ZkCoordinationServiceFactory();
 
   @BeforeClass
   public static void setup() throws InterruptedException {
@@ -58,7 +70,7 @@ public class TestZkLeaderElector {
   public void testSetup() {
     testZkConnectionString = "127.0.0.1:" + zkServer.getPort();
     try {
-      testZkUtils = getZkUtilsWithNewClient();
+      testZkUtils = getZkUtilsWithNewClient("testProcessorId");
     } catch (Exception e) {
       Assert.fail("Client connection setup failed. Aborting tests..");
     }
@@ -96,18 +108,22 @@ public class TestZkLeaderElector {
     when(mockZkUtils.registerProcessorAndGetId(any())).
         thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000");
     when(mockZkUtils.getSortedActiveProcessors()).thenReturn(activeProcessors);
+    
Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class));
 
+    ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
+    when(kb.getProcessorsPath()).thenReturn("");
+    when(mockZkUtils.getKeyBuilder()).thenReturn(kb);
+
+    ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils, 
null);
     BooleanResult isLeader = new BooleanResult();
-    ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils,
-      new ZkLeaderElector.ZkLeaderElectorListener() {
-        @Override
-        public void onBecomingLeader() {
-          isLeader.res = true;
-        }
+
+    leaderElector.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader.res = true;
       }
-    );
-    leaderElector.tryBecomeLeader();
-    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader.res, 2, 
100));
+    });
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader.res, 2, 
100));
   }
 
   @Test
@@ -115,22 +131,37 @@ public class TestZkLeaderElector {
     String processorId = "1";
     ZkUtils mockZkUtils = mock(ZkUtils.class);
     when(mockZkUtils.getSortedActiveProcessors()).thenReturn(new 
ArrayList<String>());
+    
Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class));
+
+    ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
+    when(kb.getProcessorsPath()).thenReturn("");
+    when(mockZkUtils.getKeyBuilder()).thenReturn(kb);
+
+    ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, 
mockZkUtils, null);
 
-    ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, 
mockZkUtils,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-          }
-        }
-    );
     try {
-      leaderElector.tryBecomeLeader();
+      leaderElector.tryBecomeLeader(new LeaderElectorListener() {
+        @Override
+        public void onBecomingLeader() {
+        }
+      });
       Assert.fail("Was expecting leader election to fail!");
     } catch (SamzaException e) {
       // No-op Expected
     }
   }
 
+  private CoordinationUtils getZkCoordinationService(String groupId, String 
processorId) {
+
+    Map<String, String> map = new HashMap<>();
+    map.put(ZkConfig.ZK_CONNECT, testZkConnectionString);
+    Config config = new MapConfig(map);
+
+    CoordinationUtils coordinationUtils = 
factory.getCoordinationService(groupId, processorId, config);
+    
+    return coordinationUtils;
+  }
+
   /**
    * Test starts 3 processors and verifies the state of the Zk tree after all 
processors participate in LeaderElection
    */
@@ -139,50 +170,49 @@ public class TestZkLeaderElector {
     BooleanResult isLeader1 = new BooleanResult();
     BooleanResult isLeader2 = new BooleanResult();
     BooleanResult isLeader3 = new BooleanResult();
+
+
     // Processor-1
-    ZkUtils zkUtils1 = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1,
-      new ZkLeaderElector.ZkLeaderElectorListener() {
-        @Override
-        public void onBecomingLeader() {
-          isLeader1.res = true;
-        }
-      }
-    );
+    ZkUtils zkUtils1 = getZkUtilsWithNewClient("1");
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null);
 
     // Processor-2
-    ZkUtils zkUtils2 = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader2.res = true;
-          }
-        }
-    );
+    ZkUtils zkUtils2 = getZkUtilsWithNewClient("2");
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null);
+
 
     // Processor-3
-    ZkUtils zkUtils3  = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader3.res = true;
-          }
-        });
+    ZkUtils zkUtils3 = getZkUtilsWithNewClient("3");
+    ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3, null);
 
     Assert.assertEquals(0, testZkUtils.getSortedActiveProcessors().size());
 
-    leaderElector1.tryBecomeLeader();
-    leaderElector2.tryBecomeLeader();
-    leaderElector3.tryBecomeLeader();
+    leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader1.res = true;
+      }
+    });
+    leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader2.res = true;
+      }
+    });
+    leaderElector3.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader3.res = true;
+      }
+    });
 
-    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 
100));
-    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 
100));
-    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 
100));
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 
100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 
2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 
2, 100));
 
     Assert.assertEquals(3, testZkUtils.getSortedActiveProcessors().size());
 
+
     // Clean up
     zkUtils1.close();
     zkUtils2.close();
@@ -211,104 +241,102 @@ public class TestZkLeaderElector {
 
 
     // Processor-1
-    ZkUtils zkUtils1 = getZkUtilsWithNewClient();
+    ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1");
     zkUtils1.registerProcessorAndGetId("processor1");
-    ZkLeaderElector leaderElector1 = new ZkLeaderElector(
-        "1",
-        zkUtils1,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader1.res = true;
-          }
-        },
-        new IZkDataListener() {
-          @Override
-          public void handleDataChange(String dataPath, Object data) throws 
Exception {
-          }
-
-          @Override
-          public void handleDataDeleted(String dataPath) throws Exception {
-            count.incrementAndGet();
-          }
-        });
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", 
zkUtils1, null);
 
+    leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data)
+          throws Exception {
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath)
+          throws Exception {
+        count.incrementAndGet();
+      }
+    });
 
     // Processor-2
-    ZkUtils zkUtils2 = getZkUtilsWithNewClient();
+    ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2");
     final String path2 = zkUtils2.registerProcessorAndGetId("processor2");
-    ZkLeaderElector leaderElector2 = new ZkLeaderElector(
-        "2",
-        zkUtils2,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader2.res = true;
-          }
-        },
-        new IZkDataListener() {
-          @Override
-          public void handleDataChange(String dataPath, Object data) throws 
Exception {
-
-          }
-
-          @Override
-          public void handleDataDeleted(String dataPath) throws Exception {
-            String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path2);
-            Assert.assertNotNull(registeredIdStr);
-
-            String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath);
-            Assert.assertNotNull(predecessorIdStr);
-
-            try {
-              int selfId = Integer.parseInt(registeredIdStr);
-              int predecessorId = Integer.parseInt(predecessorIdStr);
-              Assert.assertEquals(1, selfId - predecessorId);
-            } catch (Exception e) {
-              System.out.println(e.getMessage());
-            }
-            count.incrementAndGet();
-            electionLatch.countDown();
-          }
-        });
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", 
zkUtils2, null);
+
+    leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data)
+          throws Exception {
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath)
+          throws Exception {
+        String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path2);
+        Assert.assertNotNull(registeredIdStr);
+
+        String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath);
+        Assert.assertNotNull(predecessorIdStr);
+
+        try {
+          int selfId = Integer.parseInt(registeredIdStr);
+          int predecessorId = Integer.parseInt(predecessorIdStr);
+          Assert.assertEquals(1, selfId - predecessorId);
+        } catch (Exception e) {
+          LOG.error(e.getLocalizedMessage());
+        }
+        count.incrementAndGet();
+        electionLatch.countDown();
+      }
+    });
 
     // Processor-3
-    ZkUtils zkUtils3  = getZkUtilsWithNewClient();
+    ZkUtils zkUtils3  = getZkUtilsWithNewClient("processor3");
     zkUtils3.registerProcessorAndGetId("processor3");
-    ZkLeaderElector leaderElector3 = new ZkLeaderElector(
-        "3",
-        zkUtils3,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader3.res = true;
-          }
-        },
-        new IZkDataListener() {
-          @Override
-          public void handleDataChange(String dataPath, Object data) throws 
Exception {
-
-          }
-
-          @Override
-          public void handleDataDeleted(String dataPath) throws Exception {
-            count.incrementAndGet();
-          }
-        });
+    ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", 
zkUtils3, null);
+
+    leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data)
+          throws Exception {
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath)
+          throws Exception {
+        count.incrementAndGet();
+      }
+    });
 
     // Join Leader Election
-    leaderElector1.tryBecomeLeader();
-    leaderElector2.tryBecomeLeader();
-    leaderElector3.tryBecomeLeader();
-    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 
100));
-    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 
100));
-    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 
100));
+    leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader1.res = true;
+      }
+    });
+    leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader2.res = true;
+      }
+    });
+    leaderElector3.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader3.res = true;
+      }
+    });
+
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 
100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 
2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 
2, 100));
 
     Assert.assertTrue(leaderElector1.amILeader());
     Assert.assertFalse(leaderElector2.amILeader());
     Assert.assertFalse(leaderElector3.amILeader());
 
-    List<String> currentActiveProcessors = 
testZkUtils.getSortedActiveProcessors();
+    List<String> currentActiveProcessors = 
zkUtils1.getSortedActiveProcessors();
     Assert.assertEquals(3, currentActiveProcessors.size());
 
     // Leader Failure
@@ -322,11 +350,12 @@ public class TestZkLeaderElector {
     }
 
     Assert.assertEquals(1, count.get());
-    Assert.assertEquals(currentActiveProcessors, 
testZkUtils.getSortedActiveProcessors());
+    Assert.assertEquals(currentActiveProcessors, 
zkUtils2.getSortedActiveProcessors());
 
     // Clean up
     zkUtils2.close();
     zkUtils3.close();
+
   }
 
   /**
@@ -347,100 +376,101 @@ public class TestZkLeaderElector {
     BooleanResult isLeader3 = new BooleanResult();
 
     // Processor-1
-    ZkUtils zkUtils1 = getZkUtilsWithNewClient();
+    ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1");
     zkUtils1.registerProcessorAndGetId("processor1");
-    ZkLeaderElector leaderElector1 = new ZkLeaderElector(
-        "1",
-        zkUtils1,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader1.res = true;
-          }
-        },
-        new IZkDataListener() {
-          @Override
-          public void handleDataChange(String dataPath, Object data) throws 
Exception {
-
-          }
-
-          @Override
-          public void handleDataDeleted(String dataPath) throws Exception {
-            count.incrementAndGet();
-          }
-        });
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", 
zkUtils1, null);
+
+    leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data)
+          throws Exception {
+
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath)
+          throws Exception {
+        count.incrementAndGet();
+      }
+    });
+
 
     // Processor-2
-    ZkUtils zkUtils2 = getZkUtilsWithNewClient();
+    ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2");
     zkUtils2.registerProcessorAndGetId("processor2");
-    ZkLeaderElector leaderElector2 = new ZkLeaderElector(
-        "2",
-        zkUtils2,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader2.res = true;
-          }
-        },
-        new IZkDataListener() {
-          @Override
-          public void handleDataChange(String dataPath, Object data) throws 
Exception {
-
-          }
-
-          @Override
-          public void handleDataDeleted(String dataPath) throws Exception {
-            count.incrementAndGet();
-          }
-        });
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", 
zkUtils2, null);
+
+    leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data)
+          throws Exception {
+
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath)
+          throws Exception {
+        count.incrementAndGet();
+      }
+    });
 
     // Processor-3
-    ZkUtils zkUtils3  = getZkUtilsWithNewClient();
+    ZkUtils zkUtils3  = getZkUtilsWithNewClient("processor3");
     final String path3 = zkUtils3.registerProcessorAndGetId("processor3");
-    ZkLeaderElector leaderElector3 = new ZkLeaderElector(
-        "3",
-        zkUtils3,
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader3.res = true;
-          }
-        },
-        new IZkDataListener() {
-          @Override
-          public void handleDataChange(String dataPath, Object data) throws 
Exception {
-
-          }
-
-          @Override
-          public void handleDataDeleted(String dataPath) throws Exception {
-            String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path3);
-            Assert.assertNotNull(registeredIdStr);
-
-            String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath);
-            Assert.assertNotNull(predecessorIdStr);
-
-            try {
-              int selfId = Integer.parseInt(registeredIdStr);
-              int predecessorId = Integer.parseInt(predecessorIdStr);
-              Assert.assertEquals(1, selfId - predecessorId);
-            } catch (Exception e) {
-              Assert.fail("Exception in LeaderElectionListener!");
-            }
-            count.incrementAndGet();
-            electionLatch.countDown();
-          }
-        });
+    ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", 
zkUtils3, null);
+
+    leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data)
+          throws Exception {
+
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath)
+          throws Exception {
+        String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path3);
+        Assert.assertNotNull(registeredIdStr);
+
+        String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath);
+        Assert.assertNotNull(predecessorIdStr);
+
+        try {
+          int selfId = Integer.parseInt(registeredIdStr);
+          int predecessorId = Integer.parseInt(predecessorIdStr);
+          Assert.assertEquals(1, selfId - predecessorId);
+        } catch (Exception e) {
+          Assert.fail("Exception in LeaderElectionListener!");
+        }
+        count.incrementAndGet();
+        electionLatch.countDown();
+      }
+    });
 
     // Join Leader Election
-    leaderElector1.tryBecomeLeader();
-    leaderElector2.tryBecomeLeader();
-    leaderElector3.tryBecomeLeader();
-    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 
100));
-    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 
100));
-    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 
100));
-
-    List<String> currentActiveProcessors = 
testZkUtils.getSortedActiveProcessors();
+    leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader1.res = true;
+      }
+    });
+    leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader2.res = true;
+      }
+    });
+    leaderElector3.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader3.res = true;
+      }
+    });
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 
100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 
2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 
2, 100));
+
+    List<String> currentActiveProcessors = 
zkUtils1.getSortedActiveProcessors();
     Assert.assertEquals(3, currentActiveProcessors.size());
 
     zkUtils2.close();
@@ -453,7 +483,7 @@ public class TestZkLeaderElector {
     }
 
     Assert.assertEquals(1, count.get());
-    Assert.assertEquals(currentActiveProcessors, 
testZkUtils.getSortedActiveProcessors());
+    Assert.assertEquals(currentActiveProcessors, 
zkUtils1.getSortedActiveProcessors());
 
     // Clean up
     zkUtils1.close();
@@ -465,43 +495,43 @@ public class TestZkLeaderElector {
     BooleanResult isLeader1 = new BooleanResult();
     BooleanResult isLeader2 = new BooleanResult();
     // Processor-1
-    ZkLeaderElector leaderElector1 = new ZkLeaderElector(
-        "1",
-        getZkUtilsWithNewClient(),
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader1.res = true;
-          }
-        });
+
+    ZkUtils zkUtils1 = getZkUtilsWithNewClient("1");
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null);
 
     // Processor-2
-    ZkLeaderElector leaderElector2 = new ZkLeaderElector(
-        "2",
-        getZkUtilsWithNewClient(),
-        new ZkLeaderElector.ZkLeaderElectorListener() {
-          @Override
-          public void onBecomingLeader() {
-            isLeader2.res = true;
-          }
-        });
+    ZkUtils zkUtils2 = getZkUtilsWithNewClient("2");
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null);
 
     // Before Leader Election
     Assert.assertFalse(leaderElector1.amILeader());
     Assert.assertFalse(leaderElector2.amILeader());
 
-    leaderElector1.tryBecomeLeader();
-    leaderElector2.tryBecomeLeader();
+    leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader1.res = true;
+      }
+    });
+    leaderElector2.tryBecomeLeader(new LeaderElectorListener() {
+      @Override
+      public void onBecomingLeader() {
+        isLeader2.res = true;
+      }
+    });
 
     // After Leader Election
     Assert.assertTrue(leaderElector1.amILeader());
     Assert.assertFalse(leaderElector2.amILeader());
+
+    zkUtils1.close();
+    zkUtils2.close();
   }
 
-  private ZkUtils getZkUtilsWithNewClient() {
+  private ZkUtils getZkUtilsWithNewClient(String processorId) {
     ZkConnection zkConnection = 
ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
     return new ZkUtils(
-        "processorId1",
+        processorId,
         KEY_BUILDER,
         ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
         CONNECTION_TIMEOUT_MS);

http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
new file mode 100644
index 0000000..ec7e830
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationServiceFactory;
+import org.apache.samza.coordinator.Latch;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+
+
+public class TestZkProcessorLatch {
+  private static EmbeddedZookeeper zkServer = null;
+  private static String zkConnectionString;
+  private final CoordinationServiceFactory factory = new 
ZkCoordinationServiceFactory();
+  private CoordinationUtils coordinationUtils;
+
+  @BeforeClass
+  public static void setup() throws InterruptedException {
+    zkServer = new EmbeddedZookeeper();
+    zkServer.setup();
+
+    zkConnectionString = "localhost:" + zkServer.getPort();
+    System.out.println("ZK port = " + zkServer.getPort());
+  }
+
+  @Before
+  public void testSetup() {
+    String groupId = "group1";
+    String processorId = "p1";
+    Map<String, String> map = new HashMap<>();
+    map.put(ZkConfig.ZK_CONNECT, zkConnectionString);
+    Config config = new MapConfig(map);
+
+
+    coordinationUtils = factory.getCoordinationService(groupId, processorId, 
config);
+    coordinationUtils.reset();
+  }
+
+  @After
+  public void testTearDown() {
+  }
+
+  @AfterClass
+  public static void teardown() {
+    zkServer.teardown();
+  }
+
+  @Test
+  public void testSingleLatch1() {
+    System.out.println("Started 1");
+    int latchSize = 1;
+    String latchId = "l2";
+    ExecutorService pool = Executors.newFixedThreadPool(3);
+    Future f1 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+
+    try {
+      f1.get(30000, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      Assert.fail("failed to get future." + e.getLocalizedMessage());
+    }
+    pool.shutdownNow();
+  }
+
+  @Test
+  public void testSingleLatch2() {
+    System.out.println("Started 1");
+    int latchSize = 1;
+    String latchId = "l2";
+
+    ExecutorService pool = Executors.newFixedThreadPool(3);
+    Future f1 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        //latch.countDown(); only one thread counts down
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+
+    Future f2 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+
+    try {
+      f1.get(30000, TimeUnit.MILLISECONDS);
+      f2.get(30000, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      Assert.fail("failed to get future." + e.getLocalizedMessage());
+    }
+    pool.shutdownNow();
+  }
+
+  @Test
+  public void testNSizeLatch() {
+    System.out.println("Started N");
+    String latchId = "l1";
+    int latchSize = 3;
+
+    ExecutorService pool = Executors.newFixedThreadPool(3);
+    Future f1 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out " + e.getLocalizedMessage());
+        }
+      });
+    Future f2 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+    Future f3 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+
+    try {
+      f1.get(300, TimeUnit.MILLISECONDS);
+      f2.get(300, TimeUnit.MILLISECONDS);
+      f3.get(300, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      Assert.fail("failed to get future. " + e.getLocalizedMessage());
+    }
+  }
+
+  @Test
+  public void testLatchExpires() {
+    System.out.println("Started expiring");
+    String latchId = "l4";
+
+    int latchSize = 3;
+
+    ExecutorService pool = Executors.newFixedThreadPool(3);
+    Future f1 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+    Future f2 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+    Future f3 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        // This processor never completes its task
+        //latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+
+    try {
+      f1.get(300, TimeUnit.MILLISECONDS);
+      f2.get(300, TimeUnit.MILLISECONDS);
+      f3.get(300, TimeUnit.MILLISECONDS);
+      Assert.fail("Latch should've timeout.");
+    } catch (Exception e) {
+      f1.cancel(true);
+      f2.cancel(true);
+      f3.cancel(true);
+      // expected
+    }
+    pool.shutdownNow();
+  }
+
+  @Test
+  public void testSingleCountdown() {
+    System.out.println("Started single countdown");
+    String latchId = "l1";
+    int latchSize = 3;
+
+    ExecutorService pool = Executors.newFixedThreadPool(3);
+    // Only one thread invokes countDown
+    Future f1 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        latch.countDown();
+        TestZkUtils.sleepMs(100);
+        latch.countDown();
+        TestZkUtils.sleepMs(100);
+        latch.countDown();
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+    Future f2 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+    Future f3 = pool.submit(
+      () -> {
+        Latch latch = coordinationUtils.getLatch(latchSize, latchId);
+        try {
+          latch.await(100000, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          Assert.fail("await timed out. " + e.getLocalizedMessage());
+        }
+      });
+    try {
+      f1.get(600, TimeUnit.MILLISECONDS);
+      f2.get(600, TimeUnit.MILLISECONDS);
+      f3.get(600, TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      Assert.fail("Failed to get.");
+    }
+    pool.shutdownNow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
index a1ad363..2c44aea 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java
@@ -19,6 +19,16 @@
 
 package org.apache.samza.test.processor;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import kafka.utils.TestUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -28,6 +38,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.ZkConfig;
 import org.apache.samza.processor.StreamProcessor;
 import org.apache.samza.task.AsyncStreamTaskAdapter;
 import org.apache.samza.task.AsyncStreamTaskFactory;
@@ -37,17 +48,6 @@ import org.apache.samza.test.StandaloneTestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
 import static org.apache.samza.test.processor.IdentityStreamTask.endLatch;
 
 public class TestStreamProcessor extends StandaloneIntegrationTestHarness {
@@ -144,21 +144,14 @@ public class TestStreamProcessor extends 
StandaloneIntegrationTestHarness {
   private Map<String, String> createConfigs(String testSystem, String 
inputTopic, String outputTopic, int messageCount) {
     Map<String, String> configs = new HashMap<>();
     configs.putAll(
-        StandaloneTestUtils.getStandaloneConfigs(
-            "test-job",
-            "org.apache.samza.test.processor.IdentityStreamTask"));
-    configs.putAll(
-        StandaloneTestUtils.getKafkaSystemConfigs(
-            testSystem,
-            bootstrapServers(),
-            zkConnect(),
-            null,
-            StandaloneTestUtils.SerdeAlias.STRING,
-            true));
+        StandaloneTestUtils.getStandaloneConfigs("test-job", 
"org.apache.samza.test.processor.IdentityStreamTask"));
+    configs.putAll(StandaloneTestUtils.getKafkaSystemConfigs(testSystem, 
bootstrapServers(), zkConnect(), null,
+            StandaloneTestUtils.SerdeAlias.STRING, true));
     configs.put("task.inputs", String.format("%s.%s", testSystem, inputTopic));
     configs.put("app.messageCount", String.valueOf(messageCount));
     configs.put("app.outputTopic", outputTopic);
     configs.put("app.outputSystem", testSystem);
+    configs.put(ZkConfig.ZK_CONNECT, zkConnect());
     return configs;
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 5de30d8..417ada4 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 include \
   'samza-api',
   'samza-elasticsearch',

Reply via email to