Repository: incubator-beam Updated Branches: refs/heads/master be98b757b -> b48728101
Fix NPE in UnboundedReadFromBoundedSource Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/45ce4979 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/45ce4979 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/45ce4979 Branch: refs/heads/master Commit: 45ce497933ae351493c8e70bee972d91409028af Parents: be98b75 Author: Pei He <[email protected]> Authored: Mon Jun 27 18:21:37 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Tue Jun 28 11:23:26 2016 -0700 ---------------------------------------------------------------------- .../core/UnboundedReadFromBoundedSource.java | 19 +++++++++---------- .../core/UnboundedReadFromBoundedSourceTest.java | 9 +++++++++ 2 files changed, 18 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ce4979/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java index 2b3d1c7..f54af3b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSource.java @@ -167,10 +167,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle public Reader createReader(PipelineOptions options, Checkpoint<T> checkpoint) throws IOException { if (checkpoint == null) { - return new Reader( - Collections.<TimestampedValue<T>>emptyList() /* residualElements */, - boundedSource, - options); + return new Reader(null /* residualElements */, boundedSource, options); } else { return new Reader(checkpoint.residualElements, checkpoint.residualSource, options); } @@ -189,11 +186,11 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle @VisibleForTesting static class Checkpoint<T> implements UnboundedSource.CheckpointMark { - private final List<TimestampedValue<T>> residualElements; + private final @Nullable List<TimestampedValue<T>> residualElements; private final @Nullable BoundedSource<T> residualSource; public Checkpoint( - List<TimestampedValue<T>> residualElements, + @Nullable List<TimestampedValue<T>> residualElements, @Nullable BoundedSource<T> residualSource) { this.residualElements = residualElements; this.residualSource = residualSource; @@ -203,7 +200,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle public void finalizeCheckpoint() {} @VisibleForTesting - List<TimestampedValue<T>> getResidualElements() { + @Nullable List<TimestampedValue<T>> getResidualElements() { return residualElements; } @@ -286,7 +283,7 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle private boolean done; Reader( - List<TimestampedValue<T>> residualElementsList, + @Nullable List<TimestampedValue<T>> residualElementsList, @Nullable BoundedSource<T> residualSource, PipelineOptions options) { init(residualElementsList, residualSource, options); @@ -295,10 +292,12 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PInput, PColle } private void init( - List<TimestampedValue<T>> residualElementsList, + @Nullable List<TimestampedValue<T>> residualElementsList, @Nullable BoundedSource<T> residualSource, PipelineOptions options) { - this.residualElements = new ResidualElements(residualElementsList); + this.residualElements = residualElementsList == null + ? new ResidualElements(Collections.<TimestampedValue<T>>emptyList()) + : new ResidualElements(residualElementsList); this.residualSource = residualSource == null ? null : new ResidualSource(residualSource, options); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/45ce4979/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java index afd0927..dfbc675 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/UnboundedReadFromBoundedSourceTest.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.core; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint; @@ -169,6 +170,10 @@ public class UnboundedReadFromBoundedSourceTest { checkpoint.finalizeCheckpoint(); } } + Checkpoint<T> checkpointDone = reader.getCheckpointMark(); + assertTrue(checkpointDone.getResidualElements() == null + || checkpointDone.getResidualElements().isEmpty()); + assertEquals(expectedElements.size(), actual.size()); assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual)); } @@ -230,6 +235,10 @@ public class UnboundedReadFromBoundedSourceTest { hasNext = reader.advance(); } } + Checkpoint<T> checkpointDone = reader.getCheckpointMark(); + assertTrue(checkpointDone.getResidualElements() == null + || checkpointDone.getResidualElements().isEmpty()); + assertEquals(expectedElements.size(), actual.size()); assertEquals(Sets.newHashSet(expectedElements), Sets.newHashSet(actual)); }
