Repository: flink Updated Branches: refs/heads/master 1db14fc06 -> fa1498616
[hotfix] [kafka consumer] Improve logging For Kafka Consumer 0.8 shutdown This does not log InterruptedExceptions on shutdown any more, because those always come per the Task's cancelation strategy. This also slightly improves the behavior of joining on the spawned simple consumer threads on shutdown. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa149861 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa149861 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa149861 Branch: refs/heads/master Commit: fa14986161fe2fae2d8ea62f7c079f0c76b9361b Parents: 1d46c9d Author: Stephan Ewen <[email protected]> Authored: Mon Oct 24 16:00:59 2016 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Oct 26 12:36:12 2016 +0200 ---------------------------------------------------------------------- .../connectors/kafka/internals/Kafka08Fetcher.java | 10 ++++++++++ 1 file changed, 10 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fa149861/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java index fbcb19c..d015157 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java @@ -270,6 +270,11 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> { periodicCommitter.shutdown(); } + // clear the interruption flag + // this allows the joining on consumer threads (on best effort) to happen in + // case the initial interrupt already + Thread.interrupted(); + // make sure that in any case (completion, abort, error), all spawned threads are stopped try { int runningThreads; @@ -296,6 +301,11 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> { } while (runningThreads > 0); } + catch (InterruptedException ignored) { + // waiting for the thread shutdown apparently got interrupted + // restore interrupted state and continue + Thread.currentThread().interrupt(); + } catch (Throwable t) { // we catch all here to preserve the original exception LOG.error("Exception while shutting down consumer threads", t);
