Renamed ParDo.Bound to ParDo.SingleOutput
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c6252ab8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c6252ab8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c6252ab8 Branch: refs/heads/master Commit: c6252ab81762f561a2b62b0a22932ad60397085e Parents: 6b5a9af Author: Eugene Kirpichov <[email protected]> Authored: Fri Mar 3 11:06:49 2017 -0800 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Mar 28 13:04:37 2017 -0700 ---------------------------------------------------------------------- .../core/construction/PTransformMatchers.java | 24 ++++----- .../construction/PTransformMatchersTest.java | 6 +-- .../org/apache/beam/runners/core/OldDoFn.java | 10 ++-- .../beam/runners/direct/DirectRunner.java | 4 +- .../dataflow/BatchStatefulParDoOverrides.java | 18 ++++--- .../beam/runners/dataflow/DataflowRunner.java | 4 +- .../dataflow/PrimitiveParDoSingleFactory.java | 13 +++-- .../DataflowPipelineTranslatorTest.java | 4 +- .../PrimitiveParDoSingleFactoryTest.java | 8 +-- .../runners/spark/SparkPipelineStateTest.java | 2 +- .../beam/sdk/AggregatorPipelineExtractor.java | 4 +- .../org/apache/beam/sdk/transforms/DoFn.java | 8 +-- .../org/apache/beam/sdk/transforms/ParDo.java | 20 ++++---- .../org/apache/beam/sdk/transforms/View.java | 2 +- .../sdk/AggregatorPipelineExtractorTest.java | 54 ++++++++++---------- .../sdk/runners/TransformHierarchyTest.java | 8 +-- .../apache/beam/sdk/transforms/ParDoTest.java | 7 ++- .../apache/beam/sdk/values/TypedPValueTest.java | 2 +- 18 files changed, 100 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java index 38cf76f..f803a9f 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java @@ -70,16 +70,16 @@ public class PTransformMatchers { } /** - * A {@link PTransformMatcher} that matches a {@link ParDo.Bound} containing a {@link DoFn} that - * is splittable, as signified by {@link ProcessElementMethod#isSplittable()}. + * A {@link PTransformMatcher} that matches a {@link ParDo.SingleOutput} containing a {@link DoFn} + * that is splittable, as signified by {@link ProcessElementMethod#isSplittable()}. */ public static PTransformMatcher splittableParDoSingle() { return new PTransformMatcher() { @Override public boolean matches(AppliedPTransform<?, ?, ?> application) { PTransform<?, ?> transform = application.getTransform(); - if (transform instanceof ParDo.Bound) { - DoFn<?, ?> fn = ((ParDo.Bound<?, ?>) transform).getFn(); + if (transform instanceof ParDo.SingleOutput) { + DoFn<?, ?> fn = ((ParDo.SingleOutput<?, ?>) transform).getFn(); DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn); return signature.processElement().isSplittable(); } @@ -89,17 +89,17 @@ public class PTransformMatchers { } /** - * A {@link PTransformMatcher} that matches a {@link ParDo.Bound} containing a {@link DoFn} that - * uses state or timers, as specified by {@link DoFnSignature#usesState()} and - * {@link DoFnSignature#usesTimers()}. + * A {@link PTransformMatcher} that matches a {@link ParDo.SingleOutput} containing a {@link DoFn} + * that uses state or timers, as specified by {@link DoFnSignature#usesState()} and {@link + * DoFnSignature#usesTimers()}. */ public static PTransformMatcher stateOrTimerParDoSingle() { return new PTransformMatcher() { @Override public boolean matches(AppliedPTransform<?, ?, ?> application) { PTransform<?, ?> transform = application.getTransform(); - if (transform instanceof ParDo.Bound) { - DoFn<?, ?> fn = ((ParDo.Bound<?, ?>) transform).getFn(); + if (transform instanceof ParDo.SingleOutput) { + DoFn<?, ?> fn = ((ParDo.SingleOutput<?, ?>) transform).getFn(); DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn); return signature.usesState() || signature.usesTimers(); } @@ -148,7 +148,7 @@ public class PTransformMatchers { } /** - * A {@link PTransformMatcher} which matches a {@link ParDo.Bound} or {@link ParDo.BoundMulti} + * A {@link PTransformMatcher} which matches a {@link ParDo.SingleOutput} or {@link ParDo.BoundMulti} * where the {@link DoFn} is of the provided type. */ public static PTransformMatcher parDoWithFnType(final Class<? extends DoFn> fnType) { @@ -156,8 +156,8 @@ public class PTransformMatchers { @Override public boolean matches(AppliedPTransform<?, ?, ?> application) { DoFn<?, ?> fn; - if (application.getTransform() instanceof ParDo.Bound) { - fn = ((ParDo.Bound) application.getTransform()).getFn(); + if (application.getTransform() instanceof ParDo.SingleOutput) { + fn = ((ParDo.SingleOutput) application.getTransform()).getFn(); } else if (application.getTransform() instanceof ParDo.BoundMulti) { fn = ((ParDo.BoundMulti) application.getTransform()).getFn(); } else { http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 0fead17..484aba4 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -92,7 +92,7 @@ public class PTransformMatchersTest implements Serializable { @Test public void classEqualToMatchesSameClass() { - PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.Bound.class); + PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.SingleOutput.class); AppliedPTransform<?, ?, ?> application = getAppliedTransform( ParDo.of( @@ -127,7 +127,7 @@ public class PTransformMatchersTest implements Serializable { @Test public void classEqualToDoesNotMatchUnrelatedClass() { - PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.Bound.class); + PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.SingleOutput.class); AppliedPTransform<?, ?, ?> application = getAppliedTransform(Window.<KV<String, Integer>>into(new GlobalWindows())); @@ -192,7 +192,7 @@ public class PTransformMatchersTest implements Serializable { }; /** - * Demonstrates that a {@link ParDo.Bound} does not match any ParDo matcher. + * Demonstrates that a {@link ParDo.SingleOutput} does not match any ParDo matcher. */ @Test public void parDoSingle() { http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java index 4033260..e9d4740 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java @@ -141,8 +141,8 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl * <p>Once passed to {@code sideOutput} the element should not be modified * in any way. * - * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags withOutputTags} to - * specify the tags of side outputs that it consumes. Non-consumed side + * <p>The caller of {@code ParDo} uses {@link ParDo.SingleOutput#withOutputTags withOutputTags} + * to specify the tags of side outputs that it consumes. Non-consumed side * outputs, e.g., outputs for monitoring purposes only, don't necessarily * need to be specified. * @@ -157,7 +157,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl * to access any information about the input element. The output element * will have a timestamp of negative infinity. * - * @see ParDo#withOutputTags + * @see ParDo.SingleOutput#withOutputTags */ public abstract <T> void sideOutput(TupleTag<T> tag, T output); @@ -181,7 +181,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl * to access any information about the input element except for the * timestamp. * - * @see ParDo#withOutputTags + * @see ParDo.SingleOutput#withOutputTags */ public abstract <T> void sideOutputWithTimestamp( TupleTag<T> tag, T output, Instant timestamp); @@ -251,7 +251,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl * for how this corresponding window is determined. * * @throws IllegalArgumentException if this is not a side input - * @see ParDo#withSideInputs + * @see ParDo.SingleOutput#withSideInputs */ public abstract <T> T sideInput(PCollectionView<T> view); http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 62df6c8..4ee364f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -168,7 +168,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { /** The set of {@link PTransform PTransforms} that execute a UDF. Useful for some enforcements. */ private static final Set<Class<? extends PTransform>> CONTAINS_UDF = ImmutableSet.of( - Read.Bounded.class, Read.Unbounded.class, ParDo.Bound.class, ParDo.BoundMulti.class); + Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, ParDo.BoundMulti.class); enum Enforcement { ENCODABILITY { @@ -221,7 +221,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { enabledParDoEnforcements.add(ImmutabilityEnforcementFactory.create()); } Collection<ModelEnforcementFactory> parDoEnforcements = enabledParDoEnforcements.build(); - enforcements.put(ParDo.Bound.class, parDoEnforcements); + enforcements.put(ParDo.SingleOutput.class, parDoEnforcements); enforcements.put(ParDo.BoundMulti.class, parDoEnforcements); return enforcements.build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java index 91f84ab..82629db 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java @@ -58,12 +58,13 @@ import org.joda.time.Instant; public class BatchStatefulParDoOverrides { /** - * Returns a {@link PTransformOverrideFactory} that replaces a single-output - * {@link ParDo} with a composite transform specialized for the {@link DataflowRunner}. + * Returns a {@link PTransformOverrideFactory} that replaces a single-output {@link ParDo} with a + * composite transform specialized for the {@link DataflowRunner}. */ public static <K, InputT, OutputT> PTransformOverrideFactory< - PCollection<KV<K, InputT>>, PCollection<OutputT>, ParDo.Bound<KV<K, InputT>, OutputT>> + PCollection<KV<K, InputT>>, PCollection<OutputT>, + ParDo.SingleOutput<KV<K, InputT>, OutputT>> singleOutputOverrideFactory() { return new SingleOutputOverrideFactory<>(); } @@ -82,12 +83,13 @@ public class BatchStatefulParDoOverrides { private static class SingleOutputOverrideFactory<K, InputT, OutputT> implements PTransformOverrideFactory< - PCollection<KV<K, InputT>>, PCollection<OutputT>, ParDo.Bound<KV<K, InputT>, OutputT>> { + PCollection<KV<K, InputT>>, PCollection<OutputT>, + ParDo.SingleOutput<KV<K, InputT>, OutputT>> { @Override @SuppressWarnings("unchecked") public PTransform<PCollection<KV<K, InputT>>, PCollection<OutputT>> getReplacementTransform( - ParDo.Bound<KV<K, InputT>, OutputT> originalParDo) { + ParDo.SingleOutput<KV<K, InputT>, OutputT> originalParDo) { return new StatefulSingleOutputParDo<>(originalParDo); } @@ -129,13 +131,13 @@ public class BatchStatefulParDoOverrides { static class StatefulSingleOutputParDo<K, InputT, OutputT> extends PTransform<PCollection<KV<K, InputT>>, PCollection<OutputT>> { - private final ParDo.Bound<KV<K, InputT>, OutputT> originalParDo; + private final ParDo.SingleOutput<KV<K, InputT>, OutputT> originalParDo; - StatefulSingleOutputParDo(ParDo.Bound<KV<K, InputT>, OutputT> originalParDo) { + StatefulSingleOutputParDo(ParDo.SingleOutput<KV<K, InputT>, OutputT> originalParDo) { this.originalParDo = originalParDo; } - ParDo.Bound<KV<K, InputT>, OutputT> getOriginalParDo() { + ParDo.SingleOutput<KV<K, InputT>, OutputT> getOriginalParDo() { return originalParDo; } http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index a3249c3..ea96ae8 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -376,7 +376,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { .put( PTransformMatchers.classEqualTo(Combine.GroupedValues.class), new PrimitiveCombineGroupedValuesOverrideFactory()) - .put(PTransformMatchers.classEqualTo(ParDo.Bound.class), new PrimitiveParDoSingleFactory()); + .put( + PTransformMatchers.classEqualTo(ParDo.SingleOutput.class), + new PrimitiveParDoSingleFactory()); return ptoverrides.build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java index a749730..db50cc2 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactory.java @@ -26,21 +26,20 @@ import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.Bound; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; /** - * A {@link PTransformOverrideFactory} that produces {@link ParDoSingle} instances from - * {@link ParDo.Bound} instances. {@link ParDoSingle} is a primitive {@link PTransform}, to ensure + * A {@link PTransformOverrideFactory} that produces {@link ParDoSingle} instances from {@link + * ParDo.SingleOutput} instances. {@link ParDoSingle} is a primitive {@link PTransform}, to ensure * that {@link DisplayData} appears on all {@link ParDo ParDos} in the {@link DataflowRunner}. */ public class PrimitiveParDoSingleFactory<InputT, OutputT> extends SingleInputOutputOverrideFactory< - PCollection<? extends InputT>, PCollection<OutputT>, ParDo.Bound<InputT, OutputT>> { + PCollection<? extends InputT>, PCollection<OutputT>, ParDo.SingleOutput<InputT, OutputT>> { @Override public PTransform<PCollection<? extends InputT>, PCollection<OutputT>> getReplacementTransform( - ParDo.Bound<InputT, OutputT> transform) { + ParDo.SingleOutput<InputT, OutputT> transform) { return new ParDoSingle<>(transform); } @@ -49,9 +48,9 @@ public class PrimitiveParDoSingleFactory<InputT, OutputT> */ public static class ParDoSingle<InputT, OutputT> extends ForwardingPTransform<PCollection<? extends InputT>, PCollection<OutputT>> { - private final ParDo.Bound<InputT, OutputT> original; + private final ParDo.SingleOutput<InputT, OutputT> original; - private ParDoSingle(Bound<InputT, OutputT> original) { + private ParDoSingle(ParDo.SingleOutput<InputT, OutputT> original) { this.original = original; } http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 2d63193..eb55566 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -1013,8 +1013,8 @@ public class DataflowPipelineTranslatorTest implements Serializable { } }; - ParDo.Bound<Integer, Integer> parDo1 = ParDo.of(fn1); - ParDo.Bound<Integer, Integer> parDo2 = ParDo.of(fn2); + ParDo.SingleOutput<Integer, Integer> parDo1 = ParDo.of(fn1); + ParDo.SingleOutput<Integer, Integer> parDo2 = ParDo.of(fn2); pipeline .apply(Create.of(1, 2, 3)) .apply(parDo1) http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java index cb1e34e..bff46ea 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/PrimitiveParDoSingleFactoryTest.java @@ -58,11 +58,11 @@ public class PrimitiveParDoSingleFactoryTest implements Serializable { /** * A test that demonstrates that the replacement transform has the Display Data of the - * {@link ParDo.Bound} it replaces. + * {@link ParDo.SingleOutput} it replaces. */ @Test public void getReplacementTransformPopulateDisplayData() { - ParDo.Bound<Integer, Long> originalTransform = ParDo.of(new ToLongFn()); + ParDo.SingleOutput<Integer, Long> originalTransform = ParDo.of(new ToLongFn()); DisplayData originalDisplayData = DisplayData.from(originalTransform); PTransform<PCollection<? extends Integer>, PCollection<Long>> replacement = @@ -88,7 +88,7 @@ public class PrimitiveParDoSingleFactoryTest implements Serializable { pipeline .apply("StringSideInputVals", Create.of("foo", "bar", "baz")) .apply("SideStringsView", View.<String>asList()); - ParDo.Bound<Integer, Long> originalTransform = + ParDo.SingleOutput<Integer, Long> originalTransform = ParDo.of(new ToLongFn()).withSideInputs(sideLong, sideStrings); PTransform<PCollection<? extends Integer>, PCollection<Long>> replacementTransform = @@ -100,7 +100,7 @@ public class PrimitiveParDoSingleFactoryTest implements Serializable { @Test public void getReplacementTransformGetFn() { DoFn<Integer, Long> originalFn = new ToLongFn(); - ParDo.Bound<Integer, Long> originalTransform = ParDo.of(originalFn); + ParDo.SingleOutput<Integer, Long> originalTransform = ParDo.of(originalFn); PTransform<PCollection<? extends Integer>, PCollection<Long>> replacementTransform = factory.getReplacementTransform(originalTransform); ParDoSingle<Integer, Long> parDoSingle = (ParDoSingle<Integer, Long>) replacementTransform; http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java index 3a68d6f..cfbad01 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java @@ -62,7 +62,7 @@ public class SparkPipelineStateTest implements Serializable { private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the batch intentionally"; - private ParDo.Bound<String, String> printParDo(final String prefix) { + private ParDo.SingleOutput<String, String> printParDo(final String prefix) { return ParDo.of(new DoFn<String, String>() { @ProcessElement http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java index c79f779..7a422b8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java @@ -69,8 +69,8 @@ class AggregatorPipelineExtractor { private Collection<Aggregator<?, ?>> getAggregators(PTransform<?, ?> transform) { if (transform != null) { - if (transform instanceof ParDo.Bound) { - return AggregatorRetriever.getAggregators(((ParDo.Bound<?, ?>) transform).getFn()); + if (transform instanceof ParDo.SingleOutput) { + return AggregatorRetriever.getAggregators(((ParDo.SingleOutput<?, ?>) transform).getFn()); } else if (transform instanceof ParDo.BoundMulti) { return AggregatorRetriever.getAggregators( ((ParDo.BoundMulti<?, ?>) transform).getFn()); http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 20c66c0..a7730f0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -160,7 +160,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * <p>Once passed to {@code sideOutput} the element should not be modified * in any way. * - * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags} to + * <p>The caller of {@code ParDo} uses {@link ParDo.SingleOutput#withOutputTags} to * specify the tags of side outputs that it consumes. Non-consumed side * outputs, e.g., outputs for monitoring purposes only, don't necessarily * need to be specified. @@ -179,7 +179,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from * {@link StartBundle} or {@link FinishBundle} methods. * - * @see ParDo#withOutputTags + * @see ParDo.SingleOutput#withOutputTags */ public abstract <T> void sideOutput(TupleTag<T> tag, T output); @@ -206,7 +206,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * <p><i>Note:</i> A splittable {@link DoFn} is not allowed to output from * {@link StartBundle} or {@link FinishBundle} methods. * - * @see ParDo#withOutputTags + * @see ParDo.SingleOutput#withOutputTags */ public abstract <T> void sideOutputWithTimestamp( TupleTag<T> tag, T output, Instant timestamp); @@ -272,7 +272,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * Returns the value of the side input. * * @throws IllegalArgumentException if this is not a side input - * @see ParDo#withSideInputs + * @see ParDo.SingleOutput#withSideInputs */ public abstract <T> T sideInput(PCollectionView<T> view); http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 1e2e5b8..0b25aa1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -154,7 +154,7 @@ import org.apache.beam.sdk.values.TypedPValue; * {@link PCollectionView PCollectionViews} express styles of accessing * {@link PCollection PCollections} computed by earlier pipeline operations, * passed in to the {@link ParDo} transform using - * {@link ParDo.Bound#withSideInputs}, and their contents accessible to each of + * {@link SingleOutput#withSideInputs}, and their contents accessible to each of * the {@link DoFn} operations via {@link DoFn.ProcessContext#sideInput sideInput}. * For example: * @@ -184,7 +184,7 @@ import org.apache.beam.sdk.values.TypedPValue; * {@link PCollection PCollections}, each keyed by a distinct {@link TupleTag}, * and bundled in a {@link PCollectionTuple}. The {@link TupleTag TupleTags} * to be used for the output {@link PCollectionTuple} are specified by - * invoking {@link ParDo.Bound#withOutputTags}. Unconsumed side outputs do not + * invoking {@link SingleOutput#withOutputTags}. Unconsumed side outputs do not * necessarily need to be explicitly specified, even if the {@link DoFn} * generates them. Within the {@link DoFn}, an element is added to the * main output {@link PCollection} as normal, using @@ -424,9 +424,9 @@ public class ParDo { * <p>The resulting {@link PTransform PTransform} is ready to be applied, or further * properties can be set on it first. */ - public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { + public static <InputT, OutputT> SingleOutput<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { validate(fn); - return new Bound<InputT, OutputT>( + return new SingleOutput<InputT, OutputT>( fn, Collections.<PCollectionView<?>>emptyList(), displayDataForFn(fn)); } @@ -494,18 +494,18 @@ public class ParDo { * {@code PCollection<OutputT>}. * * <p>A multi-output form of this transform can be created with - * {@link ParDo.Bound#withOutputTags}. + * {@link SingleOutput#withOutputTags}. * * @param <InputT> the type of the (main) input {@link PCollection} elements * @param <OutputT> the type of the (main) output {@link PCollection} elements */ - public static class Bound<InputT, OutputT> + public static class SingleOutput<InputT, OutputT> extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { private final List<PCollectionView<?>> sideInputs; private final DoFn<InputT, OutputT> fn; private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData; - Bound( + SingleOutput( DoFn<InputT, OutputT> fn, List<PCollectionView<?>> sideInputs, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { @@ -521,7 +521,7 @@ public class ParDo { * * <p>See the discussion of Side Inputs above for more explanation. */ - public Bound<InputT, OutputT> withSideInputs(PCollectionView<?>... sideInputs) { + public SingleOutput<InputT, OutputT> withSideInputs(PCollectionView<?>... sideInputs) { return withSideInputs(Arrays.asList(sideInputs)); } @@ -532,9 +532,9 @@ public class ParDo { * * <p>See the discussion of Side Inputs above for more explanation. */ - public Bound<InputT, OutputT> withSideInputs( + public SingleOutput<InputT, OutputT> withSideInputs( Iterable<? extends PCollectionView<?>> sideInputs) { - return new Bound<>( + return new SingleOutput<>( fn, ImmutableList.<PCollectionView<?>>builder() .addAll(this.sideInputs) http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java index 767847d..67a41e4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java @@ -132,7 +132,7 @@ import org.apache.beam.sdk.values.PCollectionView; * } * </pre> * - * <p>See {@link ParDo#withSideInputs} for details on how to access + * <p>See {@link ParDo.SingleOutput#withSideInputs} for details on how to access * this variable inside a {@link ParDo} over another {@link PCollection}. */ public class View { http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java index 22efd85..52bcc93 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java @@ -64,17 +64,17 @@ public class AggregatorPipelineExtractorTest { @SuppressWarnings("unchecked") @Test - public void testGetAggregatorStepsWithParDoBoundExtractsSteps() { + public void testGetAggregatorStepsWithParDoSingleOutputExtractsSteps() { @SuppressWarnings("rawtypes") - ParDo.Bound bound = mock(ParDo.Bound.class, "Bound"); + ParDo.SingleOutput parDo = mock(ParDo.SingleOutput.class, "parDo"); AggregatorProvidingDoFn<ThreadGroup, StrictMath> fn = new AggregatorProvidingDoFn<>(); - when(bound.getFn()).thenReturn(fn); + when(parDo.getFn()).thenReturn(fn); Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs()); Aggregator<Integer, Integer> aggregatorTwo = fn.addAggregator(Min.ofIntegers()); TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class); - when(transformNode.getTransform()).thenReturn(bound); + when(transformNode.getTransform()).thenReturn(parDo); doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode))) .when(p) @@ -85,8 +85,8 @@ public class AggregatorPipelineExtractorTest { Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps = extractor.getAggregatorSteps(); - assertEquals(ImmutableSet.<PTransform<?, ?>>of(bound), aggregatorSteps.get(aggregatorOne)); - assertEquals(ImmutableSet.<PTransform<?, ?>>of(bound), aggregatorSteps.get(aggregatorTwo)); + assertEquals(ImmutableSet.<PTransform<?, ?>>of(parDo), aggregatorSteps.get(aggregatorOne)); + assertEquals(ImmutableSet.<PTransform<?, ?>>of(parDo), aggregatorSteps.get(aggregatorTwo)); assertEquals(aggregatorSteps.size(), 2); } @@ -94,15 +94,15 @@ public class AggregatorPipelineExtractorTest { @Test public void testGetAggregatorStepsWithParDoBoundMultiExtractsSteps() { @SuppressWarnings("rawtypes") - ParDo.BoundMulti bound = mock(ParDo.BoundMulti.class, "BoundMulti"); + ParDo.BoundMulti parDo = mock(ParDo.BoundMulti.class, "parDo"); AggregatorProvidingDoFn<Object, Void> fn = new AggregatorProvidingDoFn<>(); - when(bound.getFn()).thenReturn(fn); + when(parDo.getFn()).thenReturn(fn); Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Max.ofLongs()); Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(Min.ofDoubles()); TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class); - when(transformNode.getTransform()).thenReturn(bound); + when(transformNode.getTransform()).thenReturn(parDo); doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode))) .when(p) @@ -113,8 +113,8 @@ public class AggregatorPipelineExtractorTest { Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps = extractor.getAggregatorSteps(); - assertEquals(ImmutableSet.<PTransform<?, ?>>of(bound), aggregatorSteps.get(aggregatorOne)); - assertEquals(ImmutableSet.<PTransform<?, ?>>of(bound), aggregatorSteps.get(aggregatorTwo)); + assertEquals(ImmutableSet.<PTransform<?, ?>>of(parDo), aggregatorSteps.get(aggregatorOne)); + assertEquals(ImmutableSet.<PTransform<?, ?>>of(parDo), aggregatorSteps.get(aggregatorTwo)); assertEquals(2, aggregatorSteps.size()); } @@ -122,20 +122,20 @@ public class AggregatorPipelineExtractorTest { @Test public void testGetAggregatorStepsWithOneAggregatorInMultipleStepsAddsSteps() { @SuppressWarnings("rawtypes") - ParDo.Bound bound = mock(ParDo.Bound.class, "Bound"); + ParDo.SingleOutput parDo = mock(ParDo.SingleOutput.class, "parDo"); @SuppressWarnings("rawtypes") - ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound"); + ParDo.BoundMulti otherParDo = mock(ParDo.BoundMulti.class, "otherParDo"); AggregatorProvidingDoFn<String, Math> fn = new AggregatorProvidingDoFn<>(); - when(bound.getFn()).thenReturn(fn); - when(otherBound.getFn()).thenReturn(fn); + when(parDo.getFn()).thenReturn(fn); + when(otherParDo.getFn()).thenReturn(fn); Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs()); Aggregator<Double, Double> aggregatorTwo = fn.addAggregator(Min.ofDoubles()); TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class); - when(transformNode.getTransform()).thenReturn(bound); + when(transformNode.getTransform()).thenReturn(parDo); TransformHierarchy.Node otherTransformNode = mock(TransformHierarchy.Node.class); - when(otherTransformNode.getTransform()).thenReturn(otherBound); + when(otherTransformNode.getTransform()).thenReturn(otherParDo); doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode, otherTransformNode))) .when(p) @@ -147,9 +147,9 @@ public class AggregatorPipelineExtractorTest { extractor.getAggregatorSteps(); assertEquals( - ImmutableSet.<PTransform<?, ?>>of(bound, otherBound), aggregatorSteps.get(aggregatorOne)); + ImmutableSet.<PTransform<?, ?>>of(parDo, otherParDo), aggregatorSteps.get(aggregatorOne)); assertEquals( - ImmutableSet.<PTransform<?, ?>>of(bound, otherBound), aggregatorSteps.get(aggregatorTwo)); + ImmutableSet.<PTransform<?, ?>>of(parDo, otherParDo), aggregatorSteps.get(aggregatorTwo)); assertEquals(2, aggregatorSteps.size()); } @@ -157,25 +157,25 @@ public class AggregatorPipelineExtractorTest { @Test public void testGetAggregatorStepsWithDifferentStepsAddsSteps() { @SuppressWarnings("rawtypes") - ParDo.Bound bound = mock(ParDo.Bound.class, "Bound"); + ParDo.SingleOutput parDo = mock(ParDo.SingleOutput.class, "parDo"); AggregatorProvidingDoFn<ThreadGroup, Void> fn = new AggregatorProvidingDoFn<>(); Aggregator<Long, Long> aggregatorOne = fn.addAggregator(Sum.ofLongs()); - when(bound.getFn()).thenReturn(fn); + when(parDo.getFn()).thenReturn(fn); @SuppressWarnings("rawtypes") - ParDo.BoundMulti otherBound = mock(ParDo.BoundMulti.class, "otherBound"); + ParDo.BoundMulti otherParDo = mock(ParDo.BoundMulti.class, "otherParDo"); AggregatorProvidingDoFn<Long, Long> otherFn = new AggregatorProvidingDoFn<>(); Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(Sum.ofDoubles()); - when(otherBound.getFn()).thenReturn(otherFn); + when(otherParDo.getFn()).thenReturn(otherFn); TransformHierarchy.Node transformNode = mock(TransformHierarchy.Node.class); - when(transformNode.getTransform()).thenReturn(bound); + when(transformNode.getTransform()).thenReturn(parDo); TransformHierarchy.Node otherTransformNode = mock(TransformHierarchy.Node.class); - when(otherTransformNode.getTransform()).thenReturn(otherBound); + when(otherTransformNode.getTransform()).thenReturn(otherParDo); doAnswer(new VisitNodesAnswer(ImmutableList.of(transformNode, otherTransformNode))) .when(p) @@ -186,8 +186,8 @@ public class AggregatorPipelineExtractorTest { Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps = extractor.getAggregatorSteps(); - assertEquals(ImmutableSet.<PTransform<?, ?>>of(bound), aggregatorSteps.get(aggregatorOne)); - assertEquals(ImmutableSet.<PTransform<?, ?>>of(otherBound), aggregatorSteps.get(aggregatorTwo)); + assertEquals(ImmutableSet.<PTransform<?, ?>>of(parDo), aggregatorSteps.get(aggregatorOne)); + assertEquals(ImmutableSet.<PTransform<?, ?>>of(otherParDo), aggregatorSteps.get(aggregatorTwo)); assertEquals(2, aggregatorSteps.size()); } http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index 426fd8b..56dc743 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -46,7 +46,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.Bound; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PBegin; @@ -245,7 +245,7 @@ public class TransformHierarchyTest implements Serializable { @Test public void replaceWithCompositeSucceeds() { - final ParDo.Bound<Long, Long> originalParDo = + final SingleOutput<Long, Long> originalParDo = ParDo.of( new DoFn<Long, Long>() { @ProcessElement @@ -324,7 +324,7 @@ public class TransformHierarchyTest implements Serializable { PCollection.createPrimitiveOutputInternal( pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); - ParDo.Bound<Long, Long> pardo = + SingleOutput<Long, Long> pardo = ParDo.of( new DoFn<Long, Long>() { @ProcessElement @@ -408,7 +408,7 @@ public class TransformHierarchyTest implements Serializable { @Test public void visitAfterReplace() { Node root = hierarchy.getCurrent(); - final Bound<Long, Long> originalParDo = + final SingleOutput<Long, Long> originalParDo = ParDo.of( new DoFn<Long, Long>() { @ProcessElement http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 19265e1..9f621f8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -72,11 +72,10 @@ import org.apache.beam.sdk.testing.UsesTimersInParDo; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.DoFn.OnTimer; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; -import org.apache.beam.sdk.transforms.ParDo.Bound; +import org.apache.beam.sdk.transforms.ParDo.SingleOutput; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.DisplayDataMatchers; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; @@ -1511,7 +1510,7 @@ public class ParDoTest implements Serializable { } }; - Bound<String, String> parDo = ParDo.of(fn); + SingleOutput<String, String> parDo = ParDo.of(fn); DisplayData displayData = DisplayData.from(parDo); assertThat(displayData, hasDisplayItem(allOf( @@ -1534,7 +1533,7 @@ public class ParDoTest implements Serializable { } }; - Bound<String, String> parDo = ParDo.of(fn); + SingleOutput<String, String> parDo = ParDo.of(fn); DisplayData displayData = DisplayData.from(parDo); assertThat(displayData, includesDisplayDataFor("fn", fn)); http://git-wip-us.apache.org/repos/asf/beam/blob/c6252ab8/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java index 18d550c..211dfd9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java @@ -161,7 +161,7 @@ public class TypedPValueTest { public void testFinishSpecifyingShouldFailIfNoCoderInferrable() { p.enableAbandonedNodeEnforcement(false); PCollection<Integer> created = p.apply(Create.of(1, 2, 3)); - ParDo.Bound<Integer, EmptyClass> uninferrableParDo = ParDo.of(new EmptyClassDoFn()); + ParDo.SingleOutput<Integer, EmptyClass> uninferrableParDo = ParDo.of(new EmptyClassDoFn()); PCollection<EmptyClass> unencodable = created.apply(uninferrableParDo);
