Replaces all usages of CountingInput with GenerateSequence
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6a9a24c0 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6a9a24c0 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6a9a24c0 Branch: refs/heads/master Commit: 6a9a24c064518bb83a7383babdff9b263dc61346 Parents: 88c6612 Author: Eugene Kirpichov <[email protected]> Authored: Wed Apr 19 15:32:08 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Apr 21 16:53:50 2017 -0700 ---------------------------------------------------------------------- .../translation/ReadUnboundTranslatorTest.java | 6 +- .../EmptyFlattenAsCreateFactoryTest.java | 10 ++-- .../core/construction/PCollectionsTest.java | 6 +- .../PTransformReplacementsTest.java | 4 +- .../core/construction/PTransformsTest.java | 17 +++--- .../core/construction/SdkComponentsTest.java | 16 +++--- .../runners/direct/DirectGraphVisitorTest.java | 6 +- .../beam/runners/direct/DirectRunnerTest.java | 10 ++-- .../runners/direct/EvaluationContextTest.java | 4 +- .../beam/runners/flink/ReadSourceITCase.java | 4 +- .../flink/ReadSourceStreamingITCase.java | 4 +- .../streaming/StreamingSourceMetricsTest.java | 7 ++- .../org/apache/beam/sdk/io/CountingSource.java | 43 ++++----------- .../apache/beam/sdk/io/GenerateSequence.java | 5 ++ .../org/apache/beam/sdk/values/PCollection.java | 9 +-- .../java/org/apache/beam/sdk/PipelineTest.java | 58 +++++++++++--------- .../beam/sdk/io/GenerateSequenceTest.java | 18 +++--- .../apache/beam/sdk/metrics/MetricsTest.java | 12 ++-- .../sdk/runners/TransformHierarchyTest.java | 7 +-- .../beam/sdk/testing/GatherAllPanesTest.java | 8 +-- .../apache/beam/sdk/testing/PAssertTest.java | 12 ++-- .../apache/beam/sdk/transforms/FlattenTest.java | 8 +-- .../apache/beam/sdk/transforms/ParDoTest.java | 4 +- .../sdk/transforms/windowing/WindowTest.java | 4 +- .../beam/sdk/values/PCollectionListTest.java | 25 +++++---- .../beam/sdk/values/PCollectionTupleTest.java | 6 +- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 6 +- .../sdk/io/gcp/bigtable/BigtableWriteIT.java | 4 +- .../beam/sdk/io/gcp/datastore/V1WriteIT.java | 4 +- 29 files changed, 162 insertions(+), 165 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java index 6d19bb9..e0cc251 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ReadUnboundTranslatorTest.java @@ -35,7 +35,7 @@ import org.apache.beam.runners.apex.translation.operators.ApexReadUnboundedInput import org.apache.beam.runners.apex.translation.utils.CollectionSource; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; @@ -92,12 +92,12 @@ public class ReadUnboundTranslatorTest { Pipeline p = Pipeline.create(options); Set<Long> expected = ContiguousSet.create(Range.closedOpen(0L, 10L), DiscreteDomain.longs()); - p.apply(CountingInput.upTo(10)) + p.apply(GenerateSequence.fromTo(0, 10)) .apply(ParDo.of(new EmbeddedCollector())); ApexRunnerResult result = (ApexRunnerResult) p.run(); DAG dag = result.getApexDAG(); - String operatorName = "CountingInput.BoundedCountingInput/Read(BoundedCountingSource)"; + String operatorName = "GenerateSequence/Read(BoundedCountingSource)"; DAG.OperatorMeta om = dag.getOperatorMeta(operatorName); Assert.assertNotNull(om); Assert.assertEquals(om.getOperator().getClass(), ApexReadUnboundedInputOperator.class); http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java index ae2d0a9..bfa3190 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java @@ -23,7 +23,7 @@ import static org.junit.Assert.assertThat; import java.util.Collections; import java.util.Map; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.runners.PTransformOverrideFactory.PTransformReplacement; import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; import org.apache.beam.sdk.testing.NeedsRunner; @@ -71,8 +71,8 @@ public class EmptyFlattenAsCreateFactoryTest { @Test public void getInputNonEmptyThrows() { PCollectionList<Long> nonEmpty = - PCollectionList.of(pipeline.apply(CountingInput.unbounded())) - .and(pipeline.apply(CountingInput.upTo(100L))); + PCollectionList.of(pipeline.apply("unbounded", GenerateSequence.from(0))) + .and(pipeline.apply("bounded", GenerateSequence.fromTo(0, 100))); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(nonEmpty.expand().toString()); thrown.expectMessage(EmptyFlattenAsCreateFactory.class.getSimpleName()); @@ -87,8 +87,8 @@ public class EmptyFlattenAsCreateFactoryTest { @Test public void mapOutputsSucceeds() { - PCollection<Long> original = pipeline.apply("Original", CountingInput.unbounded()); - PCollection<Long> replacement = pipeline.apply("Replacement", CountingInput.unbounded()); + PCollection<Long> original = pipeline.apply("Original", GenerateSequence.from(0)); + PCollection<Long> replacement = pipeline.apply("Replacement", GenerateSequence.from(0)); Map<PValue, ReplacementOutput> mapping = factory.mapOutputs(original.expand(), replacement); assertThat( http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java index 636d245..48aa1f1 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java @@ -33,7 +33,7 @@ import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.GroupByKey; @@ -71,7 +71,7 @@ public class PCollectionsTest { public static Iterable<PCollection<?>> data() { Pipeline pipeline = TestPipeline.create(); PCollection<Integer> ints = pipeline.apply("ints", Create.of(1, 2, 3)); - PCollection<Long> longs = pipeline.apply("unbounded longs", CountingInput.unbounded()); + PCollection<Long> longs = pipeline.apply("unbounded longs", GenerateSequence.from(0)); PCollection<Long> windowedLongs = longs.apply( "into fixed windows", @@ -83,7 +83,7 @@ public class PCollectionsTest { .apply("group", GroupByKey.<String, String>create()); PCollection<Long> coderLongs = pipeline - .apply("counts with alternative coder", CountingInput.upTo(10L)) + .apply("counts with alternative coder", GenerateSequence.fromTo(0, 10)) .setCoder(BigEndianLongCoder.of()); PCollection<Integer> allCustomInts = pipeline http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformReplacementsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformReplacementsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformReplacementsTest.java index b065617..c6c50f0 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformReplacementsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformReplacementsTest.java @@ -23,7 +23,7 @@ import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableMap; import java.util.Collections; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; @@ -47,7 +47,7 @@ import org.junit.runners.JUnit4; public class PTransformReplacementsTest { @Rule public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); @Rule public ExpectedException thrown = ExpectedException.none(); - private PCollection<Long> mainInput = pipeline.apply(CountingInput.unbounded()); + private PCollection<Long> mainInput = pipeline.apply(GenerateSequence.from(0)); private PCollectionView<String> sideInput = pipeline.apply(Create.of("foo")).apply(View.<String>asSingleton()); http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java index 4e3cdb6..4ef70c0 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java @@ -35,9 +35,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; import org.apache.beam.sdk.common.runner.v1.RunnerApi.PTransform; -import org.apache.beam.sdk.io.CountingInput; -import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput; import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -75,7 +74,7 @@ public class PTransformsTest { TestPipeline compositeReadPipeline = TestPipeline.create(); ToAndFromProtoSpec compositeRead = ToAndFromProtoSpec.composite( - countingInput(compositeReadPipeline), + generateSequence(compositeReadPipeline), ToAndFromProtoSpec.leaf(read(compositeReadPipeline))); return ImmutableList.<ToAndFromProtoSpec>builder() .add(readLeaf) @@ -152,11 +151,11 @@ public class PTransformsTest { @ProcessElement public void process(ProcessContext context) {} } - private static AppliedPTransform<?, ?, ?> countingInput(Pipeline pipeline) { - UnboundedCountingInput input = CountingInput.unbounded(); - PCollection<Long> pcollection = pipeline.apply(input); - return AppliedPTransform.<PBegin, PCollection<Long>, UnboundedCountingInput>of( - "Count", pipeline.begin().expand(), pcollection.expand(), input, pipeline); + private static AppliedPTransform<?, ?, ?> generateSequence(Pipeline pipeline) { + GenerateSequence sequence = GenerateSequence.from(0); + PCollection<Long> pcollection = pipeline.apply(sequence); + return AppliedPTransform.<PBegin, PCollection<Long>, GenerateSequence>of( + "Count", pipeline.begin().expand(), pcollection.expand(), sequence, pipeline); } private static AppliedPTransform<?, ?, ?> read(Pipeline pipeline) { @@ -169,7 +168,7 @@ public class PTransformsTest { private static AppliedPTransform<?, ?, ?> multiMultiParDo(Pipeline pipeline) { PCollectionView<String> view = pipeline.apply(Create.of("foo")).apply(View.<String>asSingleton()); - PCollection<Long> input = pipeline.apply(CountingInput.unbounded()); + PCollection<Long> input = pipeline.apply(GenerateSequence.from(0)); ParDo.MultiOutput<Long, KV<Long, String>> parDo = ParDo.of(new TestDoFn()) .withSideInputs(view) http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java index 895aec4..021f19c 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java @@ -34,7 +34,7 @@ import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.Create; @@ -106,7 +106,7 @@ public class SdkComponentsTest { @Test public void registerTransformAfterChildren() throws IOException { Create.Values<Long> create = Create.of(1L, 2L, 3L); - CountingInput.UnboundedCountingInput createChild = CountingInput.unbounded(); + GenerateSequence createChild = GenerateSequence.from(0); PCollection<Long> pt = pipeline.apply(create); String userName = "my_transform"; @@ -115,7 +115,7 @@ public class SdkComponentsTest { AppliedPTransform.<PBegin, PCollection<Long>, Create.Values<Long>>of( userName, pipeline.begin().expand(), pt.expand(), create, pipeline); AppliedPTransform<?, ?, ?> childTransform = - AppliedPTransform.<PBegin, PCollection<Long>, CountingInput.UnboundedCountingInput>of( + AppliedPTransform.<PBegin, PCollection<Long>, GenerateSequence>of( childUserName, pipeline.begin().expand(), pt.expand(), createChild, pipeline); String childId = components.registerPTransform(childTransform, @@ -159,7 +159,7 @@ public class SdkComponentsTest { @Test public void registerTransformWithUnregisteredChildren() throws IOException { Create.Values<Long> create = Create.of(1L, 2L, 3L); - CountingInput.UnboundedCountingInput createChild = CountingInput.unbounded(); + GenerateSequence createChild = GenerateSequence.from(0); PCollection<Long> pt = pipeline.apply(create); String userName = "my_transform"; @@ -168,7 +168,7 @@ public class SdkComponentsTest { AppliedPTransform.<PBegin, PCollection<Long>, Create.Values<Long>>of( userName, pipeline.begin().expand(), pt.expand(), create, pipeline); AppliedPTransform<?, ?, ?> childTransform = - AppliedPTransform.<PBegin, PCollection<Long>, CountingInput.UnboundedCountingInput>of( + AppliedPTransform.<PBegin, PCollection<Long>, GenerateSequence>of( childUserName, pipeline.begin().expand(), pt.expand(), createChild, pipeline); thrown.expect(IllegalArgumentException.class); @@ -179,7 +179,7 @@ public class SdkComponentsTest { @Test public void registerPCollection() throws IOException { - PCollection<Long> pCollection = pipeline.apply(CountingInput.unbounded()).setName("foo"); + PCollection<Long> pCollection = pipeline.apply(GenerateSequence.from(0)).setName("foo"); String id = components.registerPCollection(pCollection); assertThat(id, equalTo("foo")); components.toComponents().getPcollectionsOrThrow(id); @@ -188,10 +188,10 @@ public class SdkComponentsTest { @Test public void registerPCollectionExistingNameCollision() throws IOException { PCollection<Long> pCollection = - pipeline.apply("FirstCount", CountingInput.unbounded()).setName("foo"); + pipeline.apply("FirstCount", GenerateSequence.from(0)).setName("foo"); String firstId = components.registerPCollection(pCollection); PCollection<Long> duplicate = - pipeline.apply("SecondCount", CountingInput.unbounded()).setName("foo"); + pipeline.apply("SecondCount", GenerateSequence.from(0)).setName("foo"); String secondId = components.registerPCollection(duplicate); assertThat(firstId, equalTo("foo")); assertThat(secondId, containsString("foo")); http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java index b44c890..2afb6c3 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java @@ -24,11 +24,10 @@ import static org.junit.Assert.assertThat; import com.google.common.collect.Iterables; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -89,11 +88,10 @@ public class DirectGraphVisitorTest implements Serializable { public void getRootTransformsContainsRootTransforms() { PCollection<String> created = p.apply(Create.of("foo", "bar")); PCollection<Long> counted = p.apply(Read.from(CountingSource.upTo(1234L))); - PCollection<Long> unCounted = p.apply(CountingInput.unbounded()); + PCollection<Long> unCounted = p.apply(GenerateSequence.from(0)); p.traverseTopologically(visitor); DirectGraph graph = visitor.getGraph(); assertThat(graph.getRootTransforms(), hasSize(3)); - List<PTransform<?, ?>> unapplied = new ArrayList<>(); assertThat( graph.getRootTransforms(), Matchers.<AppliedPTransform<?, ?, ?>>containsInAnyOrder( http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 246c111..f9063f0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -52,8 +52,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -246,7 +246,7 @@ public class DirectRunnerTest implements Serializable { opts.setRunner(DirectRunner.class); final Pipeline p = Pipeline.create(opts); - p.apply(CountingInput.unbounded().withRate(1L, Duration.standardSeconds(1))); + p.apply(GenerateSequence.from(0).withRate(1L, Duration.standardSeconds(1))); final BlockingQueue<PipelineResult> resultExchange = new ArrayBlockingQueue<>(1); Runnable cancelRunnable = new Runnable() { @@ -513,8 +513,7 @@ public class DirectRunnerTest implements Serializable { @Test public void testUnencodableOutputFromBoundedRead() throws Exception { Pipeline p = getPipeline(); - PCollection<Long> pCollection = - p.apply(CountingInput.upTo(10)).setCoder(new LongNoDecodeCoder()); + p.apply(GenerateSequence.fromTo(0, 10)).setCoder(new LongNoDecodeCoder()); thrown.expectCause(isA(CoderException.class)); thrown.expectMessage("Cannot decode a long"); @@ -524,8 +523,7 @@ public class DirectRunnerTest implements Serializable { @Test public void testUnencodableOutputFromUnboundedRead() { Pipeline p = getPipeline(); - PCollection<Long> pCollection = - p.apply(CountingInput.unbounded()).setCoder(new LongNoDecodeCoder()); + p.apply(GenerateSequence.from(0)).setCoder(new LongNoDecodeCoder()); thrown.expectCause(isA(CoderException.class)); thrown.expectMessage("Cannot decode a long"); http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 7a65493..35b6709 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -44,7 +44,7 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -103,7 +103,7 @@ public class EvaluationContextTest { created = p.apply(Create.of(1, 2, 3)); downstream = created.apply(WithKeys.<String, Integer>of("foo")); view = created.apply(View.<Integer>asIterable()); - unbounded = p.apply(CountingInput.unbounded()); + unbounded = p.apply(GenerateSequence.from(0)); KeyedPValueTrackingVisitor keyedPValueTrackingVisitor = KeyedPValueTrackingVisitor.create(); p.traverseTopologically(keyedPValueTrackingVisitor); http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java index 44c9017..8302983 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java @@ -21,7 +21,7 @@ import com.google.common.base.Joiner; import java.io.File; import java.net.URI; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -68,7 +68,7 @@ public class ReadSourceITCase extends JavaProgramTestBase { Pipeline p = FlinkTestPipeline.createForBatch(); PCollection<String> result = p - .apply(CountingInput.upTo(10)) + .apply(GenerateSequence.fromTo(0, 10)) .apply(ParDo.of(new DoFn<Long, String>() { @ProcessElement public void processElement(ProcessContext c) throws Exception { http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java index 79b7882..c7a044e 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.flink; import com.google.common.base.Joiner; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; @@ -58,7 +58,7 @@ public class ReadSourceStreamingITCase extends StreamingProgramTestBase { Pipeline p = FlinkTestPipeline.createForStreaming(); p - .apply(CountingInput.upTo(10)) + .apply(GenerateSequence.fromTo(0, 10)) .apply(ParDo.of(new DoFn<Long, String>() { @ProcessElement public void processElement(ProcessContext c) throws Exception { http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java index ea76d31..85fdf72 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/StreamingSourceMetricsTest.java @@ -27,11 +27,12 @@ import org.apache.beam.runners.spark.PipelineRule; import org.apache.beam.runners.spark.TestSparkPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Source; import org.apache.beam.sdk.metrics.MetricNameFilter; import org.apache.beam.sdk.metrics.MetricQueryResults; import org.apache.beam.sdk.metrics.MetricsFilter; +import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; @@ -53,7 +54,9 @@ public class StreamingSourceMetricsTest implements Serializable { final long numElements = 1000; - pipeline.apply(CountingInput.unbounded().withMaxNumRecords(numElements)); + pipeline.apply( + // Use maxReadTime to force unbounded mode. + GenerateSequence.fromTo(0, numElements).withMaxReadTime(Duration.standardDays(1))); PipelineResult pipelineResult = pipeline.run(); http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java index dd018f4..b66a8b2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/CountingSource.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; @@ -39,44 +38,26 @@ import org.joda.time.Duration; import org.joda.time.Instant; /** - * A source that produces longs. When used as a {@link BoundedSource}, {@link CountingSource} - * starts at {@code 0} and counts up to a specified maximum. When used as an - * {@link UnboundedSource}, it counts up to {@link Long#MAX_VALUE} and then never produces more - * output. (In practice, this limit should never be reached.) + * Most users should use {@link GenerateSequence} instead. + * + * <p>A source that produces longs. When used as a {@link BoundedSource}, {@link CountingSource} + * starts at {@code 0} and counts up to a specified maximum. When used as an {@link + * UnboundedSource}, it counts up to {@link Long#MAX_VALUE} and then never produces more output. (In + * practice, this limit should never be reached.) * * <p>The bounded {@link CountingSource} is implemented based on {@link OffsetBasedSource} and * {@link OffsetBasedSource.OffsetBasedReader}, so it performs efficient initial splitting and it * supports dynamic work rebalancing. * - * <p>To produce a bounded {@code PCollection<Long>}, use {@link CountingSource#upTo(long)}: - * - * <pre>{@code - * Pipeline p = ... - * PTransform<PBegin, PCollection<Long>> producer = CountingInput.upTo(1000); - * PCollection<Long> bounded = p.apply(producer); - * }</pre> - * - * <p>To produce an unbounded {@code PCollection<Long>}, use {@link CountingInput#unbounded()}, - * calling {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} to provide values - * with timestamps other than {@link Instant#now}. - * - * <pre>{@code - * Pipeline p = ... - * - * // To create an unbounded PCollection that uses processing time as the element timestamp. - * PCollection<Long> unbounded = p.apply(CountingInput.unbounded()); - * // Or, to create an unbounded source that uses a provided function to set the element timestamp. - * PCollection<Long> unboundedWithTimestamps = - * p.apply(CountingInput.unbounded().withTimestampFn(someFn)); - * - * }</pre> + * <p>To produce a bounded source, use {@link #createSourceForSubrange(long, long)}. To produce an + * unbounded source, use {@link #createUnboundedFrom(long)}. */ public class CountingSource { /** * Creates a {@link BoundedSource} that will produce the specified number of elements, * from {@code 0} to {@code numElements - 1}. * - * @deprecated use {@link CountingInput#upTo(long)} instead + * @deprecated use {@link GenerateSequence} instead */ @Deprecated public static BoundedSource<Long> upTo(long numElements) { @@ -117,7 +98,7 @@ public class CountingSource { * <p>Elements in the resulting {@link PCollection PCollection<Long>} will have timestamps * corresponding to processing time at element generation, provided by {@link Instant#now}. * - * @deprecated use {@link CountingInput#unbounded()} instead + * @deprecated use {@link GenerateSequence} instead */ @Deprecated public static UnboundedSource<Long, CounterMark> unbounded() { @@ -133,8 +114,8 @@ public class CountingSource { * * <p>Note that the timestamps produced by {@code timestampFn} may not decrease. * - * @deprecated use {@link CountingInput#unbounded()} and call - * {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)} instead + * @deprecated use {@link GenerateSequence} and call + * {@link GenerateSequence#withTimestampFn(SerializableFunction)} instead */ @Deprecated public static UnboundedSource<Long, CounterMark> unboundedWithTimestampFn( http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java index 189539f..8a83d39 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/GenerateSequence.java @@ -109,6 +109,11 @@ public abstract class GenerateSequence extends PTransform<PBegin, PCollection<Lo .build(); } + /** Specifies to generate the range [from, to). */ + public static GenerateSequence fromTo(long from, long to) { + return from(from).to(to); + } + /** Specifies the maximum number to generate (exclusive). */ public GenerateSequence to(long to) { checkArgument( http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java index b69185b..f89d2aa 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java @@ -20,7 +20,7 @@ package org.apache.beam.sdk.values; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource.BoundedReader; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.transforms.Create; @@ -39,9 +39,10 @@ import org.apache.beam.sdk.util.WindowingStrategy; * be passed as the inputs of other PTransforms. * * <p>Some root transforms produce bounded {@code PCollections} and others - * produce unbounded ones. For example, {@link CountingInput#upTo} produces a fixed set of integers, - * so it produces a bounded {@link PCollection}. {@link CountingInput#unbounded} produces all - * integers as an infinite stream, so it produces an unbounded {@link PCollection}. + * produce unbounded ones. For example, {@link GenerateSequence#fromTo} produces a fixed set of + * integers, so it produces a bounded {@link PCollection}. {@link GenerateSequence#from} without + * a {@link GenerateSequence#to} produces all integers as an infinite stream, so it produces an + * unbounded {@link PCollection}. * * <p>Each element in a {@link PCollection} has an associated timestamp. Readers assign timestamps * to elements when they create {@link PCollection PCollections}, and other http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java index 75cabf2..9604c3f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java @@ -36,9 +36,7 @@ import java.util.Map; import java.util.Set; import org.apache.beam.sdk.Pipeline.PipelineExecutionException; import org.apache.beam.sdk.Pipeline.PipelineVisitor; -import org.apache.beam.sdk.io.CountingInput; -import org.apache.beam.sdk.io.CountingInput.BoundedCountingInput; -import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -317,8 +315,8 @@ public class PipelineTest { @Test public void testReplaceAll() { pipeline.enableAbandonedNodeEnforcement(false); - pipeline.apply(CountingInput.unbounded()); - pipeline.apply(CountingInput.upTo(100L)); + pipeline.apply("unbounded", GenerateSequence.from(0)); + pipeline.apply("bounded", GenerateSequence.fromTo(0, 100)); pipeline.replaceAll( ImmutableList.of( @@ -326,18 +324,18 @@ public class PipelineTest { new PTransformMatcher() { @Override public boolean matches(AppliedPTransform<?, ?, ?> application) { - return application.getTransform() instanceof UnboundedCountingInput; + return application.getTransform() instanceof GenerateSequence; } }, - new UnboundedCountingInputOverride()), + new GenerateSequenceToCreateOverride()), PTransformOverride.of( new PTransformMatcher() { @Override public boolean matches(AppliedPTransform<?, ?, ?> application) { - return application.getTransform() instanceof BoundedCountingInput; + return application.getTransform() instanceof Create.Values; } }, - new BoundedCountingInputOverride()))); + new CreateValuesToEmptyFlattenOverride()))); pipeline.traverseTopologically( new PipelineVisitor.Defaults() { @Override @@ -348,9 +346,9 @@ public class PipelineTest { not( anyOf( Matchers.<Class<? extends PTransform>>equalTo( - UnboundedCountingInput.class), + GenerateSequence.class), Matchers.<Class<? extends PTransform>>equalTo( - BoundedCountingInput.class)))); + Create.Values.class)))); } return CompositeBehavior.ENTER_TRANSFORM; } @@ -364,7 +362,7 @@ public class PipelineTest { @Test public void testReplaceAllIncomplete() { pipeline.enableAbandonedNodeEnforcement(false); - pipeline.apply(CountingInput.unbounded()); + pipeline.apply(GenerateSequence.from(0)); // The order is such that the output of the second will match the first, which is not permitted thrown.expect(IllegalStateException.class); @@ -374,18 +372,18 @@ public class PipelineTest { new PTransformMatcher() { @Override public boolean matches(AppliedPTransform<?, ?, ?> application) { - return application.getTransform() instanceof BoundedCountingInput; + return application.getTransform() instanceof Create.Values; } }, - new BoundedCountingInputOverride()), + new CreateValuesToEmptyFlattenOverride()), PTransformOverride.of( new PTransformMatcher() { @Override public boolean matches(AppliedPTransform<?, ?, ?> application) { - return application.getTransform() instanceof UnboundedCountingInput; + return application.getTransform() instanceof GenerateSequence; } }, - new UnboundedCountingInputOverride()))); + new GenerateSequenceToCreateOverride()))); } @Test @@ -455,11 +453,11 @@ public class PipelineTest { assertThat(names, not(hasItem("original_application/custom_name2"))); } - static class BoundedCountingInputOverride - implements PTransformOverrideFactory<PBegin, PCollection<Long>, BoundedCountingInput> { + static class GenerateSequenceToCreateOverride + implements PTransformOverrideFactory<PBegin, PCollection<Long>, GenerateSequence> { @Override public PTransformReplacement<PBegin, PCollection<Long>> getReplacementTransform( - AppliedPTransform<PBegin, PCollection<Long>, BoundedCountingInput> transform) { + AppliedPTransform<PBegin, PCollection<Long>, GenerateSequence> transform) { return PTransformReplacement.of(transform.getPipeline().begin(), Create.of(0L)); } @@ -476,18 +474,28 @@ public class PipelineTest { TaggedPValue.of(replacement.getKey(), replacement.getValue()))); } } - static class UnboundedCountingInputOverride - implements PTransformOverrideFactory<PBegin, PCollection<Long>, UnboundedCountingInput> { + private static class EmptyFlatten<T> extends PTransform<PBegin, PCollection<T>> { @Override - public PTransformReplacement<PBegin, PCollection<Long>> getReplacementTransform( - AppliedPTransform<PBegin, PCollection<Long>, UnboundedCountingInput> transform) { - return PTransformReplacement.of(transform.getPipeline().begin(), CountingInput.upTo(100L)); + public PCollection<T> expand(PBegin input) { + PCollectionList<T> empty = PCollectionList.empty(input.getPipeline()); + return empty.apply(Flatten.<T>pCollections()); + } + } + + static class CreateValuesToEmptyFlattenOverride<T> + implements PTransformOverrideFactory<PBegin, PCollection<T>, Create.Values<T>> { + + @Override + public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform( + AppliedPTransform<PBegin, PCollection<T>, Create.Values<T>> transform) { + return PTransformReplacement.of( + transform.getPipeline().begin(), new EmptyFlatten<T>()); } @Override public Map<PValue, ReplacementOutput> mapOutputs( - Map<TupleTag<?>, PValue> outputs, PCollection<Long> newOutput) { + Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) { Map.Entry<TupleTag<?>, PValue> original = Iterables.getOnlyElement(outputs.entrySet()); Map.Entry<TupleTag<?>, PValue> replacement = Iterables.getOnlyElement(newOutput.expand().entrySet()); http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java index 49af479..e2c185b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/GenerateSequenceTest.java @@ -65,7 +65,7 @@ public class GenerateSequenceTest { @Category(ValidatesRunner.class) public void testBoundedInput() { long numElements = 1000; - PCollection<Long> input = p.apply(GenerateSequence.from(0).to(numElements)); + PCollection<Long> input = p.apply(GenerateSequence.fromTo(0, numElements)); addCountingAsserts(input, 0, numElements); p.run(); @@ -74,7 +74,7 @@ public class GenerateSequenceTest { @Test @Category(ValidatesRunner.class) public void testEmptyBoundedInput() { - PCollection<Long> input = p.apply(GenerateSequence.from(0).to(0)); + PCollection<Long> input = p.apply(GenerateSequence.fromTo(0, 0)); PAssert.that(input).empty(); p.run(); @@ -83,7 +83,7 @@ public class GenerateSequenceTest { @Test @Category(ValidatesRunner.class) public void testEmptyBoundedInputSubrange() { - PCollection<Long> input = p.apply(GenerateSequence.from(42).to(42)); + PCollection<Long> input = p.apply(GenerateSequence.fromTo(42, 42)); PAssert.that(input).empty(); p.run(); @@ -94,7 +94,7 @@ public class GenerateSequenceTest { public void testBoundedInputSubrange() { long start = 10; long end = 1000; - PCollection<Long> input = p.apply(GenerateSequence.from(start).to(end)); + PCollection<Long> input = p.apply(GenerateSequence.fromTo(start, end)); addCountingAsserts(input, start, end); p.run(); @@ -102,7 +102,7 @@ public class GenerateSequenceTest { @Test public void testBoundedDisplayData() { - PTransform<?, ?> input = GenerateSequence.from(0).to(1234); + PTransform<?, ?> input = GenerateSequence.fromTo(0, 1234); DisplayData displayData = DisplayData.from(input); assertThat(displayData, hasDisplayItem("from", 0)); assertThat(displayData, hasDisplayItem("to", 1234)); @@ -110,7 +110,7 @@ public class GenerateSequenceTest { @Test public void testBoundedDisplayDataSubrange() { - PTransform<?, ?> input = GenerateSequence.from(12).to(1234); + PTransform<?, ?> input = GenerateSequence.fromTo(12, 1234); DisplayData displayData = DisplayData.from(input); assertThat(displayData, hasDisplayItem("from", 12)); assertThat(displayData, hasDisplayItem("to", 1234)); @@ -124,7 +124,7 @@ public class GenerateSequenceTest { long elemsPerPeriod = 10L; Duration periodLength = Duration.millis(8); PCollection<Long> input = - p.apply(GenerateSequence.from(0).to(numElements).withRate(elemsPerPeriod, periodLength)); + p.apply(GenerateSequence.fromTo(0, numElements).withRate(elemsPerPeriod, periodLength)); addCountingAsserts(input, 0, numElements); long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod; @@ -147,7 +147,7 @@ public class GenerateSequenceTest { long numElements = 1000; PCollection<Long> input = - p.apply(GenerateSequence.from(0).to(numElements).withTimestampFn(new ValueAsTimestampFn())); + p.apply(GenerateSequence.fromTo(0, numElements).withTimestampFn(new ValueAsTimestampFn())); addCountingAsserts(input, 0, numElements); PCollection<Long> diffs = @@ -172,7 +172,7 @@ public class GenerateSequenceTest { }; PTransform<?, ?> input = - GenerateSequence.from(0).to(1234).withMaxReadTime(maxReadTime).withTimestampFn(timestampFn); + GenerateSequence.fromTo(0, 1234).withMaxReadTime(maxReadTime).withTimestampFn(timestampFn); DisplayData displayData = DisplayData.from(input); http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index afe384d..312a433 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -29,7 +29,7 @@ import static org.junit.Assert.assertThat; import java.io.Serializable; import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.UsesAttemptedMetrics; import org.apache.beam.sdk.testing.UsesCommittedMetrics; @@ -37,10 +37,10 @@ import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.hamcrest.CoreMatchers; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.After; import org.junit.Rule; @@ -236,7 +236,7 @@ public class MetricsTest implements Serializable { public void testBoundedSourceMetrics() { long numElements = 1000; - PCollection<Long> input = pipeline.apply(CountingInput.upTo(numElements)); + pipeline.apply(GenerateSequence.fromTo(0, numElements)); PipelineResult pipelineResult = pipeline.run(); @@ -257,8 +257,10 @@ public class MetricsTest implements Serializable { public void testUnboundedSourceMetrics() { long numElements = 1000; - PCollection<Long> input = pipeline - .apply((CountingInput.unbounded()).withMaxNumRecords(numElements)); + + // Use withMaxReadTime to force unbounded mode. + pipeline.apply( + GenerateSequence.fromTo(0, numElements).withMaxReadTime(Duration.standardDays(1))); PipelineResult pipelineResult = pipeline.run(); http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index 3638fc8..1b884e2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -32,9 +32,8 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults; -import org.apache.beam.sdk.io.CountingInput; -import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput; import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; import org.apache.beam.sdk.runners.TransformHierarchy.Node; @@ -254,7 +253,7 @@ public class TransformHierarchyTest implements Serializable { } }); - UnboundedCountingInput genUpstream = CountingInput.unbounded(); + GenerateSequence genUpstream = GenerateSequence.from(0); PCollection<Long> upstream = pipeline.apply(genUpstream); PCollection<Long> output = upstream.apply("Original", originalParDo); hierarchy.pushNode("Upstream", pipeline.begin(), genUpstream); @@ -417,7 +416,7 @@ public class TransformHierarchyTest implements Serializable { } }); - UnboundedCountingInput genUpstream = CountingInput.unbounded(); + GenerateSequence genUpstream = GenerateSequence.from(0); PCollection<Long> upstream = pipeline.apply(genUpstream); PCollection<Long> output = upstream.apply("Original", originalParDo); Node upstreamNode = hierarchy.pushNode("Upstream", pipeline.begin(), genUpstream); http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java index 969bbc4..b546158 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/GatherAllPanesTest.java @@ -21,7 +21,7 @@ import static org.junit.Assert.fail; import com.google.common.collect.Iterables; import java.io.Serializable; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.SerializableFunction; @@ -54,7 +54,7 @@ public class GatherAllPanesTest implements Serializable { @Category(NeedsRunner.class) public void singlePaneSingleReifiedPane() { PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> accumulatedPanes = - p.apply(CountingInput.upTo(20000)) + p.apply(GenerateSequence.fromTo(0, 20000)) .apply( WithTimestamps.of( new SerializableFunction<Long, Instant>() { @@ -95,8 +95,8 @@ public class GatherAllPanesTest implements Serializable { @Test @Category(NeedsRunner.class) public void multiplePanesMultipleReifiedPane() { - PCollection<Long> someElems = p.apply("someLongs", CountingInput.upTo(20000)); - PCollection<Long> otherElems = p.apply("otherLongs", CountingInput.upTo(20000)); + PCollection<Long> someElems = p.apply("someLongs", GenerateSequence.fromTo(0, 20000)); + PCollection<Long> otherElems = p.apply("otherLongs", GenerateSequence.fromTo(0, 20000)); PCollection<Iterable<ValueInSingleWindow<Iterable<Long>>>> accumulatedPanes = PCollectionList.of(someElems) .and(otherElems) http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java index 3528797..a5362b1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java @@ -37,7 +37,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Sum; @@ -425,7 +425,7 @@ public class PAssertTest implements Serializable { @Test @Category(ValidatesRunner.class) public void testEmptyFalse() throws Exception { - PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L)); + PCollection<Long> vals = pipeline.apply(GenerateSequence.fromTo(0, 5)); PAssert.that("Vals should have been empty", vals).empty(); Throwable thrown = runExpectingAssertionFailure(pipeline); @@ -439,7 +439,7 @@ public class PAssertTest implements Serializable { @Test @Category(ValidatesRunner.class) public void testEmptyFalseDefaultReasonString() throws Exception { - PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L)); + PCollection<Long> vals = pipeline.apply(GenerateSequence.fromTo(0, 5)); PAssert.that(vals).empty(); Throwable thrown = runExpectingAssertionFailure(pipeline); @@ -447,14 +447,14 @@ public class PAssertTest implements Serializable { String message = thrown.getMessage(); assertThat(message, - containsString("CountingInput.BoundedCountingInput/Read(BoundedCountingSource).out")); + containsString("GenerateSequence/Read(BoundedCountingSource).out")); assertThat(message, containsString("Expected: iterable over [] in any order")); } @Test @Category(ValidatesRunner.class) public void testAssertionSiteIsCapturedWithMessage() throws Exception { - PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L)); + PCollection<Long> vals = pipeline.apply(GenerateSequence.fromTo(0, 5)); assertThatCollectionIsEmptyWithMessage(vals); Throwable thrown = runExpectingAssertionFailure(pipeline); @@ -473,7 +473,7 @@ public class PAssertTest implements Serializable { @Test @Category(ValidatesRunner.class) public void testAssertionSiteIsCapturedWithoutMessage() throws Exception { - PCollection<Long> vals = pipeline.apply(CountingInput.upTo(5L)); + PCollection<Long> vals = pipeline.apply(GenerateSequence.fromTo(0, 5)); assertThatCollectionIsEmptyWithoutMessage(vals); Throwable thrown = runExpectingAssertionFailure(pipeline); http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java index a4f2545..fca804b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java @@ -45,7 +45,7 @@ import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; @@ -138,9 +138,9 @@ public class FlattenTest implements Serializable { @Category(ValidatesRunner.class) public void testFlattenInputMultipleCopies() { int count = 5; - PCollection<Long> longs = p.apply("mkLines", CountingInput.upTo(count)); + PCollection<Long> longs = p.apply("mkLines", GenerateSequence.fromTo(0, count)); PCollection<Long> biggerLongs = - p.apply("mkOtherLines", CountingInput.upTo(count)) + p.apply("mkOtherLines", GenerateSequence.fromTo(0, count)) .apply( MapElements.via( new SimpleFunction<Long, Long>() { @@ -175,7 +175,7 @@ public class FlattenTest implements Serializable { Create.of(0L, 1L, 2L, 3L, null, 4L, 5L, null, 6L, 7L, 8L, null, 9L) .withCoder(NullableCoder.of(BigEndianLongCoder.of()))); PCollection<Long> varLongs = - p.apply("VarLengthLongs", CountingInput.upTo(5L)).setCoder(VarLongCoder.of()); + p.apply("VarLengthLongs", GenerateSequence.fromTo(0, 5)).setCoder(VarLongCoder.of()); PCollection<Long> flattened = PCollectionList.of(bigEndianLongs) http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 589c744..3424f86 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -64,7 +64,7 @@ import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -889,7 +889,7 @@ public class ParDoTest implements Serializable { @Test public void testMultiOutputAppliedMultipleTimesDifferentOutputs() { pipeline.enableAbandonedNodeEnforcement(false); - PCollection<Long> longs = pipeline.apply(CountingInput.unbounded()); + PCollection<Long> longs = pipeline.apply(GenerateSequence.from(0)); TupleTag<Long> mainOut = new TupleTag<>(); final TupleTag<String> valueAsString = new TupleTag<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 8bf022b..9fe7ec3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -39,7 +39,7 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -319,7 +319,7 @@ public class WindowTest implements Serializable { final PCollection<Long> initialWindows = pipeline - .apply(CountingInput.upTo(10L)) + .apply(GenerateSequence.fromTo(0, 10)) .apply("AssignWindows", Window.into(new WindowOddEvenBuckets())); // Sanity check the window assignment to demonstrate the baseline http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java index 76cba01..f687637 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionListTest.java @@ -30,10 +30,11 @@ import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.hamcrest.Matchers; +import org.joda.time.Duration; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -61,34 +62,36 @@ public class PCollectionListTest { public void testIterationOrder() { Pipeline p = TestPipeline.create(); PCollection<Long> createOne = p.apply("CreateOne", Create.of(1L, 2L, 3L)); - PCollection<Long> boundedCount = p.apply("CountBounded", CountingInput.upTo(23L)); - PCollection<Long> unboundedCount = p.apply("CountUnbounded", CountingInput.unbounded()); + PCollection<Long> boundedCount = p.apply("CountBounded", GenerateSequence.fromTo(0, 23)); + PCollection<Long> unboundedCount = p.apply("CountUnbounded", GenerateSequence.from(0)); PCollection<Long> createTwo = p.apply("CreateTwo", Create.of(-1L, -2L)); - PCollection<Long> maxRecordsCount = - p.apply("CountLimited", CountingInput.unbounded().withMaxNumRecords(22L)); + PCollection<Long> maxReadTimeCount = + p.apply( + "CountLimited", GenerateSequence.from(0).withMaxReadTime(Duration.standardSeconds(5))); ImmutableList<PCollection<Long>> counts = - ImmutableList.of(boundedCount, maxRecordsCount, unboundedCount); + ImmutableList.of(boundedCount, maxReadTimeCount, unboundedCount); // Build a PCollectionList from a list. This should have the same order as the input list. PCollectionList<Long> pcList = PCollectionList.of(counts); // Contains is the order-dependent matcher assertThat( pcList.getAll(), - contains(boundedCount, maxRecordsCount, unboundedCount)); + contains(boundedCount, maxReadTimeCount, unboundedCount)); // A list that is expanded with builder methods has the added value at the end PCollectionList<Long> withOneCreate = pcList.and(createTwo); assertThat( - withOneCreate.getAll(), contains(boundedCount, maxRecordsCount, unboundedCount, createTwo)); + withOneCreate.getAll(), + contains(boundedCount, maxReadTimeCount, unboundedCount, createTwo)); // Lists that are built entirely from the builder return outputs in the order they were added PCollectionList<Long> fromEmpty = PCollectionList.<Long>empty(p) .and(unboundedCount) .and(createOne) - .and(ImmutableList.of(boundedCount, maxRecordsCount)); + .and(ImmutableList.of(boundedCount, maxReadTimeCount)); assertThat( - fromEmpty.getAll(), contains(unboundedCount, createOne, boundedCount, maxRecordsCount)); + fromEmpty.getAll(), contains(unboundedCount, createOne, boundedCount, maxReadTimeCount)); Map<TupleTag<?>, PValue> expansion = fromEmpty.expand(); // Tag->PValue mappings are stable between expansions. They don't need to be stable across @@ -96,7 +99,7 @@ public class PCollectionListTest { assertThat(expansion, equalTo(fromEmpty.expand())); List<PCollection<Long>> expectedList = - ImmutableList.of(unboundedCount, createOne, boundedCount, maxRecordsCount); + ImmutableList.of(unboundedCount, createOne, boundedCount, maxReadTimeCount); assertThat(expansion.values(), containsInAnyOrder(expectedList.toArray())); } http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java index 9df0512..cc8761b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java @@ -31,7 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.ValidatesRunner; @@ -108,7 +108,7 @@ public final class PCollectionTupleTest implements Serializable { public void testEquals() { TestPipeline p = TestPipeline.create(); TupleTag<Long> longTag = new TupleTag<>(); - PCollection<Long> longs = p.apply(CountingInput.unbounded()); + PCollection<Long> longs = p.apply(GenerateSequence.from(0)); TupleTag<String> strTag = new TupleTag<>(); PCollection<String> strs = p.apply(Create.of("foo", "bar")); @@ -135,7 +135,7 @@ public final class PCollectionTupleTest implements Serializable { TupleTag<Long> longTag = new TupleTag<>(); Pipeline p = TestPipeline.create(); - PCollection<Long> longs = p.apply(CountingInput.upTo(100L)); + PCollection<Long> longs = p.apply(GenerateSequence.fromTo(0, 100)); PCollection<String> strs = p.apply(Create.of("foo", "bar", "baz")); PCollection<Integer> ints = longs.apply(MapElements.via(new SimpleFunction<Long, Integer>() { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 8e1632f..e11dd74 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -75,8 +75,8 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.io.CountingInput; import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonSchemaToTableSchema; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; @@ -1040,7 +1040,7 @@ public class BigQueryIOTest implements Serializable { PCollection<TableRow> tableRows; if (unbounded) { tableRows = - p.apply(CountingInput.unbounded()) + p.apply(GenerateSequence.from(0)) .apply( MapElements.via( new SimpleFunction<Long, TableRow>() { @@ -1090,7 +1090,7 @@ public class BigQueryIOTest implements Serializable { tableRef.setTableId("sometable"); PCollection<TableRow> tableRows = - p.apply(CountingInput.unbounded()) + p.apply(GenerateSequence.from(0)) .apply( MapElements.via( new SimpleFunction<Long, TableRow>() { http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java index 240fb31..4ba77c9 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java @@ -43,7 +43,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.TestPipeline; @@ -106,7 +106,7 @@ public class BigtableWriteIT implements Serializable { createEmptyTable(instanceName, tableId); Pipeline p = Pipeline.create(options); - p.apply(CountingInput.upTo(numRows)) + p.apply(GenerateSequence.fromTo(0, numRows)) .apply(ParDo.of(new DoFn<Long, KV<ByteString, Iterable<Mutation>>>() { @ProcessElement public void processElement(ProcessContext c) { http://git-wip-us.apache.org/repos/asf/beam/blob/6a9a24c0/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java index 0fcd9d3..322aecd 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java @@ -24,7 +24,7 @@ import static org.junit.Assert.assertEquals; import java.util.UUID; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.CreateEntityFn; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -66,7 +66,7 @@ public class V1WriteIT { Pipeline p = Pipeline.create(options); // Write to datastore - p.apply(CountingInput.upTo(numEntities)) + p.apply(GenerateSequence.fromTo(0, numEntities)) .apply(ParDo.of(new CreateEntityFn( options.getKind(), options.getNamespace(), ancestor))) .apply(DatastoreIO.v1().write().withProjectId(project));
