Rename PTransforms to PTransformTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9b6728e2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9b6728e2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9b6728e2 Branch: refs/heads/master Commit: 9b6728e24748791b7181b20183df3ada31f45682 Parents: 940819e Author: Kenneth Knowles <[email protected]> Authored: Tue May 23 15:28:08 2017 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Tue May 23 15:53:41 2017 -0700 ---------------------------------------------------------------------- .../construction/PTransformTranslation.java | 119 ++++++++++++ .../runners/core/construction/PTransforms.java | 119 ------------ .../beam/runners/core/construction/ParDos.java | 4 +- .../core/construction/SdkComponents.java | 3 +- .../TransformPayloadTranslatorRegistrar.java | 2 +- .../core/construction/WindowIntoTranslator.java | 2 +- .../construction/PTransformTranslationTest.java | 189 +++++++++++++++++++ .../core/construction/PTransformsTest.java | 188 ------------------ 8 files changed, 314 insertions(+), 312 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java new file mode 100644 index 0000000..86638de --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.ImmutableMap; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.ServiceLoader; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Utilities for converting {@link PTransform PTransforms} to and from {@link RunnerApi Runner API + * protocol buffers}. + */ +public class PTransformTranslation { + private static final Map<Class<? extends PTransform>, TransformPayloadTranslator> + KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators(); + + private static Map<Class<? extends PTransform>, TransformPayloadTranslator> + loadTransformPayloadTranslators() { + ImmutableMap.Builder<Class<? extends PTransform>, TransformPayloadTranslator> builder = + ImmutableMap.builder(); + for (TransformPayloadTranslatorRegistrar registrar : + ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) { + builder.putAll(registrar.getTransformPayloadTranslators()); + } + return builder.build(); + } + + private PTransformTranslation() {} + + /** + * Translates an {@link AppliedPTransform} into a runner API proto. + * + * <p>Does not register the {@code appliedPTransform} within the provided {@link SdkComponents}. + */ + static RunnerApi.PTransform toProto( + AppliedPTransform<?, ?, ?> appliedPTransform, + List<AppliedPTransform<?, ?, ?>> subtransforms, + SdkComponents components) + throws IOException { + RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder(); + for (Map.Entry<TupleTag<?>, PValue> taggedInput : appliedPTransform.getInputs().entrySet()) { + checkArgument( + taggedInput.getValue() instanceof PCollection, + "Unexpected input type %s", + taggedInput.getValue().getClass()); + transformBuilder.putInputs( + toProto(taggedInput.getKey()), + components.registerPCollection((PCollection<?>) taggedInput.getValue())); + } + for (Map.Entry<TupleTag<?>, PValue> taggedOutput : appliedPTransform.getOutputs().entrySet()) { + // TODO: Remove gating + if (taggedOutput.getValue() instanceof PCollection) { + checkArgument( + taggedOutput.getValue() instanceof PCollection, + "Unexpected output type %s", + taggedOutput.getValue().getClass()); + transformBuilder.putOutputs( + toProto(taggedOutput.getKey()), + components.registerPCollection((PCollection<?>) taggedOutput.getValue())); + } + } + for (AppliedPTransform<?, ?, ?> subtransform : subtransforms) { + transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform)); + } + + transformBuilder.setUniqueName(appliedPTransform.getFullName()); + // TODO: Display Data + + PTransform<?, ?> transform = appliedPTransform.getTransform(); + if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) { + FunctionSpec payload = + KNOWN_PAYLOAD_TRANSLATORS + .get(transform.getClass()) + .translate(appliedPTransform, components); + transformBuilder.setSpec(payload); + } + + return transformBuilder.build(); + } + + private static String toProto(TupleTag<?> tag) { + return tag.getId(); + } + + /** + * A translator consumes a {@link PTransform} application and produces the appropriate + * FunctionSpec for a distinguished or primitive transform within the Beam runner API. + */ + public interface TransformPayloadTranslator<T extends PTransform<?, ?>> { + FunctionSpec translate(AppliedPTransform<?, ?, T> transform, SdkComponents components); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java deleted file mode 100644 index 9826b77..0000000 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransforms.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.collect.ImmutableMap; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.ServiceLoader; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; - -/** - * Utilities for converting {@link PTransform PTransforms} to and from {@link RunnerApi Runner API - * protocol buffers}. - */ -public class PTransforms { - private static final Map<Class<? extends PTransform>, TransformPayloadTranslator> - KNOWN_PAYLOAD_TRANSLATORS = loadTransformPayloadTranslators(); - - private static Map<Class<? extends PTransform>, TransformPayloadTranslator> - loadTransformPayloadTranslators() { - ImmutableMap.Builder<Class<? extends PTransform>, TransformPayloadTranslator> builder = - ImmutableMap.builder(); - for (TransformPayloadTranslatorRegistrar registrar : - ServiceLoader.load(TransformPayloadTranslatorRegistrar.class)) { - builder.putAll(registrar.getTransformPayloadTranslators()); - } - return builder.build(); - } - - private PTransforms() {} - - /** - * Translates an {@link AppliedPTransform} into a runner API proto. - * - * <p>Does not register the {@code appliedPTransform} within the provided {@link SdkComponents}. - */ - static RunnerApi.PTransform toProto( - AppliedPTransform<?, ?, ?> appliedPTransform, - List<AppliedPTransform<?, ?, ?>> subtransforms, - SdkComponents components) - throws IOException { - RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder(); - for (Map.Entry<TupleTag<?>, PValue> taggedInput : appliedPTransform.getInputs().entrySet()) { - checkArgument( - taggedInput.getValue() instanceof PCollection, - "Unexpected input type %s", - taggedInput.getValue().getClass()); - transformBuilder.putInputs( - toProto(taggedInput.getKey()), - components.registerPCollection((PCollection<?>) taggedInput.getValue())); - } - for (Map.Entry<TupleTag<?>, PValue> taggedOutput : appliedPTransform.getOutputs().entrySet()) { - // TODO: Remove gating - if (taggedOutput.getValue() instanceof PCollection) { - checkArgument( - taggedOutput.getValue() instanceof PCollection, - "Unexpected output type %s", - taggedOutput.getValue().getClass()); - transformBuilder.putOutputs( - toProto(taggedOutput.getKey()), - components.registerPCollection((PCollection<?>) taggedOutput.getValue())); - } - } - for (AppliedPTransform<?, ?, ?> subtransform : subtransforms) { - transformBuilder.addSubtransforms(components.getExistingPTransformId(subtransform)); - } - - transformBuilder.setUniqueName(appliedPTransform.getFullName()); - // TODO: Display Data - - PTransform<?, ?> transform = appliedPTransform.getTransform(); - if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) { - FunctionSpec payload = - KNOWN_PAYLOAD_TRANSLATORS - .get(transform.getClass()) - .translate(appliedPTransform, components); - transformBuilder.setSpec(payload); - } - - return transformBuilder.build(); - } - - private static String toProto(TupleTag<?> tag) { - return tag.getId(); - } - - /** - * A translator consumes a {@link PTransform} application and produces the appropriate - * FunctionSpec for a distinguished or primitive transform within the Beam runner API. - */ - public interface TransformPayloadTranslator<T extends PTransform<?, ?>> { - FunctionSpec translate(AppliedPTransform<?, ?, T> transform, SdkComponents components); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java index 2ecc041..12f2969 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDos.java @@ -34,7 +34,7 @@ import java.io.Serializable; import java.util.Collections; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.common.runner.v1.RunnerApi; @@ -96,7 +96,7 @@ public class ParDos { * A {@link TransformPayloadTranslator} for {@link ParDo}. */ public static class ParDoPayloadTranslator - implements PTransforms.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> { + implements PTransformTranslation.TransformPayloadTranslator<ParDo.MultiOutput<?, ?>> { public static TransformPayloadTranslator create() { return new ParDoPayloadTranslator(); } http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java index 3d8d4cd..da22982 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java @@ -131,7 +131,8 @@ class SdkComponents { return name; } checkNotNull(children, "child nodes may not be null"); - componentsBuilder.putTransforms(name, PTransforms.toProto(appliedPTransform, children, this)); + componentsBuilder.putTransforms(name, PTransformTranslation + .toProto(appliedPTransform, children, this)); return name; } http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java index bc568a6..3b3ffa1 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformPayloadTranslatorRegistrar.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.core.construction; import java.util.Map; -import org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.transforms.PTransform; /** A registrar of TransformPayloadTranslator. */ http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java index ea4c996..7ed2a49 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslator.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.core.construction; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.beam.runners.core.construction.PTransforms.TransformPayloadTranslator; +import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.common.runner.v1.RunnerApi; import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec; import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec; http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformTranslationTest.java new file mode 100644 index 0000000..0e6ef97 --- /dev/null +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformTranslationTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.core.construction; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.common.runner.v1.RunnerApi; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; +import org.apache.beam.sdk.common.runner.v1.RunnerApi.PTransform; +import org.apache.beam.sdk.io.CountingSource; +import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.runners.AppliedPTransform; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** + * Tests for {@link PTransformTranslation}. + */ +@RunWith(Parameterized.class) +public class PTransformTranslationTest { + + @Parameters(name = "{index}: {0}") + public static Iterable<ToAndFromProtoSpec> data() { + // This pipeline exists for construction, not to run any test. + // TODO: Leaf node with understood payload - i.e. validate payloads + ToAndFromProtoSpec readLeaf = ToAndFromProtoSpec.leaf(read(TestPipeline.create())); + ToAndFromProtoSpec readMultipleInAndOut = + ToAndFromProtoSpec.leaf(multiMultiParDo(TestPipeline.create())); + TestPipeline compositeReadPipeline = TestPipeline.create(); + ToAndFromProtoSpec compositeRead = + ToAndFromProtoSpec.composite( + generateSequence(compositeReadPipeline), + ToAndFromProtoSpec.leaf(read(compositeReadPipeline))); + return ImmutableList.<ToAndFromProtoSpec>builder() + .add(readLeaf) + .add(readMultipleInAndOut) + .add(compositeRead) + // TODO: Composite with multiple children + // TODO: Composite with a composite child + .build(); + } + + @AutoValue + abstract static class ToAndFromProtoSpec { + public static ToAndFromProtoSpec leaf(AppliedPTransform<?, ?, ?> transform) { + return new AutoValue_PTransformTranslationTest_ToAndFromProtoSpec( + transform, Collections.<ToAndFromProtoSpec>emptyList()); + } + + public static ToAndFromProtoSpec composite( + AppliedPTransform<?, ?, ?> topLevel, ToAndFromProtoSpec spec, ToAndFromProtoSpec... specs) { + List<ToAndFromProtoSpec> childSpecs = new ArrayList<>(); + childSpecs.add(spec); + childSpecs.addAll(Arrays.asList(specs)); + return new AutoValue_PTransformTranslationTest_ToAndFromProtoSpec(topLevel, childSpecs); + } + + abstract AppliedPTransform<?, ?, ?> getTransform(); + abstract Collection<ToAndFromProtoSpec> getChildren(); + } + + @Parameter(0) + public ToAndFromProtoSpec spec; + + @Test + public void toAndFromProto() throws IOException { + SdkComponents components = SdkComponents.create(); + RunnerApi.PTransform converted = convert(spec, components); + Components protoComponents = components.toComponents(); + + // Sanity checks + assertThat(converted.getInputsCount(), equalTo(spec.getTransform().getInputs().size())); + assertThat(converted.getOutputsCount(), equalTo(spec.getTransform().getOutputs().size())); + assertThat(converted.getSubtransformsCount(), equalTo(spec.getChildren().size())); + + assertThat(converted.getUniqueName(), equalTo(spec.getTransform().getFullName())); + for (PValue inputValue : spec.getTransform().getInputs().values()) { + PCollection<?> inputPc = (PCollection<?>) inputValue; + protoComponents.getPcollectionsOrThrow(components.registerPCollection(inputPc)); + } + for (PValue outputValue : spec.getTransform().getOutputs().values()) { + PCollection<?> outputPc = (PCollection<?>) outputValue; + protoComponents.getPcollectionsOrThrow(components.registerPCollection(outputPc)); + } + } + + private RunnerApi.PTransform convert(ToAndFromProtoSpec spec, SdkComponents components) + throws IOException { + List<AppliedPTransform<?, ?, ?>> childTransforms = new ArrayList<>(); + for (ToAndFromProtoSpec child : spec.getChildren()) { + childTransforms.add(child.getTransform()); + System.out.println("Converting child " + child); + convert(child, components); + // Sanity call + components.getExistingPTransformId(child.getTransform()); + } + PTransform convert = PTransformTranslation + .toProto(spec.getTransform(), childTransforms, components); + // Make sure the converted transform is registered. Convert it independently, but if this is a + // child spec, the child must be in the components. + components.registerPTransform(spec.getTransform(), childTransforms); + return convert; + } + + private static class TestDoFn extends DoFn<Long, KV<Long, String>> { + // Exists to stop the ParDo application from throwing + @ProcessElement public void process(ProcessContext context) {} + } + + private static AppliedPTransform<?, ?, ?> generateSequence(Pipeline pipeline) { + GenerateSequence sequence = GenerateSequence.from(0); + PCollection<Long> pcollection = pipeline.apply(sequence); + return AppliedPTransform.<PBegin, PCollection<Long>, GenerateSequence>of( + "Count", pipeline.begin().expand(), pcollection.expand(), sequence, pipeline); + } + + private static AppliedPTransform<?, ?, ?> read(Pipeline pipeline) { + Read.Unbounded<Long> transform = Read.from(CountingSource.unbounded()); + PCollection<Long> pcollection = pipeline.apply(transform); + return AppliedPTransform.<PBegin, PCollection<Long>, Read.Unbounded<Long>>of( + "ReadTheCount", pipeline.begin().expand(), pcollection.expand(), transform, pipeline); + } + + private static AppliedPTransform<?, ?, ?> multiMultiParDo(Pipeline pipeline) { + PCollectionView<String> view = + pipeline.apply(Create.of("foo")).apply(View.<String>asSingleton()); + PCollection<Long> input = pipeline.apply(GenerateSequence.from(0)); + ParDo.MultiOutput<Long, KV<Long, String>> parDo = + ParDo.of(new TestDoFn()) + .withSideInputs(view) + .withOutputTags( + new TupleTag<KV<Long, String>>() {}, + TupleTagList.of(new TupleTag<KV<String, Long>>() {})); + PCollectionTuple output = input.apply(parDo); + + Map<TupleTag<?>, PValue> inputs = new HashMap<>(); + inputs.putAll(parDo.getAdditionalInputs()); + inputs.putAll(input.expand()); + + return AppliedPTransform + .<PCollection<Long>, PCollectionTuple, ParDo.MultiOutput<Long, KV<Long, String>>>of( + "MultiParDoInAndOut", inputs, output.expand(), parDo, pipeline); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/9b6728e2/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java deleted file mode 100644 index 4125544..0000000 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformsTest.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import static org.hamcrest.Matchers.equalTo; -import static org.junit.Assert.assertThat; - -import com.google.auto.value.AutoValue; -import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.common.runner.v1.RunnerApi; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components; -import org.apache.beam.sdk.common.runner.v1.RunnerApi.PTransform; -import org.apache.beam.sdk.io.CountingSource; -import org.apache.beam.sdk.io.GenerateSequence; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; - -/** - * Tests for {@link PTransforms}. - */ -@RunWith(Parameterized.class) -public class PTransformsTest { - - @Parameters(name = "{index}: {0}") - public static Iterable<ToAndFromProtoSpec> data() { - // This pipeline exists for construction, not to run any test. - // TODO: Leaf node with understood payload - i.e. validate payloads - ToAndFromProtoSpec readLeaf = ToAndFromProtoSpec.leaf(read(TestPipeline.create())); - ToAndFromProtoSpec readMultipleInAndOut = - ToAndFromProtoSpec.leaf(multiMultiParDo(TestPipeline.create())); - TestPipeline compositeReadPipeline = TestPipeline.create(); - ToAndFromProtoSpec compositeRead = - ToAndFromProtoSpec.composite( - generateSequence(compositeReadPipeline), - ToAndFromProtoSpec.leaf(read(compositeReadPipeline))); - return ImmutableList.<ToAndFromProtoSpec>builder() - .add(readLeaf) - .add(readMultipleInAndOut) - .add(compositeRead) - // TODO: Composite with multiple children - // TODO: Composite with a composite child - .build(); - } - - @AutoValue - abstract static class ToAndFromProtoSpec { - public static ToAndFromProtoSpec leaf(AppliedPTransform<?, ?, ?> transform) { - return new AutoValue_PTransformsTest_ToAndFromProtoSpec( - transform, Collections.<ToAndFromProtoSpec>emptyList()); - } - - public static ToAndFromProtoSpec composite( - AppliedPTransform<?, ?, ?> topLevel, ToAndFromProtoSpec spec, ToAndFromProtoSpec... specs) { - List<ToAndFromProtoSpec> childSpecs = new ArrayList<>(); - childSpecs.add(spec); - childSpecs.addAll(Arrays.asList(specs)); - return new AutoValue_PTransformsTest_ToAndFromProtoSpec(topLevel, childSpecs); - } - - abstract AppliedPTransform<?, ?, ?> getTransform(); - abstract Collection<ToAndFromProtoSpec> getChildren(); - } - - @Parameter(0) - public ToAndFromProtoSpec spec; - - @Test - public void toAndFromProto() throws IOException { - SdkComponents components = SdkComponents.create(); - RunnerApi.PTransform converted = convert(spec, components); - Components protoComponents = components.toComponents(); - - // Sanity checks - assertThat(converted.getInputsCount(), equalTo(spec.getTransform().getInputs().size())); - assertThat(converted.getOutputsCount(), equalTo(spec.getTransform().getOutputs().size())); - assertThat(converted.getSubtransformsCount(), equalTo(spec.getChildren().size())); - - assertThat(converted.getUniqueName(), equalTo(spec.getTransform().getFullName())); - for (PValue inputValue : spec.getTransform().getInputs().values()) { - PCollection<?> inputPc = (PCollection<?>) inputValue; - protoComponents.getPcollectionsOrThrow(components.registerPCollection(inputPc)); - } - for (PValue outputValue : spec.getTransform().getOutputs().values()) { - PCollection<?> outputPc = (PCollection<?>) outputValue; - protoComponents.getPcollectionsOrThrow(components.registerPCollection(outputPc)); - } - } - - private RunnerApi.PTransform convert(ToAndFromProtoSpec spec, SdkComponents components) - throws IOException { - List<AppliedPTransform<?, ?, ?>> childTransforms = new ArrayList<>(); - for (ToAndFromProtoSpec child : spec.getChildren()) { - childTransforms.add(child.getTransform()); - System.out.println("Converting child " + child); - convert(child, components); - // Sanity call - components.getExistingPTransformId(child.getTransform()); - } - PTransform convert = PTransforms.toProto(spec.getTransform(), childTransforms, components); - // Make sure the converted transform is registered. Convert it independently, but if this is a - // child spec, the child must be in the components. - components.registerPTransform(spec.getTransform(), childTransforms); - return convert; - } - - private static class TestDoFn extends DoFn<Long, KV<Long, String>> { - // Exists to stop the ParDo application from throwing - @ProcessElement public void process(ProcessContext context) {} - } - - private static AppliedPTransform<?, ?, ?> generateSequence(Pipeline pipeline) { - GenerateSequence sequence = GenerateSequence.from(0); - PCollection<Long> pcollection = pipeline.apply(sequence); - return AppliedPTransform.<PBegin, PCollection<Long>, GenerateSequence>of( - "Count", pipeline.begin().expand(), pcollection.expand(), sequence, pipeline); - } - - private static AppliedPTransform<?, ?, ?> read(Pipeline pipeline) { - Read.Unbounded<Long> transform = Read.from(CountingSource.unbounded()); - PCollection<Long> pcollection = pipeline.apply(transform); - return AppliedPTransform.<PBegin, PCollection<Long>, Read.Unbounded<Long>>of( - "ReadTheCount", pipeline.begin().expand(), pcollection.expand(), transform, pipeline); - } - - private static AppliedPTransform<?, ?, ?> multiMultiParDo(Pipeline pipeline) { - PCollectionView<String> view = - pipeline.apply(Create.of("foo")).apply(View.<String>asSingleton()); - PCollection<Long> input = pipeline.apply(GenerateSequence.from(0)); - ParDo.MultiOutput<Long, KV<Long, String>> parDo = - ParDo.of(new TestDoFn()) - .withSideInputs(view) - .withOutputTags( - new TupleTag<KV<Long, String>>() {}, - TupleTagList.of(new TupleTag<KV<String, Long>>() {})); - PCollectionTuple output = input.apply(parDo); - - Map<TupleTag<?>, PValue> inputs = new HashMap<>(); - inputs.putAll(parDo.getAdditionalInputs()); - inputs.putAll(input.expand()); - - return AppliedPTransform - .<PCollection<Long>, PCollectionTuple, ParDo.MultiOutput<Long, KV<Long, String>>>of( - "MultiParDoInAndOut", inputs, output.expand(), parDo, pipeline); - } -}
