Repository: samza Updated Branches: refs/heads/master 161d1c47a -> f249e71a2
SAMZA-1741: fix issue that EH consumer taking too long to shutdown 1. lower the shutdown timeout from 1 min to 15 seconds 2. make sure EventHubManagers are shutdown in parallel 3. print a thread dump when we do fail during shutdown Author: Hai Lu <[email protected]> Reviewers: Jagadish <[email protected]>, Prateek <[email protected]> Closes #548 from lhaiesp/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f249e71a Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f249e71a Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f249e71a Branch: refs/heads/master Commit: f249e71a2281cb91f8d2ba8ef3aa03e0e8efd791 Parents: 161d1c4 Author: Hai Lu <[email protected]> Authored: Fri Jun 8 10:05:36 2018 -0700 Committer: Jagadish <[email protected]> Committed: Fri Jun 8 10:05:36 2018 -0700 ---------------------------------------------------------------------- .../consumer/EventHubSystemConsumer.java | 44 +++++++----- .../org/apache/samza/util/ShutdownUtil.java | 74 ++++++++++++++++++++ .../org/apache/samza/util/TestShutdownUtil.java | 63 +++++++++++++++++ 3 files changed, 165 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f249e71a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java index 04e361f..3fa95c2 100644 --- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java +++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java @@ -28,17 +28,13 @@ import com.microsoft.azure.eventhubs.PartitionReceiver; import com.microsoft.azure.eventhubs.impl.ClientConstants; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -57,6 +53,7 @@ import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin; import org.apache.samza.system.eventhub.metrics.SamzaHistogram; import org.apache.samza.system.eventhub.producer.EventHubSystemProducer; import org.apache.samza.util.BlockingEnvelopeMap; +import org.apache.samza.util.ShutdownUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,7 +99,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { // Overall timeout for EventHubClient exponential backoff policy private static final Duration DEFAULT_EVENTHUB_RECEIVER_TIMEOUT = Duration.ofMinutes(10L); - private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis(); + private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofSeconds(15).toMillis(); public static final String START_OF_STREAM = ClientConstants.START_OF_STREAM; // -1 public static final String END_OF_STREAM = "-2"; @@ -352,17 +349,32 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap { @Override public void stop() { LOG.info("Stopping event hub system consumer..."); - List<CompletableFuture<Void>> futures = new ArrayList<>(); - streamPartitionReceivers.values().forEach((receiver) -> futures.add(receiver.close())); - CompletableFuture<Void> future = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); - try { - future.get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - } catch (ExecutionException | InterruptedException | TimeoutException e) { - LOG.warn("Failed to close receivers", e); - } - perPartitionEventHubManagers.values() - .parallelStream() - .forEach(ehClientManager -> ehClientManager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS)); + + // There could be potentially many Receivers and EventHubManagers, so close the managers in parallel + LOG.info("Start shutting down eventhubs receivers"); + ShutdownUtil.boundedShutdown(streamPartitionReceivers.values().stream().map(receiver -> new Runnable() { + @Override + public void run() { + try { + receiver.close().get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + } catch (Exception e) { + LOG.error("Failed to shutdown receiver.", e); + } + } + }).collect(Collectors.toList()), "EventHubSystemConsumer.Receiver#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS); + + LOG.info("Start shutting down eventhubs managers"); + ShutdownUtil.boundedShutdown(perPartitionEventHubManagers.values().stream().map(manager -> new Runnable() { + @Override + public void run() { + try { + manager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS); + } catch (Exception e) { + LOG.error("Failed to shutdown eventhubs manager.", e); + } + } + }).collect(Collectors.toList()), "EventHubSystemConsumer.ClientManager#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS); + perPartitionEventHubManagers.clear(); perStreamEventHubManagers.clear(); isStarted = false; http://git-wip-us.apache.org/repos/asf/samza/blob/f249e71a/samza-core/src/main/java/org/apache/samza/util/ShutdownUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/ShutdownUtil.java b/samza-core/src/main/java/org/apache/samza/util/ShutdownUtil.java new file mode 100644 index 0000000..3d75654 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/util/ShutdownUtil.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + + +/** + * Shutdown related utils + */ +public class ShutdownUtil { + private static final Logger LOG = LoggerFactory.getLogger(ShutdownUtil.class); + + /** + * A helper to facilitate shutting down a set of resources in parallel to enforce a bounded shutdown time. + * The helper function instantiates an {@link ExecutorService} to execute a list of shutdown tasks, and will + * await the termination for given timeout. If shutdown remains unfinished in the end, the whole thread dump + * will be printed to help debugging. + * + * The shutdown is performed with best-effort. Depending on the implementation of the shutdown function, resource + * leak might be possible. + * + * @param shutdownTasks the list of shutdown tasks that need to be executed in parallel + * @param message message that will show in the thread name and the thread dump + * @param timeoutMs timeout in ms + * @return true if all tasks terminate in the end + */ + public static boolean boundedShutdown(List<Runnable> shutdownTasks, String message, long timeoutMs) { + ExecutorService shutdownExecutorService = Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat(message + "-%d").setDaemon(true).build()); + shutdownTasks.forEach(shutdownExecutorService::submit); + shutdownExecutorService.shutdown(); + try { + shutdownExecutorService.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + LOG.error("Shutdown was interrupted for " + message, e); + } + + if (shutdownExecutorService.isTerminated()) { + LOG.info("Shutdown complete for {}", message); + return true; + } else { + LOG.error("Shutdown function for {} remains unfinished after timeout({}ms) or interruption", message, timeoutMs); + Util.logThreadDump(message); + shutdownExecutorService.shutdownNow(); + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/f249e71a/samza-core/src/test/java/org/apache/samza/util/TestShutdownUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/util/TestShutdownUtil.java b/samza-core/src/test/java/org/apache/samza/util/TestShutdownUtil.java new file mode 100644 index 0000000..d02619a --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/util/TestShutdownUtil.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.util; + +import java.time.Duration; +import java.util.Collections; + +import org.junit.Assert; +import org.junit.Test; + + +public class TestShutdownUtil { + @Test + public void testBoundedShutdown() throws Exception { + long longTimeout = Duration.ofSeconds(60).toMillis(); + long shortTimeout = Duration.ofMillis(100).toMillis(); + + Runnable shortRunnable = () -> { + try { + Thread.sleep(shortTimeout); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + }; + long start = System.currentTimeMillis(); + Assert.assertTrue("expect the shutdown task to terminate", + ShutdownUtil.boundedShutdown(Collections.singletonList(shortRunnable), "testLongTimeout", longTimeout)); + long end = System.currentTimeMillis(); + Assert.assertTrue("boundedShutdown should complete if the shutdown function completes earlier", + (end - start) < longTimeout / 2); + + Runnable longRunnable = () -> { + try { + Thread.sleep(longTimeout); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + }; + start = System.currentTimeMillis(); + Assert.assertFalse("expect the shutdown task to be unfinished", + ShutdownUtil.boundedShutdown(Collections.singletonList(longRunnable), "testShortTimeout", shortTimeout)); + end = System.currentTimeMillis(); + Assert.assertTrue("boundedShutdown should complete even if the shutdown function takes long time", + (end - start) < longTimeout / 2); + } +}
