http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java index 211dfd9..f752b1c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java @@ -55,12 +55,12 @@ public class TypedPValueTest { } private PCollectionTuple buildPCollectionTupleWithTags( - TupleTag<Integer> mainOutputTag, TupleTag<Integer> sideOutputTag) { + TupleTag<Integer> mainOutputTag, TupleTag<Integer> additionalOutputTag) { PCollection<Integer> input = p.apply(Create.of(1, 2, 3)); PCollectionTuple tuple = input.apply( ParDo .of(new IdentityDoFn()) - .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))); + .withOutputTags(mainOutputTag, TupleTagList.of(additionalOutputTag))); return tuple; } @@ -69,11 +69,11 @@ public class TypedPValueTest { } @Test - public void testUntypedSideOutputTupleTagGivesActionableMessage() { + public void testUntypedOutputTupleTagGivesActionableMessage() { TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {}; - // untypedSideOutputTag did not use anonymous subclass. - TupleTag<Integer> untypedSideOutputTag = new TupleTag<Integer>(); - PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, untypedSideOutputTag); + // untypedOutputTag did not use anonymous subclass. + TupleTag<Integer> untypedOutputTag = new TupleTag<Integer>(); + PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, untypedOutputTag); thrown.expect(IllegalStateException.class); thrown.expectMessage("No Coder has been manually specified"); @@ -84,15 +84,15 @@ public class TypedPValueTest { thrown.expectMessage( containsString("Building a Coder from the fallback CoderProvider failed")); - tuple.get(untypedSideOutputTag).getCoder(); + tuple.get(untypedOutputTag).getCoder(); } @Test - public void testStaticFactorySideOutputTupleTagGivesActionableMessage() { + public void testStaticFactoryOutputTupleTagGivesActionableMessage() { TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {}; - // untypedSideOutputTag constructed from a static factory method. - TupleTag<Integer> untypedSideOutputTag = makeTagStatically(); - PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, untypedSideOutputTag); + // untypedOutputTag constructed from a static factory method. + TupleTag<Integer> untypedOutputTag = makeTagStatically(); + PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, untypedOutputTag); thrown.expect(IllegalStateException.class); thrown.expectMessage("No Coder has been manually specified"); @@ -103,27 +103,27 @@ public class TypedPValueTest { thrown.expectMessage( containsString("Building a Coder from the fallback CoderProvider failed")); - tuple.get(untypedSideOutputTag).getCoder(); + tuple.get(untypedOutputTag).getCoder(); } @Test - public void testTypedSideOutputTupleTag() { + public void testTypedOutputTupleTag() { TupleTag<Integer> mainOutputTag = new TupleTag<Integer>() {}; - // typedSideOutputTag was constructed with compile-time type information. - TupleTag<Integer> typedSideOutputTag = new TupleTag<Integer>() {}; - PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, typedSideOutputTag); + // typedOutputTag was constructed with compile-time type information. + TupleTag<Integer> typedOutputTag = new TupleTag<Integer>() {}; + PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, typedOutputTag); - assertThat(tuple.get(typedSideOutputTag).getCoder(), instanceOf(VarIntCoder.class)); + assertThat(tuple.get(typedOutputTag).getCoder(), instanceOf(VarIntCoder.class)); } @Test - public void testUntypedMainOutputTagTypedSideOutputTupleTag() { + public void testUntypedMainOutputTagTypedOutputTupleTag() { // mainOutputTag is allowed to be untyped because Coder can be inferred other ways. TupleTag<Integer> mainOutputTag = new TupleTag<>(); - TupleTag<Integer> typedSideOutputTag = new TupleTag<Integer>() {}; - PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, typedSideOutputTag); + TupleTag<Integer> typedOutputTag = new TupleTag<Integer>() {}; + PCollectionTuple tuple = buildPCollectionTupleWithTags(mainOutputTag, typedOutputTag); - assertThat(tuple.get(typedSideOutputTag).getCoder(), instanceOf(VarIntCoder.class)); + assertThat(tuple.get(typedOutputTag).getCoder(), instanceOf(VarIntCoder.class)); } // A simple class for which there should be no obvious Coder. @@ -139,13 +139,13 @@ public class TypedPValueTest { } @Test - public void testParDoWithNoSideOutputsErrorDoesNotMentionTupleTag() { + public void testParDoWithNoOutputsErrorDoesNotMentionTupleTag() { PCollection<EmptyClass> input = p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new EmptyClassDoFn())); thrown.expect(IllegalStateException.class); - // Output specific to ParDo TupleTag side outputs should not be present. + // Output specific to ParDo additional TupleTag outputs should not be present. thrown.expectMessage(not(containsString("erasure"))); thrown.expectMessage(not(containsString("see TupleTag Javadoc"))); // Instead, expect output suggesting other possible fixes.
http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java index 6403e96..9714d72 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java @@ -46,7 +46,7 @@ public class FakeStepContext implements StepContext { } @Override - public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) { + public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) { } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index de105d7..bd2fba9 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -300,7 +300,7 @@ public class ProcessBundleHandlerTest { private static class TestDoFn extends DoFn<String, String> { private static final TupleTag<String> mainOutput = new TupleTag<>("mainOutput"); - private static final TupleTag<String> sideOutput = new TupleTag<>("sideOutput"); + private static final TupleTag<String> additionalOutput = new TupleTag<>("output"); @StartBundle public void startBundle(Context context) { @@ -310,7 +310,7 @@ public class ProcessBundleHandlerTest { @ProcessElement public void processElement(ProcessContext context) { context.output("MainOutput" + context.element()); - context.sideOutput(sideOutput, "SideOutput" + context.element()); + context.output(additionalOutput, "AdditionalOutput" + context.element()); } @FinishBundle @@ -321,7 +321,7 @@ public class ProcessBundleHandlerTest { /** * Create a DoFn that has 3 inputs (inputATarget1, inputATarget2, inputBTarget) and 2 outputs - * (mainOutput, sideOutput). Validate that inputs are fed to the {@link DoFn} and that outputs + * (mainOutput, output). Validate that inputs are fed to the {@link DoFn} and that outputs * are directed to the correct consumers. */ @Test @@ -329,7 +329,7 @@ public class ProcessBundleHandlerTest { Map<String, Message> fnApiRegistry = ImmutableMap.of(STRING_CODER_SPEC_ID, STRING_CODER_SPEC); String primitiveTransformId = "100L"; long mainOutputId = 101L; - long sideOutputId = 102L; + long additionalOutputId = 102L; DoFnInfo<?, ?> doFnInfo = DoFnInfo.forFn( new TestDoFn(), @@ -339,7 +339,7 @@ public class ProcessBundleHandlerTest { mainOutputId, ImmutableMap.of( mainOutputId, TestDoFn.mainOutput, - sideOutputId, TestDoFn.sideOutput)); + additionalOutputId, TestDoFn.additionalOutput)); BeamFnApi.FunctionSpec functionSpec = BeamFnApi.FunctionSpec.newBuilder() .setId("1L") .setUrn(JAVA_DO_FN_URN) @@ -372,25 +372,25 @@ public class ProcessBundleHandlerTest { .putOutputs(Long.toString(mainOutputId), BeamFnApi.PCollection.newBuilder() .setCoderReference(STRING_CODER_SPEC_ID) .build()) - .putOutputs(Long.toString(sideOutputId), BeamFnApi.PCollection.newBuilder() + .putOutputs(Long.toString(additionalOutputId), BeamFnApi.PCollection.newBuilder() .setCoderReference(STRING_CODER_SPEC_ID) .build()) .build(); List<WindowedValue<String>> mainOutputValues = new ArrayList<>(); - List<WindowedValue<String>> sideOutputValues = new ArrayList<>(); + List<WindowedValue<String>> additionalOutputValues = new ArrayList<>(); BeamFnApi.Target mainOutputTarget = BeamFnApi.Target.newBuilder() .setPrimitiveTransformReference(primitiveTransformId) .setName(Long.toString(mainOutputId)) .build(); - BeamFnApi.Target sideOutputTarget = BeamFnApi.Target.newBuilder() + BeamFnApi.Target additionalOutputTarget = BeamFnApi.Target.newBuilder() .setPrimitiveTransformReference(primitiveTransformId) - .setName(Long.toString(sideOutputId)) + .setName(Long.toString(additionalOutputId)) .build(); Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> existingConsumers = ImmutableMultimap.of( mainOutputTarget, mainOutputValues::add, - sideOutputTarget, sideOutputValues::add); + additionalOutputTarget, additionalOutputValues::add); Multimap<BeamFnApi.Target, ThrowingConsumer<WindowedValue<String>>> newConsumers = HashMultimap.create(); List<ThrowingRunnable> startFunctions = new ArrayList<>(); @@ -422,12 +422,12 @@ public class ProcessBundleHandlerTest { valueInGlobalWindow("MainOutputA1"), valueInGlobalWindow("MainOutputA2"), valueInGlobalWindow("MainOutputB"))); - assertThat(sideOutputValues, contains( - valueInGlobalWindow("SideOutputA1"), - valueInGlobalWindow("SideOutputA2"), - valueInGlobalWindow("SideOutputB"))); + assertThat(additionalOutputValues, contains( + valueInGlobalWindow("AdditionalOutputA1"), + valueInGlobalWindow("AdditionalOutputA2"), + valueInGlobalWindow("AdditionalOutputB"))); mainOutputValues.clear(); - sideOutputValues.clear(); + additionalOutputValues.clear(); Iterables.getOnlyElement(finishFunctions).run(); assertThat(mainOutputValues, contains(valueInGlobalWindow("FinishBundle"))); http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java index f4bf198..1b6492e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java @@ -61,7 +61,7 @@ class WritePartition extends DoFn<String, KV<Long, List<String>>> { KV<String, Long> fileResult = results.get(i); if (currNumFiles + 1 > Write.MAX_NUM_FILES || currSizeBytes + fileResult.getValue() > Write.MAX_SIZE_BYTES) { - c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults)); + c.output(multiPartitionsTag, KV.of(++partitionId, currResults)); currResults = Lists.newArrayList(); currNumFiles = 0; currSizeBytes = 0; @@ -71,9 +71,9 @@ class WritePartition extends DoFn<String, KV<Long, List<String>>> { currResults.add(fileResult.getKey()); } if (partitionId == 0) { - c.sideOutput(singlePartitionTag, KV.of(++partitionId, currResults)); + c.output(singlePartitionTag, KV.of(++partitionId, currResults)); } else { - c.sideOutput(multiPartitionsTag, KV.of(++partitionId, currResults)); + c.output(multiPartitionsTag, KV.of(++partitionId, currResults)); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/113471d6/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 92ab204..2a2bf91 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -2107,9 +2107,9 @@ public class BigQueryIOTest implements Serializable { List<KV<Long, List<String>>> partitions; if (expectedNumPartitions > 1) { - partitions = tester.takeSideOutputElements(multiPartitionsTag); + partitions = tester.takeOutputElements(multiPartitionsTag); } else { - partitions = tester.takeSideOutputElements(singlePartitionTag); + partitions = tester.takeOutputElements(singlePartitionTag); } List<Long> partitionIds = Lists.newArrayList(); List<String> partitionFileNames = Lists.newArrayList();
