Repository: flink
Updated Branches:
  refs/heads/master 1db14fc06 -> fa1498616


[hotfix] [kafka consumer] Improve logging For Kafka Consumer 0.8 shutdown

This does not log InterruptedExceptions on shutdown any more, because those 
always come
per the Task's cancelation strategy.

This also slightly improves the behavior of joining on the spawned simple 
consumer threads on shutdown.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fa149861
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fa149861
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fa149861

Branch: refs/heads/master
Commit: fa14986161fe2fae2d8ea62f7c079f0c76b9361b
Parents: 1d46c9d
Author: Stephan Ewen <[email protected]>
Authored: Mon Oct 24 16:00:59 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Wed Oct 26 12:36:12 2016 +0200

----------------------------------------------------------------------
 .../connectors/kafka/internals/Kafka08Fetcher.java        | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fa149861/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index fbcb19c..d015157 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -270,6 +270,11 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, 
TopicAndPartition> {
                                periodicCommitter.shutdown();
                        }
 
+                       // clear the interruption flag
+                       // this allows the joining on consumer threads (on best 
effort) to happen in
+                       // case the initial interrupt already
+                       Thread.interrupted();
+
                        // make sure that in any case (completion, abort, 
error), all spawned threads are stopped
                        try {
                                int runningThreads;
@@ -296,6 +301,11 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, 
TopicAndPartition> {
                                }
                                while (runningThreads > 0);
                        }
+                       catch (InterruptedException ignored) {
+                               // waiting for the thread shutdown apparently 
got interrupted
+                               // restore interrupted state and continue
+                               Thread.currentThread().interrupt();
+                       }
                        catch (Throwable t) {
                                // we catch all here to preserve the original 
exception
                                LOG.error("Exception while shutting down 
consumer threads", t);

Reply via email to