Repository: flink Updated Branches: refs/heads/master e9a2bc9d0 -> a997dd615
[FLINK-3081] Properly stop periodic Kafka committer This closes #1410 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a997dd61 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a997dd61 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a997dd61 Branch: refs/heads/master Commit: a997dd615598650934f0b785cbe8a6468ea63481 Parents: e9a2bc9 Author: Robert Metzger <rmetz...@apache.org> Authored: Thu Nov 26 14:25:54 2015 +0100 Committer: Robert Metzger <rmetz...@apache.org> Committed: Sun Nov 29 15:56:23 2015 +0100 ---------------------------------------------------------------------- .../streaming/connectors/kafka/FlinkKafkaConsumer.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a997dd61/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java index 446648f..e42faef 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java @@ -414,14 +414,18 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T> // same here. long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000")); offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this); + offsetCommitter.setDaemon(true); offsetCommitter.start(); LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval); } - fetcher.run(sourceContext, deserializer, lastOffsets); - - if (offsetCommitter != null) { - offsetCommitter.close(); + try { + fetcher.run(sourceContext, deserializer, lastOffsets); + } finally { + if (offsetCommitter != null) { + offsetCommitter.close(); + offsetCommitter.join(); + } } } else {