Removes ParDo.Unbound and UnboundMulti
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0349eef Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0349eef Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0349eef Branch: refs/heads/master Commit: d0349eef08bfa3ac5992945b9453fa08e8b7d7e4 Parents: caba841 Author: Eugene Kirpichov <[email protected]> Authored: Fri Mar 3 10:53:28 2017 -0800 Committer: Eugene Kirpichov <[email protected]> Committed: Tue Mar 28 13:04:37 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/examples/complete/TfIdf.java | 3 +- .../beam/examples/cookbook/FilterExamples.java | 3 +- .../beam/examples/complete/game/GameStats.java | 8 +- .../apex/translation/ParDoTranslatorTest.java | 11 +- .../StatefulParDoEvaluatorFactoryTest.java | 3 +- .../beam/runners/flink/examples/TFIDF.java | 3 +- .../BatchStatefulParDoOverridesTest.java | 2 +- .../DataflowPipelineTranslatorTest.java | 4 +- .../org/apache/beam/sdk/testing/PAssert.java | 2 +- .../org/apache/beam/sdk/transforms/Combine.java | 4 +- .../org/apache/beam/sdk/transforms/ParDo.java | 297 ++----------------- .../apache/beam/sdk/transforms/Partition.java | 4 +- .../org/apache/beam/sdk/transforms/Sample.java | 7 +- .../apache/beam/sdk/metrics/MetricsTest.java | 5 +- .../apache/beam/sdk/transforms/FlattenTest.java | 4 +- .../apache/beam/sdk/transforms/ParDoTest.java | 54 ++-- .../apache/beam/sdk/transforms/ViewTest.java | 152 +++++----- .../apache/beam/sdk/values/TypedPValueTest.java | 4 +- 18 files changed, 154 insertions(+), 416 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java index 9de5617..f7904d3 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java @@ -323,7 +323,6 @@ public class TfIdf { // presented to each invocation of the DoFn. PCollection<KV<String, Double>> wordToDf = wordToDocCount .apply("ComputeDocFrequencies", ParDo - .withSideInputs(totalDocuments) .of(new DoFn<KV<String, Long>, KV<String, Double>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -335,7 +334,7 @@ public class TfIdf { c.output(KV.of(word, documentFrequency)); } - })); + }).withSideInputs(totalDocuments)); // Join the term frequency and document frequency // collections, each keyed on the word. http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java index 815ac7b..fed9db7 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java @@ -175,7 +175,6 @@ public class FilterExamples { // We'll only output readings with temperatures below this mean. PCollection<TableRow> filteredRows = monthFilteredRows .apply("ParseAndFilter", ParDo - .withSideInputs(globalMeanTemp) .of(new DoFn<TableRow, TableRow>() { @ProcessElement public void processElement(ProcessContext c) { @@ -185,7 +184,7 @@ public class FilterExamples { c.output(c.element()); } } - })); + }).withSideInputs(globalMeanTemp)); return filteredRows; } http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index c880061..93e8254 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -125,7 +125,6 @@ public class GameStats extends LeaderBoard { PCollection<KV<String, Integer>> filtered = sumScores .apply("ProcessAndFilter", ParDo // use the derived mean total score as a side input - .withSideInputs(globalMeanScore) .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() { private final Aggregator<Long, Long> numSpammerUsers = createAggregator("SpammerUsers", Sum.ofLongs()); @@ -140,7 +139,7 @@ public class GameStats extends LeaderBoard { c.output(c.element()); } } - })); + }).withSideInputs(globalMeanScore)); return filtered; } } @@ -288,7 +287,6 @@ public class GameStats extends LeaderBoard { FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration())))) // Filter out the detected spammer users, using the side input derived above. .apply("FilterOutSpammers", ParDo - .withSideInputs(spammersView) .of(new DoFn<GameActionInfo, GameActionInfo>() { @ProcessElement public void processElement(ProcessContext c) { @@ -297,8 +295,8 @@ public class GameStats extends LeaderBoard { c.output(c.element()); } } - })) - // Extract and sum teamname/score pairs from the event data. + }).withSideInputs(spammersView)) + // Extract and sum teamname/score pairs from the event data. .apply("ExtractTeamScore", new ExtractAndSumScore("team")) // [END DocInclude_FilterAndCalc] // Write the result to BigQuery http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java index 83e68f7..3bcba00 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java @@ -281,13 +281,14 @@ public class ParDoTranslatorTest { PCollectionTuple outputs = pipeline .apply(Create.of(inputs)) - .apply(ParDo.withSideInputs(sideInput1) - .withSideInputs(sideInputUnread) - .withSideInputs(sideInput2) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + .apply(ParDo .of(new TestMultiOutputWithSideInputsFn( Arrays.asList(sideInput1, sideInput2), - Arrays.<TupleTag<String>>asList()))); + Arrays.<TupleTag<String>>asList())) + .withSideInputs(sideInput1) + .withSideInputs(sideInputUnread) + .withSideInputs(sideInput2) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector())); outputs.get(sideOutputTag).setCoder(VoidCoder.of()); http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index 9bf6bc9..946cd69 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -243,7 +243,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { mainInput .apply( new ParDoMultiOverrideFactory.GbkThenStatefulParDo<>( - ParDo.withSideInputs(sideInput) + ParDo .of( new DoFn<KV<String, Integer>, Integer>() { @StateId(stateId) @@ -253,6 +253,7 @@ public class StatefulParDoEvaluatorFactoryTest implements Serializable { @ProcessElement public void process(ProcessContext c) {} }) + .withSideInputs(sideInput) .withOutputTags(mainOutput, TupleTagList.empty()))) .get(mainOutput) .setCoder(VarIntCoder.of()); http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java index 89e261b..8e1df08 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java @@ -347,7 +347,6 @@ public class TFIDF { // presented to each invocation of the DoFn. PCollection<KV<String, Double>> wordToDf = wordToDocCount .apply("ComputeDocFrequencies", ParDo - .withSideInputs(totalDocuments) .of(new DoFn<KV<String, Long>, KV<String, Double>>() { private static final long serialVersionUID = 0; @@ -361,7 +360,7 @@ public class TFIDF { c.output(KV.of(word, documentFrequency)); } - })); + }).withSideInputs(totalDocuments)); // Join the term frequency and document frequency // collections, each keyed on the word. http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java index 899902a..f995ff3 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java @@ -87,7 +87,7 @@ public class BatchStatefulParDoOverridesTest implements Serializable { DummyStatefulDoFn fn = new DummyStatefulDoFn(); pipeline .apply(Create.of(KV.of(1, 2))) - .apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.empty()).of(fn)); + .apply(ParDo.of(fn).withOutputTags(mainOutputTag, TupleTagList.empty())); DataflowRunner runner = DataflowRunner.fromOptions(options); runner.replaceTransforms(pipeline); http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 8c8568e..2d63193 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -863,7 +863,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { pipeline .apply(Create.of(KV.of(1, 1))) .apply( - ParDo.withOutputTags(mainOutputTag, TupleTagList.empty()).of( + ParDo.of( new DoFn<KV<Integer, Integer>, Integer>() { @StateId("unused") final StateSpec<Object, ValueState<Integer>> stateSpec = @@ -873,7 +873,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { public void process(ProcessContext c) { // noop } - })); + }).withOutputTags(mainOutputTag, TupleTagList.empty())); runner.replaceTransforms(pipeline); http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index 412753c..56df449 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -1110,7 +1110,7 @@ public class PAssert { .apply("WindowToken", windowToken) .apply( "RunChecks", - ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual, site))); + ParDo.of(new SideInputCheckerDoFn<>(checkerFn, actual, site)).withSideInputs(actual)); return PDone.in(input.getPipeline()); } http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 2c145b4..b403691 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -1503,7 +1503,7 @@ public class Combine { final OutputT defaultValue = fn.defaultValue(); PCollection<OutputT> defaultIfEmpty = maybeEmpty.getPipeline() .apply("CreateVoid", Create.of((Void) null).withCoder(VoidCoder.of())) - .apply("ProduceDefault", ParDo.withSideInputs(maybeEmptyView).of( + .apply("ProduceDefault", ParDo.of( new DoFn<Void, OutputT>() { @ProcessElement public void processElement(ProcessContext c) { @@ -1512,7 +1512,7 @@ public class Combine { c.output(defaultValue); } } - })) + }).withSideInputs(maybeEmptyView)) .setCoder(maybeEmpty.getCoder()) .setWindowingStrategyInternal(maybeEmpty.getWindowingStrategy()); http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 7446737..1e2e5b8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.io.Serializable; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.beam.sdk.Pipeline; @@ -153,7 +154,7 @@ import org.apache.beam.sdk.values.TypedPValue; * {@link PCollectionView PCollectionViews} express styles of accessing * {@link PCollection PCollections} computed by earlier pipeline operations, * passed in to the {@link ParDo} transform using - * {@link #withSideInputs}, and their contents accessible to each of + * {@link ParDo.Bound#withSideInputs}, and their contents accessible to each of * the {@link DoFn} operations via {@link DoFn.ProcessContext#sideInput sideInput}. * For example: * @@ -183,7 +184,7 @@ import org.apache.beam.sdk.values.TypedPValue; * {@link PCollection PCollections}, each keyed by a distinct {@link TupleTag}, * and bundled in a {@link PCollectionTuple}. The {@link TupleTag TupleTags} * to be used for the output {@link PCollectionTuple} are specified by - * invoking {@link #withOutputTags}. Unconsumed side outputs do not + * invoking {@link ParDo.Bound#withOutputTags}. Unconsumed side outputs do not * necessarily need to be explicitly specified, even if the {@link DoFn} * generates them. Within the {@link DoFn}, an element is added to the * main output {@link PCollection} as normal, using @@ -206,11 +207,6 @@ import org.apache.beam.sdk.values.TypedPValue; * PCollectionTuple results = * words.apply( * ParDo - * // Specify the main and consumed side output tags of the - * // PCollectionTuple result: - * .withOutputTags(wordsBelowCutOffTag, - * TupleTagList.of(wordLengthsAboveCutOffTag) - * .and(markedWordsTag)) * .of(new DoFn<String, String>() { * // Create a tag for the unconsumed side output. * final TupleTag<String> specialWordsTag = @@ -233,7 +229,12 @@ import org.apache.beam.sdk.values.TypedPValue; * // Emit this word to the unconsumed side output. * c.sideOutput(specialWordsTag, word); * } - * }})); + * }}) + * // Specify the main and consumed side output tags of the + * // PCollectionTuple result: + * .withOutputTags(wordsBelowCutOffTag, + * TupleTagList.of(wordLengthsAboveCutOffTag) + * .and(markedWordsTag))); * // Extract the PCollection results, by tag. * PCollection<String> wordsBelowCutOff = * results.get(wordsBelowCutOffTag); @@ -243,35 +244,6 @@ import org.apache.beam.sdk.values.TypedPValue; * results.get(markedWordsTag); * }</pre> * - * <h2>Properties May Be Specified In Any Order</h2> - * - * <p>Several properties can be specified for a {@link ParDo} - * {@link PTransform}, including side inputs, side output tags, - * and {@link DoFn} to invoke. Only the {@link DoFn} is required; side inputs and side - * output tags are only specified when they're needed. These - * properties can be specified in any order, as long as they're - * specified before the {@link ParDo} {@link PTransform} is applied. - * - * <p>The approach used to allow these properties to be specified in - * any order, with some properties omitted, is to have each of the - * property "setter" methods defined as static factory methods on - * {@link ParDo} itself, which return an instance of either - * {@link ParDo.Unbound} or - * {@link ParDo.Bound} nested classes, each of which offer - * property setter instance methods to enable setting additional - * properties. {@link ParDo.Bound} is used for {@link ParDo} - * transforms whose {@link DoFn} is specified and whose input and - * output static types have been bound. {@link ParDo.Unbound ParDo.Unbound} is used - * for {@link ParDo} transforms that have not yet had their - * {@link DoFn} specified. Only {@link ParDo.Bound} instances can be - * applied. - * - * <p>Another benefit of this approach is that it reduces the number - * of type parameters that need to be specified manually. In - * particular, the input and output types of the {@link ParDo} - * {@link PTransform} are inferred automatically from the type - * parameters of the {@link DoFn} argument passed to {@link ParDo#of}. - * * <h2>Output Coders</h2> * * <p>By default, the {@link Coder Coder<OutputT>} for the @@ -446,91 +418,16 @@ import org.apache.beam.sdk.values.TypedPValue; public class ParDo { /** - * Creates a {@link ParDo} {@link PTransform} with the given - * side inputs. - * - * <p>Side inputs are {@link PCollectionView PCollectionViews}, whose contents are - * computed during pipeline execution and then made accessible to - * {@link DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}. Each - * invocation of the {@link DoFn} receives the same values for these - * side inputs. - * - * <p>See the discussion of Side Inputs above for more explanation. - * - * <p>The resulting {@link PTransform} is incomplete, and its - * input/output types are not yet bound. Use - * {@link ParDo.Unbound#of} to specify the {@link DoFn} to - * invoke, which will also bind the input/output types of this - * {@link PTransform}. - */ - public static Unbound withSideInputs(PCollectionView<?>... sideInputs) { - return new Unbound().withSideInputs(sideInputs); - } - - /** - * Creates a {@link ParDo} with the given side inputs. - * - * <p>Side inputs are {@link PCollectionView}s, whose contents are - * computed during pipeline execution and then made accessible to - * {@link DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}. - * - * <p>See the discussion of Side Inputs above for more explanation. - * - * <p>The resulting {@link PTransform} is incomplete, and its - * input/output types are not yet bound. Use - * {@link ParDo.Unbound#of} to specify the {@link DoFn} to - * invoke, which will also bind the input/output types of this - * {@link PTransform}. - */ - public static Unbound withSideInputs( - Iterable<? extends PCollectionView<?>> sideInputs) { - return new Unbound().withSideInputs(sideInputs); - } - - /** - * Creates a multi-output {@link ParDo} {@link PTransform} whose - * output {@link PCollection}s will be referenced using the given main - * output and side output tags. - * - * <p>{@link TupleTag TupleTags} are used to name (with its static element - * type {@code T}) each main and side output {@code PCollection<T>}. - * This {@link PTransform PTransform's} {@link DoFn} emits elements to the main - * output {@link PCollection} as normal, using - * {@link DoFn.Context#output}. The {@link DoFn} emits elements to - * a side output {@code PCollection} using - * {@link DoFn.Context#sideOutput}, passing that side output's tag - * as an argument. The result of invoking this {@link PTransform} - * will be a {@link PCollectionTuple}, and any of the the main and - * side output {@code PCollection}s can be retrieved from it via - * {@link PCollectionTuple#get}, passing the output's tag as an - * argument. - * - * <p>See the discussion of Side Outputs above for more explanation. - * - * <p>The resulting {@link PTransform} is incomplete, and its input - * type is not yet bound. Use {@link ParDo.UnboundMulti#of} - * to specify the {@link DoFn} to invoke, which will also bind the - * input type of this {@link PTransform}. - */ - public static <OutputT> UnboundMulti<OutputT> withOutputTags( - TupleTag<OutputT> mainOutputTag, - TupleTagList sideOutputTags) { - return new Unbound().withOutputTags(mainOutputTag, sideOutputTags); - } - - /** * Creates a {@link ParDo} {@link PTransform} that will invoke the * given {@link DoFn} function. * - * <p>The resulting {@link PTransform PTransform's} types have been bound, with the - * input being a {@code PCollection<InputT>} and the output a - * {@code PCollection<OutputT>}, inferred from the types of the argument - * {@code DoFn<InputT, OutputT>}. It is ready to be applied, or further + * <p>The resulting {@link PTransform PTransform} is ready to be applied, or further * properties can be set on it first. */ public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { validate(fn); - return new Unbound().of(fn, displayDataForFn(fn)); + return new Bound<InputT, OutputT>( + fn, Collections.<PCollectionView<?>>emptyList(), displayDataForFn(fn)); } private static <T> DisplayData.ItemSpec<? extends Class<?>> displayDataForFn(T fn) { @@ -591,85 +488,6 @@ public class ParDo { } /** - * An incomplete {@link ParDo} transform, with unbound input/output types. - * - * <p>Before being applied, {@link ParDo.Unbound#of} must be - * invoked to specify the {@link DoFn} to invoke, which will also - * bind the input/output types of this {@link PTransform}. - */ - public static class Unbound { - private final List<PCollectionView<?>> sideInputs; - - Unbound() { - this(ImmutableList.<PCollectionView<?>>of()); - } - - Unbound(List<PCollectionView<?>> sideInputs) { - this.sideInputs = sideInputs; - } - - /** - * Returns a new {@link ParDo} transform that's like this - * transform but with the specified additional side inputs. - * Does not modify this transform. The resulting transform is - * still incomplete. - * - * <p>See the discussion of Side Inputs above and on - * {@link ParDo#withSideInputs} for more explanation. - */ - public Unbound withSideInputs(PCollectionView<?>... sideInputs) { - return withSideInputs(Arrays.asList(sideInputs)); - } - - /** - * Returns a new {@link ParDo} transform that is like this - * transform but with the specified additional side inputs. Does not modify - * this transform. The resulting transform is still incomplete. - * - * <p>See the discussion of Side Inputs above and on - * {@link ParDo#withSideInputs} for more explanation. - */ - public Unbound withSideInputs( - Iterable<? extends PCollectionView<?>> sideInputs) { - ImmutableList.Builder<PCollectionView<?>> builder = ImmutableList.builder(); - builder.addAll(this.sideInputs); - builder.addAll(sideInputs); - return new Unbound(builder.build()); - } - - /** - * Returns a new {@link ParDo} {@link PTransform} that's like this - * transform but which will invoke the given {@link DoFn} - * function, and which has its input and output types bound. Does - * not modify this transform. The resulting {@link PTransform} is - * sufficiently specified to be applied, but more properties can - * still be specified. - */ - public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { - validate(fn); - return of(fn, displayDataForFn(fn)); - } - - /** - * Returns a new multi-output {@link ParDo} transform that's like this transform but with the - * specified main and side output tags. Does not modify this transform. The resulting transform - * is still incomplete. - * - * <p>See the discussion of Side Outputs above and on {@link ParDo#withOutputTags} for more - * explanation. - */ - public <OutputT> UnboundMulti<OutputT> withOutputTags( - TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) { - return new UnboundMulti<>(sideInputs, mainOutputTag, sideOutputTags); - } - - private <InputT, OutputT> Bound<InputT, OutputT> of( - DoFn<InputT, OutputT> doFn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { - return new Bound<>(doFn, sideInputs, fnDisplayData); - } - } - - /** * A {@link PTransform} that, when applied to a {@code PCollection<InputT>}, * invokes a user-specified {@code DoFn<InputT, OutputT>} on all its elements, * with all its outputs collected into an output @@ -701,8 +519,7 @@ public class ParDo { * {@link PTransform} but with the specified additional side inputs. Does not * modify this {@link PTransform}. * - * <p>See the discussion of Side Inputs above and on - * {@link ParDo#withSideInputs} for more explanation. + * <p>See the discussion of Side Inputs above for more explanation. */ public Bound<InputT, OutputT> withSideInputs(PCollectionView<?>... sideInputs) { return withSideInputs(Arrays.asList(sideInputs)); @@ -713,8 +530,7 @@ public class ParDo { * {@link PTransform} but with the specified additional side inputs. Does not * modify this {@link PTransform}. * - * <p>See the discussion of Side Inputs above and on - * {@link ParDo#withSideInputs} for more explanation. + * <p>See the discussion of Side Inputs above for more explanation. */ public Bound<InputT, OutputT> withSideInputs( Iterable<? extends PCollectionView<?>> sideInputs) { @@ -732,8 +548,7 @@ public class ParDo { * PTransform} but with the specified main and side output tags. Does not modify this {@link * PTransform}. * - * <p>See the discussion of Side Outputs above and on {@link ParDo#withOutputTags} for more - * explanation. + * <p>See the discussion of Side Outputs above for more explanation. */ public BoundMulti<InputT, OutputT> withOutputTags( TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) { @@ -798,82 +613,6 @@ public class ParDo { } /** - * An incomplete multi-output {@link ParDo} transform, with unbound - * input type. - * - * <p>Before being applied, {@link ParDo.UnboundMulti#of} must be - * invoked to specify the {@link DoFn} to invoke, which will also - * bind the input type of this {@link PTransform}. - * - * @param <OutputT> the type of the main output {@code PCollection} elements - */ - public static class UnboundMulti<OutputT> { - private final List<PCollectionView<?>> sideInputs; - private final TupleTag<OutputT> mainOutputTag; - private final TupleTagList sideOutputTags; - - UnboundMulti(List<PCollectionView<?>> sideInputs, - TupleTag<OutputT> mainOutputTag, - TupleTagList sideOutputTags) { - this.sideInputs = sideInputs; - this.mainOutputTag = mainOutputTag; - this.sideOutputTags = sideOutputTags; - } - - /** - * Returns a new multi-output {@link ParDo} transform that's like - * this transform but with the specified side inputs. Does not - * modify this transform. The resulting transform is still - * incomplete. - * - * <p>See the discussion of Side Inputs above and on - * {@link ParDo#withSideInputs} for more explanation. - */ - public UnboundMulti<OutputT> withSideInputs( - PCollectionView<?>... sideInputs) { - return withSideInputs(Arrays.asList(sideInputs)); - } - - /** - * Returns a new multi-output {@link ParDo} transform that's like - * this transform but with the specified additional side inputs. Does not - * modify this transform. The resulting transform is still - * incomplete. - * - * <p>See the discussion of Side Inputs above and on - * {@link ParDo#withSideInputs} for more explanation. - */ - public UnboundMulti<OutputT> withSideInputs( - Iterable<? extends PCollectionView<?>> sideInputs) { - return new UnboundMulti<>( - ImmutableList.<PCollectionView<?>>builder() - .addAll(this.sideInputs) - .addAll(sideInputs) - .build(), - mainOutputTag, - sideOutputTags); - } - - /** - * Returns a new multi-output {@link ParDo} {@link PTransform} - * that's like this transform but which will invoke the given - * {@link DoFn} function, and which has its input type bound. - * Does not modify this transform. The resulting - * {@link PTransform} is sufficiently specified to be applied, but - * more properties can still be specified. - */ - public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { - validate(fn); - return of(fn, displayDataForFn(fn)); - } - - private <InputT> BoundMulti<InputT, OutputT> of( - DoFn<InputT, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { - return new BoundMulti<>(fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData); - } - } - - /** * A {@link PTransform} that, when applied to a * {@code PCollection<InputT>}, invokes a user-specified * {@code DoFn<InputT, OutputT>} on all its elements, which can emit elements @@ -910,8 +649,7 @@ public class ParDo { * that's like this {@link PTransform} but with the specified additional side * inputs. Does not modify this {@link PTransform}. * - * <p>See the discussion of Side Inputs above and on - * {@link ParDo#withSideInputs} for more explanation. + * <p>See the discussion of Side Inputs above for more explanation. */ public BoundMulti<InputT, OutputT> withSideInputs( PCollectionView<?>... sideInputs) { @@ -923,8 +661,7 @@ public class ParDo { * PTransform} but with the specified additional side inputs. Does not modify this {@link * PTransform}. * - * <p>See the discussion of Side Inputs above and on {@link ParDo#withSideInputs} for more - * explanation. + * <p>See the discussion of Side Inputs above for more explanation. */ public BoundMulti<InputT, OutputT> withSideInputs( Iterable<? extends PCollectionView<?>> sideInputs) { http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java index e0b2b61..2031bc9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java @@ -105,8 +105,8 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>> PCollectionTuple outputs = in.apply( ParDo - .withOutputTags(new TupleTag<Void>(){}, outputTags) - .of(partitionDoFn)); + .of(partitionDoFn) + .withOutputTags(new TupleTag<Void>(){}, outputTags)); PCollectionList<T> pcs = PCollectionList.empty(in.getPipeline()); Coder<T> coder = in.getCoder(); http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java index 3734f7b..3d35c80 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java @@ -146,12 +146,9 @@ public class Sample { @Override public PCollection<T> expand(PCollection<T> in) { PCollectionView<Iterable<T>> iterableView = in.apply(View.<T>asIterable()); - return - in.getPipeline() + return in.getPipeline() .apply(Create.of((Void) null).withCoder(VoidCoder.of())) - .apply(ParDo - .withSideInputs(iterableView) - .of(new SampleAnyDoFn<>(limit, iterableView))) + .apply(ParDo.of(new SampleAnyDoFn<>(limit, iterableView)).withSideInputs(iterableView)) .setCoder(in.getCoder()); } http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index 27e8411..3555db3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -209,7 +209,7 @@ public class MetricsTest implements Serializable { bundleDist.update(40L); } })) - .apply("MyStep2", ParDo.withOutputTags(output1, TupleTagList.of(output2)) + .apply("MyStep2", ParDo .of(new DoFn<Integer, Integer>() { @SuppressWarnings("unused") @ProcessElement @@ -223,7 +223,8 @@ public class MetricsTest implements Serializable { c.output(element); c.sideOutput(output2, element); } - })); + }) + .withOutputTags(output1, TupleTagList.of(output2))); PipelineResult result = pipeline.run(); result.waitUntilFinish(); http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java index 1753c49..a4f2545 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java @@ -198,14 +198,14 @@ public class FlattenTest implements Serializable { PCollection<String> output = p .apply(Create.of((Void) null).withCoder(VoidCoder.of())) - .apply(ParDo.withSideInputs(view).of(new DoFn<Void, String>() { + .apply(ParDo.of(new DoFn<Void, String>() { @ProcessElement public void processElement(ProcessContext c) { for (String side : c.sideInput(view)) { c.output(side); } } - })); + }).withSideInputs(view)); PAssert.that(output).empty(); p.run(); http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 336f4c0..9a4fd15 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -497,12 +497,13 @@ public class ParDoTest implements Serializable { PCollectionTuple outputs = pipeline .apply(Create.of(inputs)) - .apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + .apply(ParDo .of(new DoFn<Integer, Void>(){ @ProcessElement public void processElement(ProcessContext c) { c.sideOutput(sideOutputTag, c.element()); - }})); + }}) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); PAssert.that(outputs.get(mainOutputTag)).empty(); PAssert.that(outputs.get(sideOutputTag)).containsInAnyOrder(inputs); @@ -586,10 +587,11 @@ public class ParDoTest implements Serializable { PCollection<String> output = pipeline .apply(Create.of(inputs)) - .apply(ParDo.withSideInputs(sideInput1, sideInputUnread, sideInput2) + .apply(ParDo .of(new TestDoFn( Arrays.asList(sideInput1, sideInput2), - Arrays.<TupleTag<String>>asList()))); + Arrays.<TupleTag<String>>asList())) + .withSideInputs(sideInput1, sideInputUnread, sideInput2)); PAssert.that(output) .satisfies(ParDoTest.HasExpectedOutput @@ -617,12 +619,13 @@ public class ParDoTest implements Serializable { PCollection<String> output = pipeline .apply(Create.of(inputs)) - .apply(ParDo.withSideInputs(sideInput1) - .withSideInputs(sideInputUnread) - .withSideInputs(sideInput2) + .apply(ParDo .of(new TestDoFn( Arrays.asList(sideInput1, sideInput2), - Arrays.<TupleTag<String>>asList()))); + Arrays.<TupleTag<String>>asList())) + .withSideInputs(sideInput1) + .withSideInputs(sideInputUnread) + .withSideInputs(sideInput2)); PAssert.that(output) .satisfies(ParDoTest.HasExpectedOutput @@ -653,13 +656,14 @@ public class ParDoTest implements Serializable { PCollectionTuple outputs = pipeline .apply(Create.of(inputs)) - .apply(ParDo.withSideInputs(sideInput1) - .withSideInputs(sideInputUnread) - .withSideInputs(sideInput2) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + .apply(ParDo .of(new TestDoFn( Arrays.asList(sideInput1, sideInput2), - Arrays.<TupleTag<String>>asList()))); + Arrays.<TupleTag<String>>asList())) + .withSideInputs(sideInput1) + .withSideInputs(sideInputUnread) + .withSideInputs(sideInput2) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); PAssert.that(outputs.get(mainOutputTag)) .satisfies(ParDoTest.HasExpectedOutput @@ -690,13 +694,14 @@ public class ParDoTest implements Serializable { PCollectionTuple outputs = pipeline .apply(Create.of(inputs)) - .apply(ParDo.withSideInputs(sideInput1) - .withSideInputs(sideInputUnread) - .withSideInputs(sideInput2) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + .apply(ParDo .of(new TestDoFn( Arrays.asList(sideInput1, sideInput2), - Arrays.<TupleTag<String>>asList()))); + Arrays.<TupleTag<String>>asList())) + .withSideInputs(sideInput1) + .withSideInputs(sideInputUnread) + .withSideInputs(sideInput2) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); PAssert.that(outputs.get(mainOutputTag)) .satisfies(ParDoTest.HasExpectedOutput @@ -1201,7 +1206,6 @@ public class ParDoTest implements Serializable { .apply(Create.of(new TestDummy()) .withCoder(TestDummyCoder.of())) .apply(ParDo - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) .of( new DoFn<TestDummy, TestDummy>() { @ProcessElement @@ -1211,7 +1215,8 @@ public class ParDoTest implements Serializable { context.sideOutput(sideOutputTag, element); } }) - ); + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + ); // Before fix, tuple.get(mainOutputTag).apply(...) would indirectly trigger // tuple.get(sideOutputTag).finishSpecifyingOutput(), which would crash @@ -1263,14 +1268,15 @@ public class ParDoTest implements Serializable { PCollection<String> output = input - .apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)).of( + .apply(ParDo.of( new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { c.sideOutputWithTimestamp( sideOutputTag, c.element(), new Instant(c.element().longValue())); } - })).get(sideOutputTag) + }).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))) + .get(sideOutputTag) .apply(ParDo.of(new TestShiftTimestampDoFn<Integer>(Duration.ZERO, Duration.ZERO))) .apply(ParDo.of(new TestFormatTimestampDoFn<Integer>())); @@ -2297,8 +2303,8 @@ public class ParDoTest implements Serializable { }; ParDo.BoundMulti<String, String> parDo = ParDo - .withOutputTags(new TupleTag<String>(), TupleTagList.empty()) - .of(fn); + .of(fn) + .withOutputTags(new TupleTag<String>(), TupleTagList.empty()); DisplayData displayData = DisplayData.from(parDo); assertThat(displayData, includesDisplayDataFor("fn", fn)); http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java index 740d808..867fe0a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java @@ -99,12 +99,12 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("Create123", Create.of(1, 2, 3)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view)); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder(47, 47, 47); @@ -129,12 +129,12 @@ public class ViewTest implements Serializable { TimestampedValue.of(3, new Instant(12)))) .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view)); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder(47, 47, 48); @@ -150,12 +150,12 @@ public class ViewTest implements Serializable { .apply(View.<Integer>asSingleton()); pipeline.apply("Create123", Create.of(1, 2, 3)) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + .apply("OutputSideInputs", ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view)); } - })); + }).withSideInputs(view)); thrown.expect(PipelineExecutionException.class); thrown.expectCause(isA(NoSuchElementException.class)); @@ -174,12 +174,12 @@ public class ViewTest implements Serializable { final PCollectionView<Integer> view = oneTwoThree.apply(View.<Integer>asSingleton()); oneTwoThree.apply( - "OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + "OutputSideInputs", ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view)); } - })); + }).withSideInputs(view)); thrown.expect(PipelineExecutionException.class); thrown.expectCause(isA(IllegalArgumentException.class)); @@ -200,7 +200,7 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("CreateMainInput", Create.of(29, 31)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { checkArgument(c.sideInput(view).size() == 4); @@ -209,7 +209,7 @@ public class ViewTest implements Serializable { c.output(i); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23); @@ -240,7 +240,7 @@ public class ViewTest implements Serializable { .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { checkArgument(c.sideInput(view).size() == 4); @@ -249,7 +249,7 @@ public class ViewTest implements Serializable { c.output(i); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43); @@ -267,14 +267,14 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { assertTrue(c.sideInput(view).isEmpty()); assertFalse(c.sideInput(view).iterator().hasNext()); c.output(1); } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); @@ -292,7 +292,7 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("CreateMainInput", Create.of(29)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { try { @@ -319,7 +319,7 @@ public class ViewTest implements Serializable { c.output(i); } } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(output).containsInAnyOrder(11); @@ -338,14 +338,14 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("CreateMainInput", Create.of(29, 31)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { for (Integer i : c.sideInput(view)) { c.output(i); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23); @@ -377,14 +377,14 @@ public class ViewTest implements Serializable { TimestampedValue.of(35, new Instant(11)))) .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { for (Integer i : c.sideInput(view)) { c.output(i); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43); @@ -402,13 +402,13 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { assertFalse(c.sideInput(view).iterator().hasNext()); c.output(1); } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); @@ -426,7 +426,7 @@ public class ViewTest implements Serializable { PCollection<Integer> output = pipeline.apply("CreateMainInput", Create.of(29)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { Iterator<Integer> iterator = c.sideInput(view).iterator(); @@ -439,7 +439,7 @@ public class ViewTest implements Serializable { c.output(iterator.next()); } } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(output).containsInAnyOrder(11); @@ -459,14 +459,14 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + ParDo.of(new DoFn<String, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) { c.output(of(c.element(), v)); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -486,7 +486,7 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of(2 /* size */)) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>() { + ParDo.of(new DoFn<Integer, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { assertEquals((int) c.element(), c.sideInput(view).size()); @@ -497,7 +497,7 @@ public class ViewTest implements Serializable { } } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("a", 1), KV.of("a", 2), KV.of("b", 3)); @@ -539,14 +539,14 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + ParDo.of(new DoFn<String, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) { c.output(of(c.element(), v)); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -574,7 +574,7 @@ public class ViewTest implements Serializable { TimestampedValue.of("banana", new Instant(13)), TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputSideInputs", ParDo.of( new DoFn<String, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -584,7 +584,7 @@ public class ViewTest implements Serializable { c.output(of(c.element(), v)); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -611,7 +611,7 @@ public class ViewTest implements Serializable { TimestampedValue.of(1 /* size */, new Instant(5)), TimestampedValue.of(1 /* size */, new Instant(16)))) .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputSideInputs", ParDo.of( new DoFn<Integer, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -626,7 +626,7 @@ public class ViewTest implements Serializable { } } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("a", 1), KV.of("a", 2), KV.of("b", 3)); @@ -655,7 +655,7 @@ public class ViewTest implements Serializable { TimestampedValue.of("banana", new Instant(13)), TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputSideInputs", ParDo.of( new DoFn<String, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -665,7 +665,7 @@ public class ViewTest implements Serializable { c.output(of(c.element(), v)); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("apple", 2), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -685,7 +685,7 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { assertTrue(c.sideInput(view).isEmpty()); @@ -693,7 +693,7 @@ public class ViewTest implements Serializable { assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); c.output(c.element()); } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); @@ -715,7 +715,7 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { assertTrue(c.sideInput(view).isEmpty()); @@ -723,7 +723,7 @@ public class ViewTest implements Serializable { assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); c.output(c.element()); } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); @@ -743,7 +743,7 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + ParDo.of(new DoFn<String, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { try { @@ -770,7 +770,7 @@ public class ViewTest implements Serializable { c.output(KV.of(c.element(), v)); } } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(output).containsInAnyOrder(KV.of("apple", 1)); @@ -790,13 +790,13 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + ParDo.of(new DoFn<String, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { c.output( of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -816,7 +816,7 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of(2 /* size */)) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>() { + ParDo.of(new DoFn<Integer, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { assertEquals((int) c.element(), c.sideInput(view).size()); @@ -825,7 +825,7 @@ public class ViewTest implements Serializable { c.output(KV.of(entry.getKey(), entry.getValue())); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("a", 1), KV.of("b", 3)); @@ -847,13 +847,13 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + ParDo.of(new DoFn<String, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { c.output( of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -881,7 +881,7 @@ public class ViewTest implements Serializable { TimestampedValue.of("banana", new Instant(4)), TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputSideInputs", ParDo.of( new DoFn<String, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -890,7 +890,7 @@ public class ViewTest implements Serializable { c.sideInput(view).get( c.element().substring(0, 1)))); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("banana", 2), KV.of("blackberry", 3)); @@ -917,7 +917,7 @@ public class ViewTest implements Serializable { TimestampedValue.of(2 /* size */, new Instant(5)), TimestampedValue.of(1 /* size */, new Instant(16)))) .apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputSideInputs", ParDo.of( new DoFn<Integer, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -930,7 +930,7 @@ public class ViewTest implements Serializable { c.output(KV.of(entry.getKey(), entry.getValue())); } } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("a", 1), KV.of("b", 2), KV.of("b", 3)); @@ -961,7 +961,7 @@ public class ViewTest implements Serializable { TimestampedValue.of("banana", new Instant(4)), TimestampedValue.of("blackberry", new Instant(16)))) .apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputSideInputs", ParDo.of( new DoFn<String, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -970,7 +970,7 @@ public class ViewTest implements Serializable { c.sideInput(view).get( c.element().substring(0, 1)))); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("banana", 2), KV.of("blackberry", 3)); @@ -991,7 +991,7 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { assertTrue(c.sideInput(view).isEmpty()); @@ -999,7 +999,7 @@ public class ViewTest implements Serializable { assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); c.output(c.element()); } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); @@ -1019,7 +1019,7 @@ public class ViewTest implements Serializable { PCollection<Integer> results = pipeline.apply("Create1", Create.of(1)) .apply("OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() { + ParDo.of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { assertTrue(c.sideInput(view).isEmpty()); @@ -1027,7 +1027,7 @@ public class ViewTest implements Serializable { assertFalse(c.sideInput(view).entrySet().iterator().hasNext()); c.output(c.element()); } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(results).containsInAnyOrder(1); @@ -1052,13 +1052,13 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + ParDo.of(new DoFn<String, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { c.output( KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 1), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -1082,7 +1082,7 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("apple")) .apply( "OutputSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + ParDo.of(new DoFn<String, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { try { @@ -1108,7 +1108,7 @@ public class ViewTest implements Serializable { c.output( KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); } - })); + }).withSideInputs(view)); // Pass at least one value through to guarantee that DoFn executes. PAssert.that(output).containsInAnyOrder(KV.of("apple", 1)); @@ -1128,13 +1128,13 @@ public class ViewTest implements Serializable { PCollection<KV<String, Integer>> output = pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry")) .apply("Output", - ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() { + ParDo.of(new DoFn<String, KV<String, Integer>>() { @ProcessElement public void processElement(ProcessContext c) { c.output(KV .of(c.element(), c.sideInput(view).get(c.element().substring(0, 1)))); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder( KV.of("apple", 21), KV.of("banana", 3), KV.of("blackberry", 3)); @@ -1161,13 +1161,13 @@ public class ViewTest implements Serializable { TimestampedValue.of("B", new Instant(15)), TimestampedValue.of("C", new Instant(7)))) .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputMainAndSideInputs", ParDo.of( new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder("A1", "B5", "C1"); @@ -1193,13 +1193,13 @@ public class ViewTest implements Serializable { TimestampedValue.of("B", new Instant(15)), TimestampedValue.of("C", new Instant(7)))) .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputMainAndSideInputs", ParDo.of( new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder("A6", "B6", "C6"); @@ -1223,13 +1223,13 @@ public class ViewTest implements Serializable { TimestampedValue.of("B", new Instant(15)), TimestampedValue.of("C", new Instant(7)))) .apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10)))) - .apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of( + .apply("OutputMainAndSideInputs", ParDo.of( new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder("A0", "B5", "C0"); @@ -1253,12 +1253,12 @@ public class ViewTest implements Serializable { pipeline.apply("CreateMainInput", Create.of("")) .apply( "OutputMainAndSideInputs", - ParDo.withSideInputs(view).of(new DoFn<String, String>() { + ParDo.of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.element() + c.sideInput(view)); } - })); + }).withSideInputs(view)); PAssert.that(output).containsInAnyOrder("null"); @@ -1282,18 +1282,18 @@ public class ViewTest implements Serializable { pipeline.apply("CreateVoid2", Create.of((Void) null).withCoder(VoidCoder.of())) .apply( "OutputSideInput", - ParDo.withSideInputs(view1).of(new DoFn<Void, Iterable<Integer>>() { + ParDo.of(new DoFn<Void, Iterable<Integer>>() { @ProcessElement public void processElement(ProcessContext c) { c.output(c.sideInput(view1)); } - })) + }).withSideInputs(view1)) .apply("View2", View.<Iterable<Integer>>asIterable()); PCollection<Integer> output = pipeline.apply("CreateVoid3", Create.of((Void) null).withCoder(VoidCoder.of())) .apply("ReadIterableSideInput", - ParDo.withSideInputs(view2).of(new DoFn<Void, Integer>() { + ParDo.of(new DoFn<Void, Integer>() { @ProcessElement public void processElement(ProcessContext c) { for (Iterable<Integer> input : c.sideInput(view2)) { @@ -1302,7 +1302,7 @@ public class ViewTest implements Serializable { } } } - })); + }).withSideInputs(view2)); PAssert.that(output).containsInAnyOrder(17); http://git-wip-us.apache.org/repos/asf/beam/blob/d0349eef/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java index 5e7cc7d..18d550c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java @@ -59,8 +59,8 @@ public class TypedPValueTest { PCollection<Integer> input = p.apply(Create.of(1, 2, 3)); PCollectionTuple tuple = input.apply( ParDo - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) - .of(new IdentityDoFn())); + .of(new IdentityDoFn()) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); return tuple; }
