Repository: samza Updated Branches: refs/heads/master b0ce5db9b -> 8f8e7bcb9
SAMZA-1621: Delete the ephemeral processor node in StreamProcessor shutdown phase. Author: Shanthoosh Venkataraman <[email protected]> Author: Shanthoosh Venkataraman <[email protected]> Reviewers: Jagadish <[email protected]> Closes #451 from shanthoosh/SAMZA-1621 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8f8e7bcb Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8f8e7bcb Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8f8e7bcb Branch: refs/heads/master Commit: 8f8e7bcb9edaaaa67bd12b5ecd42269bfa6d90a4 Parents: b0ce5db Author: Shanthoosh Venkataraman <[email protected]> Authored: Tue May 22 15:19:55 2018 -0700 Committer: Jagadish <[email protected]> Committed: Tue May 22 15:19:55 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/zk/ZkControllerImpl.java | 20 +++---- .../samza/zk/ZkCoordinationUtilsFactory.java | 3 +- .../samza/zk/ZkJobCoordinatorFactory.java | 2 +- .../main/java/org/apache/samza/zk/ZkUtils.java | 58 ++++++++++++++++---- .../zk/TestZkBarrierForVersionUpgrade.java | 4 +- .../apache/samza/zk/TestZkLeaderElector.java | 2 +- .../apache/samza/zk/TestZkProcessorLatch.java | 6 +- .../java/org/apache/samza/zk/TestZkUtils.java | 27 +++++++-- .../processor/TestZkLocalApplicationRunner.java | 3 +- 9 files changed, 89 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java index bdbdcbc..87d7177 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java @@ -29,14 +29,14 @@ import java.util.List; public class ZkControllerImpl implements ZkController { private static final Logger LOG = LoggerFactory.getLogger(ZkControllerImpl.class); - private final String processorIdStr; + private final String processorId; private final ZkUtils zkUtils; private final ZkControllerListener zkControllerListener; private final LeaderElector zkLeaderElector; - public ZkControllerImpl(String processorIdStr, ZkUtils zkUtils, + public ZkControllerImpl(String processorId, ZkUtils zkUtils, ZkControllerListener zkControllerListener, LeaderElector zkLeaderElector) { - this.processorIdStr = processorIdStr; + this.processorId = processorId; this.zkUtils = zkUtils; this.zkControllerListener = zkControllerListener; this.zkLeaderElector = zkLeaderElector; @@ -83,12 +83,12 @@ public class ZkControllerImpl implements ZkController { @Override public void stop() { - if (isLeader()) { - zkLeaderElector.resignLeadership(); - } - - // close zk connection - if (zkUtils != null) { + try { + if (isLeader()) { + zkLeaderElector.resignLeadership(); + } + } finally { + zkUtils.deleteProcessorNode(processorId); zkUtils.close(); } } @@ -147,7 +147,7 @@ public class ZkControllerImpl implements ZkController { if (notAValidEvent()) return; - LOG.info("pid=" + processorIdStr + ". Got notification on version update change. path=" + dataPath + "; data=" + LOG.info("pid=" + processorId + ". Got notification on version update change. path=" + dataPath + "; data=" + data); zkControllerListener.onNewJobModelAvailable((String) data); } http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java index 072a2f5..0cae225 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java @@ -41,7 +41,8 @@ public class ZkCoordinationUtilsFactory implements CoordinationUtilsFactory { ZkClient zkClient = createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); - ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs(), new NoOpMetricsRegistry()); + ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, zkConfig.getZkConnectionTimeoutMs(), zkConfig.getZkSessionTimeoutMs(), + new NoOpMetricsRegistry()); return new ZkCoordinationUtils(participantId, zkConfig, zkUtils); } http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java index c967a21..6888df0 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java @@ -58,7 +58,7 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory { ZkKeyBuilder keyBuilder = new ZkKeyBuilder(getJobCoordinationZkPath(config)); ZkClient zkClient = ZkCoordinationUtilsFactory .createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs()); - return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), metricsRegistry); + return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), zkConfig.getZkSessionTimeoutMs(), metricsRegistry); } public static String getJobCoordinationZkPath(Config config) { http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java index 6511603..fead167 100644 --- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java +++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java @@ -84,6 +84,7 @@ public class ZkUtils { private final int connectionTimeoutMs; private final AtomicInteger currentGeneration; private final ZkUtilsMetrics metrics; + private final int sessionTimeoutMs; public void incGeneration() { currentGeneration.incrementAndGet(); @@ -93,12 +94,13 @@ public class ZkUtils { return currentGeneration.get(); } - public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, MetricsRegistry metricsRegistry) { + public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, int sessionTimeOutMs, MetricsRegistry metricsRegistry) { this.keyBuilder = zkKeyBuilder; this.connectionTimeoutMs = connectionTimeoutMs; this.zkClient = zkClient; this.metrics = new ZkUtilsMetrics(metricsRegistry); this.currentGeneration = new AtomicInteger(0); + this.sessionTimeoutMs = sessionTimeOutMs; } public void connect() throws ZkInterruptedException { @@ -110,7 +112,7 @@ public class ZkUtils { } // reset all zk-session specific state - public void unregister() { + public synchronized void unregister() { ephemeralPath = null; } @@ -132,17 +134,29 @@ public class ZkUtils { * @return String representing the absolute ephemeralPath of this client in the current session */ public synchronized String registerProcessorAndGetId(final ProcessorData data) { + final long startTimeMs = System.currentTimeMillis(); + final long retryTimeOutMs = 2 * sessionTimeoutMs; String processorId = data.getProcessorId(); if (ephemeralPath == null) { ephemeralPath = zkClient.createEphemeralSequential(keyBuilder.getProcessorsPath() + "/", data.toString()); LOG.info("Created ephemeral path: {} for processor: {} in zookeeper.", ephemeralPath, data); - ProcessorNode processorNode = new ProcessorNode(data, ephemeralPath); - // Determine if there are duplicate processors with this.processorId after registration. - if (!isValidRegisteredProcessor(processorNode)) { - LOG.info("Processor: {} is duplicate. Deleting zookeeper node at path: {}.", processorId, ephemeralPath); - zkClient.delete(ephemeralPath); - metrics.deletions.inc(); - throw new SamzaException(String.format("Processor: %s is duplicate in the group. Registration failed.", processorId)); + while (true) { + ProcessorNode processorNode = new ProcessorNode(data, ephemeralPath); + // Determine if there are duplicate processors with this.processorId after registration. + if (!isValidRegisteredProcessor(processorNode)) { + long currentTimeMs = System.currentTimeMillis(); + if ((currentTimeMs - startTimeMs) < retryTimeOutMs) { + LOG.info("Processor: {} is duplicate. Retrying registration again.", processorId); + timeDelay(5000); + } else { + LOG.info("Processor: {} is duplicate. Deleting zookeeper node at path: {}.", processorId, ephemeralPath); + zkClient.delete(ephemeralPath); + metrics.deletions.inc(); + throw new SamzaException(String.format("Processor: %s is duplicate in the group. Registration failed.", processorId)); + } + } else { + break; + } } } else { LOG.info("Ephemeral path: {} exists for processor: {} in zookeeper.", ephemeralPath, data); @@ -150,6 +164,30 @@ public class ZkUtils { return ephemeralPath; } + public void timeDelay(int sleepTimeInMillis) { + try { + Thread.sleep(sleepTimeInMillis); + } catch (InterruptedException e) { + LOG.error("Interrupted exception on wait.", e); + Thread.interrupted(); + } + } + + /** + * Deletes the ephemeral node of a processor in zookeeper. + * @param processorId uniqueId identifying the stream processor to delete. + */ + public synchronized void deleteProcessorNode(String processorId) { + try { + if (ephemeralPath != null) { + LOG.info("Deleting the ephemeral node: {} of the processor: {} in zookeeper.", ephemeralPath, processorId); + zkClient.delete(ephemeralPath); + } + } catch (Exception e) { + LOG.error("Exception occurred on deletion of ephemeral node: {}.", ephemeralPath, e); + } + } + /** * Determines the validity of processor registered with zookeeper. * @@ -291,7 +329,7 @@ public class ZkUtils { return zkClient.exists(path); } - public void close() throws ZkInterruptedException { + public void close() { try { zkClient.close(); } catch (ZkInterruptedException e) { http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java index bd84b57..592470a 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java @@ -56,9 +56,9 @@ public class TestZkBarrierForVersionUpgrade { @Before public void testSetup() { ZkClient zkClient = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS); - this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry()); + this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS); - this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry()); + this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); } @After http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java index 74b9abd..52f30d8 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java @@ -438,6 +438,6 @@ public class TestZkLeaderElector { return new ZkUtils( KEY_BUILDER, zkClient, - CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry()); + CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); } } http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java index 674287b..7876679 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java @@ -217,10 +217,6 @@ public class TestZkProcessorLatch { private ZkUtils getZkUtilsWithNewClient(String processorId) { ZkClient zkClient = ZkCoordinationUtilsFactory .createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, CONNECTION_TIMEOUT_MS); - return new ZkUtils( - KEY_BUILDER, - zkClient, - CONNECTION_TIMEOUT_MS, - new NoOpMetricsRegistry()); + return new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); } } http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java index 1d6ff86..e49dc13 100644 --- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java +++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java @@ -101,7 +101,7 @@ public class TestZkUtils { } private ZkUtils getZkUtils() { - return new ZkUtils(KEY_BUILDER, zkClient, + return new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); } @@ -192,7 +192,7 @@ public class TestZkUtils { zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1")); List<String> l = zkUtils.getSortedActiveProcessorsIDs(); Assert.assertEquals(1, l.size()); - new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId( + new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId( new ProcessorData("host2", "2")); l = zkUtils.getSortedActiveProcessorsIDs(); Assert.assertEquals(2, l.size()); @@ -460,12 +460,29 @@ public class TestZkUtils { Assert.assertEquals("3", zkUtils.getNextJobModelVersion(zkUtils.getJobModelVersion())); } + @Test + public void testDeleteProcessorNodeShouldDeleteTheCorrectProcessorNode() { + String testProcessorId1 = "processorId1"; + String testProcessorId2 = "processorId2"; + + ZkUtils zkUtils = getZkUtils(); + ZkUtils zkUtils1 = getZkUtils(); + + zkUtils.registerProcessorAndGetId(new ProcessorData("host1", testProcessorId1)); + zkUtils1.registerProcessorAndGetId(new ProcessorData("host2", testProcessorId2)); + + zkUtils.deleteProcessorNode(testProcessorId1); + + List<String> expectedProcessors = ImmutableList.of(testProcessorId2); + List<String> actualProcessors = zkUtils.getSortedActiveProcessorsIDs(); + + Assert.assertEquals(expectedProcessors, actualProcessors); + } @Test public void testCloseShouldRetryOnceOnInterruptedException() { ZkClient zkClient = Mockito.mock(ZkClient.class); - ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, - SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); + ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); Mockito.doThrow(new ZkInterruptedException(new InterruptedException())) .doAnswer(invocation -> null) @@ -481,7 +498,7 @@ public class TestZkUtils { CountDownLatch latch = new CountDownLatch(1); // Establish connection with the zookeeper server. ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.getPort()); - ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); + ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); Thread threadToInterrupt = new Thread(() -> { try { http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index e7dff83..9d2cd92 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -84,6 +84,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne private static final int NUM_KAFKA_EVENTS = 300; private static final int ZK_CONNECTION_TIMEOUT_MS = 5000; + private static final int ZK_SESSION_TIMEOUT_MS = 10000; private static final String TEST_SYSTEM = "TestSystemName"; private static final String TEST_SSP_GROUPER_FACTORY = "org.apache.samza.container.grouper.stream.GroupByPartitionFactory"; private static final String TEST_TASK_GROUPER_FACTORY = "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory"; @@ -137,7 +138,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne ZkClient zkClient = new ZkClient(zkConnect()); ZkKeyBuilder zkKeyBuilder = new ZkKeyBuilder(ZkJobCoordinatorFactory.getJobCoordinationZkPath(applicationConfig1)); - zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry()); + zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, ZK_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()); zkUtils.connect(); // Create local application runners.
