Repository: kafka Updated Branches: refs/heads/trunk 39f62ddce -> c9e99f297
KAFKA-4306: Shutdown distributed herder with a timeout. Resolves KAFKA-4306: Connect workers won't shut down if brokers are not available KAFKA-4154: Kafka Connect fails to shutdown if it has not completed startup Author: Konstantine Karantasis <[email protected]> Reviewers: Shikhar Bhushan <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #2201 from kkonstantine/KAFKA-4306-Connect-workers-will-not-shut-down-if-brokers-are-not-available Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9e99f29 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9e99f29 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9e99f29 Branch: refs/heads/trunk Commit: c9e99f297f3090cd348e231dbeaf69c388de1234 Parents: 39f62dd Author: Konstantine Karantasis <[email protected]> Authored: Tue Dec 6 14:34:52 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Tue Dec 6 14:34:52 2016 -0800 ---------------------------------------------------------------------- .../runtime/distributed/DistributedHerder.java | 42 +++++++++++--------- 1 file changed, 23 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c9e99f29/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index ce2e72a..a8799c6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -59,9 +59,11 @@ import java.util.NoSuchElementException; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -105,13 +107,14 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private final String workerGroupId; private final int workerSyncTimeoutMs; + private final long workerTasksShutdownTimeoutMs; private final int workerUnsyncBackoffMs; + private final ExecutorService herderExecutor; private final ExecutorService forwardRequestExecutor; private final ExecutorService startAndStopExecutor; private final WorkerGroupMember member; private final AtomicBoolean stopping; - private final CountDownLatch stopLatch = new CountDownLatch(1); // Track enough information about the current membership state to be able to determine which requests via the API // and the from other nodes are safe to process @@ -156,8 +159,16 @@ public class DistributedHerder extends AbstractHerder implements Runnable { this.time = time; this.workerGroupId = config.getString(DistributedConfig.GROUP_ID_CONFIG); this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG); + this.workerTasksShutdownTimeoutMs = config.getLong(DistributedConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG); this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG); this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configBackingStore, new RebalanceListener(), time); + this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(1), + new ThreadFactory() { + @Override + public Thread newThread(Runnable herder) { + return new Thread(herder, "DistributedHerder"); + } + }); this.forwardRequestExecutor = Executors.newSingleThreadExecutor(); this.startAndStopExecutor = Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE); @@ -170,8 +181,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { @Override public void start() { - Thread thread = new Thread(this, "DistributedHerder"); - thread.start(); + this.herderExecutor.submit(this); } @Override @@ -192,10 +202,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { log.info("Herder stopped"); } catch (Throwable t) { log.error("Uncaught exception in herder work thread, exiting: ", t); - stopLatch.countDown(); System.exit(1); - } finally { - stopLatch.countDown(); } } @@ -369,20 +376,17 @@ public class DistributedHerder extends AbstractHerder implements Runnable { stopping.set(true); member.wakeup(); - while (stopLatch.getCount() > 0) { - try { - stopLatch.await(); - } catch (InterruptedException e) { - // ignore, should not happen - } - } - - forwardRequestExecutor.shutdown(); - startAndStopExecutor.shutdown(); + herderExecutor.shutdown(); try { - if (!forwardRequestExecutor.awaitTermination(10000, TimeUnit.MILLISECONDS)) + if (!herderExecutor.awaitTermination(workerTasksShutdownTimeoutMs, TimeUnit.MILLISECONDS)) + herderExecutor.shutdownNow(); + + forwardRequestExecutor.shutdown(); + startAndStopExecutor.shutdown(); + + if (!forwardRequestExecutor.awaitTermination(10000L, TimeUnit.MILLISECONDS)) forwardRequestExecutor.shutdownNow(); - if (!startAndStopExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS)) + if (!startAndStopExecutor.awaitTermination(1000L, TimeUnit.MILLISECONDS)) startAndStopExecutor.shutdownNow(); } catch (InterruptedException e) { // ignore
