Revert "Roll-forward Include Additional PTransform inputs in Transform Nodes"
This reverts commit 2e2ae9cfa581a73864695d15102acadc2750a57a. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/247f9bc1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/247f9bc1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/247f9bc1 Branch: refs/heads/master Commit: 247f9bc1581984d026764b3d433cb594e700bc21 Parents: c687887 Author: Thomas Groh <[email protected]> Authored: Fri May 26 11:00:40 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Fri May 26 13:18:55 2017 -0700 ---------------------------------------------------------------------- .../apex/translation/TranslationContext.java | 4 +- .../core/construction/TransformInputs.java | 50 ------ .../core/construction/TransformInputsTest.java | 166 ------------------- .../beam/runners/direct/DirectGraphVisitor.java | 15 +- .../runners/direct/ParDoEvaluatorFactory.java | 9 +- ...littableProcessElementsEvaluatorFactory.java | 2 - .../direct/StatefulParDoEvaluatorFactory.java | 1 - .../beam/runners/direct/WatermarkManager.java | 17 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 6 +- .../flink/FlinkBatchTranslationContext.java | 3 +- .../flink/FlinkStreamingTranslationContext.java | 3 +- .../dataflow/DataflowPipelineTranslator.java | 5 +- .../spark/translation/EvaluationContext.java | 4 +- .../beam/sdk/runners/TransformHierarchy.java | 28 +--- 14 files changed, 29 insertions(+), 284 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java index 94d13e1..aff3863 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java @@ -34,7 +34,6 @@ import org.apache.beam.runners.apex.translation.utils.ApexStateInternals; import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend; import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec; -import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; @@ -94,8 +93,7 @@ class TranslationContext { } public <InputT extends PValue> InputT getInput() { - return (InputT) - Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform())); + return (InputT) Iterables.getOnlyElement(getCurrentTransform().getInputs().values()); } public Map<TupleTag<?>, PValue> getOutputs() { http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java deleted file mode 100644 index 2baf93a..0000000 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.collect.ImmutableList; -import java.util.Collection; -import java.util.Map; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; - -/** Utilities for extracting subsets of inputs from an {@link AppliedPTransform}. */ -public class TransformInputs { - /** - * Gets all inputs of the {@link AppliedPTransform} that are not returned by {@link - * PTransform#getAdditionalInputs()}. - */ - public static Collection<PValue> nonAdditionalInputs(AppliedPTransform<?, ?, ?> application) { - ImmutableList.Builder<PValue> mainInputs = ImmutableList.builder(); - PTransform<?, ?> transform = application.getTransform(); - for (Map.Entry<TupleTag<?>, PValue> input : application.getInputs().entrySet()) { - if (!transform.getAdditionalInputs().containsKey(input.getKey())) { - mainInputs.add(input.getValue()); - } - } - checkArgument( - !mainInputs.build().isEmpty() || application.getInputs().isEmpty(), - "Expected at least one main input if any inputs exist"); - return mainInputs.build(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java deleted file mode 100644 index f5b2c11..0000000 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.core.construction; - -import static org.junit.Assert.assertThat; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.GenerateSequence; -import org.apache.beam.sdk.runners.AppliedPTransform; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.POutput; -import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TupleTag; -import org.hamcrest.Matchers; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link TransformInputs}. */ -@RunWith(JUnit4.class) -public class TransformInputsTest { - @Rule public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false); - @Rule public ExpectedException thrown = ExpectedException.none(); - - @Test - public void nonAdditionalInputsWithNoInputSucceeds() { - AppliedPTransform<PInput, POutput, TestTransform> transform = - AppliedPTransform.of( - "input-free", - Collections.<TupleTag<?>, PValue>emptyMap(), - Collections.<TupleTag<?>, PValue>emptyMap(), - new TestTransform(), - pipeline); - - assertThat(TransformInputs.nonAdditionalInputs(transform), Matchers.<PValue>empty()); - } - - @Test - public void nonAdditionalInputsWithOneMainInputSucceeds() { - PCollection<Long> input = pipeline.apply(GenerateSequence.from(1L)); - AppliedPTransform<PInput, POutput, TestTransform> transform = - AppliedPTransform.of( - "input-single", - Collections.<TupleTag<?>, PValue>singletonMap(new TupleTag<Long>() {}, input), - Collections.<TupleTag<?>, PValue>emptyMap(), - new TestTransform(), - pipeline); - - assertThat( - TransformInputs.nonAdditionalInputs(transform), Matchers.<PValue>containsInAnyOrder(input)); - } - - @Test - public void nonAdditionalInputsWithMultipleNonAdditionalInputsSucceeds() { - Map<TupleTag<?>, PValue> allInputs = new HashMap<>(); - PCollection<Integer> mainInts = pipeline.apply("MainInput", Create.of(12, 3)); - allInputs.put(new TupleTag<Integer>() {}, mainInts); - PCollection<Void> voids = pipeline.apply("VoidInput", Create.empty(VoidCoder.of())); - allInputs.put(new TupleTag<Void>() {}, voids); - AppliedPTransform<PInput, POutput, TestTransform> transform = - AppliedPTransform.of( - "additional-free", - allInputs, - Collections.<TupleTag<?>, PValue>emptyMap(), - new TestTransform(), - pipeline); - - assertThat( - TransformInputs.nonAdditionalInputs(transform), - Matchers.<PValue>containsInAnyOrder(voids, mainInts)); - } - - @Test - public void nonAdditionalInputsWithAdditionalInputsSucceeds() { - Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>(); - additionalInputs.put(new TupleTag<String>() {}, pipeline.apply(Create.of("1, 2", "3"))); - additionalInputs.put(new TupleTag<Long>() {}, pipeline.apply(GenerateSequence.from(3L))); - - Map<TupleTag<?>, PValue> allInputs = new HashMap<>(); - PCollection<Integer> mainInts = pipeline.apply("MainInput", Create.of(12, 3)); - allInputs.put(new TupleTag<Integer>() {}, mainInts); - PCollection<Void> voids = pipeline.apply("VoidInput", Create.empty(VoidCoder.of())); - allInputs.put( - new TupleTag<Void>() {}, voids); - allInputs.putAll(additionalInputs); - - AppliedPTransform<PInput, POutput, TestTransform> transform = - AppliedPTransform.of( - "additional", - allInputs, - Collections.<TupleTag<?>, PValue>emptyMap(), - new TestTransform(additionalInputs), - pipeline); - - assertThat( - TransformInputs.nonAdditionalInputs(transform), - Matchers.<PValue>containsInAnyOrder(mainInts, voids)); - } - - @Test - public void nonAdditionalInputsWithOnlyAdditionalInputsThrows() { - Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>(); - additionalInputs.put(new TupleTag<String>() {}, pipeline.apply(Create.of("1, 2", "3"))); - additionalInputs.put(new TupleTag<Long>() {}, pipeline.apply(GenerateSequence.from(3L))); - - AppliedPTransform<PInput, POutput, TestTransform> transform = - AppliedPTransform.of( - "additional-only", - additionalInputs, - Collections.<TupleTag<?>, PValue>emptyMap(), - new TestTransform(additionalInputs), - pipeline); - - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("at least one"); - TransformInputs.nonAdditionalInputs(transform); - } - - private static class TestTransform extends PTransform<PInput, POutput> { - private final Map<TupleTag<?>, PValue> additionalInputs; - - private TestTransform() { - this(Collections.<TupleTag<?>, PValue>emptyMap()); - } - - private TestTransform(Map<TupleTag<?>, PValue> additionalInputs) { - this.additionalInputs = additionalInputs; - } - - @Override - public POutput expand(PInput input) { - return PDone.in(input.getPipeline()); - } - - @Override - public Map<TupleTag<?>, PValue> getAdditionalInputs() { - return additionalInputs; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java index ed4282b..01204e3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java @@ -21,12 +21,10 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -36,8 +34,6 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the @@ -45,7 +41,6 @@ import org.slf4j.LoggerFactory; * input after the upstream transform has produced and committed output. */ class DirectGraphVisitor extends PipelineVisitor.Defaults { - private static final Logger LOG = LoggerFactory.getLogger(DirectGraphVisitor.class); private Map<POutput, AppliedPTransform<?, ?, ?>> producers = new HashMap<>(); @@ -88,15 +83,7 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { if (node.getInputs().isEmpty()) { rootTransforms.add(appliedTransform); } else { - Collection<PValue> mainInputs = - TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(getPipeline())); - if (!mainInputs.containsAll(node.getInputs().values())) { - LOG.debug( - "Inputs reduced to {} from {} by removing additional inputs", - mainInputs, - node.getInputs().values()); - } - for (PValue value : mainInputs) { + for (PValue value : node.getInputs().values()) { primitiveConsumers.put(value, appliedTransform); } } http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index 516f798..8aa75cf 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -20,6 +20,7 @@ package org.apache.beam.runners.direct; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.collect.Iterables; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -78,7 +79,6 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator (TransformEvaluator<T>) createEvaluator( (AppliedPTransform) application, - (PCollection<InputT>) inputBundle.getPCollection(), inputBundle.getKey(), doFn, transform.getSideInputs(), @@ -102,7 +102,6 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator @SuppressWarnings({"unchecked", "rawtypes"}) DoFnLifecycleManagerRemovingTransformEvaluator<InputT> createEvaluator( AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application, - PCollection<InputT> mainInput, StructuralKey<?> inputBundleKey, DoFn<InputT, OutputT> doFn, List<PCollectionView<?>> sideInputs, @@ -121,7 +120,6 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator createParDoEvaluator( application, inputBundleKey, - mainInput, sideInputs, mainOutputTag, additionalOutputTags, @@ -134,7 +132,6 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator ParDoEvaluator<InputT> createParDoEvaluator( AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application, StructuralKey<?> key, - PCollection<InputT> mainInput, List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, @@ -147,7 +144,8 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator evaluationContext, stepContext, application, - mainInput.getWindowingStrategy(), + ((PCollection<InputT>) Iterables.getOnlyElement(application.getInputs().values())) + .getWindowingStrategy(), fn, key, sideInputs, @@ -175,4 +173,5 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator } return pcs; } + } http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index eccc83a..b85f481c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -116,8 +116,6 @@ class SplittableProcessElementsEvaluatorFactory< delegateFactory.createParDoEvaluator( application, inputBundle.getKey(), - (PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>) - inputBundle.getPCollection(), transform.getSideInputs(), transform.getMainOutputTag(), transform.getAdditionalOutputTags().getAll(), http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index 3619d05..506c84c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -117,7 +117,6 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator = delegateFactory.createEvaluator( (AppliedPTransform) application, - (PCollection) inputBundle.getPCollection(), inputBundle.getKey(), doFn, application.getTransform().getUnderlyingParDo().getSideInputs(), http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index b15b52e..4f1b831 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -823,11 +823,10 @@ class WatermarkManager { inputWmsBuilder.add(THE_END_OF_TIME); } for (PValue pvalue : inputs.values()) { - if (graph.getPrimitiveConsumers(pvalue).contains(transform)) { - Watermark producerOutputWatermark = - getTransformWatermark(graph.getProducer(pvalue)).synchronizedProcessingOutputWatermark; - inputWmsBuilder.add(producerOutputWatermark); - } + Watermark producerOutputWatermark = + getTransformWatermark(graph.getProducer(pvalue)) + .synchronizedProcessingOutputWatermark; + inputWmsBuilder.add(producerOutputWatermark); } return inputWmsBuilder.build(); } @@ -839,11 +838,9 @@ class WatermarkManager { inputWatermarksBuilder.add(THE_END_OF_TIME); } for (PValue pvalue : inputs.values()) { - if (graph.getPrimitiveConsumers(pvalue).contains(transform)) { - Watermark producerOutputWatermark = - getTransformWatermark(graph.getProducer(pvalue)).outputWatermark; - inputWatermarksBuilder.add(producerOutputWatermark); - } + Watermark producerOutputWatermark = + getTransformWatermark(graph.getProducer(pvalue)).outputWatermark; + inputWatermarksBuilder.add(producerOutputWatermark); } List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build(); return inputCollectionWatermarks; http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 8b86bbe..09a21ac 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -98,7 +98,7 @@ public class ParDoEvaluatorTest { when(evaluationContext.createBundle(output)).thenReturn(outputBundle); ParDoEvaluator<Integer> evaluator = - createEvaluator(singletonView, fn, inputPc, output); + createEvaluator(singletonView, fn, output); IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L)); WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(3); @@ -132,7 +132,6 @@ public class ParDoEvaluatorTest { private ParDoEvaluator<Integer> createEvaluator( PCollectionView<Integer> singletonView, RecorderFn fn, - PCollection<Integer> input, PCollection<Integer> output) { when( evaluationContext.createSideInputReader( @@ -157,7 +156,8 @@ public class ParDoEvaluatorTest { evaluationContext, stepContext, transform, - input.getWindowingStrategy(), + ((PCollection<?>) Iterables.getOnlyElement(transform.getInputs().values())) + .getWindowingStrategy(), fn, null /* key */, ImmutableList.<PCollectionView<?>>of(singletonView), http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java index 6e70198..0439119 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.flink; import com.google.common.collect.Iterables; import java.util.HashMap; import java.util.Map; -import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; @@ -144,7 +143,7 @@ class FlinkBatchTranslationContext { @SuppressWarnings("unchecked") <T extends PValue> T getInput(PTransform<T, ?> transform) { - return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform)); + return (T) Iterables.getOnlyElement(currentTransform.getInputs().values()); } Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) { http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java index 74a5fb9..ea5f6b3 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java @@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.Iterables; import java.util.HashMap; import java.util.Map; -import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; @@ -114,7 +113,7 @@ class FlinkStreamingTranslationContext { @SuppressWarnings("unchecked") public <T extends PValue> T getInput(PTransform<T, ?> transform) { - return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform)); + return (T) Iterables.getOnlyElement(currentTransform.getInputs().values()); } public <T extends PInput> Map<TupleTag<?>, PValue> getInputs(PTransform<T, ?> transform) { http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java index fccd018..af93ef5 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java @@ -56,7 +56,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; -import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.runners.core.construction.WindowingStrategyTranslation; import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly; import org.apache.beam.runners.dataflow.DataflowRunner.CombineGroupedValues; @@ -396,9 +395,7 @@ public class DataflowPipelineTranslator { @Override public <InputT extends PValue> InputT getInput(PTransform<InputT, ?> transform) { - return (InputT) - Iterables.getOnlyElement( - TransformInputs.nonAdditionalInputs(getCurrentTransform(transform))); + return (InputT) Iterables.getOnlyElement(getInputs(transform).values()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java index 0c6c4d1..8102926 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java @@ -26,7 +26,6 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; -import org.apache.beam.runners.core.construction.TransformInputs; import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.coders.CoderHelpers; import org.apache.beam.sdk.Pipeline; @@ -104,8 +103,7 @@ public class EvaluationContext { public <T extends PValue> T getInput(PTransform<T, ?> transform) { @SuppressWarnings("unchecked") - T input = - (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform())); + T input = (T) Iterables.getOnlyElement(getInputs(transform).values()); return input; } http://git-wip-us.apache.org/repos/asf/beam/blob/247f9bc1/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java index bebc306..630d24c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import javax.annotation.Nullable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior; @@ -67,7 +68,7 @@ public class TransformHierarchy { producers = new HashMap<>(); producerInput = new HashMap<>(); unexpandedInputs = new HashMap<>(); - root = new Node(); + root = new Node(null, null, "", null); current = root; } @@ -254,36 +255,25 @@ public class TransformHierarchy { boolean finishedSpecifying = false; /** - * Creates the root-level node. The root level node has a null enclosing node, a null transform, - * an empty map of inputs, and a name equal to the empty string. - */ - private Node() { - this.enclosingNode = null; - this.transform = null; - this.fullName = ""; - this.inputs = Collections.emptyMap(); - } - - /** * Creates a new Node with the given parent and transform. * + * <p>EnclosingNode and transform may both be null for a root-level node, which holds all other + * nodes. + * * @param enclosingNode the composite node containing this node * @param transform the PTransform tracked by this node * @param fullName the fully qualified name of the transform * @param input the unexpanded input to the transform */ private Node( - Node enclosingNode, - PTransform<?, ?> transform, + @Nullable Node enclosingNode, + @Nullable PTransform<?, ?> transform, String fullName, - PInput input) { + @Nullable PInput input) { this.enclosingNode = enclosingNode; this.transform = transform; this.fullName = fullName; - ImmutableMap.Builder<TupleTag<?>, PValue> inputs = ImmutableMap.builder(); - inputs.putAll(input.expand()); - inputs.putAll(transform.getAdditionalInputs()); - this.inputs = inputs.build(); + this.inputs = input == null ? Collections.<TupleTag<?>, PValue>emptyMap() : input.expand(); } /**
