OffsetBasedSource: allow empty sources As one example, empty files exist.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7e9ad995 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7e9ad995 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7e9ad995 Branch: refs/heads/gearpump-runner Commit: 7e9ad9954e50bd01fba4cda84c182af895b2c23f Parents: dfaf2a8 Author: Dan Halperin <[email protected]> Authored: Fri Oct 21 12:21:45 2016 -0700 Committer: Dan Halperin <[email protected]> Committed: Mon Oct 24 13:16:50 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/io/OffsetBasedSource.java | 4 +-- .../beam/sdk/io/OffsetBasedSourceTest.java | 30 +++++++++++++++----- 2 files changed, 25 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e9ad995/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java index 6e49cc3..e9a398d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java @@ -146,8 +146,8 @@ public abstract class OffsetBasedSource<T> extends BoundedSource<T> { this.endOffset >= 0, "End offset has value %s, must be non-negative", this.endOffset); checkArgument( - this.startOffset < this.endOffset, - "Start offset %s must be before end offset %s", + this.startOffset <= this.endOffset, + "Start offset %s may not be larger than end offset %s", this.startOffset, this.endOffset); checkArgument( this.minBundleSize >= 0, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7e9ad995/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java index 923b4b4..6584e5d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java @@ -62,7 +62,7 @@ public class OffsetBasedSourceTest { } @Override - public boolean producesSortedKeys(PipelineOptions options) throws Exception { + public boolean producesSortedKeys(PipelineOptions options) { return false; } @@ -85,7 +85,7 @@ public class OffsetBasedSourceTest { } @Override - public OffsetBasedReader<Integer> createReader(PipelineOptions options) throws IOException { + public CoarseRangeReader createReader(PipelineOptions options) { return new CoarseRangeReader(this); } } @@ -105,7 +105,7 @@ public class OffsetBasedSourceTest { } @Override - public boolean startImpl() throws IOException { + public boolean startImpl() { current = getCurrentSource().getStartOffset(); while (current % granularity != 0) { ++current; @@ -114,7 +114,7 @@ public class OffsetBasedSourceTest { } @Override - public boolean advanceImpl() throws IOException { + public boolean advanceImpl() { ++current; return true; } @@ -130,7 +130,7 @@ public class OffsetBasedSourceTest { } @Override - public void close() throws IOException { } + public void close() { } } public static void assertSplitsAre(List<? extends OffsetBasedSource<?>> splits, @@ -211,7 +211,7 @@ public class OffsetBasedSourceTest { // in the face of that. PipelineOptions options = PipelineOptionsFactory.create(); CoarseRangeSource source = new CoarseRangeSource(13, 35, 1, 10); - try (BoundedSource.BoundedReader<Integer> reader = source.createReader(options)) { + try (CoarseRangeReader reader = source.createReader(options)) { List<Integer> items = new ArrayList<>(); assertEquals(0.0, reader.getFractionConsumed(), 1e-6); @@ -304,7 +304,7 @@ public class OffsetBasedSourceTest { public void testSplitAtFraction() throws IOException { PipelineOptions options = PipelineOptionsFactory.create(); CoarseRangeSource source = new CoarseRangeSource(13, 35, 1, 10); - try (CoarseRangeReader reader = (CoarseRangeReader) source.createReader(options)) { + try (CoarseRangeReader reader = source.createReader(options)) { List<Integer> originalItems = new ArrayList<>(); assertTrue(reader.start()); originalItems.add(reader.getCurrent()); @@ -341,4 +341,20 @@ public class OffsetBasedSourceTest { CoarseRangeSource original = new CoarseRangeSource(13, 35, 1, 10); assertSplitAtFractionExhaustive(original, options); } + + @Test + public void testEmptyOffsetRange() throws Exception { + CoarseRangeSource empty = new CoarseRangeSource(0, 0, 1, 1); + try (CoarseRangeReader reader = empty.createReader(PipelineOptionsFactory.create())) { + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(OffsetBasedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining()); + assertEquals(0.0, reader.getFractionConsumed(), 0.0001); + + assertFalse(reader.start()); + + assertEquals(0, reader.getSplitPointsConsumed()); + assertEquals(0, reader.getSplitPointsRemaining()); + assertEquals(1.0, reader.getFractionConsumed(), 0.0001); + } + } }
