Repository: samza
Updated Branches:
  refs/heads/master 7a2b4650c -> 72ad7523f


SAMZA-1712: Tear down ZookeeperServer connections in ZkClient on interrupts.

**Problem:**

If a thread executing zkClient.close is interrupted, currently we swallow the 
ZkInterruptedException and proceed without closing the zookeeper connection.

This leads to ephemeral nodes of StreamProcessor lurking around in zookeeper 
after StreamProcessor shutdown.

Users had to wait till zookeeper server session timeout for the ephemeral nodes 
to get deleted.

**Change:**

Retry once on InterruptedException when closing the zkClient.

Misc changes:
* Remove unnecessary null checks.
* Remove unnecessary typecasts.

Author: Shanthoosh Venkataraman <[email protected]>

Reviewers: Jagadish <[email protected]>

Closes #519 from shanthoosh/handle_interrupted_exception_in_zkclient_close


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/72ad7523
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/72ad7523
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/72ad7523

Branch: refs/heads/master
Commit: 72ad7523fffdcafdc01a0c6922fc94ccd1e482a5
Parents: 7a2b465
Author: Shanthoosh Venkataraman <[email protected]>
Authored: Fri May 18 17:29:00 2018 -0700
Committer: Jagadish <[email protected]>
Committed: Fri May 18 17:29:00 2018 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 42 ++++++--------
 .../org/apache/samza/zk/ZkUtilsMetrics.java     |  6 ++
 .../java/org/apache/samza/zk/TestZkUtils.java   | 59 +++++++++++++++++---
 3 files changed, 73 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/72ad7523/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 43f7d9c..6511603 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -78,7 +78,6 @@ public class ZkUtils {
   private static final Logger LOG = LoggerFactory.getLogger(ZkUtils.class);
   /* package private */static final String ZK_PROTOCOL_VERSION = "1.0";
 
-
   private final ZkClient zkClient;
   private volatile String ephemeralPath = null;
   private final ZkKeyBuilder keyBuilder;
@@ -105,9 +104,7 @@ public class ZkUtils {
   public void connect() throws ZkInterruptedException {
     boolean isConnected = zkClient.waitUntilConnected(connectionTimeoutMs, 
TimeUnit.MILLISECONDS);
     if (!isConnected) {
-      if (metrics != null) {
-        metrics.zkConnectionError.inc();
-      }
+      metrics.zkConnectionError.inc();
       throw new RuntimeException("Unable to connect to Zookeeper within 
connectionTimeout " + connectionTimeoutMs + "ms. Shutting down!");
     }
   }
@@ -144,6 +141,7 @@ public class ZkUtils {
       if (!isValidRegisteredProcessor(processorNode)) {
         LOG.info("Processor: {} is duplicate. Deleting zookeeper node at path: 
{}.", processorId, ephemeralPath);
         zkClient.delete(ephemeralPath);
+        metrics.deletions.inc();
         throw new SamzaException(String.format("Processor: %s is duplicate in 
the group. Registration failed.", processorId));
       }
     } else {
@@ -272,16 +270,12 @@ public class ZkUtils {
 
   public void subscribeDataChanges(String path, IZkDataListener dataListener) {
     zkClient.subscribeDataChanges(path, dataListener);
-    if (metrics != null) {
-      metrics.subscriptions.inc();
-    }
+    metrics.subscriptions.inc();
   }
 
   public void subscribeChildChanges(String path, IZkChildListener listener) {
     zkClient.subscribeChildChanges(path, listener);
-    if (metrics != null) {
-      metrics.subscriptions.inc();
-    }
+    metrics.subscriptions.inc();
   }
 
   public void unsubscribeChildChanges(String path, IZkChildListener 
childListener) {
@@ -290,9 +284,7 @@ public class ZkUtils {
 
   public void writeData(String path, Object object) {
     zkClient.writeData(path, object);
-    if (metrics != null) {
-      metrics.writes.inc();
-    }
+    metrics.writes.inc();
   }
 
   public boolean exists(String path) {
@@ -303,9 +295,10 @@ public class ZkUtils {
     try {
       zkClient.close();
     } catch (ZkInterruptedException e) {
-      // Swallowing due to occurrence in the last stage of lifecycle (Not 
actionable) and clear the interrupted status.
+      LOG.warn("Interrupted when closing zkClient. Clearing the interrupted 
status and retrying.", e);
       Thread.interrupted();
-      LOG.warn("Ignoring the exception when closing the zookeeper client.", e);
+      zkClient.close();
+      Thread.currentThread().interrupt();
     }
   }
 
@@ -366,9 +359,7 @@ public class ZkUtils {
   public void subscribeToJobModelVersionChange(GenIZkDataListener 
dataListener) {
     LOG.info(" subscribing for jm version change at:" + 
keyBuilder.getJobModelVersionPath());
     zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), 
dataListener);
-    if (metrics != null) {
-      metrics.subscriptions.inc();
-    }
+    metrics.subscriptions.inc();
   }
 
   /**
@@ -397,7 +388,7 @@ public class ZkUtils {
    * @return job model for this version
    */
   public JobModel getJobModel(String jobModelVersion) {
-    LOG.info("read the model ver=" + jobModelVersion + " from " + 
keyBuilder.getJobModelPath(jobModelVersion));
+    LOG.info("Read the model ver=" + jobModelVersion + " from " + 
keyBuilder.getJobModelPath(jobModelVersion));
     Object data = 
zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
     metrics.reads.inc();
     ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
@@ -450,7 +441,7 @@ public class ZkUtils {
    */
   public void publishJobModelVersion(String oldVersion, String newVersion) {
     Stat stat = new Stat();
-    String currentVersion = 
zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
+    String currentVersion = 
zkClient.readData(keyBuilder.getJobModelVersionPath(), stat);
     metrics.reads.inc();
     LOG.info("publishing new version: " + newVersion + "; oldVersion = " + 
oldVersion + "(" + stat
         .getVersion() + ")");
@@ -491,7 +482,7 @@ public class ZkUtils {
     }
     // if exists, verify the version
     Stat stat = new Stat();
-    String version = (String) zkClient.readData(rootPath, stat);
+    String version = zkClient.readData(rootPath, stat);
     if (version == null) {
       // for backward compatibility, if no value - assume 1.0
       try {
@@ -500,7 +491,7 @@ public class ZkUtils {
         // if the write failed with ZkBadVersionException it means someone 
else already wrote a version, so we can ignore it.
       }
       // re-read the updated version
-      version = (String) zkClient.readData(rootPath);
+      version = zkClient.readData(rootPath);
     }
     LOG.info("Current version for zk root node: " + rootPath + " is " + 
version + ", expected version is " + ZK_PROTOCOL_VERSION);
     if (!version.equals(ZK_PROTOCOL_VERSION)) {
@@ -525,7 +516,7 @@ public class ZkUtils {
    * @param listener - will be called when a processor is added or removed.
    */
   public void subscribeToProcessorChange(IZkChildListener listener) {
-    LOG.info("subscribing for child change at:" + 
keyBuilder.getProcessorsPath());
+    LOG.info("Subscribing for child change at:" + 
keyBuilder.getProcessorsPath());
     zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
     metrics.subscriptions.inc();
   }
@@ -542,7 +533,7 @@ public class ZkUtils {
   void deleteOldJobModels(int numVersionsToLeave) {
     // read current list of JMs
     String path = keyBuilder.getJobModelPathPrefix();
-    LOG.info("about to delete jm path=" + path);
+    LOG.info("About to delete jm path=" + path);
     List<String> znodeIds = zkClient.getChildren(path);
     deleteOldVersionPath(path, znodeIds, numVersionsToLeave, new 
Comparator<String>() {
       @Override
@@ -556,7 +547,7 @@ public class ZkUtils {
   void deleteOldBarrierVersions(int numVersionsToLeave) {
     // read current list of barriers
     String path = keyBuilder.getJobModelVersionBarrierPrefix();
-    LOG.info("about to delete old barrier paths from " + path);
+    LOG.info("About to delete old barrier paths from " + path);
     List<String> znodeIds = zkClient.getChildren(path);
     LOG.info("List of all zkNodes: " + znodeIds);
     deleteOldVersionPath(path, znodeIds, numVersionsToLeave,  new 
Comparator<String>() {
@@ -584,6 +575,7 @@ public class ZkUtils {
         try {
           LOG.info("deleting " + pathToDelete);
           zkClient.deleteRecursive(pathToDelete);
+          metrics.deletions.inc();
         } catch (Exception e) {
           LOG.warn("delete of node " + pathToDelete + " failed.", e);
         }

http://git-wip-us.apache.org/repos/asf/samza/blob/72ad7523/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java
index b9f4aa8..335fbb1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtilsMetrics.java
@@ -46,10 +46,16 @@ public class ZkUtilsMetrics extends MetricsBase {
    */
   public final Counter zkConnectionError;
 
+  /**
+   * Number of zookeeper data node deletions.
+   */
+  public final Counter deletions;
+
   public ZkUtilsMetrics(MetricsRegistry metricsRegistry) {
     super(metricsRegistry);
     this.reads = newCounter("reads");
     this.writes = newCounter("writes");
+    this.deletions = newCounter("deletions");
     this.subscriptions = newCounter("subscriptions");
     this.zkConnectionError = newCounter("zk-connection-errors");
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/72ad7523/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 1dfb414..ee523aa 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -26,6 +26,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.BooleanSupplier;
 import com.google.common.collect.ImmutableList;
 import org.I0Itec.zkclient.IZkDataListener;
@@ -48,6 +49,7 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
 import org.mockito.Mockito;
 
 public class TestZkUtils {
@@ -62,6 +64,9 @@ public class TestZkUtils {
   // Declared public to honor junit contract.
   public final ExpectedException expectedException = ExpectedException.none();
 
+  @Rule
+  public Timeout testTimeOutInMillis = new Timeout(120000);
+
   @BeforeClass
   public static void setup() throws InterruptedException {
     zkServer = new EmbeddedZookeeper();
@@ -410,15 +415,6 @@ public class TestZkUtils {
 
   }
 
-  @Test
-  public void testCloseShouldNotThrowZkInterruptedExceptionToCaller() {
-    ZkClient zkClient = Mockito.mock(ZkClient.class);
-    ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient,
-            SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
-    Mockito.doThrow(new ZkInterruptedException(new 
InterruptedException())).when(zkClient).close();
-    zkUtils.close();
-  }
-
   public static boolean testWithDelayBackOff(BooleanSupplier cond, long 
startDelayMs, long maxDelayMs) {
     long delay = startDelayMs;
     while (delay < maxDelayMs) {
@@ -463,4 +459,49 @@ public class TestZkUtils {
     // Get on the JobModel version should return 2, taking into account the 
published version 2.
     Assert.assertEquals("3", 
zkUtils.getNextJobModelVersion(zkUtils.getJobModelVersion()));
   }
+
+
+  @Test
+  public void testCloseShouldRetryOnceOnInterruptedException() {
+    ZkClient zkClient = Mockito.mock(ZkClient.class);
+    ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient,
+        SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+
+    Mockito.doThrow(new ZkInterruptedException(new InterruptedException()))
+           .doAnswer(invocation -> null)
+           .when(zkClient).close();
+
+    zkUtils.close();
+
+    Mockito.verify(zkClient, Mockito.times(2)).close();
+  }
+
+  @Test
+  public void testCloseShouldTearDownZkConnectionOnInterruptedException() 
throws Exception {
+    CountDownLatch latch = new CountDownLatch(1);
+    // Establish connection with the zookeeper server.
+    ZkClient zkClient = new ZkClient("127.0.0.1:" + zkServer.getPort());
+    ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS, 
new NoOpMetricsRegistry());
+
+    Thread threadToInterrupt = new Thread(() -> {
+        try {
+          latch.await();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+        zkUtils.close();
+      });
+
+    threadToInterrupt.start();
+
+    Field field = ZkClient.class.getDeclaredField("_closed");
+    field.setAccessible(true);
+
+    Assert.assertFalse(field.getBoolean(zkClient));
+
+    threadToInterrupt.interrupt();
+    threadToInterrupt.join();
+
+    Assert.assertTrue(field.getBoolean(zkClient));
+  }
 }

Reply via email to