Repository: flink Updated Branches: refs/heads/release-0.10 d6e118b8e -> 961adea59
[FLINK-3081] Properly stop periodic Kafka committer Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/961adea5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/961adea5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/961adea5 Branch: refs/heads/release-0.10 Commit: 961adea591054ef6dae2e06f76cef7409eff8204 Parents: d6e118b Author: Robert Metzger <rmetz...@apache.org> Authored: Sun Nov 29 16:05:00 2015 +0100 Committer: Robert Metzger <rmetz...@apache.org> Committed: Sun Nov 29 16:05:00 2015 +0100 ---------------------------------------------------------------------- .../streaming/connectors/kafka/FlinkKafkaConsumer.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/961adea5/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java index e701639..8791fc8 100644 --- a/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java +++ b/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java @@ -390,14 +390,18 @@ public class FlinkKafkaConsumer<T> extends RichParallelSourceFunction<T> // same here. long commitInterval = Long.valueOf(props.getProperty("auto.commit.interval.ms", "60000")); offsetCommitter = new PeriodicOffsetCommitter(commitInterval, this); + offsetCommitter.setDaemon(true); offsetCommitter.start(); LOG.info("Starting periodic offset committer, with commit interval of {}ms", commitInterval); } - fetcher.run(sourceContext, valueDeserializer, lastOffsets); - - if (offsetCommitter != null) { - offsetCommitter.close(); + try { + fetcher.run(sourceContext, valueDeserializer, lastOffsets); + } finally { + if (offsetCommitter != null) { + offsetCommitter.close(); + offsetCommitter.join(); + } } } else {