Repository: beam
Updated Branches:
  refs/heads/master 16b9d584c -> aa26f4bf7


[BEAM-2979] Fix a race condition in getWatermark() in KafkaIO.

Don't set curRecord to null before updating. If user deserializers throw, ok to 
keep curRecord pointing to old one.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56b512f9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56b512f9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56b512f9

Branch: refs/heads/master
Commit: 56b512f9242f17a804f2e8d9adca49c771863e53
Parents: 16b9d58
Author: Raghu Angadi <[email protected]>
Authored: Wed Oct 11 15:04:28 2017 -0700
Committer: [email protected] <[email protected]>
Committed: Mon Oct 30 15:49:12 2017 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java  | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/56b512f9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index af73a8d..17e0e34 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -1255,11 +1255,10 @@ public class KafkaIO {
             offsetGap = 0;
           }
 
-          curRecord = null; // user coders below might throw.
-
-          // apply user deserializers.
+          // Apply user deserializers. User deserializers might throw, which 
will be propagated up
+          // and 'curRecord' remains unchanged. The runner should close this 
reader.
           // TODO: write records that can't be deserialized to a "dead-letter" 
additional output.
-          KafkaRecord<K, V> record = new KafkaRecord<K, V>(
+          KafkaRecord<K, V> record = new KafkaRecord<>(
               rawRecord.topic(),
               rawRecord.partition(),
               rawRecord.offset(),
@@ -1372,7 +1371,6 @@ public class KafkaIO {
       return curTimestamp;
     }
 
-
     @Override
     public long getSplitBacklogBytes() {
       long backlogBytes = 0;

Reply via email to