Repository: samza Updated Branches: refs/heads/master 61cf4e4df -> 553ce33b1
http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index b48bc70..fb31054 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -19,7 +19,9 @@ package org.apache.samza.zk; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -27,6 +29,12 @@ import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkConnection; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.ZkConfig; +import org.apache.samza.coordinator.CoordinationUtils; +import org.apache.samza.coordinator.CoordinationServiceFactory; +import org.apache.samza.coordinator.LeaderElectorListener; import org.apache.samza.testUtils.EmbeddedZookeeper; import org.junit.After; import org.junit.AfterClass; @@ -34,12 +42,15 @@ import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Mockito; +import org.slf4j.Logger; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class TestZkLeaderElector { + private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(TestZkLeaderElector.class); private static EmbeddedZookeeper zkServer = null; private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test"); @@ -47,6 +58,7 @@ public class TestZkLeaderElector { private ZkUtils testZkUtils = null; private static final int SESSION_TIMEOUT_MS = 20000; private static final int CONNECTION_TIMEOUT_MS = 10000; + private final CoordinationServiceFactory factory = new ZkCoordinationServiceFactory(); @BeforeClass public static void setup() throws InterruptedException { @@ -58,7 +70,7 @@ public class TestZkLeaderElector { public void testSetup() { testZkConnectionString = "127.0.0.1:" + zkServer.getPort(); try { - testZkUtils = getZkUtilsWithNewClient(); + testZkUtils = getZkUtilsWithNewClient("testProcessorId"); } catch (Exception e) { Assert.fail("Client connection setup failed. Aborting tests.."); } @@ -96,18 +108,22 @@ public class TestZkLeaderElector { when(mockZkUtils.registerProcessorAndGetId(any())). thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000"); when(mockZkUtils.getSortedActiveProcessors()).thenReturn(activeProcessors); + Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class)); + ZkKeyBuilder kb = mock(ZkKeyBuilder.class); + when(kb.getProcessorsPath()).thenReturn(""); + when(mockZkUtils.getKeyBuilder()).thenReturn(kb); + + ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils, null); BooleanResult isLeader = new BooleanResult(); - ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils, - new ZkLeaderElector.ZkLeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader.res = true; - } + + leaderElector.tryBecomeLeader(new LeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader.res = true; } - ); - leaderElector.tryBecomeLeader(); - Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader.res, 2, 100)); + }); + Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader.res, 2, 100)); } @Test @@ -115,22 +131,37 @@ public class TestZkLeaderElector { String processorId = "1"; ZkUtils mockZkUtils = mock(ZkUtils.class); when(mockZkUtils.getSortedActiveProcessors()).thenReturn(new ArrayList<String>()); + Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class)); + + ZkKeyBuilder kb = mock(ZkKeyBuilder.class); + when(kb.getProcessorsPath()).thenReturn(""); + when(mockZkUtils.getKeyBuilder()).thenReturn(kb); + + ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils, null); - ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils, - new ZkLeaderElector.ZkLeaderElectorListener() { - @Override - public void onBecomingLeader() { - } - } - ); try { - leaderElector.tryBecomeLeader(); + leaderElector.tryBecomeLeader(new LeaderElectorListener() { + @Override + public void onBecomingLeader() { + } + }); Assert.fail("Was expecting leader election to fail!"); } catch (SamzaException e) { // No-op Expected } } + private CoordinationUtils getZkCoordinationService(String groupId, String processorId) { + + Map<String, String> map = new HashMap<>(); + map.put(ZkConfig.ZK_CONNECT, testZkConnectionString); + Config config = new MapConfig(map); + + CoordinationUtils coordinationUtils = factory.getCoordinationService(groupId, processorId, config); + + return coordinationUtils; + } + /** * Test starts 3 processors and verifies the state of the Zk tree after all processors participate in LeaderElection */ @@ -139,50 +170,49 @@ public class TestZkLeaderElector { BooleanResult isLeader1 = new BooleanResult(); BooleanResult isLeader2 = new BooleanResult(); BooleanResult isLeader3 = new BooleanResult(); + + // Processor-1 - ZkUtils zkUtils1 = getZkUtilsWithNewClient(); - ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, - new ZkLeaderElector.ZkLeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader1.res = true; - } - } - ); + ZkUtils zkUtils1 = getZkUtilsWithNewClient("1"); + ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null); // Processor-2 - ZkUtils zkUtils2 = getZkUtilsWithNewClient(); - ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, - new ZkLeaderElector.ZkLeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader2.res = true; - } - } - ); + ZkUtils zkUtils2 = getZkUtilsWithNewClient("2"); + ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null); + // Processor-3 - ZkUtils zkUtils3 = getZkUtilsWithNewClient(); - ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3, - new ZkLeaderElector.ZkLeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader3.res = true; - } - }); + ZkUtils zkUtils3 = getZkUtilsWithNewClient("3"); + ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3, null); Assert.assertEquals(0, testZkUtils.getSortedActiveProcessors().size()); - leaderElector1.tryBecomeLeader(); - leaderElector2.tryBecomeLeader(); - leaderElector3.tryBecomeLeader(); + leaderElector1.tryBecomeLeader(new LeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader1.res = true; + } + }); + leaderElector2.tryBecomeLeader(new LeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader2.res = true; + } + }); + leaderElector3.tryBecomeLeader(new LeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader3.res = true; + } + }); - Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100)); - Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100)); - Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100)); + Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 100)); + Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100)); + Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100)); Assert.assertEquals(3, testZkUtils.getSortedActiveProcessors().size()); + // Clean up zkUtils1.close(); zkUtils2.close(); @@ -211,104 +241,102 @@ public class TestZkLeaderElector { // Processor-1 - ZkUtils zkUtils1 = getZkUtilsWithNewClient(); + ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1"); zkUtils1.registerProcessorAndGetId("processor1"); - ZkLeaderElector leaderElector1 = new ZkLeaderElector( - "1", - zkUtils1, - new ZkLeaderElector.ZkLeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader1.res = true; - } - }, - new IZkDataListener() { - @Override - public void handleDataChange(String dataPath, Object data) throws Exception { - } - - @Override - public void handleDataDeleted(String dataPath) throws Exception { - count.incrementAndGet(); - } - }); + ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", zkUtils1, null); + leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) + throws Exception { + } + + @Override + public void handleDataDeleted(String dataPath) + throws Exception { + count.incrementAndGet(); + } + }); // Processor-2 - ZkUtils zkUtils2 = getZkUtilsWithNewClient(); + ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2"); final String path2 = zkUtils2.registerProcessorAndGetId("processor2"); - ZkLeaderElector leaderElector2 = new ZkLeaderElector( - "2", - zkUtils2, - new ZkLeaderElector.ZkLeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader2.res = true; - } - }, - new IZkDataListener() { - @Override - public void handleDataChange(String dataPath, Object data) throws Exception { - - } - - @Override - public void handleDataDeleted(String dataPath) throws Exception { - String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path2); - Assert.assertNotNull(registeredIdStr); - - String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath); - Assert.assertNotNull(predecessorIdStr); - - try { - int selfId = Integer.parseInt(registeredIdStr); - int predecessorId = Integer.parseInt(predecessorIdStr); - Assert.assertEquals(1, selfId - predecessorId); - } catch (Exception e) { - System.out.println(e.getMessage()); - } - count.incrementAndGet(); - electionLatch.countDown(); - } - }); + ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", zkUtils2, null); + + leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) + throws Exception { + } + + @Override + public void handleDataDeleted(String dataPath) + throws Exception { + String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path2); + Assert.assertNotNull(registeredIdStr); + + String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath); + Assert.assertNotNull(predecessorIdStr); + + try { + int selfId = Integer.parseInt(registeredIdStr); + int predecessorId = Integer.parseInt(predecessorIdStr); + Assert.assertEquals(1, selfId - predecessorId); + } catch (Exception e) { + LOG.error(e.getLocalizedMessage()); + } + count.incrementAndGet(); + electionLatch.countDown(); + } + }); // Processor-3 - ZkUtils zkUtils3 = getZkUtilsWithNewClient(); + ZkUtils zkUtils3 = getZkUtilsWithNewClient("processor3"); zkUtils3.registerProcessorAndGetId("processor3"); - ZkLeaderElector leaderElector3 = new ZkLeaderElector( - "3", - zkUtils3, - new ZkLeaderElector.ZkLeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader3.res = true; - } - }, - new IZkDataListener() { - @Override - public void handleDataChange(String dataPath, Object data) throws Exception { - - } - - @Override - public void handleDataDeleted(String dataPath) throws Exception { - count.incrementAndGet(); - } - }); + ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", zkUtils3, null); + + leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) + throws Exception { + } + + @Override + public void handleDataDeleted(String dataPath) + throws Exception { + count.incrementAndGet(); + } + }); // Join Leader Election - leaderElector1.tryBecomeLeader(); - leaderElector2.tryBecomeLeader(); - leaderElector3.tryBecomeLeader(); - Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100)); - Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100)); - Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100)); + leaderElector1.tryBecomeLeader(new LeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader1.res = true; + } + }); + leaderElector2.tryBecomeLeader(new LeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader2.res = true; + } + }); + leaderElector3.tryBecomeLeader(new LeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader3.res = true; + } + }); + + Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 100)); + Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100)); + Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100)); Assert.assertTrue(leaderElector1.amILeader()); Assert.assertFalse(leaderElector2.amILeader()); Assert.assertFalse(leaderElector3.amILeader()); - List<String> currentActiveProcessors = testZkUtils.getSortedActiveProcessors(); + List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessors(); Assert.assertEquals(3, currentActiveProcessors.size()); // Leader Failure @@ -322,11 +350,12 @@ public class TestZkLeaderElector { } Assert.assertEquals(1, count.get()); - Assert.assertEquals(currentActiveProcessors, testZkUtils.getSortedActiveProcessors()); + Assert.assertEquals(currentActiveProcessors, zkUtils2.getSortedActiveProcessors()); // Clean up zkUtils2.close(); zkUtils3.close(); + } /** @@ -347,100 +376,101 @@ public class TestZkLeaderElector { BooleanResult isLeader3 = new BooleanResult(); // Processor-1 - ZkUtils zkUtils1 = getZkUtilsWithNewClient(); + ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1"); zkUtils1.registerProcessorAndGetId("processor1"); - ZkLeaderElector leaderElector1 = new ZkLeaderElector( - "1", - zkUtils1, - new ZkLeaderElector.ZkLeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader1.res = true; - } - }, - new IZkDataListener() { - @Override - public void handleDataChange(String dataPath, Object data) throws Exception { - - } - - @Override - public void handleDataDeleted(String dataPath) throws Exception { - count.incrementAndGet(); - } - }); + ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", zkUtils1, null); + + leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) + throws Exception { + + } + + @Override + public void handleDataDeleted(String dataPath) + throws Exception { + count.incrementAndGet(); + } + }); + // Processor-2 - ZkUtils zkUtils2 = getZkUtilsWithNewClient(); + ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2"); zkUtils2.registerProcessorAndGetId("processor2"); - ZkLeaderElector leaderElector2 = new ZkLeaderElector( - "2", - zkUtils2, - new ZkLeaderElector.ZkLeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader2.res = true; - } - }, - new IZkDataListener() { - @Override - public void handleDataChange(String dataPath, Object data) throws Exception { - - } - - @Override - public void handleDataDeleted(String dataPath) throws Exception { - count.incrementAndGet(); - } - }); + ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", zkUtils2, null); + + leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) + throws Exception { + + } + + @Override + public void handleDataDeleted(String dataPath) + throws Exception { + count.incrementAndGet(); + } + }); // Processor-3 - ZkUtils zkUtils3 = getZkUtilsWithNewClient(); + ZkUtils zkUtils3 = getZkUtilsWithNewClient("processor3"); final String path3 = zkUtils3.registerProcessorAndGetId("processor3"); - ZkLeaderElector leaderElector3 = new ZkLeaderElector( - "3", - zkUtils3, - new ZkLeaderElector.ZkLeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader3.res = true; - } - }, - new IZkDataListener() { - @Override - public void handleDataChange(String dataPath, Object data) throws Exception { - - } - - @Override - public void handleDataDeleted(String dataPath) throws Exception { - String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path3); - Assert.assertNotNull(registeredIdStr); - - String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath); - Assert.assertNotNull(predecessorIdStr); - - try { - int selfId = Integer.parseInt(registeredIdStr); - int predecessorId = Integer.parseInt(predecessorIdStr); - Assert.assertEquals(1, selfId - predecessorId); - } catch (Exception e) { - Assert.fail("Exception in LeaderElectionListener!"); - } - count.incrementAndGet(); - electionLatch.countDown(); - } - }); + ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", zkUtils3, null); + + leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() { + @Override + public void handleDataChange(String dataPath, Object data) + throws Exception { + + } + + @Override + public void handleDataDeleted(String dataPath) + throws Exception { + String registeredIdStr = ZkKeyBuilder.parseIdFromPath(path3); + Assert.assertNotNull(registeredIdStr); + + String predecessorIdStr = ZkKeyBuilder.parseIdFromPath(dataPath); + Assert.assertNotNull(predecessorIdStr); + + try { + int selfId = Integer.parseInt(registeredIdStr); + int predecessorId = Integer.parseInt(predecessorIdStr); + Assert.assertEquals(1, selfId - predecessorId); + } catch (Exception e) { + Assert.fail("Exception in LeaderElectionListener!"); + } + count.incrementAndGet(); + electionLatch.countDown(); + } + }); // Join Leader Election - leaderElector1.tryBecomeLeader(); - leaderElector2.tryBecomeLeader(); - leaderElector3.tryBecomeLeader(); - Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100)); - Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100)); - Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100)); - - List<String> currentActiveProcessors = testZkUtils.getSortedActiveProcessors(); + leaderElector1.tryBecomeLeader(new LeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader1.res = true; + } + }); + leaderElector2.tryBecomeLeader(new LeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader2.res = true; + } + }); + leaderElector3.tryBecomeLeader(new LeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader3.res = true; + } + }); + Assert.assertTrue(TestZkUtils.testWithDelayBackOff(() -> isLeader1.res, 2, 100)); + Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100)); + Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100)); + + List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessors(); Assert.assertEquals(3, currentActiveProcessors.size()); zkUtils2.close(); @@ -453,7 +483,7 @@ public class TestZkLeaderElector { } Assert.assertEquals(1, count.get()); - Assert.assertEquals(currentActiveProcessors, testZkUtils.getSortedActiveProcessors()); + Assert.assertEquals(currentActiveProcessors, zkUtils1.getSortedActiveProcessors()); // Clean up zkUtils1.close(); @@ -465,43 +495,43 @@ public class TestZkLeaderElector { BooleanResult isLeader1 = new BooleanResult(); BooleanResult isLeader2 = new BooleanResult(); // Processor-1 - ZkLeaderElector leaderElector1 = new ZkLeaderElector( - "1", - getZkUtilsWithNewClient(), - new ZkLeaderElector.ZkLeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader1.res = true; - } - }); + + ZkUtils zkUtils1 = getZkUtilsWithNewClient("1"); + ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1, null); // Processor-2 - ZkLeaderElector leaderElector2 = new ZkLeaderElector( - "2", - getZkUtilsWithNewClient(), - new ZkLeaderElector.ZkLeaderElectorListener() { - @Override - public void onBecomingLeader() { - isLeader2.res = true; - } - }); + ZkUtils zkUtils2 = getZkUtilsWithNewClient("2"); + ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2, null); // Before Leader Election Assert.assertFalse(leaderElector1.amILeader()); Assert.assertFalse(leaderElector2.amILeader()); - leaderElector1.tryBecomeLeader(); - leaderElector2.tryBecomeLeader(); + leaderElector1.tryBecomeLeader(new LeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader1.res = true; + } + }); + leaderElector2.tryBecomeLeader(new LeaderElectorListener() { + @Override + public void onBecomingLeader() { + isLeader2.res = true; + } + }); // After Leader Election Assert.assertTrue(leaderElector1.amILeader()); Assert.assertFalse(leaderElector2.amILeader()); + + zkUtils1.close(); + zkUtils2.close(); } - private ZkUtils getZkUtilsWithNewClient() { + private ZkUtils getZkUtilsWithNewClient(String processorId) { ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS); return new ZkUtils( - "processorId1", + processorId, KEY_BUILDER, ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS), CONNECTION_TIMEOUT_MS); http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java new file mode 100644 index 0000000..ec7e830 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.zk; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.samza.config.Config; +import org.apache.samza.config.MapConfig; +import org.apache.samza.config.ZkConfig; +import org.apache.samza.coordinator.CoordinationUtils; +import org.apache.samza.coordinator.CoordinationServiceFactory; +import org.apache.samza.coordinator.Latch; +import org.apache.samza.testUtils.EmbeddedZookeeper; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + + + +public class TestZkProcessorLatch { + private static EmbeddedZookeeper zkServer = null; + private static String zkConnectionString; + private final CoordinationServiceFactory factory = new ZkCoordinationServiceFactory(); + private CoordinationUtils coordinationUtils; + + @BeforeClass + public static void setup() throws InterruptedException { + zkServer = new EmbeddedZookeeper(); + zkServer.setup(); + + zkConnectionString = "localhost:" + zkServer.getPort(); + System.out.println("ZK port = " + zkServer.getPort()); + } + + @Before + public void testSetup() { + String groupId = "group1"; + String processorId = "p1"; + Map<String, String> map = new HashMap<>(); + map.put(ZkConfig.ZK_CONNECT, zkConnectionString); + Config config = new MapConfig(map); + + + coordinationUtils = factory.getCoordinationService(groupId, processorId, config); + coordinationUtils.reset(); + } + + @After + public void testTearDown() { + } + + @AfterClass + public static void teardown() { + zkServer.teardown(); + } + + @Test + public void testSingleLatch1() { + System.out.println("Started 1"); + int latchSize = 1; + String latchId = "l2"; + ExecutorService pool = Executors.newFixedThreadPool(3); + Future f1 = pool.submit( + () -> { + Latch latch = coordinationUtils.getLatch(latchSize, latchId); + latch.countDown(); + try { + latch.await(100000, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + Assert.fail("await timed out. " + e.getLocalizedMessage()); + } + }); + + try { + f1.get(30000, TimeUnit.MILLISECONDS); + } catch (Exception e) { + Assert.fail("failed to get future." + e.getLocalizedMessage()); + } + pool.shutdownNow(); + } + + @Test + public void testSingleLatch2() { + System.out.println("Started 1"); + int latchSize = 1; + String latchId = "l2"; + + ExecutorService pool = Executors.newFixedThreadPool(3); + Future f1 = pool.submit( + () -> { + Latch latch = coordinationUtils.getLatch(latchSize, latchId); + //latch.countDown(); only one thread counts down + try { + latch.await(100000, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + Assert.fail("await timed out. " + e.getLocalizedMessage()); + } + }); + + Future f2 = pool.submit( + () -> { + Latch latch = coordinationUtils.getLatch(latchSize, latchId); + latch.countDown(); + try { + latch.await(100000, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + Assert.fail("await timed out. " + e.getLocalizedMessage()); + } + }); + + try { + f1.get(30000, TimeUnit.MILLISECONDS); + f2.get(30000, TimeUnit.MILLISECONDS); + } catch (Exception e) { + Assert.fail("failed to get future." + e.getLocalizedMessage()); + } + pool.shutdownNow(); + } + + @Test + public void testNSizeLatch() { + System.out.println("Started N"); + String latchId = "l1"; + int latchSize = 3; + + ExecutorService pool = Executors.newFixedThreadPool(3); + Future f1 = pool.submit( + () -> { + Latch latch = coordinationUtils.getLatch(latchSize, latchId); + latch.countDown(); + try { + latch.await(100000, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + Assert.fail("await timed out " + e.getLocalizedMessage()); + } + }); + Future f2 = pool.submit( + () -> { + Latch latch = coordinationUtils.getLatch(latchSize, latchId); + latch.countDown(); + try { + latch.await(100000, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + Assert.fail("await timed out. " + e.getLocalizedMessage()); + } + }); + Future f3 = pool.submit( + () -> { + Latch latch = coordinationUtils.getLatch(latchSize, latchId); + latch.countDown(); + try { + latch.await(100000, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + Assert.fail("await timed out. " + e.getLocalizedMessage()); + } + }); + + try { + f1.get(300, TimeUnit.MILLISECONDS); + f2.get(300, TimeUnit.MILLISECONDS); + f3.get(300, TimeUnit.MILLISECONDS); + } catch (Exception e) { + Assert.fail("failed to get future. " + e.getLocalizedMessage()); + } + } + + @Test + public void testLatchExpires() { + System.out.println("Started expiring"); + String latchId = "l4"; + + int latchSize = 3; + + ExecutorService pool = Executors.newFixedThreadPool(3); + Future f1 = pool.submit( + () -> { + Latch latch = coordinationUtils.getLatch(latchSize, latchId); + latch.countDown(); + try { + latch.await(100000, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + Assert.fail("await timed out. " + e.getLocalizedMessage()); + } + }); + Future f2 = pool.submit( + () -> { + Latch latch = coordinationUtils.getLatch(latchSize, latchId); + latch.countDown(); + try { + latch.await(100000, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + Assert.fail("await timed out. " + e.getLocalizedMessage()); + } + }); + Future f3 = pool.submit( + () -> { + Latch latch = coordinationUtils.getLatch(latchSize, latchId); + // This processor never completes its task + //latch.countDown(); + try { + latch.await(100000, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + Assert.fail("await timed out. " + e.getLocalizedMessage()); + } + }); + + try { + f1.get(300, TimeUnit.MILLISECONDS); + f2.get(300, TimeUnit.MILLISECONDS); + f3.get(300, TimeUnit.MILLISECONDS); + Assert.fail("Latch should've timeout."); + } catch (Exception e) { + f1.cancel(true); + f2.cancel(true); + f3.cancel(true); + // expected + } + pool.shutdownNow(); + } + + @Test + public void testSingleCountdown() { + System.out.println("Started single countdown"); + String latchId = "l1"; + int latchSize = 3; + + ExecutorService pool = Executors.newFixedThreadPool(3); + // Only one thread invokes countDown + Future f1 = pool.submit( + () -> { + Latch latch = coordinationUtils.getLatch(latchSize, latchId); + latch.countDown(); + TestZkUtils.sleepMs(100); + latch.countDown(); + TestZkUtils.sleepMs(100); + latch.countDown(); + try { + latch.await(100000, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + Assert.fail("await timed out. " + e.getLocalizedMessage()); + } + }); + Future f2 = pool.submit( + () -> { + Latch latch = coordinationUtils.getLatch(latchSize, latchId); + try { + latch.await(100000, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + Assert.fail("await timed out. " + e.getLocalizedMessage()); + } + }); + Future f3 = pool.submit( + () -> { + Latch latch = coordinationUtils.getLatch(latchSize, latchId); + try { + latch.await(100000, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + Assert.fail("await timed out. " + e.getLocalizedMessage()); + } + }); + try { + f1.get(600, TimeUnit.MILLISECONDS); + f2.get(600, TimeUnit.MILLISECONDS); + f3.get(600, TimeUnit.MILLISECONDS); + } catch (Exception e) { + Assert.fail("Failed to get."); + } + pool.shutdownNow(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java index a1ad363..2c44aea 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestStreamProcessor.java @@ -19,6 +19,16 @@ package org.apache.samza.test.processor; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import kafka.utils.TestUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -28,6 +38,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.MapConfig; +import org.apache.samza.config.ZkConfig; import org.apache.samza.processor.StreamProcessor; import org.apache.samza.task.AsyncStreamTaskAdapter; import org.apache.samza.task.AsyncStreamTaskFactory; @@ -37,17 +48,6 @@ import org.apache.samza.test.StandaloneTestUtils; import org.junit.Assert; import org.junit.Test; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - import static org.apache.samza.test.processor.IdentityStreamTask.endLatch; public class TestStreamProcessor extends StandaloneIntegrationTestHarness { @@ -144,21 +144,14 @@ public class TestStreamProcessor extends StandaloneIntegrationTestHarness { private Map<String, String> createConfigs(String testSystem, String inputTopic, String outputTopic, int messageCount) { Map<String, String> configs = new HashMap<>(); configs.putAll( - StandaloneTestUtils.getStandaloneConfigs( - "test-job", - "org.apache.samza.test.processor.IdentityStreamTask")); - configs.putAll( - StandaloneTestUtils.getKafkaSystemConfigs( - testSystem, - bootstrapServers(), - zkConnect(), - null, - StandaloneTestUtils.SerdeAlias.STRING, - true)); + StandaloneTestUtils.getStandaloneConfigs("test-job", "org.apache.samza.test.processor.IdentityStreamTask")); + configs.putAll(StandaloneTestUtils.getKafkaSystemConfigs(testSystem, bootstrapServers(), zkConnect(), null, + StandaloneTestUtils.SerdeAlias.STRING, true)); configs.put("task.inputs", String.format("%s.%s", testSystem, inputTopic)); configs.put("app.messageCount", String.valueOf(messageCount)); configs.put("app.outputTopic", outputTopic); configs.put("app.outputSystem", testSystem); + configs.put(ZkConfig.ZK_CONNECT, zkConnect()); return configs; } http://git-wip-us.apache.org/repos/asf/samza/blob/553ce33b/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index 5de30d8..417ada4 100644 --- a/settings.gradle +++ b/settings.gradle @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + include \ 'samza-api', 'samza-elasticsearch',
