Repository: kafka Updated Branches: refs/heads/trunk c1f8f689a -> 5092e7f83
MINOR: Connect hangs on startup failure Author: Jason Gustafson <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #902 from hachikuji/hotfix-connect-startup Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5092e7f8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5092e7f8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5092e7f8 Branch: refs/heads/trunk Commit: 5092e7f8347d17d1b6e509424cbebf2406d8d4ba Parents: c1f8f68 Author: Jason Gustafson <[email protected]> Authored: Wed Feb 10 17:08:23 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Wed Feb 10 17:08:23 2016 -0800 ---------------------------------------------------------------------- .../apache/kafka/connect/runtime/Connect.java | 38 +++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5092e7f8/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java index 6611e5d..49cf4bc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java @@ -50,31 +50,35 @@ public class Connect { } public void start() { - log.info("Kafka Connect starting"); - Runtime.getRuntime().addShutdownHook(shutdownHook); - - worker.start(); - herder.start(); - rest.start(herder); + try { + log.info("Kafka Connect starting"); + Runtime.getRuntime().addShutdownHook(shutdownHook); - log.info("Kafka Connect started"); + worker.start(); + herder.start(); + rest.start(herder); - startLatch.countDown(); + log.info("Kafka Connect started"); + } finally { + startLatch.countDown(); + } } public void stop() { - boolean wasShuttingDown = shutdown.getAndSet(true); - if (!wasShuttingDown) { - log.info("Kafka Connect stopping"); + try { + boolean wasShuttingDown = shutdown.getAndSet(true); + if (!wasShuttingDown) { + log.info("Kafka Connect stopping"); - rest.stop(); - herder.stop(); - worker.stop(); + rest.stop(); + herder.stop(); + worker.stop(); - log.info("Kafka Connect stopped"); + log.info("Kafka Connect stopped"); + } + } finally { + stopLatch.countDown(); } - - stopLatch.countDown(); } public void awaitStop() {
