[ https://issues.apache.org/jira/browse/BEAM-4056?focusedWorklogId=91430&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91430 ]
ASF GitHub Bot logged work on BEAM-4056: ---------------------------------------- Author: ASF GitHub Bot Created on: 16/Apr/18 18:36 Start Date: 16/Apr/18 18:36 Worklog Time Spent: 10m Work Description: tgroh closed pull request #5118: [BEAM-4056] Identify side inputs by transform id and local name URL: https://github.com/apache/beam/pull/5118 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index 83bb8a29f45..2bc2e34951c 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -217,12 +217,12 @@ message ExecutableStagePayload { // PTransform the ExecutableStagePayload is the payload of. string input = 2; - // Side Input PCollection ids. Each must be present as a value in the inputs of - // any PTransform the ExecutableStagePayload is the payload of. - repeated string side_inputs = 3; + // The side inputs required for this executable stage. Each Side Input of each PTransform within + // this ExecutableStagePayload must be represented within this field. + repeated SideInputId side_inputs = 3; // PTransform ids contained within this executable stage. This must contain at least one - // PTransform ID. + // PTransform id. repeated string transforms = 4; // Output PCollection ids. This must be equal to the values of the outputs of any @@ -232,6 +232,16 @@ message ExecutableStagePayload { // (Required) The components for the Executable Stage. This must contain all of the Transforms // in transforms, and the closure of all of the components they recognize. Components components = 6; + + // A reference to a side input. Side inputs are uniquely identified by PTransform id and + // local name. + message SideInputId { + // (Required) The id of the PTransform that references this side input. + string transform_id = 1; + + // (Required) The local name of this side input from the PTransform that references it. + string local_name = 2; + } } // The payload for the primitive ParDo transform. diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java index c41d0b8b587..50a1c9e1539 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java @@ -25,6 +25,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.Components; import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; @@ -77,7 +78,7 @@ * Returns the set of {@link PCollectionNode PCollections} that will be accessed by this {@link * ExecutableStage} as side inputs. */ - Collection<PCollectionNode> getSideInputPCollections(); + Collection<SideInputReference> getSideInputs(); /** * Returns the leaf {@link PCollectionNode PCollections} of this {@link ExecutableStage}. @@ -122,11 +123,16 @@ default PTransform toPTransform() { pt.putInputs("input", getInputPCollection().getId()); payload.setInput(input.getId()); - int sideInputIndex = 0; - for (PCollectionNode sideInputNode : getSideInputPCollections()) { - pt.putInputs(String.format("side_input_%s", sideInputIndex), sideInputNode.getId()); - payload.addSideInputs(sideInputNode.getId()); - sideInputIndex++; + for (SideInputReference sideInput : getSideInputs()) { + // Side inputs of the ExecutableStage itself can be uniquely identified by inner PTransform + // name and local name. + String outerLocalName = String.format("%s:%s", sideInput.transform(), sideInput.localName()); + pt.putInputs(outerLocalName, sideInput.collection().getId()); + payload.addSideInputs( + SideInputId.newBuilder() + .setTransformId(sideInput.transform().getId()) + .setLocalName(sideInput.localName()) + .build()); } int outputIndex = 0; @@ -174,11 +180,11 @@ static ExecutableStage fromPayload(ExecutableStagePayload payload) { PCollectionNode input = PipelineNode.pCollection( payload.getInput(), components.getPcollectionsOrThrow(payload.getInput())); - List<PCollectionNode> sideInputs = + List<SideInputReference> sideInputs = payload .getSideInputsList() .stream() - .map(id -> PipelineNode.pCollection(id, components.getPcollectionsOrThrow(id))) + .map(sideInputId -> SideInputReference.fromSideInputId(sideInputId, components)) .collect(Collectors.toList()); List<PTransformNode> transforms = payload diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuser.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuser.java index fcc48526db9..02cfc1b3437 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuser.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuser.java @@ -79,7 +79,7 @@ public static ExecutableStage forGrpcPortRead( ImmutableSet.Builder<PTransformNode> fusedTransforms = ImmutableSet.builder(); fusedTransforms.addAll(initialNodes); - Set<PCollectionNode> sideInputs = new LinkedHashSet<>(); + Set<SideInputReference> sideInputs = new LinkedHashSet<>(); Set<PCollectionNode> fusedCollections = new LinkedHashSet<>(); Set<PCollectionNode> materializedPCollections = new LinkedHashSet<>(); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java index c0642e1253f..339ddcebac5 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java @@ -34,7 +34,7 @@ static ImmutableExecutableStage ofFullComponents( Components components, Environment environment, PCollectionNode input, - Collection<PCollectionNode> sideInputs, + Collection<SideInputReference> sideInputs, Collection<PTransformNode> transforms, Collection<PCollectionNode> outputs) { Components prunedComponents = @@ -53,7 +53,7 @@ static ImmutableExecutableStage of( Components components, Environment environment, PCollectionNode input, - Collection<PCollectionNode> sideInputs, + Collection<SideInputReference> sideInputs, Collection<PTransformNode> transforms, Collection<PCollectionNode> outputs) { return new AutoValue_ImmutableExecutableStage( @@ -76,7 +76,7 @@ static ImmutableExecutableStage of( public abstract PCollectionNode getInputPCollection(); @Override - public abstract Collection<PCollectionNode> getSideInputPCollections(); + public abstract Collection<SideInputReference> getSideInputs(); @Override public abstract Collection<PTransformNode> getTransforms(); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java index 27c46fdabe3..00ace2bce18 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java @@ -237,17 +237,22 @@ public Components getComponents() { } /** - * Returns the {@link PCollectionNode PCollectionNodes} that the provided transform consumes as - * side inputs. + * Returns the {@link SideInputReference SideInputReferences} that the provided transform consumes + * as side inputs. */ - public Collection<PCollectionNode> getSideInputs(PTransformNode transform) { + public Collection<SideInputReference> getSideInputs(PTransformNode transform) { return getLocalSideInputNames(transform.getTransform()) .stream() .map( localName -> { - String pcollectionId = transform.getTransform().getInputsOrThrow(localName); - return PipelineNode.pCollection( - pcollectionId, components.getPcollectionsOrThrow(pcollectionId)); + String transformId = transform.getId(); + PTransform transformProto = components.getTransformsOrThrow(transformId); + String collectionId = transform.getTransform().getInputsOrThrow(localName); + PCollection collection = components.getPcollectionsOrThrow(collectionId); + return SideInputReference.of( + PipelineNode.pTransform(transformId, transformProto), + localName, + PipelineNode.pCollection(collectionId, collection)); }) .collect(Collectors.toSet()); } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java new file mode 100644 index 00000000000..b5ca4c7951d --- /dev/null +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.construction.graph; + +import com.google.auto.value.AutoValue; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; +import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; +import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; + +/** + * A reference to a side input. This includes the PTransform that references the side input as well + * as the PCollection referenced. Both are necessary in order to fully resolve a view. + */ +@AutoValue +public abstract class SideInputReference { + + /** Create a side input reference. */ + public static SideInputReference of( + PTransformNode transform, String localName, PCollectionNode collection) { + return new AutoValue_SideInputReference(transform, localName, collection); + } + + /** Create a side input reference from a SideInputId proto and components. */ + public static SideInputReference fromSideInputId( + SideInputId sideInputId, RunnerApi.Components components) { + String transformId = sideInputId.getTransformId(); + String localName = sideInputId.getLocalName(); + String collectionId = components.getTransformsOrThrow(transformId).getInputsOrThrow(localName); + PTransform transform = components.getTransformsOrThrow(transformId); + PCollection collection = components.getPcollectionsOrThrow(collectionId); + return SideInputReference.of( + PipelineNode.pTransform(transformId, transform), + localName, + PipelineNode.pCollection(collectionId, collection)); + } + + /** The id of the PTransform that uses this side input. */ + public abstract PTransformNode transform(); + /** The local name the referencing PTransform uses to refer to this side input. */ + public abstract String localName(); + /** The PCollection that backs this side input. */ + public abstract PCollectionNode collection(); +} diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java index ca89cdb148e..b072c9c83cc 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import java.util.Collection; import java.util.stream.Collectors; +import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId; import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection; import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; @@ -37,39 +38,66 @@ */ public class ExecutableStageMatcher extends TypeSafeMatcher<ExecutableStage> { private final String inputPCollectionId; + private final Collection<SideInputId> sideInputIds; private final Collection<String> materializedPCollection; private final Collection<String> fusedTransforms; private ExecutableStageMatcher( String inputPCollectionId, + Collection<SideInputId> sideInputIds, Collection<String> materializedPCollection, Collection<String> fusedTransforms) { this.inputPCollectionId = inputPCollectionId; + this.sideInputIds = sideInputIds; this.materializedPCollection = materializedPCollection; this.fusedTransforms = fusedTransforms; } public static ExecutableStageMatcher withInput(String inputId) { - return new ExecutableStageMatcher(inputId, ImmutableList.of(), ImmutableList.of()); + return new ExecutableStageMatcher( + inputId, ImmutableList.of(), ImmutableList.of(), ImmutableList.of()); + } + + public ExecutableStageMatcher withSideInputs(SideInputId... sideInputs) { + return new ExecutableStageMatcher( + inputPCollectionId, + ImmutableList.copyOf(sideInputs), + materializedPCollection, + fusedTransforms); } public ExecutableStageMatcher withNoOutputs() { - return new ExecutableStageMatcher(inputPCollectionId, ImmutableList.of(), fusedTransforms); + return new ExecutableStageMatcher( + inputPCollectionId, sideInputIds, ImmutableList.of(), fusedTransforms); } public ExecutableStageMatcher withOutputs(String... pCollections) { return new ExecutableStageMatcher( - inputPCollectionId, ImmutableList.copyOf(pCollections), fusedTransforms); + inputPCollectionId, sideInputIds, ImmutableList.copyOf(pCollections), fusedTransforms); } public ExecutableStageMatcher withTransforms(String... transforms) { return new ExecutableStageMatcher( - inputPCollectionId, materializedPCollection, ImmutableList.copyOf(transforms)); + inputPCollectionId, + sideInputIds, + materializedPCollection, + ImmutableList.copyOf(transforms)); } @Override protected boolean matchesSafely(ExecutableStage item) { return item.getInputPCollection().getId().equals(inputPCollectionId) + && containsInAnyOrder(sideInputIds.toArray()) + .matches( + item.getSideInputs() + .stream() + .map( + ref -> + SideInputId.newBuilder() + .setTransformId(ref.transform().getId()) + .setLocalName(ref.localName()) + .build()) + .collect(Collectors.toSet())) && containsInAnyOrder(materializedPCollection.toArray(new String[0])) .matches( item.getOutputPCollections() diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java index 6d07e5fc909..a396c93ad2f 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java @@ -39,6 +39,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput; import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload; import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -79,12 +80,16 @@ public void testRoundTripToFromTransform() throws Exception { .putEnvironments("foo", env) .build(); + PTransformNode transformNode = PipelineNode.pTransform("pt", pt); + SideInputReference sideInputRef = + SideInputReference.of( + transformNode, "side_input", PipelineNode.pCollection("sideInput.in", sideInput)); ImmutableExecutableStage stage = ImmutableExecutableStage.of( components, env, PipelineNode.pCollection("input.out", input), - Collections.singleton(PipelineNode.pCollection("sideInput.in", sideInput)), + Collections.singleton(sideInputRef), Collections.singleton(PipelineNode.pTransform("pt", pt)), Collections.singleton(PipelineNode.pCollection("output.out", output))); diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java index fb9051f7de4..cb494cbfcbb 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java @@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertThat; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Components; import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; @@ -803,6 +804,11 @@ public void sideInputRootsNewStage() { .withNoOutputs() .withTransforms("leftParDo", "rightParDo"), ExecutableStageMatcher.withInput("read.out") + .withSideInputs( + RunnerApi.ExecutableStagePayload.SideInputId.newBuilder() + .setTransformId("sideParDo") + .setLocalName("side") + .build()) .withNoOutputs() .withTransforms("sideParDo"), ExecutableStageMatcher.withInput("sideImpulse.out") diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java index ee64eca03e3..2dc98efeb44 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java @@ -947,9 +947,13 @@ public void sideInputIncludedInStage() { ExecutableStage subgraph = GreedyStageFuser.forGrpcPortRead( p, readOutput, ImmutableSet.of(PipelineNode.pTransform("parDo", parDoTransform))); - assertThat( - subgraph.getSideInputPCollections(), - contains(PipelineNode.pCollection("side_read.out", sideInputPCollection))); + PTransformNode parDoNode = PipelineNode.pTransform("parDo", parDoTransform); + SideInputReference sideInputRef = + SideInputReference.of( + parDoNode, + "side_input", + PipelineNode.pCollection("side_read.out", sideInputPCollection)); + assertThat(subgraph.getSideInputs(), contains(sideInputRef)); assertThat(subgraph.getOutputPCollections(), emptyIterable()); } diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java index 729f82ec548..6c156a62015 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java @@ -34,6 +34,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload; import org.apache.beam.runners.core.construction.PTransformTranslation; +import org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -75,12 +76,16 @@ public void ofFullComponentsOnlyHasStagePTransforms() throws Exception { .putEnvironments("foo", env) .build(); + PTransformNode transformNode = PipelineNode.pTransform("pt", pt); + SideInputReference sideInputRef = + SideInputReference.of( + transformNode, "side_input", PipelineNode.pCollection("sideInput.in", sideInput)); ImmutableExecutableStage stage = ImmutableExecutableStage.ofFullComponents( components, env, PipelineNode.pCollection("input.out", input), - Collections.singleton(PipelineNode.pCollection("sideInput.in", sideInput)), + Collections.singleton(sideInputRef), Collections.singleton(PipelineNode.pTransform("pt", pt)), Collections.singleton(PipelineNode.pCollection("output.out", output))); diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java index dd0bfb599c9..691e768c7ce 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java @@ -30,6 +30,7 @@ import com.google.common.collect.ImmutableSet; import java.util.Collection; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import org.apache.beam.model.pipeline.v1.RunnerApi; @@ -200,21 +201,26 @@ public void transformWithSideAndMainInputs() { .values()); PCollectionNode mainInput = PipelineNode.pCollection(mainInputName, components.getPcollectionsOrThrow(mainInputName)); - String sideInputName = + PTransform parDoTransform = components.getTransformsOrThrow("par_do"); + String sideInputLocalName = getOnlyElement( - components - .getTransformsOrThrow("par_do") + parDoTransform .getInputsMap() - .values() + .entrySet() .stream() - .filter(pcollectionName -> !pcollectionName.equals(mainInputName)) + .filter(entry -> !entry.getValue().equals(mainInputName)) + .map(Map.Entry::getKey) .collect(Collectors.toSet())); + String sideInputCollectionId = parDoTransform.getInputsOrThrow(sideInputLocalName); PCollectionNode sideInput = - PipelineNode.pCollection(sideInputName, components.getPcollectionsOrThrow(sideInputName)); + PipelineNode.pCollection( + sideInputCollectionId, components.getPcollectionsOrThrow(sideInputCollectionId)); PTransformNode parDoNode = PipelineNode.pTransform("par_do", components.getTransformsOrThrow("par_do")); + SideInputReference sideInputRef = + SideInputReference.of(parDoNode, sideInputLocalName, sideInput); - assertThat(qp.getSideInputs(parDoNode), contains(sideInput)); + assertThat(qp.getSideInputs(parDoNode), contains(sideInputRef)); assertThat(qp.getPerElementConsumers(mainInput), contains(parDoNode)); assertThat(qp.getPerElementConsumers(sideInput), not(contains(parDoNode))); } @@ -254,8 +260,10 @@ public void transformWithSameSideAndMainInput() { PipelineNode.pCollection("read_pc", components.getPcollectionsOrThrow("read_pc")); PTransformNode multiConsumerPT = PipelineNode.pTransform("multiConsumer", components.getTransformsOrThrow("multiConsumer")); + SideInputReference sideInputRef = + SideInputReference.of(multiConsumerPT, "side_in", multiInputPc); assertThat(qp.getPerElementConsumers(multiInputPc), contains(multiConsumerPT)); - assertThat(qp.getSideInputs(multiConsumerPT), contains(multiInputPc)); + assertThat(qp.getSideInputs(multiConsumerPT), contains(sideInputRef)); } /** ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 91430) Time Spent: 2h 20m (was: 2h 10m) > Identify Side Inputs by PTransform ID and local name > ---------------------------------------------------- > > Key: BEAM-4056 > URL: https://issues.apache.org/jira/browse/BEAM-4056 > Project: Beam > Issue Type: New Feature > Components: runner-core > Reporter: Ben Sidhom > Assignee: Ben Sidhom > Priority: Minor > Time Spent: 2h 20m > Remaining Estimate: 0h > > This is necessary in order to correctly identify side inputs during all > phases of portable pipeline execution (fusion, translation, and SDK > execution). -- This message was sent by Atlassian JIRA (v7.6.3#76005)