Repository: flink
Updated Branches:
  refs/heads/release-0.10 d6e118b8e -> 961adea59


[FLINK-3081] Properly stop periodic Kafka committer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/961adea5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/961adea5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/961adea5

Branch: refs/heads/release-0.10
Commit: 961adea591054ef6dae2e06f76cef7409eff8204
Parents: d6e118b
Author: Robert Metzger <rmetz...@apache.org>
Authored: Sun Nov 29 16:05:00 2015 +0100
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Sun Nov 29 16:05:00 2015 +0100

----------------------------------------------------------------------
 .../streaming/connectors/kafka/FlinkKafkaConsumer.java  | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/961adea5/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index e701639..8791fc8 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -390,14 +390,18 @@ public class FlinkKafkaConsumer<T> extends 
RichParallelSourceFunction<T>
                                // same here.
                                long commitInterval = 
Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
                                offsetCommitter = new 
PeriodicOffsetCommitter(commitInterval, this);
+                               offsetCommitter.setDaemon(true);
                                offsetCommitter.start();
                                LOG.info("Starting periodic offset committer, 
with commit interval of {}ms", commitInterval);
                        }
 
-                       fetcher.run(sourceContext, valueDeserializer, 
lastOffsets);
-
-                       if (offsetCommitter != null) {
-                               offsetCommitter.close();
+                       try {
+                               fetcher.run(sourceContext, valueDeserializer, 
lastOffsets);
+                       } finally {
+                               if (offsetCommitter != null) {
+                                       offsetCommitter.close();
+                                       offsetCommitter.join();
+                               }
                        }
                }
                else {

Reply via email to