Repository: incubator-beam
Updated Branches:
  refs/heads/master be98b757b -> b48728101


Fix NPE in UnboundedReadFromBoundedSource


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/45ce4979
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/45ce4979
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/45ce4979

Branch: refs/heads/master
Commit: 45ce497933ae351493c8e70bee972d91409028af
Parents: be98b75
Author: Pei He <[email protected]>
Authored: Mon Jun 27 18:21:37 2016 -0700
Committer: Dan Halperin <[email protected]>
Committed: Tue Jun 28 11:23:26 2016 -0700

----------------------------------------------------------------------
 .../core/UnboundedReadFromBoundedSource.java     | 19 +++++++++----------
 .../core/UnboundedReadFromBoundedSourceTest.java |  9 +++++++++
 2 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ce4979/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
index 2b3d1c7..f54af3b 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java
@@ -167,10 +167,7 @@ public class UnboundedReadFromBoundedSource<T> extends 
PTransform<PInput, PColle
     public Reader createReader(PipelineOptions options, Checkpoint<T> 
checkpoint)
         throws IOException {
       if (checkpoint == null) {
-        return new Reader(
-            Collections.<TimestampedValue<T>>emptyList() /* residualElements 
*/,
-            boundedSource,
-            options);
+        return new Reader(null /* residualElements */, boundedSource, options);
       } else {
         return new Reader(checkpoint.residualElements, 
checkpoint.residualSource, options);
       }
@@ -189,11 +186,11 @@ public class UnboundedReadFromBoundedSource<T> extends 
PTransform<PInput, PColle
 
     @VisibleForTesting
     static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
-      private final List<TimestampedValue<T>> residualElements;
+      private final @Nullable List<TimestampedValue<T>> residualElements;
       private final @Nullable BoundedSource<T> residualSource;
 
       public Checkpoint(
-          List<TimestampedValue<T>> residualElements,
+          @Nullable List<TimestampedValue<T>> residualElements,
           @Nullable BoundedSource<T> residualSource) {
         this.residualElements = residualElements;
         this.residualSource = residualSource;
@@ -203,7 +200,7 @@ public class UnboundedReadFromBoundedSource<T> extends 
PTransform<PInput, PColle
       public void finalizeCheckpoint() {}
 
       @VisibleForTesting
-      List<TimestampedValue<T>> getResidualElements() {
+      @Nullable List<TimestampedValue<T>> getResidualElements() {
         return residualElements;
       }
 
@@ -286,7 +283,7 @@ public class UnboundedReadFromBoundedSource<T> extends 
PTransform<PInput, PColle
       private boolean done;
 
       Reader(
-          List<TimestampedValue<T>> residualElementsList,
+          @Nullable List<TimestampedValue<T>> residualElementsList,
           @Nullable BoundedSource<T> residualSource,
           PipelineOptions options) {
         init(residualElementsList, residualSource, options);
@@ -295,10 +292,12 @@ public class UnboundedReadFromBoundedSource<T> extends 
PTransform<PInput, PColle
       }
 
       private void init(
-          List<TimestampedValue<T>> residualElementsList,
+          @Nullable List<TimestampedValue<T>> residualElementsList,
           @Nullable BoundedSource<T> residualSource,
           PipelineOptions options) {
-        this.residualElements = new ResidualElements(residualElementsList);
+        this.residualElements = residualElementsList == null
+            ? new 
ResidualElements(Collections.<TimestampedValue<T>>emptyList())
+                : new ResidualElements(residualElementsList);
         this.residualSource =
             residualSource == null ? null : new ResidualSource(residualSource, 
options);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ce4979/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
index afd0927..dfbc675 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import 
org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
 import 
org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint;
@@ -169,6 +170,10 @@ public class UnboundedReadFromBoundedSourceTest {
         checkpoint.finalizeCheckpoint();
       }
     }
+    Checkpoint<T> checkpointDone = reader.getCheckpointMark();
+    assertTrue(checkpointDone.getResidualElements() == null
+        || checkpointDone.getResidualElements().isEmpty());
+
     assertEquals(expectedElements.size(), actual.size());
     assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
   }
@@ -230,6 +235,10 @@ public class UnboundedReadFromBoundedSourceTest {
         hasNext = reader.advance();
       }
     }
+    Checkpoint<T> checkpointDone = reader.getCheckpointMark();
+    assertTrue(checkpointDone.getResidualElements() == null
+        || checkpointDone.getResidualElements().isEmpty());
+
     assertEquals(expectedElements.size(), actual.size());
     assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual));
   }

Reply via email to