http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 9a66a2f..5496f71 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -97,7 +97,7 @@ import org.joda.time.Instant; * @param <InputT> the input type of the {@link DoFn} * @param <FnOutputT> the output type of the {@link DoFn} * @param <OutputT> the output type of the operator, this can be different from the fn output - * type when we have side outputs + * type when we have additional tagged outputs */ public class DoFnOperator<InputT, FnOutputT, OutputT> extends AbstractStreamOperator<OutputT> @@ -110,7 +110,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> protected final SerializedPipelineOptions serializedOptions; protected final TupleTag<FnOutputT> mainOutputTag; - protected final List<TupleTag<?>> sideOutputTags; + protected final List<TupleTag<?>> additionalOutputTags; protected final Collection<PCollectionView<?>> sideInputs; protected final Map<Integer, PCollectionView<?>> sideInputTagMapping; @@ -155,7 +155,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> DoFn<InputT, FnOutputT> doFn, Coder<WindowedValue<InputT>> inputCoder, TupleTag<FnOutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, + List<TupleTag<?>> additionalOutputTags, OutputManagerFactory<OutputT> outputManagerFactory, WindowingStrategy<?, ?> windowingStrategy, Map<Integer, PCollectionView<?>> sideInputTagMapping, @@ -165,7 +165,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> this.doFn = doFn; this.inputCoder = inputCoder; this.mainOutputTag = mainOutputTag; - this.sideOutputTags = sideOutputTags; + this.additionalOutputTags = additionalOutputTags; this.sideInputTagMapping = sideInputTagMapping; this.sideInputs = sideInputs; this.serializedOptions = new SerializedPipelineOptions(options); @@ -275,7 +275,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> sideInputReader, outputManager, mainOutputTag, - sideOutputTags, + additionalOutputTags, stepContext, aggregatorFactory, windowingStrategy); @@ -619,7 +619,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> return new DoFnRunners.OutputManager() { @Override public <T> void output(TupleTag<T> tag, WindowedValue<T> value) { - // with side outputs we can't get around this because we don't + // with tagged outputs we can't get around this because we don't // know our own output type... @SuppressWarnings("unchecked") OutputT castValue = (OutputT) value; @@ -675,7 +675,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> public void noteOutput(WindowedValue<?> output) {} @Override - public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) {} + public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {} @Override public <T, W extends BoundedWindow> void writePCollectionViewData(
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 0724ac2..1a636c9 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -63,7 +63,7 @@ public class SplittableDoFnOperator< WindowedValue< KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder, TupleTag<FnOutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, + List<TupleTag<?>> additionalOutputTags, OutputManagerFactory<OutputT> outputManagerFactory, WindowingStrategy<?, ?> windowingStrategy, Map<Integer, PCollectionView<?>> sideInputTagMapping, @@ -74,7 +74,7 @@ public class SplittableDoFnOperator< doFn, inputCoder, mainOutputTag, - sideOutputTags, + additionalOutputTags, outputManagerFactory, windowingStrategy, sideInputTagMapping, @@ -125,9 +125,9 @@ public class SplittableDoFnOperator< } @Override - public <SideOutputT> void sideOutputWindowedValue( - TupleTag<SideOutputT> tag, - SideOutputT output, + public <AdditionalOutputT> void outputWindowedValue( + TupleTag<AdditionalOutputT> tag, + AdditionalOutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index b015f66..8bbc6ef 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -57,7 +57,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn, Coder<WindowedValue<KeyedWorkItem<K, InputT>>> inputCoder, TupleTag<KV<K, OutputT>> mainOutputTag, - List<TupleTag<?>> sideOutputTags, + List<TupleTag<?>> additionalOutputTags, OutputManagerFactory<WindowedValue<KV<K, OutputT>>> outputManagerFactory, WindowingStrategy<?, ?> windowingStrategy, Map<Integer, PCollectionView<?>> sideInputTagMapping, @@ -68,7 +68,7 @@ public class WindowDoFnOperator<K, InputT, OutputT> null, inputCoder, mainOutputTag, - sideOutputTags, + additionalOutputTags, outputManagerFactory, windowingStrategy, sideInputTagMapping, http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index c1fdea3..4c826d1 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -144,19 +144,19 @@ public class DoFnOperatorTest { WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()); TupleTag<String> mainOutput = new TupleTag<>("main-output"); - TupleTag<String> sideOutput1 = new TupleTag<>("side-output-1"); - TupleTag<String> sideOutput2 = new TupleTag<>("side-output-2"); + TupleTag<String> additionalOutput1 = new TupleTag<>("output-1"); + TupleTag<String> additionalOutput2 = new TupleTag<>("output-2"); ImmutableMap<TupleTag<?>, Integer> outputMapping = ImmutableMap.<TupleTag<?>, Integer>builder() .put(mainOutput, 1) - .put(sideOutput1, 2) - .put(sideOutput2, 3) + .put(additionalOutput1, 2) + .put(additionalOutput2, 3) .build(); DoFnOperator<String, String, RawUnionValue> doFnOperator = new DoFnOperator<>( - new MultiOutputDoFn(sideOutput1, sideOutput2), + new MultiOutputDoFn(additionalOutput1, additionalOutput2), windowedValueCoder, mainOutput, - ImmutableList.<TupleTag<?>>of(sideOutput1, sideOutput2), + ImmutableList.<TupleTag<?>>of(additionalOutput1, additionalOutput2), new DoFnOperator.MultiOutputOutputManagerFactory(outputMapping), WindowingStrategy.globalDefault(), new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ @@ -176,8 +176,8 @@ public class DoFnOperatorTest { assertThat( this.stripStreamRecordFromRawUnion(testHarness.getOutput()), contains( - new RawUnionValue(2, WindowedValue.valueInGlobalWindow("side: one")), - new RawUnionValue(3, WindowedValue.valueInGlobalWindow("side: two")), + new RawUnionValue(2, WindowedValue.valueInGlobalWindow("extra: one")), + new RawUnionValue(3, WindowedValue.valueInGlobalWindow("extra: two")), new RawUnionValue(1, WindowedValue.valueInGlobalWindow("got: hello")), new RawUnionValue(2, WindowedValue.valueInGlobalWindow("got: hello")), new RawUnionValue(3, WindowedValue.valueInGlobalWindow("got: hello")))); @@ -542,24 +542,24 @@ public class DoFnOperatorTest { } private static class MultiOutputDoFn extends DoFn<String, String> { - private TupleTag<String> sideOutput1; - private TupleTag<String> sideOutput2; + private TupleTag<String> additionalOutput1; + private TupleTag<String> additionalOutput2; - public MultiOutputDoFn(TupleTag<String> sideOutput1, TupleTag<String> sideOutput2) { - this.sideOutput1 = sideOutput1; - this.sideOutput2 = sideOutput2; + public MultiOutputDoFn(TupleTag<String> additionalOutput1, TupleTag<String> additionalOutput2) { + this.additionalOutput1 = additionalOutput1; + this.additionalOutput2 = additionalOutput2; } @ProcessElement public void processElement(ProcessContext c) throws Exception { if (c.element().equals("one")) { - c.sideOutput(sideOutput1, "side: one"); + c.output(additionalOutput1, "extra: one"); } else if (c.element().equals("two")) { - c.sideOutput(sideOutput2, "side: two"); + c.output(additionalOutput2, "extra: two"); } else { c.output("got: " + c.element()); - c.sideOutput(sideOutput1, "got: " + c.element()); - c.sideOutput(sideOutput2, "got: " + c.element()); + c.output(additionalOutput1, "got: " + c.element()); + c.output(additionalOutput2, "got: " + c.element()); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java index 3ded079..73f3728 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java @@ -175,7 +175,7 @@ public class BatchStatefulParDoOverrides { ParDo.of(new BatchStatefulDoFn<K, InputT, OutputT>(fn)) .withSideInputs(originalParDo.getSideInputs()) .withOutputTags( - originalParDo.getMainOutputTag(), originalParDo.getSideOutputTags()); + originalParDo.getMainOutputTag(), originalParDo.getAdditionalOutputTags()); return input.apply(new GbkBeforeStatefulParDo<K, InputT>()).apply(statefulParDo); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java index 86bfeb6..ead2712 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java @@ -494,7 +494,7 @@ class BatchViewOverrides { */ private void outputMetadataRecordForSize( ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value, long uniqueKeyCount) { - c.sideOutput(outputForSize, + c.output(outputForSize, KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), value.getKey().getValue())), KV.of(value.getKey().getValue(), uniqueKeyCount))); @@ -503,7 +503,7 @@ class BatchViewOverrides { /** This outputs records which will be used to construct the entry set. */ private void outputMetadataRecordForEntrySet( ProcessContext c, KV<KV<K, W>, WindowedValue<V>> value) { - c.sideOutput(outputForEntrySet, + c.output(outputForEntrySet, KV.of(ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), value.getKey().getValue())), KV.of(value.getKey().getValue(), value.getKey().getKey()))); @@ -773,7 +773,7 @@ class BatchViewOverrides { coderForMapLike(windowCoder, inputCoder.getKeyCoder(), inputCoder.getValueCoder()); // Create the various output tags representing the main output containing the data stream - // and the side outputs containing the metadata about the size and entry set. + // and the additional outputs containing the metadata about the size and entry set. TupleTag<IsmRecord<WindowedValue<V>>> mainOutputTag = new TupleTag<>(); TupleTag<KV<Integer, KV<W, Long>>> outputForSizeTag = new TupleTag<>(); TupleTag<KV<Integer, KV<W, K>>> outputForEntrySetTag = new TupleTag<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java index cd12c92..87395e6 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchViewOverridesTest.java @@ -280,7 +280,7 @@ public class BatchViewOverridesTest { // Verify the number of unique keys per window. assertThat( - doFnTester.takeSideOutputElements(outputForSizeTag), + doFnTester.takeOutputElements(outputForSizeTag), contains( KV.of( ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), @@ -294,7 +294,7 @@ public class BatchViewOverridesTest { // Verify the output for the unique keys. assertThat( - doFnTester.takeSideOutputElements(outputForEntrySetTag), + doFnTester.takeOutputElements(outputForEntrySetTag), contains( KV.of( ismCoder.hash(ImmutableList.of(IsmFormat.getMetadataKey(), windowA)), http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 0e74fa2..029c28a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -412,12 +412,14 @@ public class SparkGroupAlsoByWindowViaWindowSet { } @Override - public <SideOutputT> void sideOutputWindowedValue( - TupleTag<SideOutputT> tag, - SideOutputT output, Instant timestamp, + public <AdditionalOutputT> void outputWindowedValue( + TupleTag<AdditionalOutputT> tag, + AdditionalOutputT output, + Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - throw new UnsupportedOperationException("Side outputs are not allowed in GroupAlsoByWindow."); + throw new UnsupportedOperationException( + "Tagged outputs are not allowed in GroupAlsoByWindow."); } } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index a761954..4cd1683 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -43,8 +43,8 @@ import scala.Tuple2; /** - * DoFunctions ignore side outputs. MultiDoFunctions deal with side outputs by enriching the - * underlying data with multiple TupleTags. + * DoFunctions ignore outputs that are not the main output. MultiDoFunctions deal with additional + * outputs by enriching the underlying data with multiple TupleTags. * * @param <InputT> Input type for DoFunction. * @param <OutputT> Output type for DoFunction. http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java index d19c4a9..ccc0fa3 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java @@ -164,12 +164,12 @@ public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends Bounde } @Override - public <SideOutputT> void sideOutputWindowedValue( - TupleTag<SideOutputT> tag, - SideOutputT output, + public <AdditionalOutputT> void outputWindowedValue( + TupleTag<AdditionalOutputT> tag, + AdditionalOutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs."); + throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs."); } Iterable<WindowedValue<KV<K, Iterable<V>>>> getOutputs() { http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index 4f8a1a5..3e8dde5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -113,7 +113,7 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> { public void noteOutput(WindowedValue<?> output) { } @Override - public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) { } + public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) { } @Override public <T, W extends BoundedWindow> void writePCollectionViewData( http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 65892d2..000eada 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -385,7 +385,7 @@ public final class StreamingTransformTranslator { JavaDStream<WindowedValue<InputT>> dStream = unboundedDataset.getDStream(); final String stepName = context.getCurrentTransform().getFullName(); - if (transform.getSideOutputTags().size() == 0) { + if (transform.getAdditionalOutputTags().size() == 0) { JavaPairDStream<TupleTag<?>, WindowedValue<?>> all = dStream.transformToPair( new Function< http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 8fe4831..58d65d0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -2191,7 +2191,7 @@ public class Combine { c.output(kv); } else { int nonce = counter++ % spread; - c.sideOutput(hot, KV.of(KV.of(kv.getKey(), nonce), kv.getValue())); + c.output(hot, KV.of(KV.of(kv.getKey(), nonce), kv.getValue())); } } }) http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 74a1348..d3da251 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -153,14 +153,14 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD public abstract void outputWithTimestamp(OutputT output, Instant timestamp); /** - * Adds the given element to the side output {@code PCollection} with the + * Adds the given element to the output {@code PCollection} with the * given tag. * - * <p>Once passed to {@code sideOutput} the element should not be modified + * <p>Once passed to {@code output} the element should not be modified * in any way. * * <p>The caller of {@code ParDo} uses {@link ParDo.SingleOutput#withOutputTags} to - * specify the tags of side outputs that it consumes. Non-consumed side + * specify the tags of outputs that it consumes. Non-consumed * outputs, e.g., outputs for monitoring purposes only, don't necessarily * need to be specified. * @@ -180,13 +180,13 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * * @see ParDo.SingleOutput#withOutputTags */ - public abstract <T> void sideOutput(TupleTag<T> tag, T output); + public abstract <T> void output(TupleTag<T> tag, T output); /** - * Adds the given element to the specified side output {@code PCollection}, + * Adds the given element to the specified output {@code PCollection}, * with the given timestamp. * - * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be + * <p>Once passed to {@code outputWithTimestamp} the element should not be * modified in any way. * * <p>If invoked from {@link ProcessElement}), the timestamp @@ -207,7 +207,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * * @see ParDo.SingleOutput#withOutputTags */ - public abstract <T> void sideOutputWithTimestamp( + public abstract <T> void outputWithTimestamp( TupleTag<T> tag, T output, Instant timestamp); /** http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 88f4035..5446431 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -65,7 +65,7 @@ import org.joda.time.Instant; * * // Set arguments shared across all bundles: * fnTester.setSideInputs(...); // If fn takes side inputs. - * fnTester.setSideOutputTags(...); // If fn writes to side outputs. + * fnTester.setOutputTags(...); // If fn writes to more than one output. * * // Process a bundle containing a single input element: * Input testInput = ...; @@ -464,14 +464,14 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } /** - * Returns the elements output so far to the side output with the + * Returns the elements output so far to the output with the * given tag. Does not clear them, so subsequent calls will * continue to include these elements. * - * @see #takeSideOutputElements - * @see #clearSideOutputElements + * @see #takeOutputElements + * @see #clearOutputElements */ - public <T> List<T> peekSideOutputElements(TupleTag<T> tag) { + public <T> List<T> peekOutputElements(TupleTag<T> tag) { // TODO: Should we return an unmodifiable list? return Lists.transform(getImmutableOutput(tag), new Function<ValueInSingleWindow<T>, T>() { @@ -483,24 +483,23 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } /** - * Clears the record of the elements output so far to the side - * output with the given tag. + * Clears the record of the elements output so far to the output with the given tag. * - * @see #peekSideOutputElements + * @see #peekOutputElements */ - public <T> void clearSideOutputElements(TupleTag<T> tag) { + public <T> void clearOutputElements(TupleTag<T> tag) { getMutableOutput(tag).clear(); } /** - * Returns the elements output so far to the side output with the given tag. + * Returns the elements output so far to the output with the given tag. * Clears the list so these elements don't appear in future calls. * - * @see #peekSideOutputElements + * @see #peekOutputElements */ - public <T> List<T> takeSideOutputElements(TupleTag<T> tag) { - List<T> resultElems = new ArrayList<>(peekSideOutputElements(tag)); - clearSideOutputElements(tag); + public <T> List<T> takeOutputElements(TupleTag<T> tag) { + List<T> resultElems = new ArrayList<>(peekOutputElements(tag)); + clearOutputElements(tag); return resultElems; } @@ -563,12 +562,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { throwUnsupportedOutputFromBundleMethods(); } @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { + public <T> void output(TupleTag<T> tag, T output) { throwUnsupportedOutputFromBundleMethods(); } @@ -683,21 +682,21 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { @Override public void output(OutputT output) { - sideOutput(mainOutputTag, output); + output(mainOutputTag, output); } @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { - sideOutputWithTimestamp(mainOutputTag, output, timestamp); + outputWithTimestamp(mainOutputTag, output, timestamp); } @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - sideOutputWithTimestamp(tag, output, element.getTimestamp()); + public <T> void output(TupleTag<T> tag, T output) { + outputWithTimestamp(tag, output, element.getTimestamp()); } @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { getMutableOutput(tag) .add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane())); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 3de845b..e3777ac 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -35,6 +35,7 @@ import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; +import org.apache.beam.sdk.transforms.display.DisplayData.ItemSpec; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.MethodWithExtraParameters; @@ -103,7 +104,7 @@ import org.apache.beam.sdk.values.TypedPValue; * <p>Each of the calls to any of the {@link DoFn DoFn's} processing * methods can produce zero or more output elements. All of the * of output elements from all of the {@link DoFn} instances - * are included in the output {@link PCollection}. + * are included in an output {@link PCollection}. * * <p>For example: * @@ -180,20 +181,20 @@ import org.apache.beam.sdk.values.TypedPValue; * }})); * }</pre> * - * <h2>Side Outputs</h2> + * <h2>Additional Outputs</h2> * * <p>Optionally, a {@link ParDo} transform can produce multiple * output {@link PCollection PCollections}, both a "main output" - * {@code PCollection<OutputT>} plus any number of "side output" + * {@code PCollection<OutputT>} plus any number of additional output * {@link PCollection PCollections}, each keyed by a distinct {@link TupleTag}, * and bundled in a {@link PCollectionTuple}. The {@link TupleTag TupleTags} * to be used for the output {@link PCollectionTuple} are specified by - * invoking {@link SingleOutput#withOutputTags}. Unconsumed side outputs do not + * invoking {@link SingleOutput#withOutputTags}. Unconsumed outputs do not * necessarily need to be explicitly specified, even if the {@link DoFn} * generates them. Within the {@link DoFn}, an element is added to the * main output {@link PCollection} as normal, using - * {@link DoFn.Context#output}, while an element is added to a side output - * {@link PCollection} using {@link DoFn.Context#sideOutput}. For example: + * {@link DoFn.Context#output(Object)}, while an element is added to any additional output + * {@link PCollection} using {@link DoFn.Context#output(TupleTag, Object)}. For example: * * <pre>{@code * PCollection<String> words = ...; @@ -201,7 +202,7 @@ import org.apache.beam.sdk.values.TypedPValue; * // plus the lengths of words that are above the cut off. * // Also select words starting with "MARKER". * final int wordLengthCutOff = 10; - * // Create tags to use for the main and side outputs. + * // Create tags to use for the main and additional outputs. * final TupleTag<String> wordsBelowCutOffTag = * new TupleTag<String>(){}; * final TupleTag<Integer> wordLengthsAboveCutOffTag = @@ -212,7 +213,7 @@ import org.apache.beam.sdk.values.TypedPValue; * words.apply( * ParDo * .of(new DoFn<String, String>() { - * // Create a tag for the unconsumed side output. + * // Create a tag for the unconsumed output. * final TupleTag<String> specialWordsTag = * new TupleTag<String>(){}; * {@literal @}ProcessElement @@ -222,19 +223,19 @@ import org.apache.beam.sdk.values.TypedPValue; * // Emit this short word to the main output. * c.output(word); * } else { - * // Emit this long word's length to a side output. - * c.sideOutput(wordLengthsAboveCutOffTag, word.length()); + * // Emit this long word's length to a specified output. + * c.output(wordLengthsAboveCutOffTag, word.length()); * } * if (word.startsWith("MARKER")) { - * // Emit this word to a different side output. - * c.sideOutput(markedWordsTag, word); + * // Emit this word to a different specified output. + * c.output(markedWordsTag, word); * } * if (word.startsWith("SPECIAL")) { - * // Emit this word to the unconsumed side output. - * c.sideOutput(specialWordsTag, word); + * // Emit this word to the unconsumed output. + * c.output(specialWordsTag, word); * } * }}) - * // Specify the main and consumed side output tags of the + * // Specify the main and consumed output tags of the * // PCollectionTuple result: * .withOutputTags(wordsBelowCutOffTag, * TupleTagList.of(wordLengthsAboveCutOffTag) @@ -254,9 +255,9 @@ import org.apache.beam.sdk.values.TypedPValue; * elements of the main output {@link PCollection PCollection<OutputT>} is * inferred from the concrete type of the {@link DoFn DoFn<InputT, OutputT>}. * - * <p>By default, the {@link Coder Coder<SideOutputT>} for the elements of - * a side output {@link PCollection PCollection<SideOutputT>} is inferred - * from the concrete type of the corresponding {@link TupleTag TupleTag<SideOutputT>}. + * <p>By default, the {@link Coder Coder<AdditionalOutputT>} for the elements of + * an output {@link PCollection PCollection<AdditionalOutputT>} is inferred + * from the concrete type of the corresponding {@link TupleTag TupleTag<AdditionalOutputT>}. * To be successful, the {@link TupleTag} should be created as an instance * of a trivial anonymous subclass, with {@code {}} suffixed to the * constructor call. Such uses block Java's generic type parameter @@ -265,12 +266,12 @@ import org.apache.beam.sdk.values.TypedPValue; * <pre> {@code * // A TupleTag to use for a side input can be written concisely: * final TupleTag<Integer> sideInputag = new TupleTag<>(); - * // A TupleTag to use for a side output should be written with "{}", + * // A TupleTag to use for an output should be written with "{}", * // and explicit generic parameter type: - * final TupleTag<String> sideOutputTag = new TupleTag<String>(){}; + * final TupleTag<String> additionalOutputTag = new TupleTag<String>(){}; * } </pre> * This style of {@code TupleTag} instantiation is used in the example of - * multiple side outputs, above. + * {@link ParDo ParDos} that produce multiple outputs, above. * * <h2>Serializability of {@link DoFn DoFns}</h2> * @@ -358,7 +359,7 @@ import org.apache.beam.sdk.values.TypedPValue; * that state across Java processes. All information should be * communicated to {@link DoFn} instances via main and side inputs and * serialized state, and all output should be communicated from a - * {@link DoFn} instance via main and side outputs, in the absence of + * {@link DoFn} instance via output {@link PCollection PCollections}, in the absence of * external communication mechanisms written by user code. * * <h2>Fault Tolerance</h2> @@ -602,14 +603,14 @@ public class ParDo { /** * Returns a new multi-output {@link ParDo} {@link PTransform} that's like this {@link - * PTransform} but with the specified main and side output tags. Does not modify this {@link + * PTransform} but with the specified output tags. Does not modify this {@link * PTransform}. * - * <p>See the discussion of Side Outputs above for more explanation. + * <p>See the discussion of Additional Outputs above for more explanation. */ public MultiOutput<InputT, OutputT> withOutputTags( - TupleTag<OutputT> mainOutputTag, TupleTagList sideOutputTags) { - return new MultiOutput<>(fn, sideInputs, mainOutputTag, sideOutputTags, fnDisplayData); + TupleTag<OutputT> mainOutputTag, TupleTagList additionalOutputTags) { + return new MultiOutput<>(fn, sideInputs, mainOutputTag, additionalOutputTags, fnDisplayData); } @Override @@ -671,11 +672,9 @@ public class ParDo { } /** - * A {@link PTransform} that, when applied to a - * {@code PCollection<InputT>}, invokes a user-specified - * {@code DoFn<InputT, OutputT>} on all its elements, which can emit elements - * to any of the {@link PTransform}'s main and side output - * {@code PCollection}s, which are bundled into a result + * A {@link PTransform} that, when applied to a {@code PCollection<InputT>}, invokes a + * user-specified {@code DoFn<InputT, OutputT>} on all its elements, which can emit elements to + * any of the {@link PTransform}'s output {@code PCollection}s, which are bundled into a result * {@code PCollectionTuple}. * * @param <InputT> the type of the (main) input {@code PCollection} elements @@ -685,7 +684,7 @@ public class ParDo { extends PTransform<PCollection<? extends InputT>, PCollectionTuple> { private final List<PCollectionView<?>> sideInputs; private final TupleTag<OutputT> mainOutputTag; - private final TupleTagList sideOutputTags; + private final TupleTagList additionalOutputTags; private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData; private final DoFn<InputT, OutputT> fn; @@ -693,11 +692,11 @@ public class ParDo { DoFn<InputT, OutputT> fn, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, - TupleTagList sideOutputTags, - DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { + TupleTagList additionalOutputTags, + ItemSpec<? extends Class<?>> fnDisplayData) { this.sideInputs = sideInputs; this.mainOutputTag = mainOutputTag; - this.sideOutputTags = sideOutputTags; + this.additionalOutputTags = additionalOutputTags; this.fn = SerializableUtils.clone(fn); this.fnDisplayData = fnDisplayData; } @@ -730,7 +729,7 @@ public class ParDo { .addAll(sideInputs) .build(), mainOutputTag, - sideOutputTags, + additionalOutputTags, fnDisplayData); } @@ -745,7 +744,7 @@ public class ParDo { PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal( input.getPipeline(), - TupleTagList.of(mainOutputTag).and(sideOutputTags.getAll()), + TupleTagList.of(mainOutputTag).and(additionalOutputTags.getAll()), input.getWindowingStrategy(), input.isBounded()); @@ -794,8 +793,8 @@ public class ParDo { return mainOutputTag; } - public TupleTagList getSideOutputTags() { - return sideOutputTags; + public TupleTagList getAdditionalOutputTags() { + return additionalOutputTags; } public List<PCollectionView<?>> getSideInputs() { http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java index 2031bc9..595d18c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java @@ -169,7 +169,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>> if (0 <= partition && partition < numPartitions) { @SuppressWarnings("unchecked") TupleTag<X> typedTag = (TupleTag<X>) outputTags.get(partition); - c.sideOutput(typedTag, input); + c.output(typedTag, input); } else { throw new IndexOutOfBoundsException( "Partition function returned out of bounds index: " http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java index 0ab26ca..ce67e94 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java @@ -37,8 +37,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; * {@link PTransform} taking * or producing multiple PCollection inputs or outputs that can be of * different types, for instance a - * {@link ParDo} with side - * outputs. + * {@link ParDo} with multiple outputs. * * <p>A {@link PCollectionTuple} can be created and accessed like follows: * <pre> {@code http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java index a6b63ab..37d41f7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTag.java @@ -31,25 +31,23 @@ import org.apache.beam.sdk.util.CloudObject; import org.apache.beam.sdk.util.PropertyNames; /** - * A {@link TupleTag} is a typed tag to use as the key of a - * heterogeneously typed tuple, like {@link PCollectionTuple}. - * Its generic type parameter allows tracking - * the static type of things stored in tuples. + * A {@link TupleTag} is a typed tag to use as the key of a heterogeneously typed tuple, like {@link + * PCollectionTuple}. Its generic type parameter allows tracking the static type of things stored in + * tuples. * - * <p>To aid in assigning default {@link org.apache.beam.sdk.coders.Coder Coders} for results of - * side outputs of {@link ParDo}, an output - * {@link TupleTag} should be instantiated with an extra {@code {}} so - * it is an instance of an anonymous subclass without generic type - * parameters. Input {@link TupleTag TupleTags} require no such extra - * instantiation (although it doesn't hurt). For example: + * <p>To aid in assigning default {@link org.apache.beam.sdk.coders.Coder Coders} for results of a + * {@link ParDo}, an output {@link TupleTag} should be instantiated with an extra {@code {}} so it + * is an instance of an anonymous subclass without generic type parameters. Input {@link TupleTag + * TupleTags} require no such extra instantiation (although it doesn't hurt). For example: * - * <pre> {@code + * <pre>{@code * TupleTag<SomeType> inputTag = new TupleTag<>(); * TupleTag<SomeOtherType> outputTag = new TupleTag<SomeOtherType>(){}; - * } </pre> + * } + * </pre> * - * @param <V> the type of the elements or values of the tagged thing, - * e.g., a {@code PCollection<V>}. + * @param <V> the type of the elements or values of the tagged thing, e.g., a {@code + * PCollection<V>}. */ public class TupleTag<V> implements Serializable { /** http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTagList.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTagList.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTagList.java index b4ce941..5aeff5e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTagList.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TupleTagList.java @@ -28,7 +28,7 @@ import org.apache.beam.sdk.transforms.ParDo; /** * A {@link TupleTagList} is an immutable list of heterogeneously * typed {@link TupleTag TupleTags}. A {@link TupleTagList} is used, for instance, to - * specify the tags of the side outputs of a + * specify the tags of the additional outputs of a * {@link ParDo}. * * <p>A {@link TupleTagList} can be created and accessed like follows: http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java index d353835..54af747 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java @@ -148,14 +148,14 @@ public abstract class TypedPValue<T> extends PValueBase implements PValue { return new CoderOrFailure<>(registry.getDefaultCoder(token), null); } catch (CannotProvideCoderException exc) { inferFromTokenException = exc; - // Attempt to detect when the token came from a TupleTag used for a ParDo side output, + // Attempt to detect when the token came from a TupleTag used for a ParDo output, // and provide a better error message if so. Unfortunately, this information is not // directly available from the TypeDescriptor, so infer based on the type of the PTransform // and the error message itself. if (transform instanceof ParDo.MultiOutput && exc.getReason() == ReasonCode.TYPE_ERASURE) { inferFromTokenException = new CannotProvideCoderException(exc.getMessage() - + " If this error occurs for a side output of the producing ParDo, verify that the " + + " If this error occurs for an output of the producing ParDo, verify that the " + "TupleTag for this output is constructed with proper type information (see " + "TupleTag Javadoc) or explicitly set the Coder to use if this is not possible."); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java index 3555db3..afe384d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/metrics/MetricsTest.java @@ -221,7 +221,7 @@ public class MetricsTest implements Serializable { values.update(element); gauge.set(12L); c.output(element); - c.sideOutput(output2, element); + c.output(output2, element); } }) .withOutputTags(output1, TupleTagList.of(output2))); http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index b429eab..589c744 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -153,15 +153,15 @@ public class ParDoTest implements Serializable { State state = State.NOT_SET_UP; final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>(); - final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>(); + final List<TupleTag<String>> additionalOutputTupleTags = new ArrayList<>(); public TestDoFn() { } public TestDoFn(List<PCollectionView<Integer>> sideInputViews, - List<TupleTag<String>> sideOutputTupleTags) { + List<TupleTag<String>> additionalOutputTupleTags) { this.sideInputViews.addAll(sideInputViews); - this.sideOutputTupleTags.addAll(sideOutputTupleTags); + this.additionalOutputTupleTags.addAll(additionalOutputTupleTags); } @Setup @@ -197,9 +197,9 @@ public class ParDoTest implements Serializable { private void outputToAll(Context c, String value) { c.output(value); - for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) { - c.sideOutput(sideOutputTupleTag, - sideOutputTupleTag.getId() + ": " + value); + for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) { + c.output(additionalOutputTupleTag, + additionalOutputTupleTag.getId() + ": " + value); } } @@ -212,9 +212,9 @@ public class ParDoTest implements Serializable { value += ": " + sideInputValues; } c.output(value); - for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) { - c.sideOutput(sideOutputTupleTag, - sideOutputTupleTag.getId() + ": " + value); + for (TupleTag<String> additionalOutputTupleTag : additionalOutputTupleTags) { + c.output(additionalOutputTupleTag, + additionalOutputTupleTag.getId() + ": " + value); } } } @@ -389,90 +389,90 @@ public class ParDoTest implements Serializable { @Test @Category(ValidatesRunner.class) - public void testParDoWithSideOutputs() { + public void testParDoWithTaggedOutput() { List<Integer> inputs = Arrays.asList(3, -42, 666); TupleTag<String> mainOutputTag = new TupleTag<String>("main"){}; - TupleTag<String> sideOutputTag1 = new TupleTag<String>("side1"){}; - TupleTag<String> sideOutputTag2 = new TupleTag<String>("side2"){}; - TupleTag<String> sideOutputTag3 = new TupleTag<String>("side3"){}; - TupleTag<String> sideOutputTagUnwritten = new TupleTag<String>("sideUnwritten"){}; + TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){}; + TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){}; + TupleTag<String> additionalOutputTag3 = new TupleTag<String>("additional3"){}; + TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){}; PCollectionTuple outputs = pipeline .apply(Create.of(inputs)) .apply(ParDo .of(new TestDoFn( Arrays.<PCollectionView<Integer>>asList(), - Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3))) + Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3))) .withOutputTags( mainOutputTag, - TupleTagList.of(sideOutputTag3) - .and(sideOutputTag1) - .and(sideOutputTagUnwritten) - .and(sideOutputTag2))); + TupleTagList.of(additionalOutputTag3) + .and(additionalOutputTag1) + .and(additionalOutputTagUnwritten) + .and(additionalOutputTag2))); PAssert.that(outputs.get(mainOutputTag)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); - PAssert.that(outputs.get(sideOutputTag1)) + PAssert.that(outputs.get(additionalOutputTag1)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromSideOutput(sideOutputTag1)); - PAssert.that(outputs.get(sideOutputTag2)) + .fromOutput(additionalOutputTag1)); + PAssert.that(outputs.get(additionalOutputTag2)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromSideOutput(sideOutputTag2)); - PAssert.that(outputs.get(sideOutputTag3)) + .fromOutput(additionalOutputTag2)); + PAssert.that(outputs.get(additionalOutputTag3)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromSideOutput(sideOutputTag3)); - PAssert.that(outputs.get(sideOutputTagUnwritten)).empty(); + .fromOutput(additionalOutputTag3)); + PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty(); pipeline.run(); } @Test @Category(ValidatesRunner.class) - public void testParDoEmptyWithSideOutputs() { + public void testParDoEmptyWithTaggedOutput() { TupleTag<String> mainOutputTag = new TupleTag<String>("main"){}; - TupleTag<String> sideOutputTag1 = new TupleTag<String>("side1"){}; - TupleTag<String> sideOutputTag2 = new TupleTag<String>("side2"){}; - TupleTag<String> sideOutputTag3 = new TupleTag<String>("side3"){}; - TupleTag<String> sideOutputTagUnwritten = new TupleTag<String>("sideUnwritten"){}; + TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){}; + TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){}; + TupleTag<String> additionalOutputTag3 = new TupleTag<String>("additional3"){}; + TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){}; PCollectionTuple outputs = pipeline .apply(Create.empty(VarIntCoder.of())) .apply(ParDo .of(new TestDoFn( Arrays.<PCollectionView<Integer>>asList(), - Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3))) + Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3))) .withOutputTags( mainOutputTag, - TupleTagList.of(sideOutputTag3).and(sideOutputTag1) - .and(sideOutputTagUnwritten).and(sideOutputTag2))); + TupleTagList.of(additionalOutputTag3).and(additionalOutputTag1) + .and(additionalOutputTagUnwritten).and(additionalOutputTag2))); List<Integer> inputs = Collections.emptyList(); PAssert.that(outputs.get(mainOutputTag)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); - PAssert.that(outputs.get(sideOutputTag1)) + PAssert.that(outputs.get(additionalOutputTag1)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromSideOutput(sideOutputTag1)); - PAssert.that(outputs.get(sideOutputTag2)) + .fromOutput(additionalOutputTag1)); + PAssert.that(outputs.get(additionalOutputTag2)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromSideOutput(sideOutputTag2)); - PAssert.that(outputs.get(sideOutputTag3)) + .fromOutput(additionalOutputTag2)); + PAssert.that(outputs.get(additionalOutputTag3)) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs) - .fromSideOutput(sideOutputTag3)); - PAssert.that(outputs.get(sideOutputTagUnwritten)).empty(); + .fromOutput(additionalOutputTag3)); + PAssert.that(outputs.get(additionalOutputTagUnwritten)).empty(); pipeline.run(); } @Test @Category(ValidatesRunner.class) - public void testParDoWithEmptySideOutputs() { + public void testParDoWithEmptyTaggedOutput() { TupleTag<String> mainOutputTag = new TupleTag<String>("main"){}; - TupleTag<String> sideOutputTag1 = new TupleTag<String>("side1"){}; - TupleTag<String> sideOutputTag2 = new TupleTag<String>("side2"){}; + TupleTag<String> additionalOutputTag1 = new TupleTag<String>("additional1"){}; + TupleTag<String> additionalOutputTag2 = new TupleTag<String>("additional2"){}; PCollectionTuple outputs = pipeline .apply(Create.empty(VarIntCoder.of())) @@ -480,12 +480,12 @@ public class ParDoTest implements Serializable { .of(new TestNoOutputDoFn()) .withOutputTags( mainOutputTag, - TupleTagList.of(sideOutputTag1).and(sideOutputTag2))); + TupleTagList.of(additionalOutputTag1).and(additionalOutputTag2))); PAssert.that(outputs.get(mainOutputTag)).empty(); - PAssert.that(outputs.get(sideOutputTag1)).empty(); - PAssert.that(outputs.get(sideOutputTag2)).empty(); + PAssert.that(outputs.get(additionalOutputTag1)).empty(); + PAssert.that(outputs.get(additionalOutputTag2)).empty(); pipeline.run(); } @@ -493,12 +493,12 @@ public class ParDoTest implements Serializable { @Test @Category(ValidatesRunner.class) - public void testParDoWithOnlySideOutputs() { + public void testParDoWithOnlyTaggedOutput() { List<Integer> inputs = Arrays.asList(3, -42, 666); final TupleTag<Void> mainOutputTag = new TupleTag<Void>("main"){}; - final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>("side"){}; + final TupleTag<Integer> additionalOutputTag = new TupleTag<Integer>("additional"){}; PCollectionTuple outputs = pipeline .apply(Create.of(inputs)) @@ -506,29 +506,29 @@ public class ParDoTest implements Serializable { .of(new DoFn<Integer, Void>(){ @ProcessElement public void processElement(ProcessContext c) { - c.sideOutput(sideOutputTag, c.element()); + c.output(additionalOutputTag, c.element()); }}) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); PAssert.that(outputs.get(mainOutputTag)).empty(); - PAssert.that(outputs.get(sideOutputTag)).containsInAnyOrder(inputs); + PAssert.that(outputs.get(additionalOutputTag)).containsInAnyOrder(inputs); pipeline.run(); } @Test @Category(NeedsRunner.class) - public void testParDoWritingToUndeclaredSideOutput() { + public void testParDoWritingToUndeclaredTag() { List<Integer> inputs = Arrays.asList(3, -42, 666); - TupleTag<String> sideTag = new TupleTag<String>("side"){}; + TupleTag<String> notOutputTag = new TupleTag<String>("additional"){}; PCollection<String> output = pipeline .apply(Create.of(inputs)) .apply(ParDo.of(new TestDoFn( Arrays.<PCollectionView<Integer>>asList(), - Arrays.asList(sideTag)))); + Arrays.asList(notOutputTag)))); PAssert.that(output) .satisfies(ParDoTest.HasExpectedOutput.forInput(inputs)); @@ -539,7 +539,7 @@ public class ParDoTest implements Serializable { @Test // TODO: The exception thrown is runner-specific, even if the behavior is general @Category(NeedsRunner.class) - public void testParDoUndeclaredSideOutputLimit() { + public void testParDoUndeclaredTagLimit() { PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(3))); @@ -548,13 +548,13 @@ public class ParDoTest implements Serializable { .apply("Success1000", ParDo.of(new DoFn<Integer, String>() { @ProcessElement public void processElement(ProcessContext c) { - TupleTag<String> specialSideTag = new TupleTag<String>(){}; - c.sideOutput(specialSideTag, "side"); - c.sideOutput(specialSideTag, "side"); - c.sideOutput(specialSideTag, "side"); + TupleTag<String> specialOutputTag = new TupleTag<String>(){}; + c.output(specialOutputTag, "special"); + c.output(specialOutputTag, "special"); + c.output(specialOutputTag, "special"); for (int i = 0; i < 998; i++) { - c.sideOutput(new TupleTag<String>(){}, "side"); + c.output(new TupleTag<String>(){}, "tag" + i); } }})); pipeline.run(); @@ -565,12 +565,12 @@ public class ParDoTest implements Serializable { @ProcessElement public void processElement(ProcessContext c) { for (int i = 0; i < 1000; i++) { - c.sideOutput(new TupleTag<String>(){}, "side"); + c.output(new TupleTag<String>(){}, "output" + i); } }})); thrown.expect(RuntimeException.class); - thrown.expectMessage("the number of side outputs has exceeded a limit"); + thrown.expectMessage("the number of outputs has exceeded a limit"); pipeline.run(); } @@ -647,7 +647,7 @@ public class ParDoTest implements Serializable { List<Integer> inputs = Arrays.asList(3, -42, 666); final TupleTag<String> mainOutputTag = new TupleTag<String>("main"){}; - final TupleTag<Void> sideOutputTag = new TupleTag<Void>("sideOutput"){}; + final TupleTag<Void> additionalOutputTag = new TupleTag<Void>("output"){}; PCollectionView<Integer> sideInput1 = pipeline .apply("CreateSideInput1", Create.of(11)) @@ -668,7 +668,7 @@ public class ParDoTest implements Serializable { .withSideInputs(sideInput1) .withSideInputs(sideInputUnread) .withSideInputs(sideInput2) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); PAssert.that(outputs.get(mainOutputTag)) .satisfies(ParDoTest.HasExpectedOutput @@ -685,7 +685,7 @@ public class ParDoTest implements Serializable { List<Integer> inputs = Arrays.asList(3, -42, 666); final TupleTag<String> mainOutputTag = new TupleTag<String>("main"){}; - final TupleTag<Void> sideOutputTag = new TupleTag<Void>("sideOutput"){}; + final TupleTag<Void> additionalOutputTag = new TupleTag<Void>("output"){}; PCollectionView<Integer> sideInput1 = pipeline .apply("CreateSideInput1", Create.of(11)) @@ -706,7 +706,7 @@ public class ParDoTest implements Serializable { .withSideInputs(sideInput1) .withSideInputs(sideInputUnread) .withSideInputs(sideInput2) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); PAssert.that(outputs.get(mainOutputTag)) .satisfies(ParDoTest.HasExpectedOutput @@ -853,37 +853,37 @@ public class ParDoTest implements Serializable { @Test public void testParDoMultiNameBasedDoFnWithTrimmerSuffix() { assertThat( - ParDo.of(new SideOutputDummyFn(null)).withOutputTags(null, null).getName(), - containsString("ParMultiDo(SideOutputDummy)")); + ParDo.of(new TaggedOutputDummyFn(null)).withOutputTags(null, null).getName(), + containsString("ParMultiDo(TaggedOutputDummy)")); } @Test - public void testParDoWithSideOutputsName() { + public void testParDoWithTaggedOutputName() { pipeline.enableAbandonedNodeEnforcement(false); TupleTag<String> mainOutputTag = new TupleTag<String>("main"){}; - TupleTag<String> sideOutputTag1 = new TupleTag<String>("side1"){}; - TupleTag<String> sideOutputTag2 = new TupleTag<String>("side2"){}; - TupleTag<String> sideOutputTag3 = new TupleTag<String>("side3"){}; - TupleTag<String> sideOutputTagUnwritten = new TupleTag<String>("sideUnwritten"){}; + TupleTag<String> additionalOutputTag1 = new TupleTag<String>("output1"){}; + TupleTag<String> additionalOutputTag2 = new TupleTag<String>("output2"){}; + TupleTag<String> additionalOutputTag3 = new TupleTag<String>("output3"){}; + TupleTag<String> additionalOutputTagUnwritten = new TupleTag<String>("unwrittenOutput"){}; PCollectionTuple outputs = pipeline .apply(Create.of(Arrays.asList(3, -42, 666))).setName("MyInput") .apply("MyParDo", ParDo .of(new TestDoFn( Arrays.<PCollectionView<Integer>>asList(), - Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3))) + Arrays.asList(additionalOutputTag1, additionalOutputTag2, additionalOutputTag3))) .withOutputTags( mainOutputTag, - TupleTagList.of(sideOutputTag3).and(sideOutputTag1) - .and(sideOutputTagUnwritten).and(sideOutputTag2))); + TupleTagList.of(additionalOutputTag3).and(additionalOutputTag1) + .and(additionalOutputTagUnwritten).and(additionalOutputTag2))); assertEquals("MyParDo.main", outputs.get(mainOutputTag).getName()); - assertEquals("MyParDo.side1", outputs.get(sideOutputTag1).getName()); - assertEquals("MyParDo.side2", outputs.get(sideOutputTag2).getName()); - assertEquals("MyParDo.side3", outputs.get(sideOutputTag3).getName()); - assertEquals("MyParDo.sideUnwritten", - outputs.get(sideOutputTagUnwritten).getName()); + assertEquals("MyParDo.output1", outputs.get(additionalOutputTag1).getName()); + assertEquals("MyParDo.output2", outputs.get(additionalOutputTag2).getName()); + assertEquals("MyParDo.output3", outputs.get(additionalOutputTag3).getName()); + assertEquals("MyParDo.unwrittenOutput", + outputs.get(additionalOutputTagUnwritten).getName()); } @Test @@ -892,29 +892,29 @@ public class ParDoTest implements Serializable { PCollection<Long> longs = pipeline.apply(CountingInput.unbounded()); TupleTag<Long> mainOut = new TupleTag<>(); - final TupleTag<String> sideOutOne = new TupleTag<>(); - final TupleTag<Integer> sideOutTwo = new TupleTag<>(); + final TupleTag<String> valueAsString = new TupleTag<>(); + final TupleTag<Integer> valueAsInt = new TupleTag<>(); DoFn<Long, Long> fn = new DoFn<Long, Long>() { @ProcessElement public void processElement(ProcessContext cxt) { cxt.output(cxt.element()); - cxt.sideOutput(sideOutOne, Long.toString(cxt.element())); - cxt.sideOutput(sideOutTwo, Long.valueOf(cxt.element()).intValue()); + cxt.output(valueAsString, Long.toString(cxt.element())); + cxt.output(valueAsInt, Long.valueOf(cxt.element()).intValue()); } }; ParDo.MultiOutput<Long, Long> parDo = - ParDo.of(fn).withOutputTags(mainOut, TupleTagList.of(sideOutOne).and(sideOutTwo)); + ParDo.of(fn).withOutputTags(mainOut, TupleTagList.of(valueAsString).and(valueAsInt)); PCollectionTuple firstApplication = longs.apply("first", parDo); PCollectionTuple secondApplication = longs.apply("second", parDo); assertThat(firstApplication, not(equalTo(secondApplication))); assertThat( firstApplication.getAll().keySet(), - Matchers.<TupleTag<?>>containsInAnyOrder(mainOut, sideOutOne, sideOutTwo)); + Matchers.<TupleTag<?>>containsInAnyOrder(mainOut, valueAsString, valueAsInt)); assertThat( secondApplication.getAll().keySet(), - Matchers.<TupleTag<?>>containsInAnyOrder(mainOut, sideOutOne, sideOutTwo)); + Matchers.<TupleTag<?>>containsInAnyOrder(mainOut, valueAsString, valueAsInt)); } @Test @@ -1017,28 +1017,28 @@ public class ParDoTest implements Serializable { } } - private static class SideOutputDummyFn extends DoFn<Integer, Integer> { - private TupleTag<TestDummy> sideTag; - public SideOutputDummyFn(TupleTag<TestDummy> sideTag) { - this.sideTag = sideTag; + private static class TaggedOutputDummyFn extends DoFn<Integer, Integer> { + private TupleTag<TestDummy> dummyOutputTag; + public TaggedOutputDummyFn(TupleTag<TestDummy> dummyOutputTag) { + this.dummyOutputTag = dummyOutputTag; } @ProcessElement public void processElement(ProcessContext c) { c.output(1); - c.sideOutput(sideTag, new TestDummy()); + c.output(dummyOutputTag, new TestDummy()); } } private static class MainOutputDummyFn extends DoFn<Integer, TestDummy> { - private TupleTag<Integer> sideTag; - public MainOutputDummyFn(TupleTag<Integer> sideTag) { - this.sideTag = sideTag; + private TupleTag<Integer> intOutputTag; + public MainOutputDummyFn(TupleTag<Integer> intOutputTag) { + this.intOutputTag = intOutputTag; } @ProcessElement public void processElement(ProcessContext c) { c.output(new TestDummy()); - c.sideOutput(sideTag, 1); + c.output(intOutputTag, 1); } } @@ -1112,7 +1112,7 @@ public class ParDoTest implements Serializable { implements SerializableFunction<Iterable<String>, Void>, Serializable { private final List<Integer> inputs; private final List<Integer> sideInputs; - private final String sideOutput; + private final String additionalOutput; private final boolean ordered; public static HasExpectedOutput forInput(List<Integer> inputs) { @@ -1125,11 +1125,11 @@ public class ParDoTest implements Serializable { private HasExpectedOutput(List<Integer> inputs, List<Integer> sideInputs, - String sideOutput, + String additionalOutput, boolean ordered) { this.inputs = inputs; this.sideInputs = sideInputs; - this.sideOutput = sideOutput; + this.additionalOutput = additionalOutput; this.ordered = ordered; } @@ -1138,18 +1138,18 @@ public class ParDoTest implements Serializable { for (Integer sideInputValue : sideInputValues) { sideInputs.add(sideInputValue); } - return new HasExpectedOutput(inputs, sideInputs, sideOutput, ordered); + return new HasExpectedOutput(inputs, sideInputs, additionalOutput, ordered); } - public HasExpectedOutput fromSideOutput(TupleTag<String> sideOutputTag) { - return fromSideOutput(sideOutputTag.getId()); + public HasExpectedOutput fromOutput(TupleTag<String> outputTag) { + return fromOutput(outputTag.getId()); } - public HasExpectedOutput fromSideOutput(String sideOutput) { - return new HasExpectedOutput(inputs, sideInputs, sideOutput, ordered); + public HasExpectedOutput fromOutput(String outputId) { + return new HasExpectedOutput(inputs, sideInputs, outputId, ordered); } public HasExpectedOutput inOrder() { - return new HasExpectedOutput(inputs, sideInputs, sideOutput, true); + return new HasExpectedOutput(inputs, sideInputs, additionalOutput, true); } @Override @@ -1174,17 +1174,17 @@ public class ParDoTest implements Serializable { sideInputsSuffix = ": " + sideInputs; } - String sideOutputPrefix; - if (sideOutput.isEmpty()) { - sideOutputPrefix = ""; + String additionalOutputPrefix; + if (additionalOutput.isEmpty()) { + additionalOutputPrefix = ""; } else { - sideOutputPrefix = sideOutput + ": "; + additionalOutputPrefix = additionalOutput + ": "; } List<String> expectedProcesseds = new ArrayList<>(); for (Integer input : inputs) { expectedProcesseds.add( - sideOutputPrefix + "processing: " + input + sideInputsSuffix); + additionalOutputPrefix + "processing: " + input + sideInputsSuffix); } String[] expectedProcessedsArray = expectedProcesseds.toArray(new String[expectedProcesseds.size()]); @@ -1196,10 +1196,10 @@ public class ParDoTest implements Serializable { assertEquals(starteds.size(), finisheds.size()); for (String started : starteds) { - assertEquals(sideOutputPrefix + "started", started); + assertEquals(additionalOutputPrefix + "started", started); } for (String finished : finisheds) { - assertEquals(sideOutputPrefix + "finished", finished); + assertEquals(additionalOutputPrefix + "finished", finished); } return null; @@ -1208,15 +1208,15 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) - public void testSideOutputUnknownCoder() throws Exception { + public void testTaggedOutputUnknownCoder() throws Exception { PCollection<Integer> input = pipeline .apply(Create.of(Arrays.asList(1, 2, 3))); final TupleTag<Integer> mainOutputTag = new TupleTag<Integer>("main"); - final TupleTag<TestDummy> sideOutputTag = new TupleTag<TestDummy>("unknownSide"); - input.apply(ParDo.of(new SideOutputDummyFn(sideOutputTag)) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + final TupleTag<TestDummy> additionalOutputTag = new TupleTag<TestDummy>("unknownSide"); + input.apply(ParDo.of(new TaggedOutputDummyFn(additionalOutputTag)) + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); thrown.expect(IllegalStateException.class); thrown.expectMessage("Unable to return a default Coder"); @@ -1224,26 +1224,27 @@ public class ParDoTest implements Serializable { } @Test - public void testSideOutputUnregisteredExplicitCoder() throws Exception { + public void testTaggedOutputUnregisteredExplicitCoder() throws Exception { pipeline.enableAbandonedNodeEnforcement(false); PCollection<Integer> input = pipeline .apply(Create.of(Arrays.asList(1, 2, 3))); final TupleTag<Integer> mainOutputTag = new TupleTag<Integer>("main"); - final TupleTag<TestDummy> sideOutputTag = new TupleTag<TestDummy>("unregisteredSide"); - ParDo.MultiOutput<Integer, Integer> pardo = ParDo.of(new SideOutputDummyFn(sideOutputTag)) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)); + final TupleTag<TestDummy> additionalOutputTag = new TupleTag<TestDummy>("unregisteredSide"); + ParDo.MultiOutput<Integer, Integer> pardo = + ParDo.of(new TaggedOutputDummyFn(additionalOutputTag)) + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)); PCollectionTuple outputTuple = input.apply(pardo); - outputTuple.get(sideOutputTag).setCoder(new TestDummyCoder()); + outputTuple.get(additionalOutputTag).setCoder(new TestDummyCoder()); - outputTuple.get(sideOutputTag).apply(View.<TestDummy>asSingleton()); + outputTuple.get(additionalOutputTag).apply(View.<TestDummy>asSingleton()); - assertEquals(new TestDummyCoder(), outputTuple.get(sideOutputTag).getCoder()); - outputTuple.get(sideOutputTag).finishSpecifyingOutput(input, pardo); // Check for crashes + assertEquals(new TestDummyCoder(), outputTuple.get(additionalOutputTag).getCoder()); + outputTuple.get(additionalOutputTag).finishSpecifyingOutput(input, pardo); // Check for crashes assertEquals(new TestDummyCoder(), - outputTuple.get(sideOutputTag).getCoder()); // Check for corruption + outputTuple.get(additionalOutputTag).getCoder()); // Check for corruption } @Test @@ -1254,9 +1255,11 @@ public class ParDoTest implements Serializable { .apply(Create.of(Arrays.asList(1, 2, 3))); final TupleTag<TestDummy> mainOutputTag = new TupleTag<TestDummy>("unregisteredMain"); - final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>("side") {}; - PCollectionTuple outputTuple = input.apply(ParDo.of(new MainOutputDummyFn(sideOutputTag)) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + final TupleTag<Integer> additionalOutputTag = new TupleTag<Integer>("additionalOutput") {}; + PCollectionTuple outputTuple = + input.apply( + ParDo.of(new MainOutputDummyFn(additionalOutputTag)) + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); outputTuple.get(mainOutputTag).setCoder(new TestDummyCoder()); @@ -1265,13 +1268,13 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) - public void testMainOutputApplySideOutputNoCoder() { + public void testMainOutputApplyTaggedOutputNoCoder() { // Regression test: applying a transform to the main output // should not cause a crash based on lack of a coder for the - // side output. + // additional output. final TupleTag<TestDummy> mainOutputTag = new TupleTag<TestDummy>("main"); - final TupleTag<TestDummy> sideOutputTag = new TupleTag<TestDummy>("side"); + final TupleTag<TestDummy> additionalOutputTag = new TupleTag<TestDummy>("additionalOutput"); PCollectionTuple tuple = pipeline .apply(Create.of(new TestDummy()) .withCoder(TestDummyCoder.of())) @@ -1282,14 +1285,14 @@ public class ParDoTest implements Serializable { public void processElement(ProcessContext context) { TestDummy element = context.element(); context.output(element); - context.sideOutput(sideOutputTag, element); + context.output(additionalOutputTag, element); } }) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag)) ); // Before fix, tuple.get(mainOutputTag).apply(...) would indirectly trigger - // tuple.get(sideOutputTag).finishSpecifyingOutput(), which would crash + // tuple.get(additionalOutputTag).finishSpecifyingOutput(), which would crash // on a missing coder. tuple.get(mainOutputTag) .setCoder(TestDummyCoder.of()) @@ -1300,7 +1303,7 @@ public class ParDoTest implements Serializable { } })); - tuple.get(sideOutputTag).setCoder(TestDummyCoder.of()); + tuple.get(additionalOutputTag).setCoder(TestDummyCoder.of()); pipeline.run(); } @@ -1328,13 +1331,13 @@ public class ParDoTest implements Serializable { @Test @Category(NeedsRunner.class) - public void testParDoSideOutputWithTimestamp() { + public void testParDoTaggedOutputWithTimestamp() { PCollection<Integer> input = pipeline.apply(Create.of(Arrays.asList(3, 42, 6))); final TupleTag<Integer> mainOutputTag = new TupleTag<Integer>("main"){}; - final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>("side"){}; + final TupleTag<Integer> additionalOutputTag = new TupleTag<Integer>("additional"){}; PCollection<String> output = input @@ -1342,11 +1345,11 @@ public class ParDoTest implements Serializable { new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { - c.sideOutputWithTimestamp( - sideOutputTag, c.element(), new Instant(c.element().longValue())); + c.outputWithTimestamp( + additionalOutputTag, c.element(), new Instant(c.element().longValue())); } - }).withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))) - .get(sideOutputTag) + }).withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))) + .get(additionalOutputTag) .apply(ParDo.of(new TestShiftTimestampDoFn<Integer>(Duration.ZERO, Duration.ZERO))) .apply(ParDo.of(new TestFormatTimestampDoFn<Integer>())); @@ -1914,7 +1917,7 @@ public class ParDoTest implements Serializable { @Test @Category({ValidatesRunner.class, UsesStatefulParDo.class}) - public void testValueStateSideOutput() { + public void testValueStateTaggedOutput() { final String stateId = "foo"; final TupleTag<Integer> evenTag = new TupleTag<Integer>() {}; @@ -1934,7 +1937,7 @@ public class ParDoTest implements Serializable { if (currentValue % 2 == 0) { c.output(currentValue); } else { - c.sideOutput(oddTag, currentValue); + c.output(oddTag, currentValue); } state.write(currentValue + 1); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index a122f67..9e8c12e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -218,12 +218,12 @@ public class SplittableDoFnTest { private static class SDFWithSideInputsAndOutputs extends DoFn<Integer, String> { private final PCollectionView<String> sideInput; - private final TupleTag<String> sideOutput; + private final TupleTag<String> additionalOutput; private SDFWithSideInputsAndOutputs( - PCollectionView<String> sideInput, TupleTag<String> sideOutput) { + PCollectionView<String> sideInput, TupleTag<String> additionalOutput) { this.sideInput = sideInput; - this.sideOutput = sideOutput; + this.additionalOutput = additionalOutput; } @ProcessElement @@ -231,7 +231,7 @@ public class SplittableDoFnTest { checkState(tracker.tryClaim(tracker.currentRestriction().getFrom())); String side = c.sideInput(sideInput); c.output("main:" + side + ":" + c.element()); - c.sideOutput(sideOutput, "side:" + side + ":" + c.element()); + c.output(additionalOutput, "additional:" + side + ":" + c.element()); } @GetInitialRestriction @@ -247,21 +247,22 @@ public class SplittableDoFnTest { PCollectionView<String> sideInput = p.apply("side input", Create.of("foo")).apply(View.<String>asSingleton()); TupleTag<String> mainOutputTag = new TupleTag<>("main"); - TupleTag<String> sideOutputTag = new TupleTag<>("side"); + TupleTag<String> additionalOutputTag = new TupleTag<>("additional"); PCollectionTuple res = p.apply("input", Create.of(0, 1, 2)) .apply( - ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, sideOutputTag)) + ParDo.of(new SDFWithSideInputsAndOutputs(sideInput, additionalOutputTag)) .withSideInputs(sideInput) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); res.get(mainOutputTag).setCoder(StringUtf8Coder.of()); - res.get(sideOutputTag).setCoder(StringUtf8Coder.of()); + res.get(additionalOutputTag).setCoder(StringUtf8Coder.of()); PAssert.that(res.get(mainOutputTag)) .containsInAnyOrder(Arrays.asList("main:foo:0", "main:foo:1", "main:foo:2")); - PAssert.that(res.get(sideOutputTag)) - .containsInAnyOrder(Arrays.asList("side:foo:0", "side:foo:1", "side:foo:2")); + PAssert.that(res.get(additionalOutputTag)) + .containsInAnyOrder( + Arrays.asList("additional:foo:0", "additional:foo:1", "additional:foo:2")); p.run(); } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java index 0a0abd6..9df0512 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java @@ -82,7 +82,7 @@ public final class PCollectionTupleTest implements Serializable { TupleTag<Integer> mainOutputTag = new TupleTag<Integer>("main") {}; TupleTag<Integer> emptyOutputTag = new TupleTag<Integer>("empty") {}; - final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>("side") {}; + final TupleTag<Integer> additionalOutputTag = new TupleTag<Integer>("extra") {}; PCollection<Integer> mainInput = pipeline .apply(Create.of(inputs)); @@ -91,14 +91,14 @@ public final class PCollectionTupleTest implements Serializable { .of(new DoFn<Integer, Integer>() { @ProcessElement public void processElement(ProcessContext c) { - c.sideOutput(sideOutputTag, c.element()); + c.output(additionalOutputTag, c.element()); }}) - .withOutputTags(emptyOutputTag, TupleTagList.of(sideOutputTag))); + .withOutputTags(emptyOutputTag, TupleTagList.of(additionalOutputTag))); assertNotNull("outputs.getPipeline()", outputs.getPipeline()); outputs = outputs.and(mainOutputTag, mainInput); PAssert.that(outputs.get(mainOutputTag)).containsInAnyOrder(inputs); - PAssert.that(outputs.get(sideOutputTag)).containsInAnyOrder(inputs); + PAssert.that(outputs.get(additionalOutputTag)).containsInAnyOrder(inputs); PAssert.that(outputs.get(emptyOutputTag)).empty(); pipeline.run();
