[ 
https://issues.apache.org/jira/browse/BEAM-4056?focusedWorklogId=91430&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-91430
 ]

ASF GitHub Bot logged work on BEAM-4056:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Apr/18 18:36
            Start Date: 16/Apr/18 18:36
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #5118: [BEAM-4056] Identify 
side inputs by transform id and local name
URL: https://github.com/apache/beam/pull/5118
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto 
b/model/pipeline/src/main/proto/beam_runner_api.proto
index 83bb8a29f45..2bc2e34951c 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -217,12 +217,12 @@ message ExecutableStagePayload {
   // PTransform the ExecutableStagePayload is the payload of.
   string input = 2;
 
-  // Side Input PCollection ids. Each must be present as a value in the inputs 
of
-  // any PTransform the ExecutableStagePayload is the payload of.
-  repeated string side_inputs = 3;
+  // The side inputs required for this executable stage. Each Side Input of 
each PTransform within
+  // this ExecutableStagePayload must be represented within this field.
+  repeated SideInputId side_inputs = 3;
 
   // PTransform ids contained within this executable stage. This must contain 
at least one
-  // PTransform ID.
+  // PTransform id.
   repeated string transforms = 4;
 
   // Output PCollection ids. This must be equal to the values of the outputs 
of any
@@ -232,6 +232,16 @@ message ExecutableStagePayload {
   // (Required) The components for the Executable Stage. This must contain all 
of the Transforms
   // in transforms, and the closure of all of the components they recognize.
   Components components = 6;
+
+  // A reference to a side input. Side inputs are uniquely identified by 
PTransform id and
+  // local name.
+  message SideInputId {
+    // (Required) The id of the PTransform that references this side input.
+    string transform_id = 1;
+
+    // (Required) The local name of this side input from the PTransform that 
references it.
+    string local_name = 2;
+  }
 }
 
 // The payload for the primitive ParDo transform.
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
index c41d0b8b587..50a1c9e1539 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
@@ -25,6 +25,7 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
@@ -77,7 +78,7 @@
    * Returns the set of {@link PCollectionNode PCollections} that will be 
accessed by this {@link
    * ExecutableStage} as side inputs.
    */
-  Collection<PCollectionNode> getSideInputPCollections();
+  Collection<SideInputReference> getSideInputs();
 
   /**
    * Returns the leaf {@link PCollectionNode PCollections} of this {@link 
ExecutableStage}.
@@ -122,11 +123,16 @@ default PTransform toPTransform() {
     pt.putInputs("input", getInputPCollection().getId());
     payload.setInput(input.getId());
 
-    int sideInputIndex = 0;
-    for (PCollectionNode sideInputNode : getSideInputPCollections()) {
-      pt.putInputs(String.format("side_input_%s", sideInputIndex), 
sideInputNode.getId());
-      payload.addSideInputs(sideInputNode.getId());
-      sideInputIndex++;
+    for (SideInputReference sideInput : getSideInputs()) {
+      // Side inputs of the ExecutableStage itself can be uniquely identified 
by inner PTransform
+      // name and local name.
+      String outerLocalName = String.format("%s:%s", sideInput.transform(), 
sideInput.localName());
+      pt.putInputs(outerLocalName, sideInput.collection().getId());
+      payload.addSideInputs(
+          SideInputId.newBuilder()
+              .setTransformId(sideInput.transform().getId())
+              .setLocalName(sideInput.localName())
+              .build());
     }
 
     int outputIndex = 0;
@@ -174,11 +180,11 @@ static ExecutableStage fromPayload(ExecutableStagePayload 
payload) {
     PCollectionNode input =
         PipelineNode.pCollection(
             payload.getInput(), 
components.getPcollectionsOrThrow(payload.getInput()));
-    List<PCollectionNode> sideInputs =
+    List<SideInputReference> sideInputs =
         payload
             .getSideInputsList()
             .stream()
-            .map(id -> PipelineNode.pCollection(id, 
components.getPcollectionsOrThrow(id)))
+            .map(sideInputId -> 
SideInputReference.fromSideInputId(sideInputId, components))
             .collect(Collectors.toList());
     List<PTransformNode> transforms =
         payload
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuser.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuser.java
index fcc48526db9..02cfc1b3437 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuser.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuser.java
@@ -79,7 +79,7 @@ public static ExecutableStage forGrpcPortRead(
     ImmutableSet.Builder<PTransformNode> fusedTransforms = 
ImmutableSet.builder();
     fusedTransforms.addAll(initialNodes);
 
-    Set<PCollectionNode> sideInputs = new LinkedHashSet<>();
+    Set<SideInputReference> sideInputs = new LinkedHashSet<>();
     Set<PCollectionNode> fusedCollections = new LinkedHashSet<>();
     Set<PCollectionNode> materializedPCollections = new LinkedHashSet<>();
 
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
index c0642e1253f..339ddcebac5 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
@@ -34,7 +34,7 @@ static ImmutableExecutableStage ofFullComponents(
       Components components,
       Environment environment,
       PCollectionNode input,
-      Collection<PCollectionNode> sideInputs,
+      Collection<SideInputReference> sideInputs,
       Collection<PTransformNode> transforms,
       Collection<PCollectionNode> outputs) {
     Components prunedComponents =
@@ -53,7 +53,7 @@ static ImmutableExecutableStage of(
       Components components,
       Environment environment,
       PCollectionNode input,
-      Collection<PCollectionNode> sideInputs,
+      Collection<SideInputReference> sideInputs,
       Collection<PTransformNode> transforms,
       Collection<PCollectionNode> outputs) {
     return new AutoValue_ImmutableExecutableStage(
@@ -76,7 +76,7 @@ static ImmutableExecutableStage of(
   public abstract PCollectionNode getInputPCollection();
 
   @Override
-  public abstract Collection<PCollectionNode> getSideInputPCollections();
+  public abstract Collection<SideInputReference> getSideInputs();
 
   @Override
   public abstract Collection<PTransformNode> getTransforms();
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
index 27c46fdabe3..00ace2bce18 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
@@ -237,17 +237,22 @@ public Components getComponents() {
   }
 
   /**
-   * Returns the {@link PCollectionNode PCollectionNodes} that the provided 
transform consumes as
-   * side inputs.
+   * Returns the {@link SideInputReference SideInputReferences} that the 
provided transform consumes
+   * as side inputs.
    */
-  public Collection<PCollectionNode> getSideInputs(PTransformNode transform) {
+  public Collection<SideInputReference> getSideInputs(PTransformNode 
transform) {
     return getLocalSideInputNames(transform.getTransform())
         .stream()
         .map(
             localName -> {
-              String pcollectionId = 
transform.getTransform().getInputsOrThrow(localName);
-              return PipelineNode.pCollection(
-                  pcollectionId, 
components.getPcollectionsOrThrow(pcollectionId));
+              String transformId = transform.getId();
+              PTransform transformProto = 
components.getTransformsOrThrow(transformId);
+              String collectionId = 
transform.getTransform().getInputsOrThrow(localName);
+              PCollection collection = 
components.getPcollectionsOrThrow(collectionId);
+              return SideInputReference.of(
+                  PipelineNode.pTransform(transformId, transformProto),
+                  localName,
+                  PipelineNode.pCollection(collectionId, collection));
             })
         .collect(Collectors.toSet());
   }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java
new file mode 100644
index 00000000000..b5ca4c7951d
--- /dev/null
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SideInputReference.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction.graph;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+
+/**
+ * A reference to a side input. This includes the PTransform that references 
the side input as well
+ * as the PCollection referenced. Both are necessary in order to fully resolve 
a view.
+ */
+@AutoValue
+public abstract class SideInputReference {
+
+  /** Create a side input reference. */
+  public static SideInputReference of(
+      PTransformNode transform, String localName, PCollectionNode collection) {
+    return new AutoValue_SideInputReference(transform, localName, collection);
+  }
+
+  /** Create a side input reference from a SideInputId proto and components. */
+  public static SideInputReference fromSideInputId(
+      SideInputId sideInputId, RunnerApi.Components components) {
+    String transformId = sideInputId.getTransformId();
+    String localName = sideInputId.getLocalName();
+    String collectionId = 
components.getTransformsOrThrow(transformId).getInputsOrThrow(localName);
+    PTransform transform = components.getTransformsOrThrow(transformId);
+    PCollection collection = components.getPcollectionsOrThrow(collectionId);
+    return SideInputReference.of(
+        PipelineNode.pTransform(transformId, transform),
+        localName,
+        PipelineNode.pCollection(collectionId, collection));
+  }
+
+  /** The id of the PTransform that uses this side input. */
+  public abstract PTransformNode transform();
+  /** The local name the referencing PTransform uses to refer to this side 
input. */
+  public abstract String localName();
+  /** The PCollection that backs this side input. */
+  public abstract PCollectionNode collection();
+}
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java
index ca89cdb148e..b072c9c83cc 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageMatcher.java
@@ -23,6 +23,7 @@
 import com.google.common.collect.ImmutableList;
 import java.util.Collection;
 import java.util.stream.Collectors;
+import 
org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload.SideInputId;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
@@ -37,39 +38,66 @@
  */
 public class ExecutableStageMatcher extends TypeSafeMatcher<ExecutableStage> {
   private final String inputPCollectionId;
+  private final Collection<SideInputId> sideInputIds;
   private final Collection<String> materializedPCollection;
   private final Collection<String> fusedTransforms;
 
   private ExecutableStageMatcher(
       String inputPCollectionId,
+      Collection<SideInputId> sideInputIds,
       Collection<String> materializedPCollection,
       Collection<String> fusedTransforms) {
     this.inputPCollectionId = inputPCollectionId;
+    this.sideInputIds = sideInputIds;
     this.materializedPCollection = materializedPCollection;
     this.fusedTransforms = fusedTransforms;
   }
 
   public static ExecutableStageMatcher withInput(String inputId) {
-    return new ExecutableStageMatcher(inputId, ImmutableList.of(), 
ImmutableList.of());
+    return new ExecutableStageMatcher(
+        inputId, ImmutableList.of(), ImmutableList.of(), ImmutableList.of());
+  }
+
+  public ExecutableStageMatcher withSideInputs(SideInputId... sideInputs) {
+    return new ExecutableStageMatcher(
+        inputPCollectionId,
+        ImmutableList.copyOf(sideInputs),
+        materializedPCollection,
+        fusedTransforms);
   }
 
   public ExecutableStageMatcher withNoOutputs() {
-    return new ExecutableStageMatcher(inputPCollectionId, ImmutableList.of(), 
fusedTransforms);
+    return new ExecutableStageMatcher(
+        inputPCollectionId, sideInputIds, ImmutableList.of(), fusedTransforms);
   }
 
   public ExecutableStageMatcher withOutputs(String... pCollections) {
     return new ExecutableStageMatcher(
-        inputPCollectionId, ImmutableList.copyOf(pCollections), 
fusedTransforms);
+        inputPCollectionId, sideInputIds, ImmutableList.copyOf(pCollections), 
fusedTransforms);
   }
 
   public ExecutableStageMatcher withTransforms(String... transforms) {
     return new ExecutableStageMatcher(
-        inputPCollectionId, materializedPCollection, 
ImmutableList.copyOf(transforms));
+        inputPCollectionId,
+        sideInputIds,
+        materializedPCollection,
+        ImmutableList.copyOf(transforms));
   }
 
   @Override
   protected boolean matchesSafely(ExecutableStage item) {
     return item.getInputPCollection().getId().equals(inputPCollectionId)
+        && containsInAnyOrder(sideInputIds.toArray())
+            .matches(
+                item.getSideInputs()
+                    .stream()
+                    .map(
+                        ref ->
+                            SideInputId.newBuilder()
+                                .setTransformId(ref.transform().getId())
+                                .setLocalName(ref.localName())
+                                .build())
+                    .collect(Collectors.toSet()))
         && containsInAnyOrder(materializedPCollection.toArray(new String[0]))
             .matches(
                 item.getOutputPCollections()
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
index 6d07e5fc909..a396c93ad2f 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
@@ -39,6 +39,7 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -79,12 +80,16 @@ public void testRoundTripToFromTransform() throws Exception 
{
             .putEnvironments("foo", env)
             .build();
 
+    PTransformNode transformNode = PipelineNode.pTransform("pt", pt);
+    SideInputReference sideInputRef =
+        SideInputReference.of(
+            transformNode, "side_input", 
PipelineNode.pCollection("sideInput.in", sideInput));
     ImmutableExecutableStage stage =
         ImmutableExecutableStage.of(
             components,
             env,
             PipelineNode.pCollection("input.out", input),
-            Collections.singleton(PipelineNode.pCollection("sideInput.in", 
sideInput)),
+            Collections.singleton(sideInputRef),
             Collections.singleton(PipelineNode.pTransform("pt", pt)),
             Collections.singleton(PipelineNode.pCollection("output.out", 
output)));
 
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
index fb9051f7de4..cb494cbfcbb 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
@@ -24,6 +24,7 @@
 import static org.hamcrest.Matchers.hasSize;
 import static org.junit.Assert.assertThat;
 
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
@@ -803,6 +804,11 @@ public void sideInputRootsNewStage() {
                 .withNoOutputs()
                 .withTransforms("leftParDo", "rightParDo"),
             ExecutableStageMatcher.withInput("read.out")
+                .withSideInputs(
+                    RunnerApi.ExecutableStagePayload.SideInputId.newBuilder()
+                        .setTransformId("sideParDo")
+                        .setLocalName("side")
+                        .build())
                 .withNoOutputs()
                 .withTransforms("sideParDo"),
             ExecutableStageMatcher.withInput("sideImpulse.out")
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
index ee64eca03e3..2dc98efeb44 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
@@ -947,9 +947,13 @@ public void sideInputIncludedInStage() {
     ExecutableStage subgraph =
         GreedyStageFuser.forGrpcPortRead(
             p, readOutput, ImmutableSet.of(PipelineNode.pTransform("parDo", 
parDoTransform)));
-    assertThat(
-        subgraph.getSideInputPCollections(),
-        contains(PipelineNode.pCollection("side_read.out", 
sideInputPCollection)));
+    PTransformNode parDoNode = PipelineNode.pTransform("parDo", 
parDoTransform);
+    SideInputReference sideInputRef =
+        SideInputReference.of(
+            parDoNode,
+            "side_input",
+            PipelineNode.pCollection("side_read.out", sideInputPCollection));
+    assertThat(subgraph.getSideInputs(), contains(sideInputRef));
     assertThat(subgraph.getOutputPCollections(), emptyIterable());
   }
 
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
index 729f82ec548..6c156a62015 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStageTest.java
@@ -34,6 +34,7 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -75,12 +76,16 @@ public void ofFullComponentsOnlyHasStagePTransforms() 
throws Exception {
             .putEnvironments("foo", env)
             .build();
 
+    PTransformNode transformNode = PipelineNode.pTransform("pt", pt);
+    SideInputReference sideInputRef =
+        SideInputReference.of(
+            transformNode, "side_input", 
PipelineNode.pCollection("sideInput.in", sideInput));
     ImmutableExecutableStage stage =
         ImmutableExecutableStage.ofFullComponents(
             components,
             env,
             PipelineNode.pCollection("input.out", input),
-            Collections.singleton(PipelineNode.pCollection("sideInput.in", 
sideInput)),
+            Collections.singleton(sideInputRef),
             Collections.singleton(PipelineNode.pTransform("pt", pt)),
             Collections.singleton(PipelineNode.pCollection("output.out", 
output)));
 
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
index dd0bfb599c9..691e768c7ce 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/QueryablePipelineTest.java
@@ -30,6 +30,7 @@
 
 import com.google.common.collect.ImmutableSet;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -200,21 +201,26 @@ public void transformWithSideAndMainInputs() {
                 .values());
     PCollectionNode mainInput =
         PipelineNode.pCollection(mainInputName, 
components.getPcollectionsOrThrow(mainInputName));
-    String sideInputName =
+    PTransform parDoTransform = components.getTransformsOrThrow("par_do");
+    String sideInputLocalName =
         getOnlyElement(
-            components
-                .getTransformsOrThrow("par_do")
+            parDoTransform
                 .getInputsMap()
-                .values()
+                .entrySet()
                 .stream()
-                .filter(pcollectionName -> 
!pcollectionName.equals(mainInputName))
+                .filter(entry -> !entry.getValue().equals(mainInputName))
+                .map(Map.Entry::getKey)
                 .collect(Collectors.toSet()));
+    String sideInputCollectionId = 
parDoTransform.getInputsOrThrow(sideInputLocalName);
     PCollectionNode sideInput =
-        PipelineNode.pCollection(sideInputName, 
components.getPcollectionsOrThrow(sideInputName));
+        PipelineNode.pCollection(
+            sideInputCollectionId, 
components.getPcollectionsOrThrow(sideInputCollectionId));
     PTransformNode parDoNode =
         PipelineNode.pTransform("par_do", 
components.getTransformsOrThrow("par_do"));
+    SideInputReference sideInputRef =
+        SideInputReference.of(parDoNode, sideInputLocalName, sideInput);
 
-    assertThat(qp.getSideInputs(parDoNode), contains(sideInput));
+    assertThat(qp.getSideInputs(parDoNode), contains(sideInputRef));
     assertThat(qp.getPerElementConsumers(mainInput), contains(parDoNode));
     assertThat(qp.getPerElementConsumers(sideInput), not(contains(parDoNode)));
   }
@@ -254,8 +260,10 @@ public void transformWithSameSideAndMainInput() {
         PipelineNode.pCollection("read_pc", 
components.getPcollectionsOrThrow("read_pc"));
     PTransformNode multiConsumerPT =
         PipelineNode.pTransform("multiConsumer", 
components.getTransformsOrThrow("multiConsumer"));
+    SideInputReference sideInputRef =
+        SideInputReference.of(multiConsumerPT, "side_in", multiInputPc);
     assertThat(qp.getPerElementConsumers(multiInputPc), 
contains(multiConsumerPT));
-    assertThat(qp.getSideInputs(multiConsumerPT), contains(multiInputPc));
+    assertThat(qp.getSideInputs(multiConsumerPT), contains(sideInputRef));
   }
 
   /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 91430)
    Time Spent: 2h 20m  (was: 2h 10m)

> Identify Side Inputs by PTransform ID and local name
> ----------------------------------------------------
>
>                 Key: BEAM-4056
>                 URL: https://issues.apache.org/jira/browse/BEAM-4056
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-core
>            Reporter: Ben Sidhom
>            Assignee: Ben Sidhom
>            Priority: Minor
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> This is necessary in order to correctly identify side inputs during all 
> phases of portable pipeline execution (fusion, translation, and SDK 
> execution).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to