Repository: incubator-beam Updated Branches: refs/heads/master 40bd27602 -> ddb59125a
Add support for having an empty CountingInput/CountingSource Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/30ff1ee1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/30ff1ee1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/30ff1ee1 Branch: refs/heads/master Commit: 30ff1ee17bb290f2b50fd082d8cb63d48280c5c2 Parents: 40bd276 Author: Luke Cwik <lc...@google.com> Authored: Thu Dec 8 15:22:35 2016 -0800 Committer: Luke Cwik <lc...@google.com> Committed: Thu Dec 8 18:41:17 2016 -0800 ---------------------------------------------------------------------- .../org/apache/beam/sdk/io/CountingInput.java | 12 ++++++---- .../org/apache/beam/sdk/io/CountingSource.java | 12 ++++++---- .../apache/beam/sdk/io/CountingInputTest.java | 23 +++++++++++++++++++- .../apache/beam/sdk/io/CountingSourceTest.java | 10 +++++++++ 4 files changed, 48 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30ff1ee1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java index 3148d8d..ac70aca 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingInput.java @@ -75,17 +75,21 @@ public class CountingInput { * from {@code 0} to {@code numElements - 1}. */ public static BoundedCountingInput upTo(long numElements) { - checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements); + checkArgument(numElements >= 0, + "numElements (%s) must be greater than or equal to 0", + numElements); return new BoundedCountingInput(numElements); } /** * Creates a {@link BoundedCountingInput} that will produce elements - * starting from {@code startIndex} to {@code endIndex - 1}. + * starting from {@code startIndex} (inclusive) to {@code endIndex} (exclusive). + * If {@code startIndex == endIndex}, then no elements will be produced. */ public static BoundedCountingInput forSubrange(long startIndex, long endIndex) { - checkArgument(endIndex > startIndex, "endIndex (%s) must be greater than startIndex (%s)", - endIndex, startIndex); + checkArgument(endIndex >= startIndex, + "endIndex (%s) must be greater than or equal to startIndex (%s)", + endIndex, startIndex); return new BoundedCountingInput(startIndex, endIndex); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30ff1ee1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java index bc7fb78..9752dba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java @@ -78,17 +78,21 @@ public class CountingSource { */ @Deprecated public static BoundedSource<Long> upTo(long numElements) { - checkArgument(numElements > 0, "numElements (%s) must be greater than 0", numElements); + checkArgument(numElements >= 0, + "numElements (%s) must be greater than or equal to 0", + numElements); return new BoundedCountingSource(0, numElements); } /** * Creates a {@link BoundedSource} that will produce elements - * from {@code startIndex} to {@code endIndex - 1}. + * starting from {@code startIndex} (inclusive) to {@code endIndex} (exclusive). + * If {@code startIndex == endIndex}, then no elements will be produced. */ static BoundedSource<Long> createSourceForSubrange(long startIndex, long endIndex) { - checkArgument(endIndex > startIndex, "endIndex (%s) must be greater than startIndex (%s)", - endIndex, startIndex); + checkArgument(endIndex >= startIndex, + "endIndex (%s) must be greater than or equal to startIndex (%s)", + endIndex, startIndex); return new BoundedCountingSource(startIndex, endIndex); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30ff1ee1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java index 02b4ba0..4349f66 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java @@ -71,7 +71,7 @@ public class CountingInputTest { public void testBoundedInput() { Pipeline p = TestPipeline.create(); long numElements = 1000; - PCollection<Long> input = p.apply(CountingInput.upTo(numElements)); + PCollection<Long> input = p.apply(Read.from(CountingSource.upTo(numElements))); addCountingAsserts(input, 0, numElements); p.run(); @@ -79,6 +79,27 @@ public class CountingInputTest { @Test @Category(RunnableOnService.class) + public void testEmptyBoundedSource() { + Pipeline p = TestPipeline.create(); + PCollection<Long> input = p.apply(CountingInput.upTo(0)); + + PAssert.that(input).empty(); + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testEmptyBoundedSourceUsingRange() { + Pipeline p = TestPipeline.create(); + PCollection<Long> input = p.apply(CountingInput.forSubrange(42, 42)); + + PAssert.that(input).empty(); + p.run(); + } + + + @Test + @Category(RunnableOnService.class) public void testBoundedInputSubrange() { Pipeline p = TestPipeline.create(); long start = 10; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/30ff1ee1/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java index 88c68d3..5eccde6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java @@ -92,6 +92,16 @@ public class CountingSourceTest { @Test @Category(RunnableOnService.class) + public void testEmptyBoundedSource() { + Pipeline p = TestPipeline.create(); + PCollection<Long> input = p.apply(Read.from(CountingSource.upTo(0))); + + PAssert.that(input).empty(); + p.run(); + } + + @Test + @Category(RunnableOnService.class) public void testBoundedSourceSplits() throws Exception { Pipeline p = TestPipeline.create(); long numElements = 1000;