Repository: beam Updated Branches: refs/heads/master caba84171 -> 66283670d
Renamed ParDo.BoundMulti to ParDo.MultiOutput Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f6dd5833 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f6dd5833 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f6dd5833 Branch: refs/heads/master Commit: f6dd5833f0c195c0ecedb4f24d8f2f718354c234 Parents: c6252ab Author: Eugene Kirpichov <[email protected]> Authored: Fri Mar 3 11:13:10 2017 -0800 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Mar 28 13:04:37 2017 -0700 ---------------------------------------------------------------------- .../translation/ApexPipelineTranslator.java | 2 +- .../apex/translation/ParDoTranslator.java | 8 ++++---- .../core/construction/PTransformMatchers.java | 20 ++++++++++---------- .../beam/runners/core/SplittableParDo.java | 8 ++++---- .../beam/runners/core/SplittableParDoTest.java | 2 +- .../beam/runners/direct/DirectRunner.java | 5 +++-- .../direct/KeyedPValueTrackingVisitor.java | 4 ++-- .../runners/direct/ParDoEvaluatorFactory.java | 8 ++++---- .../direct/ParDoMultiOverrideFactory.java | 20 ++++++++++---------- .../direct/TransformEvaluatorRegistry.java | 2 +- .../flink/FlinkBatchTransformTranslators.java | 6 +++--- .../FlinkStreamingTransformTranslators.java | 6 +++--- .../dataflow/BatchStatefulParDoOverrides.java | 13 ++++++------- .../dataflow/DataflowPipelineTranslator.java | 8 ++++---- .../spark/translation/TransformTranslator.java | 9 +++++---- .../streaming/StreamingTransformTranslator.java | 8 ++++---- .../streaming/TrackStreamingSourcesTest.java | 4 ++-- .../beam/sdk/AggregatorPipelineExtractor.java | 4 ++-- .../org/apache/beam/sdk/transforms/ParDo.java | 14 +++++++------- .../transforms/windowing/WindowMappingFn.java | 4 ++-- .../org/apache/beam/sdk/values/TypedPValue.java | 2 +- .../sdk/AggregatorPipelineExtractorTest.java | 8 ++++---- .../sdk/runners/TransformHierarchyTest.java | 6 +++--- .../apache/beam/sdk/transforms/ParDoTest.java | 6 +++--- 24 files changed, 89 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java index 7eb9551..42ff144 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ApexPipelineTranslator.java @@ -59,7 +59,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { static { // register TransformTranslators - registerTransformTranslator(ParDo.BoundMulti.class, new ParDoTranslator<>()); + registerTransformTranslator(ParDo.MultiOutput.class, new ParDoTranslator<>()); registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator()); registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator()); registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator()); http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java index 5ffc3c3..75722c7 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java @@ -44,15 +44,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * {@link ParDo.BoundMulti} is translated to {@link ApexParDoOperator} that wraps the {@link DoFn}. + * {@link ParDo.MultiOutput} is translated to {@link ApexParDoOperator} that wraps the {@link DoFn}. */ class ParDoTranslator<InputT, OutputT> - implements TransformTranslator<ParDo.BoundMulti<InputT, OutputT>> { + implements TransformTranslator<ParDo.MultiOutput<InputT, OutputT>> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(ParDoTranslator.class); @Override - public void translate(ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) { + public void translate(ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) { DoFn<InputT, OutputT> doFn = transform.getFn(); DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); @@ -105,7 +105,7 @@ class ParDoTranslator<InputT, OutputT> checkArgument( output.getValue() instanceof PCollection, "%s %s outputs non-PCollection %s of type %s", - ParDo.BoundMulti.class.getSimpleName(), + ParDo.MultiOutput.class.getSimpleName(), context.getFullName(), output.getValue(), output.getValue().getClass().getSimpleName()); http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java index f803a9f..d5a91a7 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java @@ -109,7 +109,7 @@ public class PTransformMatchers { } /** - * A {@link PTransformMatcher} that matches a {@link ParDo.BoundMulti} containing a {@link DoFn} + * A {@link PTransformMatcher} that matches a {@link ParDo.MultiOutput} containing a {@link DoFn} * that is splittable, as signified by {@link ProcessElementMethod#isSplittable()}. */ public static PTransformMatcher splittableParDoMulti() { @@ -117,8 +117,8 @@ public class PTransformMatchers { @Override public boolean matches(AppliedPTransform<?, ?, ?> application) { PTransform<?, ?> transform = application.getTransform(); - if (transform instanceof ParDo.BoundMulti) { - DoFn<?, ?> fn = ((ParDo.BoundMulti<?, ?>) transform).getFn(); + if (transform instanceof ParDo.MultiOutput) { + DoFn<?, ?> fn = ((ParDo.MultiOutput<?, ?>) transform).getFn(); DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn); return signature.processElement().isSplittable(); } @@ -128,7 +128,7 @@ public class PTransformMatchers { } /** - * A {@link PTransformMatcher} that matches a {@link ParDo.BoundMulti} containing a {@link DoFn} + * A {@link PTransformMatcher} that matches a {@link ParDo.MultiOutput} containing a {@link DoFn} * that uses state or timers, as specified by {@link DoFnSignature#usesState()} and * {@link DoFnSignature#usesTimers()}. */ @@ -137,8 +137,8 @@ public class PTransformMatchers { @Override public boolean matches(AppliedPTransform<?, ?, ?> application) { PTransform<?, ?> transform = application.getTransform(); - if (transform instanceof ParDo.BoundMulti) { - DoFn<?, ?> fn = ((ParDo.BoundMulti<?, ?>) transform).getFn(); + if (transform instanceof ParDo.MultiOutput) { + DoFn<?, ?> fn = ((ParDo.MultiOutput<?, ?>) transform).getFn(); DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn); return signature.usesState() || signature.usesTimers(); } @@ -148,8 +148,8 @@ public class PTransformMatchers { } /** - * A {@link PTransformMatcher} which matches a {@link ParDo.SingleOutput} or {@link ParDo.BoundMulti} - * where the {@link DoFn} is of the provided type. + * A {@link PTransformMatcher} which matches a {@link ParDo.SingleOutput} or {@link + * ParDo.MultiOutput} where the {@link DoFn} is of the provided type. */ public static PTransformMatcher parDoWithFnType(final Class<? extends DoFn> fnType) { return new PTransformMatcher() { @@ -158,8 +158,8 @@ public class PTransformMatchers { DoFn<?, ?> fn; if (application.getTransform() instanceof ParDo.SingleOutput) { fn = ((ParDo.SingleOutput) application.getTransform()).getFn(); - } else if (application.getTransform() instanceof ParDo.BoundMulti) { - fn = ((ParDo.BoundMulti) application.getTransform()).getFn(); + } else if (application.getTransform() instanceof ParDo.MultiOutput) { + fn = ((ParDo.MultiOutput) application.getTransform()).getFn(); } else { return false; } http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 544bfa0..0b311c7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -82,14 +82,14 @@ import org.joda.time.Instant; @Experimental(Experimental.Kind.SPLITTABLE_DO_FN) public class SplittableParDo<InputT, OutputT, RestrictionT> extends PTransform<PCollection<InputT>, PCollectionTuple> { - private final ParDo.BoundMulti<InputT, OutputT> parDo; + private final ParDo.MultiOutput<InputT, OutputT> parDo; /** * Creates the transform for the given original multi-output {@link ParDo}. * * @param parDo The splittable {@link ParDo} transform. */ - public SplittableParDo(ParDo.BoundMulti<InputT, OutputT> parDo) { + public SplittableParDo(ParDo.MultiOutput<InputT, OutputT> parDo) { checkNotNull(parDo, "parDo must not be null"); this.parDo = parDo; checkArgument( @@ -248,7 +248,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> windowingStrategy, input.isBounded().and(signature.isBoundedPerElement())); - // Set output type descriptor similarly to how ParDo.BoundMulti does it. + // Set output type descriptor similarly to how ParDo.MultiOutput does it. outputs.get(mainOutputTag).setTypeDescriptor(fn.getOutputTypeDescriptor()); return outputs; @@ -260,7 +260,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> input, TypedPValue<T> output) throws CannotProvideCoderException { - // Similar logic to ParDo.BoundMulti.getDefaultOutputCoder. + // Similar logic to ParDo.MultiOutput.getDefaultOutputCoder. @SuppressWarnings("unchecked") KeyedWorkItemCoder<String, ElementAndRestriction<InputT, RestrictionT>> kwiCoder = (KeyedWorkItemCoder) input.getCoder(); http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index af547c2..ee94ee0 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -135,7 +135,7 @@ public class SplittableParDoTest { private static final TupleTag<String> MAIN_OUTPUT_TAG = new TupleTag<String>() {}; - private ParDo.BoundMulti<Integer, String> makeParDo(DoFn<Integer, String> fn) { + private ParDo.MultiOutput<Integer, String> makeParDo(DoFn<Integer, String> fn) { return ParDo.of(fn).withOutputTags(MAIN_OUTPUT_TAG, TupleTagList.empty()); } http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 4ee364f..11fe3f5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -53,6 +53,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; @@ -168,7 +169,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { /** The set of {@link PTransform PTransforms} that execute a UDF. Useful for some enforcements. */ private static final Set<Class<? extends PTransform>> CONTAINS_UDF = ImmutableSet.of( - Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, ParDo.BoundMulti.class); + Read.Bounded.class, Read.Unbounded.class, ParDo.SingleOutput.class, MultiOutput.class); enum Enforcement { ENCODABILITY { @@ -222,7 +223,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { } Collection<ModelEnforcementFactory> parDoEnforcements = enabledParDoEnforcements.build(); enforcements.put(ParDo.SingleOutput.class, parDoEnforcements); - enforcements.put(ParDo.BoundMulti.class, parDoEnforcements); + enforcements.put(MultiOutput.class, parDoEnforcements); return enforcements.build(); } http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java index 32eb692..02b1bed 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -116,8 +116,8 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor { // The most obvious alternative would be a package-private marker interface, but // better to make this obviously hacky so it is less likely to proliferate. Meanwhile // we intend to allow explicit expression of key-preserving DoFn in the model. - if (transform instanceof ParDo.BoundMulti) { - ParDo.BoundMulti<?, ?> parDo = (ParDo.BoundMulti<?, ?>) transform; + if (transform instanceof ParDo.MultiOutput) { + ParDo.MultiOutput<?, ?> parDo = (ParDo.MultiOutput<?, ?>) transform; return parDo.getFn() instanceof ParDoMultiOverrideFactory.ToKeyedWorkItem; } else { return false; http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index 7d6a8ea..b0e97fb 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -37,7 +37,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** A {@link TransformEvaluatorFactory} for {@link ParDo.BoundMulti}. */ +/** A {@link TransformEvaluatorFactory} for {@link ParDo.MultiOutput}. */ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluatorFactory { private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class); @@ -62,13 +62,13 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws Exception { @SuppressWarnings("unchecked") - AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>> + AppliedPTransform<PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>> parDoApplication = (AppliedPTransform< - PCollection<InputT>, PCollectionTuple, ParDo.BoundMulti<InputT, OutputT>>) + PCollection<InputT>, PCollectionTuple, ParDo.MultiOutput<InputT, OutputT>>) application; - ParDo.BoundMulti<InputT, OutputT> transform = parDoApplication.getTransform(); + ParDo.MultiOutput<InputT, OutputT> transform = parDoApplication.getTransform(); final DoFn<InputT, OutputT> doFn = transform.getFn(); @SuppressWarnings({"unchecked", "rawtypes"}) http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index c999093..4604fcc 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -36,7 +36,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.BoundMulti; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.AfterPane; @@ -61,11 +61,11 @@ import org.apache.beam.sdk.values.TypedPValue; */ class ParDoMultiOverrideFactory<InputT, OutputT> implements PTransformOverrideFactory< - PCollection<? extends InputT>, PCollectionTuple, BoundMulti<InputT, OutputT>> { + PCollection<? extends InputT>, PCollectionTuple, MultiOutput<InputT, OutputT>> { @Override @SuppressWarnings("unchecked") public PTransform<PCollection<? extends InputT>, PCollectionTuple> getReplacementTransform( - BoundMulti<InputT, OutputT> transform) { + MultiOutput<InputT, OutputT> transform) { DoFn<InputT, OutputT> fn = transform.getFn(); DoFnSignature signature = DoFnSignatures.getSignature(fn.getClass()); @@ -75,8 +75,8 @@ class ParDoMultiOverrideFactory<InputT, OutputT> || signature.timerDeclarations().size() > 0) { // Based on the fact that the signature is stateful, DoFnSignatures ensures // that it is also keyed - ParDo.BoundMulti<KV<?, ?>, OutputT> keyedTransform = - (ParDo.BoundMulti<KV<?, ?>, OutputT>) transform; + MultiOutput<KV<?, ?>, OutputT> keyedTransform = + (MultiOutput<KV<?, ?>, OutputT>) transform; return new GbkThenStatefulParDo(keyedTransform); } else { @@ -98,9 +98,9 @@ class ParDoMultiOverrideFactory<InputT, OutputT> static class GbkThenStatefulParDo<K, InputT, OutputT> extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> { - private final ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo; + private final MultiOutput<KV<K, InputT>, OutputT> underlyingParDo; - public GbkThenStatefulParDo(ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo) { + public GbkThenStatefulParDo(MultiOutput<KV<K, InputT>, OutputT> underlyingParDo) { this.underlyingParDo = underlyingParDo; } @@ -165,17 +165,17 @@ class ParDoMultiOverrideFactory<InputT, OutputT> static class StatefulParDo<K, InputT, OutputT> extends PTransform<PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>>, PCollectionTuple> { - private final transient ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo; + private final transient MultiOutput<KV<K, InputT>, OutputT> underlyingParDo; private final transient PCollection<KV<K, InputT>> originalInput; public StatefulParDo( - ParDo.BoundMulti<KV<K, InputT>, OutputT> underlyingParDo, + MultiOutput<KV<K, InputT>, OutputT> underlyingParDo, PCollection<KV<K, InputT>> originalInput) { this.underlyingParDo = underlyingParDo; this.originalInput = originalInput; } - public ParDo.BoundMulti<KV<K, InputT>, OutputT> getUnderlyingParDo() { + public MultiOutput<KV<K, InputT>, OutputT> getUnderlyingParDo() { return underlyingParDo; } http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 62fee53..5ad8709 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -51,7 +51,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder() .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt)) .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt)) - .put(ParDo.BoundMulti.class, new ParDoEvaluatorFactory<>(ctxt)) + .put(ParDo.MultiOutput.class, new ParDoEvaluatorFactory<>(ctxt)) .put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt)) .put(PCollections.class, new FlattenEvaluatorFactory(ctxt)) .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory(ctxt)) http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index 31a6bda..1d6728b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -112,7 +112,7 @@ class FlinkBatchTransformTranslators { TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslatorBatch()); - TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoTranslatorBatch()); + TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoTranslatorBatch()); TRANSLATORS.put(Read.Bounded.class, new ReadSourceTranslatorBatch()); } @@ -499,12 +499,12 @@ class FlinkBatchTransformTranslators { private static class ParDoTranslatorBatch<InputT, OutputT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator< - ParDo.BoundMulti<InputT, OutputT>> { + ParDo.MultiOutput<InputT, OutputT>> { @Override @SuppressWarnings("unchecked") public void translateNode( - ParDo.BoundMulti<InputT, OutputT> transform, + ParDo.MultiOutput<InputT, OutputT> transform, FlinkBatchTranslationContext context) { DoFn<InputT, OutputT> doFn = transform.getFn(); rejectSplittable(doFn); http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 7227dce..00b0412 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -121,7 +121,7 @@ class FlinkStreamingTransformTranslators { TRANSLATORS.put(Write.class, new WriteSinkStreamingTranslator()); TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); - TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoStreamingTranslator()); + TRANSLATORS.put(ParDo.MultiOutput.class, new ParDoStreamingTranslator()); TRANSLATORS.put(Window.Assign.class, new WindowAssignTranslator()); TRANSLATORS.put(Flatten.PCollections.class, new FlattenPCollectionTranslator()); @@ -398,11 +398,11 @@ class FlinkStreamingTransformTranslators { private static class ParDoStreamingTranslator<InputT, OutputT> extends FlinkStreamingPipelineTranslator.StreamTransformTranslator< - ParDo.BoundMulti<InputT, OutputT>> { + ParDo.MultiOutput<InputT, OutputT>> { @Override public void translateNode( - ParDo.BoundMulti<InputT, OutputT> transform, + ParDo.MultiOutput<InputT, OutputT> transform, FlinkStreamingTranslationContext context) { DoFn<InputT, OutputT> doFn = transform.getFn(); http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java index 82629db..1d19d64 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java @@ -33,7 +33,6 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -76,7 +75,7 @@ public class BatchStatefulParDoOverrides { public static <K, InputT, OutputT> PTransformOverrideFactory< PCollection<KV<K, InputT>>, PCollectionTuple, - ParDo.BoundMulti<KV<K, InputT>, OutputT>> + ParDo.MultiOutput<KV<K, InputT>, OutputT>> multiOutputOverrideFactory() { return new MultiOutputOverrideFactory<>(); } @@ -107,12 +106,12 @@ public class BatchStatefulParDoOverrides { private static class MultiOutputOverrideFactory<K, InputT, OutputT> implements PTransformOverrideFactory< - PCollection<KV<K, InputT>>, PCollectionTuple, ParDo.BoundMulti<KV<K, InputT>, OutputT>> { + PCollection<KV<K, InputT>>, PCollectionTuple, ParDo.MultiOutput<KV<K, InputT>, OutputT>> { @Override @SuppressWarnings("unchecked") public PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> getReplacementTransform( - BoundMulti<KV<K, InputT>, OutputT> originalParDo) { + ParDo.MultiOutput<KV<K, InputT>, OutputT> originalParDo) { return new StatefulMultiOutputParDo<>(originalParDo); } @@ -159,9 +158,9 @@ public class BatchStatefulParDoOverrides { static class StatefulMultiOutputParDo<K, InputT, OutputT> extends PTransform<PCollection<KV<K, InputT>>, PCollectionTuple> { - private final BoundMulti<KV<K, InputT>, OutputT> originalParDo; + private final ParDo.MultiOutput<KV<K, InputT>, OutputT> originalParDo; - StatefulMultiOutputParDo(ParDo.BoundMulti<KV<K, InputT>, OutputT> originalParDo) { + StatefulMultiOutputParDo(ParDo.MultiOutput<KV<K, InputT>, OutputT> originalParDo) { this.originalParDo = originalParDo; } @@ -182,7 +181,7 @@ public class BatchStatefulParDoOverrides { return input.apply(new GbkBeforeStatefulParDo<K, InputT>()).apply(statefulParDo); } - public BoundMulti<KV<K, InputT>, OutputT> getOriginalParDo() { + public ParDo.MultiOutput<KV<K, InputT>, OutputT> getOriginalParDo() { return originalParDo; } } http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index db96594..6d231b9 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -821,15 +821,15 @@ public class DataflowPipelineTranslator { }); registerTransformTranslator( - ParDo.BoundMulti.class, - new TransformTranslator<ParDo.BoundMulti>() { + ParDo.MultiOutput.class, + new TransformTranslator<ParDo.MultiOutput>() { @Override - public void translate(ParDo.BoundMulti transform, TranslationContext context) { + public void translate(ParDo.MultiOutput transform, TranslationContext context) { translateMultiHelper(transform, context); } private <InputT, OutputT> void translateMultiHelper( - ParDo.BoundMulti<InputT, OutputT> transform, TranslationContext context) { + ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) { StepTranslationContext stepContext = context.addStep(transform, "ParallelDo"); translateInputs( http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index b57860a..d88ef7e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -355,11 +355,12 @@ public final class TransformTranslator { }; } - private static <InputT, OutputT> TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>> + private static <InputT, OutputT> TransformEvaluator<ParDo.MultiOutput<InputT, OutputT>> parDo() { - return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() { + return new TransformEvaluator<ParDo.MultiOutput<InputT, OutputT>>() { @Override - public void evaluate(ParDo.BoundMulti<InputT, OutputT> transform, EvaluationContext context) { + public void evaluate( + ParDo.MultiOutput<InputT, OutputT> transform, EvaluationContext context) { String stepName = context.getCurrentTransform().getFullName(); DoFn<InputT, OutputT> doFn = transform.getFn(); rejectSplittable(doFn); @@ -848,7 +849,7 @@ public final class TransformTranslator { EVALUATORS.put(Read.Bounded.class, readBounded()); EVALUATORS.put(HadoopIO.Read.Bound.class, readHadoop()); EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop()); - EVALUATORS.put(ParDo.BoundMulti.class, parDo()); + EVALUATORS.put(ParDo.MultiOutput.class, parDo()); EVALUATORS.put(GroupByKey.class, groupByKey()); EVALUATORS.put(Combine.GroupedValues.class, combineGrouped()); EVALUATORS.put(Combine.Globally.class, combineGlobally()); http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index b88731c..2d2854f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -368,11 +368,11 @@ public final class StreamingTransformTranslator { }; } - private static <InputT, OutputT> TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>> + private static <InputT, OutputT> TransformEvaluator<ParDo.MultiOutput<InputT, OutputT>> multiDo() { - return new TransformEvaluator<ParDo.BoundMulti<InputT, OutputT>>() { + return new TransformEvaluator<ParDo.MultiOutput<InputT, OutputT>>() { public void evaluate( - final ParDo.BoundMulti<InputT, OutputT> transform, final EvaluationContext context) { + final ParDo.MultiOutput<InputT, OutputT> transform, final EvaluationContext context) { final DoFn<InputT, OutputT> doFn = transform.getFn(); rejectSplittable(doFn); rejectStateAndTimers(doFn); @@ -525,7 +525,7 @@ public final class StreamingTransformTranslator { EVALUATORS.put(Read.Unbounded.class, readUnbounded()); EVALUATORS.put(GroupByKey.class, groupByKey()); EVALUATORS.put(Combine.GroupedValues.class, combineGrouped()); - EVALUATORS.put(ParDo.BoundMulti.class, multiDo()); + EVALUATORS.put(ParDo.MultiOutput.class, multiDo()); EVALUATORS.put(ConsoleIO.Write.Unbound.class, print()); EVALUATORS.put(CreateStream.class, createFromQueue()); EVALUATORS.put(Window.Assign.class, window()); http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java index d66633b..41ccd08 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/TrackStreamingSourcesTest.java @@ -83,7 +83,7 @@ public class TrackStreamingSourcesTest { p.apply(emptyStream).apply(ParDo.of(new PassthroughFn<>())); - p.traverseTopologically(new StreamingSourceTracker(jssc, p, ParDo.BoundMulti.class, 0)); + p.traverseTopologically(new StreamingSourceTracker(jssc, p, ParDo.MultiOutput.class, 0)); assertThat(StreamingSourceTracker.numAssertions, equalTo(1)); } @@ -111,7 +111,7 @@ public class TrackStreamingSourcesTest { PCollectionList.of(pcol1).and(pcol2).apply(Flatten.<Integer>pCollections()); flattened.apply(ParDo.of(new PassthroughFn<>())); - p.traverseTopologically(new StreamingSourceTracker(jssc, p, ParDo.BoundMulti.class, 0, 1)); + p.traverseTopologically(new StreamingSourceTracker(jssc, p, ParDo.MultiOutput.class, 0, 1)); assertThat(StreamingSourceTracker.numAssertions, equalTo(1)); } http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java index 7a422b8..8804f55 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/AggregatorPipelineExtractor.java @@ -71,9 +71,9 @@ class AggregatorPipelineExtractor { if (transform != null) { if (transform instanceof ParDo.SingleOutput) { return AggregatorRetriever.getAggregators(((ParDo.SingleOutput<?, ?>) transform).getFn()); - } else if (transform instanceof ParDo.BoundMulti) { + } else if (transform instanceof ParDo.MultiOutput) { return AggregatorRetriever.getAggregators( - ((ParDo.BoundMulti<?, ?>) transform).getFn()); + ((ParDo.MultiOutput<?, ?>) transform).getFn()); } } return Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 0b25aa1..664fbc3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -550,9 +550,9 @@ public class ParDo { * * <p>See the discussion of Side Outputs above for more explanation. */ - public BoundMulti<InputT, OutputT> withOutputTags( + public MultiOutput<InputT, OutputT> withOutputTags( TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) { - return new BoundMulti<>(fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData); + return new MultiOutput<>(fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData); } @Override @@ -623,7 +623,7 @@ public class ParDo { * @param <InputT> the type of the (main) input {@code PCollection} elements * @param <OutputT> the type of the main output {@code PCollection} elements */ - public static class BoundMulti<InputT, OutputT> + public static class MultiOutput<InputT, OutputT> extends PTransform<PCollection<? extends InputT>, PCollectionTuple> { private final List<PCollectionView<?>> sideInputs; private final TupleTag<OutputT> mainOutputTag; @@ -631,7 +631,7 @@ public class ParDo { private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData; private final DoFn<InputT, OutputT> fn; - BoundMulti( + MultiOutput( DoFn<InputT, OutputT> fn, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, @@ -651,7 +651,7 @@ public class ParDo { * * <p>See the discussion of Side Inputs above for more explanation. */ - public BoundMulti<InputT, OutputT> withSideInputs( + public MultiOutput<InputT, OutputT> withSideInputs( PCollectionView<?>... sideInputs) { return withSideInputs(Arrays.asList(sideInputs)); } @@ -663,9 +663,9 @@ public class ParDo { * * <p>See the discussion of Side Inputs above for more explanation. */ - public BoundMulti<InputT, OutputT> withSideInputs( + public MultiOutput<InputT, OutputT> withSideInputs( Iterable<? extends PCollectionView<?>> sideInputs) { - return new BoundMulti<>( + return new MultiOutput<>( fn, ImmutableList.<PCollectionView<?>>builder() .addAll(this.sideInputs) http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java index 910ed98..c144aba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java @@ -19,14 +19,14 @@ package org.apache.beam.sdk.transforms.windowing; import java.io.Serializable; -import org.apache.beam.sdk.transforms.ParDo.BoundMulti; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Duration; /** * A function that takes the windows of elements in a main input and maps them to the appropriate * window in a {@link PCollectionView} consumed as a - * {@link BoundMulti#withSideInputs(PCollectionView[]) side input}. + * {@link MultiOutput#withSideInputs(PCollectionView[]) side input}. */ public abstract class WindowMappingFn<TargetWindowT extends BoundedWindow> implements Serializable { private final Duration maximumLookback; http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java index de1b99c..d353835 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java @@ -152,7 +152,7 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue { // and provide a better error message if so. Unfortunately, this information is not // directly available from the TypeDescriptor, so infer based on the type of the PTransform // and the error message itself. - if (transform instanceof ParDo.BoundMulti + if (transform instanceof ParDo.MultiOutput && exc.getReason() == ReasonCode.TYPE_ERASURE) { inferFromTokenException = new CannotProvideCoderException(exc.getMessage() + " If this error occurs for a side output of the producing ParDo, verify that the " http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java index 52bcc93..0d18840 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/AggregatorPipelineExtractorTest.java @@ -92,9 +92,9 @@ public class AggregatorPipelineExtractorTest { @SuppressWarnings("unchecked") @Test - public void testGetAggregatorStepsWithParDoBoundMultiExtractsSteps() { + public void testGetAggregatorStepsWithParDoMultiOutputExtractsSteps() { @SuppressWarnings("rawtypes") - ParDo.BoundMulti parDo = mock(ParDo.BoundMulti.class, "parDo"); + ParDo.MultiOutput parDo = mock(ParDo.MultiOutput.class, "parDo"); AggregatorProvidingDoFn<Object, Void> fn = new AggregatorProvidingDoFn<>(); when(parDo.getFn()).thenReturn(fn); @@ -124,7 +124,7 @@ public class AggregatorPipelineExtractorTest { @SuppressWarnings("rawtypes") ParDo.SingleOutput parDo = mock(ParDo.SingleOutput.class, "parDo"); @SuppressWarnings("rawtypes") - ParDo.BoundMulti otherParDo = mock(ParDo.BoundMulti.class, "otherParDo"); + ParDo.MultiOutput otherParDo = mock(ParDo.MultiOutput.class, "otherParDo"); AggregatorProvidingDoFn<String, Math> fn = new AggregatorProvidingDoFn<>(); when(parDo.getFn()).thenReturn(fn); when(otherParDo.getFn()).thenReturn(fn); @@ -165,7 +165,7 @@ public class AggregatorPipelineExtractorTest { when(parDo.getFn()).thenReturn(fn); @SuppressWarnings("rawtypes") - ParDo.BoundMulti otherParDo = mock(ParDo.BoundMulti.class, "otherParDo"); + ParDo.MultiOutput otherParDo = mock(ParDo.MultiOutput.class, "otherParDo"); AggregatorProvidingDoFn<Long, Long> otherFn = new AggregatorProvidingDoFn<>(); Aggregator<Double, Double> aggregatorTwo = otherFn.addAggregator(Sum.ofDoubles()); http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java index 56dc743..f62b320 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java @@ -46,8 +46,8 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ParDo.MultiOutput; import org.apache.beam.sdk.transforms.ParDo.SingleOutput; -import org.apache.beam.sdk.transforms.ParDo.BoundMulti; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -268,7 +268,7 @@ public class TransformHierarchyTest implements Serializable { hierarchy.popNode(); final TupleTag<Long> longs = new TupleTag<>(); - final ParDo.BoundMulti<Long, Long> replacementParDo = + final MultiOutput<Long, Long> replacementParDo = ParDo.of( new DoFn<Long, Long>() { @ProcessElement @@ -431,7 +431,7 @@ public class TransformHierarchyTest implements Serializable { hierarchy.popNode(); final TupleTag<Long> longs = new TupleTag<>(); - final BoundMulti<Long, Long> replacementParDo = + final MultiOutput<Long, Long> replacementParDo = ParDo.of( new DoFn<Long, Long>() { @ProcessElement http://git-wip-us.apache.org/repos/asf/beam/blob/f6dd5833/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 9f621f8..cbbbe5f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -898,7 +898,7 @@ public class ParDoTest implements Serializable { } }; - ParDo.BoundMulti<Long, Long> parDo = + ParDo.MultiOutput<Long, Long> parDo = ParDo.of(fn).withOutputTags(mainOut, TupleTagList.of(sideOutOne).and(sideOutTwo)); PCollectionTuple firstApplication = longs.apply("first", parDo); PCollectionTuple secondApplication = longs.apply("second", parDo); @@ -1161,7 +1161,7 @@ public class ParDoTest implements Serializable { final TupleTag<Integer> mainOutputTag = new TupleTag<Integer>("main"); final TupleTag<TestDummy> sideOutputTag = new TupleTag<TestDummy>("unregisteredSide"); - ParDo.BoundMulti<Integer, Integer> pardo = ParDo.of(new SideOutputDummyFn(sideOutputTag)) + ParDo.MultiOutput<Integer, Integer> pardo = ParDo.of(new SideOutputDummyFn(sideOutputTag)) .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)); PCollectionTuple outputTuple = input.apply(pardo); @@ -2301,7 +2301,7 @@ public class ParDoTest implements Serializable { } }; - ParDo.BoundMulti<String, String> parDo = ParDo + ParDo.MultiOutput<String, String> parDo = ParDo .of(fn) .withOutputTags(new TupleTag<String>(), TupleTagList.empty());
