Repository: samza
Updated Branches:
  refs/heads/master 161d1c47a -> f249e71a2


SAMZA-1741: fix issue that EH consumer taking too long to shutdown

1.  lower the shutdown timeout from 1 min to 15 seconds
2. make sure EventHubManagers are shutdown in parallel
3. print a thread dump when we do fail during shutdown

Author: Hai Lu <[email protected]>

Reviewers: Jagadish <[email protected]>, Prateek <[email protected]>

Closes #548 from lhaiesp/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f249e71a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f249e71a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f249e71a

Branch: refs/heads/master
Commit: f249e71a2281cb91f8d2ba8ef3aa03e0e8efd791
Parents: 161d1c4
Author: Hai Lu <[email protected]>
Authored: Fri Jun 8 10:05:36 2018 -0700
Committer: Jagadish <[email protected]>
Committed: Fri Jun 8 10:05:36 2018 -0700

----------------------------------------------------------------------
 .../consumer/EventHubSystemConsumer.java        | 44 +++++++-----
 .../org/apache/samza/util/ShutdownUtil.java     | 74 ++++++++++++++++++++
 .../org/apache/samza/util/TestShutdownUtil.java | 63 +++++++++++++++++
 3 files changed, 165 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f249e71a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
index 04e361f..3fa95c2 100644
--- 
a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
+++ 
b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
@@ -28,17 +28,13 @@ import com.microsoft.azure.eventhubs.PartitionReceiver;
 import com.microsoft.azure.eventhubs.impl.ClientConstants;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -57,6 +53,7 @@ import 
org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
 import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
 import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
 import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.util.ShutdownUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,7 +99,7 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
 
   // Overall timeout for EventHubClient exponential backoff policy
   private static final Duration DEFAULT_EVENTHUB_RECEIVER_TIMEOUT = 
Duration.ofMinutes(10L);
-  private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = 
Duration.ofMinutes(1L).toMillis();
+  private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = 
Duration.ofSeconds(15).toMillis();
 
   public static final String START_OF_STREAM = 
ClientConstants.START_OF_STREAM; // -1
   public static final String END_OF_STREAM = "-2";
@@ -352,17 +349,32 @@ public class EventHubSystemConsumer extends 
BlockingEnvelopeMap {
   @Override
   public void stop() {
     LOG.info("Stopping event hub system consumer...");
-    List<CompletableFuture<Void>> futures = new ArrayList<>();
-    streamPartitionReceivers.values().forEach((receiver) -> 
futures.add(receiver.close()));
-    CompletableFuture<Void> future = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
-    try {
-      future.get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
-    } catch (ExecutionException | InterruptedException | TimeoutException e) {
-      LOG.warn("Failed to close receivers", e);
-    }
-    perPartitionEventHubManagers.values()
-        .parallelStream()
-        .forEach(ehClientManager -> 
ehClientManager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
+
+    // There could be potentially many Receivers and EventHubManagers, so 
close the managers in parallel
+    LOG.info("Start shutting down eventhubs receivers");
+    
ShutdownUtil.boundedShutdown(streamPartitionReceivers.values().stream().map(receiver
 -> new Runnable() {
+      @Override
+      public void run() {
+        try {
+          receiver.close().get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+          LOG.error("Failed to shutdown receiver.", e);
+        }
+      }
+    }).collect(Collectors.toList()), "EventHubSystemConsumer.Receiver#close", 
DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
+
+    LOG.info("Start shutting down eventhubs managers");
+    
ShutdownUtil.boundedShutdown(perPartitionEventHubManagers.values().stream().map(manager
 -> new Runnable() {
+      @Override
+      public void run() {
+        try {
+          manager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
+        } catch (Exception e) {
+          LOG.error("Failed to shutdown eventhubs manager.", e);
+        }
+      }
+    }).collect(Collectors.toList()), 
"EventHubSystemConsumer.ClientManager#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
+
     perPartitionEventHubManagers.clear();
     perStreamEventHubManagers.clear();
     isStarted = false;

http://git-wip-us.apache.org/repos/asf/samza/blob/f249e71a/samza-core/src/main/java/org/apache/samza/util/ShutdownUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/util/ShutdownUtil.java 
b/samza-core/src/main/java/org/apache/samza/util/ShutdownUtil.java
new file mode 100644
index 0000000..3d75654
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/util/ShutdownUtil.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+/**
+ * Shutdown related utils
+ */
+public class ShutdownUtil {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ShutdownUtil.class);
+
+  /**
+   * A helper to facilitate shutting down a set of resources in parallel to 
enforce a bounded shutdown time.
+   * The helper function instantiates an {@link ExecutorService} to execute a 
list of shutdown tasks, and will
+   * await the termination for given timeout. If shutdown remains unfinished 
in the end, the whole thread dump
+   * will be printed to help debugging.
+   *
+   * The shutdown is performed with best-effort. Depending on the 
implementation of the shutdown function, resource
+   * leak might be possible.
+   *
+   * @param shutdownTasks the list of shutdown tasks that need to be executed 
in parallel
+   * @param message message that will show in the thread name and the thread 
dump
+   * @param timeoutMs timeout in ms
+   * @return true if all tasks terminate in the end
+   */
+  public static boolean boundedShutdown(List<Runnable> shutdownTasks, String 
message, long timeoutMs) {
+    ExecutorService shutdownExecutorService = Executors.newCachedThreadPool(
+        new ThreadFactoryBuilder().setNameFormat(message + 
"-%d").setDaemon(true).build());
+    shutdownTasks.forEach(shutdownExecutorService::submit);
+    shutdownExecutorService.shutdown();
+    try {
+      shutdownExecutorService.awaitTermination(timeoutMs, 
TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      LOG.error("Shutdown was interrupted for " + message, e);
+    }
+
+    if (shutdownExecutorService.isTerminated()) {
+      LOG.info("Shutdown complete for {}", message);
+      return true;
+    } else {
+      LOG.error("Shutdown function for {} remains unfinished after 
timeout({}ms) or interruption", message, timeoutMs);
+      Util.logThreadDump(message);
+      shutdownExecutorService.shutdownNow();
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f249e71a/samza-core/src/test/java/org/apache/samza/util/TestShutdownUtil.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/java/org/apache/samza/util/TestShutdownUtil.java 
b/samza-core/src/test/java/org/apache/samza/util/TestShutdownUtil.java
new file mode 100644
index 0000000..d02619a
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/util/TestShutdownUtil.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import java.time.Duration;
+import java.util.Collections;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestShutdownUtil {
+  @Test
+  public void testBoundedShutdown() throws Exception {
+    long longTimeout = Duration.ofSeconds(60).toMillis();
+    long shortTimeout = Duration.ofMillis(100).toMillis();
+
+    Runnable shortRunnable = () -> {
+      try {
+        Thread.sleep(shortTimeout);
+      } catch (Exception e) {
+        Assert.fail(e.getMessage());
+      }
+    };
+    long start = System.currentTimeMillis();
+    Assert.assertTrue("expect the shutdown task to terminate",
+        ShutdownUtil.boundedShutdown(Collections.singletonList(shortRunnable), 
"testLongTimeout", longTimeout));
+    long end = System.currentTimeMillis();
+    Assert.assertTrue("boundedShutdown should complete if the shutdown 
function completes earlier",
+        (end - start) < longTimeout / 2);
+
+    Runnable longRunnable = () -> {
+      try {
+        Thread.sleep(longTimeout);
+      } catch (Exception e) {
+        Assert.fail(e.getMessage());
+      }
+    };
+    start = System.currentTimeMillis();
+    Assert.assertFalse("expect the shutdown task to be unfinished",
+        ShutdownUtil.boundedShutdown(Collections.singletonList(longRunnable), 
"testShortTimeout", shortTimeout));
+    end = System.currentTimeMillis();
+    Assert.assertTrue("boundedShutdown should complete even if the shutdown 
function takes long time",
+        (end - start) < longTimeout / 2);
+  }
+}

Reply via email to