Repository: flink Updated Branches: refs/heads/master 72e6b760f -> eece0dd05
[hotfix] [kafka] Fix NPE in Kafka09Fetcher Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eece0dd0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eece0dd0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eece0dd0 Branch: refs/heads/master Commit: eece0dd05bc38b88fcb6cbcef15add7f98eab456 Parents: 72e6b76 Author: Fabian Hueske <fhue...@apache.org> Authored: Tue Oct 4 14:43:35 2016 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Oct 4 15:30:32 2016 +0200 ---------------------------------------------------------------------- .../streaming/connectors/kafka/internal/Kafka09Fetcher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/eece0dd0/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index 3c2cca3..ad7efa2 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -291,9 +291,9 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem for (KafkaTopicPartitionState<TopicPartition> partition : partitions) { // committed offsets through the KafkaConsumer need to be 1 more than the last processed offset. // This does not affect Flink's checkpoints/saved state. - Long offsetToCommit = offsets.get(partition.getKafkaTopicPartition()) + 1; + Long offsetToCommit = offsets.get(partition.getKafkaTopicPartition()); if (offsetToCommit != null) { - offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit)); + offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit + 1)); partition.setCommittedOffset(offsetToCommit); } }