Repository: flink Updated Branches: refs/heads/master 607892314 -> 35892ed14
[hotfix][Kafka 0.9] Avoid committing offsets to closed consumer Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35892ed1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35892ed1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35892ed1 Branch: refs/heads/master Commit: 35892ed148afdb217a61fbacea1a9cb0eacb5c48 Parents: 6078923 Author: Robert Metzger <[email protected]> Authored: Mon Mar 21 12:31:17 2016 +0100 Committer: Robert Metzger <[email protected]> Committed: Mon Mar 21 14:38:05 2016 +0100 ---------------------------------------------------------------------- .../flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java | 4 ++++ 1 file changed, 4 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/35892ed1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index 62ba3c4..3b780bd 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -387,6 +387,10 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> { @Override protected void commitOffsets(HashMap<KafkaTopicPartition, Long> checkpointOffsets) { + if(!running) { + LOG.warn("Unable to commit offsets on closed consumer"); + return; + } Map<TopicPartition, OffsetAndMetadata> kafkaCheckpointOffsets = convertToCommitMap(checkpointOffsets); synchronized (this.consumer) { this.consumer.commitSync(kafkaCheckpointOffsets);
