Repository: flink
Updated Branches:
  refs/heads/master 607892314 -> 35892ed14


[hotfix][Kafka 0.9] Avoid committing offsets to closed consumer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35892ed1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35892ed1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35892ed1

Branch: refs/heads/master
Commit: 35892ed148afdb217a61fbacea1a9cb0eacb5c48
Parents: 6078923
Author: Robert Metzger <[email protected]>
Authored: Mon Mar 21 12:31:17 2016 +0100
Committer: Robert Metzger <[email protected]>
Committed: Mon Mar 21 14:38:05 2016 +0100

----------------------------------------------------------------------
 .../flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java   | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35892ed1/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index 62ba3c4..3b780bd 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -387,6 +387,10 @@ public class FlinkKafkaConsumer09<T> extends 
FlinkKafkaConsumerBase<T> {
 
        @Override
        protected void commitOffsets(HashMap<KafkaTopicPartition, Long> 
checkpointOffsets) {
+               if(!running) {
+                       LOG.warn("Unable to commit offsets on closed consumer");
+                       return;
+               }
                Map<TopicPartition, OffsetAndMetadata> kafkaCheckpointOffsets = 
convertToCommitMap(checkpointOffsets);
                synchronized (this.consumer) {
                        this.consumer.commitSync(kafkaCheckpointOffsets);

Reply via email to