Repository: flink Updated Branches: refs/heads/master b4bf99dfc -> 223b0aa0e
[FLINK-4945] FlinkKafkaConsumer logs wrong warning about confirmation for unknown checkpoint This closes #2706 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/223b0aa0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/223b0aa0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/223b0aa0 Branch: refs/heads/master Commit: 223b0aa0ec9fa0fc53ca823d70e91010643b1ef2 Parents: b4bf99d Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Thu Oct 27 18:48:11 2016 +0200 Committer: Robert Metzger <rmetz...@apache.org> Committed: Wed Nov 2 12:03:38 2016 +0100 ---------------------------------------------------------------------- .../streaming/connectors/kafka/FlinkKafkaConsumerBase.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/223b0aa0/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index db092f0..5161b35 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -346,9 +346,6 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti // originally restored offsets or the assigned partitions if (restoreToOffset != null) { - // the map cannot be asynchronously updated, because only one checkpoint call can happen - // on this function at a time: either snapshotState() or notifyCheckpointComplete() - pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset); for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : restoreToOffset.entrySet()) { offsetsStateForCheckpoint.add( @@ -360,6 +357,10 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti Tuple2.of(subscribedPartition, KafkaTopicPartitionState.OFFSET_NOT_SET)); } } + + // the map cannot be asynchronously updated, because only one checkpoint call can happen + // on this function at a time: either snapshotState() or notifyCheckpointComplete() + pendingOffsetsToCommit.put(context.getCheckpointId(), restoreToOffset); } else { HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();