Repository: samza
Updated Branches:
  refs/heads/master 002e13178 -> 4123b160d


SAMZA-1568: Handle ZkInterruptedException in zkclient.close.

When zookeeper session failures occur in a stream processor,   leaves the 
group(zkClient is closed) and joins the group again.

The last step in that shutdown sequence is zkClient.close(). In some scenarios, 
it throws the following exception,

    org.I0Itec.zkclient.exception.ZkInterruptedException: 
java.lang.InterruptedException
    at org.I0Itec.zkclient.ZkClient.close(ZkClient.java:1278)
    at org.apache.samza.zk.ZkControllerImpl.stop(ZkControllerImpl.java:92)

    at org.apache.samza.zk.ZkJobCoordinator.stop(ZkJobCoordinator.java:141)
In existing implementation this is not handled, there by killing the stream 
processor.  The following codepath triggers this exception:

`StreamProcessor.stop -> ZkJobCoordinator.stop() ->  zkController.stop() -> 
zkUtils.close`

This exception causes the integration test to fail occasionally  and can cause 
LocalApplicationRunner.waitForFinish method call to block indefinitely(since 
this callback event success, updates the latch state required for waitForFinish 
to end).

Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>

Reviewers: Jagadish <jagad...@apache.org>

Closes #416 from shanthoosh/zk_utils_close


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4123b160
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4123b160
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4123b160

Branch: refs/heads/master
Commit: 4123b160d5cb6d92db4848caf8e043f02da94b05
Parents: 002e131
Author: Shanthoosh Venkataraman <svenkatara...@linkedin.com>
Authored: Tue Feb 13 19:30:03 2018 -0800
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Tue Feb 13 19:30:03 2018 -0800

----------------------------------------------------------------------
 .../java/org/apache/samza/zk/ZkControllerImpl.java  |  2 +-
 .../src/main/java/org/apache/samza/zk/ZkUtils.java  | 10 +++++++---
 .../test/java/org/apache/samza/zk/TestZkUtils.java  | 16 ++++++++++++++--
 3 files changed, 22 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4123b160/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
index 6305616..bdbdcbc 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -89,7 +89,7 @@ public class ZkControllerImpl implements ZkController {
 
     // close zk connection
     if (zkUtils != null) {
-      zkUtils.getZkClient().close();
+      zkUtils.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/4123b160/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index f34ba4e..300fff6 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -94,8 +94,6 @@ public class ZkUtils {
     return currentGeneration.get();
   }
 
-
-
   public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int 
connectionTimeoutMs, MetricsRegistry metricsRegistry) {
     this.keyBuilder = zkKeyBuilder;
     this.connectionTimeoutMs = connectionTimeoutMs;
@@ -298,7 +296,13 @@ public class ZkUtils {
   }
 
   public void close() throws ZkInterruptedException {
-    zkClient.close();
+    try {
+      zkClient.close();
+    } catch (ZkInterruptedException e) {
+      // Swallowing due to occurrence in the last stage of lifecycle (Not 
actionable) and clear the interrupted status.
+      Thread.interrupted();
+      LOG.warn("Ignoring the exception when closing the zookeeper client.", e);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/4123b160/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java 
b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 5d47dfc..ec04949 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -30,6 +30,7 @@ import java.util.function.BooleanSupplier;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.samza.SamzaException;
@@ -46,6 +47,7 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import org.mockito.Mockito;
 
 
 public class TestZkUtils {
@@ -88,8 +90,9 @@ public class TestZkUtils {
 
   @After
   public void testTeardown() {
-    zkUtils.close();
-    zkClient.close();
+    if (zkClient != null) {
+      zkUtils.close();
+    }
   }
 
   private ZkUtils getZkUtils() {
@@ -392,6 +395,15 @@ public class TestZkUtils {
 
   }
 
+  @Test
+  public void testCloseShouldNotThrowZkInterruptedExceptionToCaller() {
+    ZkClient zkClient = Mockito.mock(ZkClient.class);
+    ZkUtils zkUtils = new ZkUtils(KEY_BUILDER, zkClient,
+            SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+    Mockito.doThrow(new ZkInterruptedException(new 
InterruptedException())).when(zkClient).close();
+    zkUtils.close();
+  }
+
   public static boolean testWithDelayBackOff(BooleanSupplier cond, long 
startDelayMs, long maxDelayMs) {
     long delay = startDelayMs;
     while (delay < maxDelayMs) {

Reply via email to