Repository: beam Updated Branches: refs/heads/master 16b9d584c -> aa26f4bf7
[BEAM-2979] Fix a race condition in getWatermark() in KafkaIO. Don't set curRecord to null before updating. If user deserializers throw, ok to keep curRecord pointing to old one. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56b512f9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56b512f9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56b512f9 Branch: refs/heads/master Commit: 56b512f9242f17a804f2e8d9adca49c771863e53 Parents: 16b9d58 Author: Raghu Angadi <[email protected]> Authored: Wed Oct 11 15:04:28 2017 -0700 Committer: [email protected] <[email protected]> Committed: Mon Oct 30 15:49:12 2017 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/56b512f9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index af73a8d..17e0e34 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -1255,11 +1255,10 @@ public class KafkaIO { offsetGap = 0; } - curRecord = null; // user coders below might throw. - - // apply user deserializers. + // Apply user deserializers. User deserializers might throw, which will be propagated up + // and 'curRecord' remains unchanged. The runner should close this reader. // TODO: write records that can't be deserialized to a "dead-letter" additional output. - KafkaRecord<K, V> record = new KafkaRecord<K, V>( + KafkaRecord<K, V> record = new KafkaRecord<>( rawRecord.topic(), rawRecord.partition(), rawRecord.offset(), @@ -1372,7 +1371,6 @@ public class KafkaIO { return curTimestamp; } - @Override public long getSplitBacklogBytes() { long backlogBytes = 0;
