Repository: flink
Updated Branches:
  refs/heads/master e9a2bc9d0 -> a997dd615


[FLINK-3081] Properly stop periodic Kafka committer

This closes #1410


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a997dd61
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a997dd61
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a997dd61

Branch: refs/heads/master
Commit: a997dd615598650934f0b785cbe8a6468ea63481
Parents: e9a2bc9
Author: Robert Metzger <rmetz...@apache.org>
Authored: Thu Nov 26 14:25:54 2015 +0100
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Sun Nov 29 15:56:23 2015 +0100

----------------------------------------------------------------------
 .../streaming/connectors/kafka/FlinkKafkaConsumer.java  | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a997dd61/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
index 446648f..e42faef 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java
@@ -414,14 +414,18 @@ public class FlinkKafkaConsumer<T> extends 
RichParallelSourceFunction<T>
                                // same here.
                                long commitInterval = 
Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000"));
                                offsetCommitter = new 
PeriodicOffsetCommitter(commitInterval, this);
+                               offsetCommitter.setDaemon(true);
                                offsetCommitter.start();
                                LOG.info("Starting periodic offset committer, 
with commit interval of {}ms", commitInterval);
                        }
 
-                       fetcher.run(sourceContext, deserializer, lastOffsets);
-
-                       if (offsetCommitter != null) {
-                               offsetCommitter.close();
+                       try {
+                               fetcher.run(sourceContext, deserializer, 
lastOffsets);
+                       } finally {
+                               if (offsetCommitter != null) {
+                                       offsetCommitter.close();
+                                       offsetCommitter.join();
+                               }
                        }
                }
                else {

Reply via email to