Repository: flink
Updated Branches:
  refs/heads/master b4bf99dfc -> 223b0aa0e


[FLINK-4945] FlinkKafkaConsumer logs wrong warning about confirmation for 
unknown checkpoint

This closes #2706


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/223b0aa0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/223b0aa0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/223b0aa0

Branch: refs/heads/master
Commit: 223b0aa0ec9fa0fc53ca823d70e91010643b1ef2
Parents: b4bf99d
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Thu Oct 27 18:48:11 2016 +0200
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Wed Nov 2 12:03:38 2016 +0100

----------------------------------------------------------------------
 .../streaming/connectors/kafka/FlinkKafkaConsumerBase.java    | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/223b0aa0/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
index db092f0..5161b35 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
@@ -346,9 +346,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                                // originally restored offsets or the assigned 
partitions
 
                                if (restoreToOffset != null) {
-                                       // the map cannot be asynchronously 
updated, because only one checkpoint call can happen
-                                       // on this function at a time: either 
snapshotState() or notifyCheckpointComplete()
-                                       
pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset);
 
                                        for (Map.Entry<KafkaTopicPartition, 
Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) {
                                                offsetsStateForCheckpoint.add(
@@ -360,6 +357,10 @@ public abstract class FlinkKafkaConsumerBase<T> extends 
RichParallelSourceFuncti
                                                                
Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET));
                                        }
                                }
+
+                               // the map cannot be asynchronously updated, 
because only one checkpoint call can happen
+                               // on this function at a time: either 
snapshotState() or notifyCheckpointComplete()
+                               
pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset);
                        } else {
                                HashMap<KafkaTopicPartition, Long> 
currentOffsets = fetcher.snapshotCurrentState();
 

Reply via email to