Repository: samza
Updated Branches:
  refs/heads/master b0ce5db9b -> 8f8e7bcb9


SAMZA-1621: Delete the ephemeral processor node in StreamProcessor shutdown 
phase.

Author: Shanthoosh Venkataraman <[email protected]>
Author: Shanthoosh Venkataraman <[email protected]>

Reviewers: Jagadish <[email protected]>

Closes #451 from shanthoosh/SAMZA-1621


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8f8e7bcb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8f8e7bcb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8f8e7bcb

Branch: refs/heads/master
Commit: 8f8e7bcb9edaaaa67bd12b5ecd42269bfa6d90a4
Parents: b0ce5db
Author: Shanthoosh Venkataraman <[email protected]>
Authored: Tue May 22 15:19:55 2018 -0700
Committer: Jagadish <[email protected]>
Committed: Tue May 22 15:19:55 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/zk/ZkControllerImpl.java   | 20 +++----
 .../samza/zk/ZkCoordinationUtilsFactory.java    |  3 +-
 .../samza/zk/ZkJobCoordinatorFactory.java       |  2 +-
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 58 ++++++++++++++++----
 .../zk/TestZkBarrierForVersionUpgrade.java      |  4 +-
 .../apache/samza/zk/TestZkLeaderElector.java    |  2 +-
 .../apache/samza/zk/TestZkProcessorLatch.java   |  6 +-
 .../java/org/apache/samza/zk/TestZkUtils.java   | 27 +++++++--
 .../processor/TestZkLocalApplicationRunner.java |  3 +-
 9 files changed, 89 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
index bdbdcbc..87d7177 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -29,14 +29,14 @@ import java.util.List;
 public class ZkControllerImpl implements ZkController {
   private static final Logger LOG = 
LoggerFactory.getLogger(ZkControllerImpl.class);
 
-  private final String processorIdStr;
+  private final String processorId;
   private final ZkUtils zkUtils;
   private final ZkControllerListener zkControllerListener;
   private final LeaderElector zkLeaderElector;
 
-  public ZkControllerImpl(String processorIdStr, ZkUtils zkUtils,
+  public ZkControllerImpl(String processorId, ZkUtils zkUtils,
       ZkControllerListener zkControllerListener, LeaderElector 
zkLeaderElector) {
-    this.processorIdStr = processorIdStr;
+    this.processorId = processorId;
     this.zkUtils = zkUtils;
     this.zkControllerListener = zkControllerListener;
     this.zkLeaderElector = zkLeaderElector;
@@ -83,12 +83,12 @@ public class ZkControllerImpl implements ZkController {
 
   @Override
   public void stop() {
-    if (isLeader()) {
-      zkLeaderElector.resignLeadership();
-    }
-
-    // close zk connection
-    if (zkUtils != null) {
+    try {
+      if (isLeader()) {
+        zkLeaderElector.resignLeadership();
+      }
+    } finally {
+      zkUtils.deleteProcessorNode(processorId);
       zkUtils.close();
     }
   }
@@ -147,7 +147,7 @@ public class ZkControllerImpl implements ZkController {
       if (notAValidEvent())
         return;
 
-      LOG.info("pid=" + processorIdStr + ". Got notification on version update 
change. path=" + dataPath + "; data="
+      LOG.info("pid=" + processorId + ". Got notification on version update 
change. path=" + dataPath + "; data="
           + data);
       zkControllerListener.onNewJobModelAvailable((String) data);
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
index 072a2f5..0cae225 100644
--- 
a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
+++ 
b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
@@ -41,7 +41,8 @@ public class ZkCoordinationUtilsFactory implements 
CoordinationUtilsFactory {
     ZkClient zkClient =
         createZkClient(zkConfig.getZkConnect(), 
zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
 
-    ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, 
zkConfig.getZkConnectionTimeoutMs(), new NoOpMetricsRegistry());
+    ZkUtils zkUtils = new ZkUtils(new ZkKeyBuilder(groupId), zkClient, 
zkConfig.getZkConnectionTimeoutMs(), zkConfig.getZkSessionTimeoutMs(),
+        new NoOpMetricsRegistry());
     return new ZkCoordinationUtils(participantId, zkConfig, zkUtils);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index c967a21..6888df0 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -58,7 +58,7 @@ public class ZkJobCoordinatorFactory implements 
JobCoordinatorFactory {
     ZkKeyBuilder keyBuilder = new 
ZkKeyBuilder(getJobCoordinationZkPath(config));
     ZkClient zkClient = ZkCoordinationUtilsFactory
         .createZkClient(zkConfig.getZkConnect(), 
zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
-    return new ZkUtils(keyBuilder, zkClient, 
zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
+    return new ZkUtils(keyBuilder, zkClient, 
zkConfig.getZkConnectionTimeoutMs(), zkConfig.getZkSessionTimeoutMs(), 
metricsRegistry);
   }
 
   public static String getJobCoordinationZkPath(Config config) {

http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 6511603..fead167 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -84,6 +84,7 @@ public class ZkUtils {
   private final int connectionTimeoutMs;
   private final AtomicInteger currentGeneration;
   private final ZkUtilsMetrics metrics;
+  private final int sessionTimeoutMs;
 
   public void incGeneration() {
     currentGeneration.incrementAndGet();
@@ -93,12 +94,13 @@ public class ZkUtils {
     return currentGeneration.get();
   }
 
-  public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int 
connectionTimeoutMs, MetricsRegistry metricsRegistry) {
+  public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int 
connectionTimeoutMs, int sessionTimeOutMs, MetricsRegistry metricsRegistry) {
     this.keyBuilder = zkKeyBuilder;
     this.connectionTimeoutMs = connectionTimeoutMs;
     this.zkClient = zkClient;
     this.metrics = new ZkUtilsMetrics(metricsRegistry);
     this.currentGeneration = new AtomicInteger(0);
+    this.sessionTimeoutMs = sessionTimeOutMs;
   }
 
   public void connect() throws ZkInterruptedException {
@@ -110,7 +112,7 @@ public class ZkUtils {
   }
 
   // reset all zk-session specific state
-  public void unregister() {
+  public synchronized void unregister() {
     ephemeralPath = null;
   }
 
@@ -132,17 +134,29 @@ public class ZkUtils {
    * @return String representing the absolute ephemeralPath of this client in 
the current session
    */
   public synchronized String registerProcessorAndGetId(final ProcessorData 
data) {
+    final long startTimeMs = System.currentTimeMillis();
+    final long retryTimeOutMs = 2 * sessionTimeoutMs;
     String processorId = data.getProcessorId();
     if (ephemeralPath == null) {
       ephemeralPath = 
zkClient.createEphemeralSequential(keyBuilder.getProcessorsPath() + "/", 
data.toString());
       LOG.info("Created ephemeral path: {} for processor: {} in zookeeper.", 
ephemeralPath, data);
-      ProcessorNode processorNode = new ProcessorNode(data, ephemeralPath);
-      // Determine if there are duplicate processors with this.processorId 
after registration.
-      if (!isValidRegisteredProcessor(processorNode)) {
-        LOG.info("Processor: {} is duplicate. Deleting zookeeper node at path: 
{}.", processorId, ephemeralPath);
-        zkClient.delete(ephemeralPath);
-        metrics.deletions.inc();
-        throw new SamzaException(String.format("Processor: %s is duplicate in 
the group. Registration failed.", processorId));
+      while (true) {
+        ProcessorNode processorNode = new ProcessorNode(data, ephemeralPath);
+        // Determine if there are duplicate processors with this.processorId 
after registration.
+        if (!isValidRegisteredProcessor(processorNode)) {
+          long currentTimeMs = System.currentTimeMillis();
+          if ((currentTimeMs - startTimeMs) < retryTimeOutMs) {
+            LOG.info("Processor: {} is duplicate. Retrying registration 
again.", processorId);
+            timeDelay(5000);
+          } else {
+            LOG.info("Processor: {} is duplicate. Deleting zookeeper node at 
path: {}.", processorId, ephemeralPath);
+            zkClient.delete(ephemeralPath);
+            metrics.deletions.inc();
+            throw new SamzaException(String.format("Processor: %s is duplicate 
in the group. Registration failed.", processorId));
+          }
+        } else {
+          break;
+        }
       }
     } else {
       LOG.info("Ephemeral path: {} exists for processor: {} in zookeeper.", 
ephemeralPath, data);
@@ -150,6 +164,30 @@ public class ZkUtils {
     return ephemeralPath;
   }
 
+  public void timeDelay(int sleepTimeInMillis) {
+    try {
+      Thread.sleep(sleepTimeInMillis);
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted exception on wait.", e);
+      Thread.interrupted();
+    }
+  }
+
+  /**
+   * Deletes the ephemeral node of a processor in zookeeper.
+   * @param processorId uniqueId identifying the stream processor to delete.
+   */
+  public synchronized void deleteProcessorNode(String processorId) {
+    try {
+      if (ephemeralPath != null) {
+        LOG.info("Deleting the ephemeral node: {} of the processor: {} in 
zookeeper.", ephemeralPath, processorId);
+        zkClient.delete(ephemeralPath);
+      }
+    } catch (Exception e) {
+      LOG.error("Exception occurred on deletion of ephemeral node: {}.", 
ephemeralPath, e);
+    }
+  }
+
   /**
    * Determines the validity of processor registered with zookeeper.
    *
@@ -291,7 +329,7 @@ public class ZkUtils {
     return zkClient.exists(path);
   }
 
-  public void close() throws ZkInterruptedException {
+  public void close() {
     try {
       zkClient.close();
     } catch (ZkInterruptedException e) {

http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index bd84b57..592470a 100644
--- 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -56,9 +56,9 @@ public class TestZkBarrierForVersionUpgrade {
   @Before
   public void testSetup() {
     ZkClient zkClient = new ZkClient(testZkConnectionString, 
ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
-    this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, 
ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
+    this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, 
ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, 
new NoOpMetricsRegistry());
     ZkClient zkClient1 = new ZkClient(testZkConnectionString, 
ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
-    this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, 
ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
+    this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, 
ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, 
new NoOpMetricsRegistry());
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 74b9abd..52f30d8 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -438,6 +438,6 @@ public class TestZkLeaderElector {
     return new ZkUtils(
         KEY_BUILDER,
         zkClient,
-        CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
+        CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
index 674287b..7876679 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkProcessorLatch.java
@@ -217,10 +217,6 @@ public class TestZkProcessorLatch {
   private ZkUtils getZkUtilsWithNewClient(String processorId) {
     ZkClient zkClient = ZkCoordinationUtilsFactory
         .createZkClient(testZkConnectionString, SESSION_TIMEOUT_MS, 
CONNECTION_TIMEOUT_MS);
-    return new ZkUtils(
-        KEY_BUILDER,
-        zkClient,
-        CONNECTION_TIMEOUT_MS,
-        new NoOpMetricsRegistry());
+    return new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, 
SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 1d6ff86..e49dc13 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -101,7 +101,7 @@ public class TestZkUtils {
   }
 
   private ZkUtils getZkUtils() {
-    return new ZkUtils(KEY_BUILDER, zkClient,
+    return new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS,
                        SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
   }
 
@@ -192,7 +192,7 @@ public class TestZkUtils {
     zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1"));
     List<String> l = zkUtils.getSortedActiveProcessorsIDs();
     Assert.assertEquals(1, l.size());
-    new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, new 
NoOpMetricsRegistry()).registerProcessorAndGetId(
+    new ZkUtils(KEY_BUILDER, zkClient, CONNECTION_TIMEOUT_MS, 
SESSION_TIMEOUT_MS, new NoOpMetricsRegistry()).registerProcessorAndGetId(
         new ProcessorData("host2", "2"));
     l = zkUtils.getSortedActiveProcessorsIDs();
     Assert.assertEquals(2, l.size());
@@ -460,12 +460,29 @@ public class TestZkUtils {
     Assert.assertEquals("3", 
zkUtils.getNextJobModelVersion(zkUtils.getJobModelVersion()));
   }
 
+  @Test
+  public void testDeleteProcessorNodeShouldDeleteTheCorrectProcessorNode() {
+    String testProcessorId1 = "processorId1";
+    String testProcessorId2 = "processorId2";
+
+    ZkUtils zkUtils = getZkUtils();
+    ZkUtils zkUtils1 = getZkUtils();
+
+    zkUtils.registerProcessorAndGetId(new ProcessorData("host1", 
testProcessorId1));
+    zkUtils1.registerProcessorAndGetId(new ProcessorData("host2", 
testProcessorId2));
+
+    zkUtils.deleteProcessorNode(testProcessorId1);
+
+    List<String> expectedProcessors = ImmutableList.of(testProcessorId2);
+    List<String> actualProcessors = zkUtils.getSortedActiveProcessorsIDs();
+
+    Assert.assertEquals(expectedProcessors, actualProcessors);
+  }
 
   @Test
   public void testCloseShouldRetryOnceOnInterruptedException() {
     ZkClient zkClient = Mockito.mock(ZkClient.class);
-    ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient,
-        SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+    ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, 
CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
 
     Mockito.doThrow(new ZkInterruptedException(new InterruptedException()))
            .doAnswer(invocation -> null)
@@ -481,7 +498,7 @@ public class TestZkUtils {
     CountDownLatch latch = new CountDownLatch(1);
     // Establish connection with the zookeeper server.
     ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.getPort());
-    ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, 
new NoOpMetricsRegistry());
+    ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, 
CONNECTION_TIMEOUT_MS, SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
 
     Thread threadToInterrupt = new Thread(() -> {
         try {

http://git-wip-us.apache.org/repos/asf/samza/blob/8f8e7bcb/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index e7dff83..9d2cd92 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -84,6 +84,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
 
   private static final int NUM_KAFKA_EVENTS = 300;
   private static final int ZK_CONNECTION_TIMEOUT_MS = 5000;
+  private static final int ZK_SESSION_TIMEOUT_MS = 10000;
   private static final String TEST_SYSTEM = "TestSystemName";
   private static final String TEST_SSP_GROUPER_FACTORY = 
"org.apache.samza.container.grouper.stream.GroupByPartitionFactory";
   private static final String TEST_TASK_GROUPER_FACTORY = 
"org.apache.samza.container.grouper.task.GroupByContainerIdsFactory";
@@ -137,7 +138,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
 
     ZkClient zkClient = new ZkClient(zkConnect());
     ZkKeyBuilder zkKeyBuilder = new 
ZkKeyBuilder(ZkJobCoordinatorFactory.getJobCoordinationZkPath(applicationConfig1));
-    zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, 
new NoOpMetricsRegistry());
+    zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, 
ZK_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
     zkUtils.connect();
 
     // Create local application runners.

Reply via email to