[ 
https://issues.apache.org/jira/browse/BEAM-6195?focusedWorklogId=173119&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173119
 ]

ASF GitHub Bot logged work on BEAM-6195:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Dec/18 00:14
            Start Date: 08/Dec/18 00:14
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #7223: [BEAM-6195] Make 
ProcessRemoteBundleOperation map PCollectionId into correct Outp…
URL: https://github.com/apache/beam/pull/7223
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index f0e8ddb73456..00504d21e394 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -97,7 +97,7 @@
    */
   private static final Function<MapTask, MutableNetwork<Node, Edge>> 
mapTaskToBaseNetwork =
       new FixMultiOutputInfosOnParDoInstructions(idGenerator)
-          .andThen(new MapTaskToNetworkFunction());
+          .andThen(new MapTaskToNetworkFunction(idGenerator));
 
   /** Registry of known {@link ReaderFactory ReaderFactories}. */
   private final ReaderRegistry readerRegistry = 
ReaderRegistry.defaultRegistry();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 7db53391fc57..53bc1156560b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -37,11 +37,11 @@
 import com.google.common.graph.Network;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.core.ElementByteSizeObservable;
@@ -358,19 +358,20 @@ public Node typedApply(ExecutableStageNode input) {
         Iterable<OutputReceiverNode> outputReceiverNodes =
             Iterables.filter(network.successors(input), 
OutputReceiverNode.class);
 
-        OutputReceiver[] outputReceivers = new 
OutputReceiver[Iterables.size(outputReceiverNodes)];
+        Map<String, OutputReceiver> outputReceiverMap = new HashMap<>();
         Lists.newArrayList(outputReceiverNodes)
             .stream()
-            .map(outputReceiverNode -> outputReceiverNode.getOutputReceiver())
-            .collect(Collectors.toList())
-            .toArray(outputReceivers);
-
+            .forEach(
+                outputReceiverNode ->
+                    outputReceiverMap.put(
+                        outputReceiverNode.getPcollectionId(),
+                        outputReceiverNode.getOutputReceiver()));
         return OperationNode.create(
             new ProcessRemoteBundleOperation(
                 executionContext.createOperationContext(
                     NameContext.create(stageName, stageName, stageName, 
stageName)),
                 stageBundleFactory,
-                outputReceivers));
+                outputReceiverMap));
       }
     };
   }
@@ -687,7 +688,7 @@ public Node typedApply(InstructionOutputNode input) {
                     cloudOutput.getName()));
         outputReceiver.addOutputCounter(outputCounter);
 
-        return OutputReceiverNode.create(outputReceiver, coder);
+        return OutputReceiverNode.create(outputReceiver, coder, 
input.getPcollectionId());
       }
     };
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
index 4ba66ba56ceb..6285dfd198a9 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
@@ -357,7 +357,7 @@ public Node typedApply(InstructionOutputNode input) {
                     cloudOutput.getName()));
         outputReceiver.addOutputCounter(outputCounter);
 
-        return OutputReceiverNode.create(outputReceiver, coder);
+        return OutputReceiverNode.create(outputReceiver, coder, 
input.getPcollectionId());
       }
     };
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index d6de9071d743..f32fa74614b7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -162,7 +162,7 @@
    * </ul>
    */
   private static final Function<MapTask, MutableNetwork<Node, Edge>> 
mapTaskToBaseNetwork =
-      new MapTaskToNetworkFunction();
+      new MapTaskToNetworkFunction(idGenerator);
 
   // Maximum number of threads for processing.  Currently each thread 
processes one key at a time.
   static final int MAX_PROCESSING_THREADS = 300;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
index 3d44331dd5e5..943954976706 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/control/ProcessRemoteBundleOperation.java
@@ -19,6 +19,7 @@
 
 import com.google.common.collect.Iterables;
 import java.io.Closeable;
+import java.util.Map;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OperationContext;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ReceivingOperation;
@@ -42,15 +43,15 @@
 public class ProcessRemoteBundleOperation<InputT> extends ReceivingOperation {
   private static final Logger LOG = 
LoggerFactory.getLogger(ProcessRemoteBundleOperation.class);
   private final StageBundleFactory stageBundleFactory;
+  private static final OutputReceiver[] EMPTY_RECEIVER_ARRAY = new 
OutputReceiver[0];
+  private final Map<String, OutputReceiver> outputReceiverMap;
   private final OutputReceiverFactory receiverFactory =
       new OutputReceiverFactory() {
         @Override
         public FnDataReceiver<?> create(String pCollectionId) {
           return receivedElement -> {
-            for (OutputReceiver receiver : receivers) {
-              LOG.debug("Consume element {}", receivedElement);
-              receiver.process((WindowedValue<?>) receivedElement);
-            }
+            LOG.debug("Consume element {}", receivedElement);
+            outputReceiverMap.get(pCollectionId).process((WindowedValue<?>) 
receivedElement);
           };
         }
       };
@@ -59,11 +60,14 @@
   private RemoteBundle remoteBundle;
 
   public ProcessRemoteBundleOperation(
-      OperationContext context, StageBundleFactory stageBundleFactory, 
OutputReceiver[] receivers) {
-    super(receivers, context);
+      OperationContext context,
+      StageBundleFactory stageBundleFactory,
+      Map<String, OutputReceiver> outputReceiverMap) {
+    super(EMPTY_RECEIVER_ARRAY, context);
     this.stageBundleFactory = stageBundleFactory;
-    stateRequestHandler = StateRequestHandler.unsupported();
-    progressHandler = BundleProgressHandler.ignored();
+    this.outputReceiverMap = outputReceiverMap;
+    this.stateRequestHandler = StateRequestHandler.unsupported();
+    this.progressHandler = BundleProgressHandler.ignored();
   }
 
   @Override
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunction.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunction.java
index 7b356e877ce5..04edc4e8acf4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunction.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunction.java
@@ -89,21 +89,24 @@
    */
   private void cloneFlatten(Node flatten, MutableNetwork<Node, Edge> network) {
     // Start by creating the clones of the flatten and its PCollection.
-    Node flattenOut = Iterables.getOnlyElement(network.successors(flatten));
+    InstructionOutputNode flattenOut =
+        (InstructionOutputNode) 
Iterables.getOnlyElement(network.successors(flatten));
     ParallelInstruction flattenInstruction =
         ((ParallelInstructionNode) flatten).getParallelInstruction();
 
     Node runnerFlatten =
         ParallelInstructionNode.create(flattenInstruction, 
ExecutionLocation.RUNNER_HARNESS);
     Node runnerFlattenOut =
-        InstructionOutputNode.create(((InstructionOutputNode) 
flattenOut).getInstructionOutput());
+        InstructionOutputNode.create(
+            flattenOut.getInstructionOutput(), flattenOut.getPcollectionId());
     network.addNode(runnerFlatten);
     network.addNode(runnerFlattenOut);
 
     Node sdkFlatten =
         ParallelInstructionNode.create(flattenInstruction, 
ExecutionLocation.SDK_HARNESS);
     Node sdkFlattenOut =
-        InstructionOutputNode.create(((InstructionOutputNode) 
flattenOut).getInstructionOutput());
+        InstructionOutputNode.create(
+            flattenOut.getInstructionOutput(), flattenOut.getPcollectionId());
     network.addNode(sdkFlatten);
     network.addNode(sdkFlattenOut);
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index ab8cd0b8596a..bc70914a45a6 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -226,7 +226,7 @@ public Node apply(MutableNetwork<Node, Edge> input) {
             e);
       }
 
-      String pcollectionId = "generatedPcollection" + idGenerator.getId();
+      String pcollectionId = node.getPcollectionId();
       RunnerApi.PCollection pCollection =
           RunnerApi.PCollection.newBuilder()
               .setCoderId(coderId)
@@ -350,6 +350,10 @@ public Node apply(MutableNetwork<Node, Edge> input) {
       executableStageTransforms.add(PipelineNode.pTransform(ptransformId, 
pTransform.build()));
     }
 
+    if (executableStageInputs.size() != 1) {
+      throw new UnsupportedOperationException("ExecutableStage only support 
one input PCollection");
+    }
+
     PCollectionNode executableInput = executableStageInputs.iterator().next();
     RunnerApi.Components executableStageComponents = componentsBuilder.build();
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
index 5a1dffbdc6a9..996a3e2a3d37 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunction.java
@@ -261,9 +261,11 @@ private Node rewireAcrossSdkRunnerPortNode(
       Set<Node> successors) {
 
     InstructionOutputNode newPredecessorOutputNode =
-        InstructionOutputNode.create(outputNode.getInstructionOutput());
+        InstructionOutputNode.create(
+            outputNode.getInstructionOutput(), outputNode.getPcollectionId());
     InstructionOutputNode portOutputNode =
-        InstructionOutputNode.create(outputNode.getInstructionOutput());
+        InstructionOutputNode.create(
+            outputNode.getInstructionOutput(), outputNode.getPcollectionId());
     String predecessorPortEdgeId = idGenerator.getId();
     String successorPortEdgeId = idGenerator.getId();
     Node portNode = portSupplier.apply(predecessorPortEdgeId, 
successorPortEdgeId);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodes.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodes.java
index fab79af364ec..280362140e45 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodes.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodes.java
@@ -158,7 +158,8 @@ private 
InsertFetchAndFilterStreamingSideInputNodes(RunnerApi.Pipeline pipeline)
       InstructionOutputNode predecessor =
           (InstructionOutputNode) network.incidentNodes(mainInput).source();
       InstructionOutputNode predecessorCopy =
-          InstructionOutputNode.create(predecessor.getInstructionOutput());
+          InstructionOutputNode.create(
+              predecessor.getInstructionOutput(), 
predecessor.getPcollectionId());
       network.removeEdge(mainInput);
       network.addNode(streamingSideInputWindowHandlerNode);
       network.addNode(predecessorCopy);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
index 398037e3373c..04874388a1bf 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCoders.java
@@ -153,7 +153,7 @@ public InstructionOutputNode 
typedApply(InstructionOutputNode input) {
                 e);
           }
         }
-        return InstructionOutputNode.create(cloudOutput);
+        return InstructionOutputNode.create(cloudOutput, 
input.getPcollectionId());
       }
     };
   }
@@ -179,7 +179,7 @@ public Node typedApply(InstructionOutputNode input) {
                   input.getInstructionOutput()),
               e);
         }
-        return InstructionOutputNode.create(instructionOutput);
+        return InstructionOutputNode.create(instructionOutput, 
input.getPcollectionId());
       }
     };
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunction.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunction.java
index 0856cc833392..b9212bc23547 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunction.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunction.java
@@ -36,6 +36,7 @@
 import 
org.apache.beam.runners.dataflow.worker.graph.Nodes.InstructionOutputNode;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
 import 
org.apache.beam.runners.dataflow.worker.graph.Nodes.ParallelInstructionNode;
+import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.util.Transport;
 
 /**
@@ -63,6 +64,12 @@ private static ParallelInstruction clone(JsonFactory 
factory, ParallelInstructio
     }
   }
 
+  private final IdGenerator idGenerator;
+
+  public MapTaskToNetworkFunction(IdGenerator idGenerator) {
+    this.idGenerator = idGenerator;
+  }
+
   @Override
   public MutableNetwork<Node, Edge> apply(MapTask mapTask) {
     List<ParallelInstruction> parallelInstructions = 
Apiary.listOrEmpty(mapTask.getInstructions());
@@ -98,7 +105,9 @@ private static ParallelInstruction clone(JsonFactory 
factory, ParallelInstructio
       // Connect the instruction node output to the output PCollection node
       for (int j = 0; j < outputs.size(); ++j) {
         InstructionOutput instructionOutput = outputs.get(j);
-        InstructionOutputNode outputNode = 
InstructionOutputNode.create(instructionOutput);
+        InstructionOutputNode outputNode =
+            InstructionOutputNode.create(
+                instructionOutput, "generatedPcollection" + 
this.idGenerator.getId());
         network.addNode(outputNode);
         if (parallelInstruction.getParDo() != null) {
           network.addEdge(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
index 5d058a97628d..a328e2286b09 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/Nodes.java
@@ -206,20 +206,25 @@ public String toString() {
     public abstract ExecutionLocation getExecutionLocation();
   }
 
-  /** A node that stores {@link InstructionOutput}s. */
+  /** A node that stores {@link InstructionOutput}s with the corresponding . */
   @AutoValue
   public abstract static class InstructionOutputNode extends Node {
-    public static InstructionOutputNode create(InstructionOutput 
instructionOutput) {
+    public static InstructionOutputNode create(
+        InstructionOutput instructionOutput, String pcollectionId) {
       checkNotNull(instructionOutput);
-      return new AutoValue_Nodes_InstructionOutputNode(instructionOutput);
+      checkNotNull(pcollectionId);
+      return new AutoValue_Nodes_InstructionOutputNode(instructionOutput, 
pcollectionId);
     }
 
     public abstract InstructionOutput getInstructionOutput();
 
+    public abstract String getPcollectionId();
+
     @Override
     public String toString() {
       return MoreObjects.toStringHelper(this)
           .add("instructionOutput", 
toStringWithTrimmedLiterals(getInstructionOutput()))
+          .add("pcollectionId", getPcollectionId())
           .toString();
     }
   }
@@ -227,14 +232,18 @@ public String toString() {
   /** A node that stores {@link OutputReceiver}s. */
   @AutoValue
   public abstract static class OutputReceiverNode extends Node {
-    public static OutputReceiverNode create(OutputReceiver outputReceiver, 
Coder<?> coder) {
+    public static OutputReceiverNode create(
+        OutputReceiver outputReceiver, Coder<?> coder, String pcollectionId) {
       checkNotNull(outputReceiver);
-      return new AutoValue_Nodes_OutputReceiverNode(outputReceiver, coder);
+      checkNotNull(pcollectionId);
+      return new AutoValue_Nodes_OutputReceiverNode(outputReceiver, coder, 
pcollectionId);
     }
 
     public abstract OutputReceiver getOutputReceiver();
 
     public abstract Coder<?> getCoder();
+
+    public abstract String getPcollectionId();
   }
 
   /** A node that stores {@link Operation}s. */
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
index 6b12b219bd0e..6b062f1dcab7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactoryTest.java
@@ -120,7 +120,7 @@
 
   private static final Function<MapTask, MutableNetwork<Node, Edge>> 
mapTaskToNetwork =
       new FixMultiOutputInfosOnParDoInstructions(idGenerator)
-          .andThen(new MapTaskToNetworkFunction());
+          .andThen(new MapTaskToNetworkFunction(idGenerator));
 
   private static final CloudObject windowedStringCoder =
       CloudObjects.asCloudObject(
@@ -130,6 +130,8 @@
   private PipelineOptions options;
   private ReaderRegistry readerRegistry;
   private SinkRegistry sinkRegistry;
+  private static final String PCOLLECTION_ID = "fakeId";
+
   @Mock private Network<Node, Edge> network;
   @Mock private CounterUpdateExtractor<?> updateExtractor;
 
@@ -326,7 +328,8 @@ public void testCreateReadOperation() throws Exception {
                 
IntrinsicMapTaskExecutorFactory.createOutputReceiversTransform(STAGE, 
counterSet)
                     .apply(
                         InstructionOutputNode.create(
-                            
instructionNode.getParallelInstruction().getOutputs().get(0)))));
+                            
instructionNode.getParallelInstruction().getOutputs().get(0),
+                            PCOLLECTION_ID))));
     when(network.outDegree(instructionNode)).thenReturn(1);
 
     Node operationNode =
@@ -524,7 +527,7 @@ public void testCreateParDoOperation() throws Exception {
         IntrinsicMapTaskExecutorFactory.createOutputReceiversTransform(STAGE, 
counterSet)
             .apply(
                 InstructionOutputNode.create(
-                    
instructionNode.getParallelInstruction().getOutputs().get(0)));
+                    
instructionNode.getParallelInstruction().getOutputs().get(0), PCOLLECTION_ID));
 
     
when(network.successors(instructionNode)).thenReturn(ImmutableSet.of(outputReceiverNode));
     when(network.outDegree(instructionNode)).thenReturn(1);
@@ -601,7 +604,8 @@ public void testCreatePartialGroupByKeyOperation() throws 
Exception {
                 
IntrinsicMapTaskExecutorFactory.createOutputReceiversTransform(STAGE, 
counterSet)
                     .apply(
                         InstructionOutputNode.create(
-                            
instructionNode.getParallelInstruction().getOutputs().get(0)))));
+                            
instructionNode.getParallelInstruction().getOutputs().get(0),
+                            PCOLLECTION_ID))));
     when(network.outDegree(instructionNode)).thenReturn(1);
 
     Node operationNode =
@@ -652,7 +656,8 @@ public void 
testCreatePartialGroupByKeyOperationWithCombine() throws Exception {
                 
IntrinsicMapTaskExecutorFactory.createOutputReceiversTransform(STAGE, 
counterSet)
                     .apply(
                         InstructionOutputNode.create(
-                            
instructionNode.getParallelInstruction().getOutputs().get(0)))));
+                            
instructionNode.getParallelInstruction().getOutputs().get(0),
+                            PCOLLECTION_ID))));
     when(network.outDegree(instructionNode)).thenReturn(1);
 
     Node operationNode =
@@ -729,7 +734,8 @@ public void testCreateFlattenOperation() throws Exception {
                 
IntrinsicMapTaskExecutorFactory.createOutputReceiversTransform(STAGE, 
counterSet)
                     .apply(
                         InstructionOutputNode.create(
-                            
instructionNode.getParallelInstruction().getOutputs().get(0)))));
+                            
instructionNode.getParallelInstruction().getOutputs().get(0),
+                            PCOLLECTION_ID))));
     when(network.outDegree(instructionNode)).thenReturn(1);
 
     Node operationNode =
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunctionTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunctionTest.java
index b74a287161fd..04f1214456e5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunctionTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CloneAmbiguousFlattensFunctionTest.java
@@ -382,7 +382,7 @@ private static ParallelInstructionNode createFlatten(String 
name, ExecutionLocat
 
   /** Creates an {@link InstructionOutputNode} to act as a PCollection. */
   private static InstructionOutputNode createPCollection(String name) {
-    return InstructionOutputNode.create(new InstructionOutput().setName(name));
+    return InstructionOutputNode.create(new InstructionOutput().setName(name), 
"fakeId");
   }
 
   /** Creates a {@link NoLocationNode} to use for testing nodes that have no 
ExecutionLocation */
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
index 87d4035b9ab2..ec24fd7f87ec 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/CreateRegisterFnOperationFunctionTest.java
@@ -535,7 +535,7 @@ private static ParallelInstructionNode createParDoNode(
   }
 
   private static InstructionOutputNode createInstructionOutputNode(String 
name) {
-    return InstructionOutputNode.create(new InstructionOutput().setName(name));
+    return InstructionOutputNode.create(new InstructionOutput().setName(name), 
"fakeId");
   }
 
   /** A named node to easily differentiate graph construction problems during 
testing. */
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceFlattenLocationsFunctionTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceFlattenLocationsFunctionTest.java
index 44fd072f7870..c1c5df040f2b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceFlattenLocationsFunctionTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceFlattenLocationsFunctionTest.java
@@ -381,7 +381,7 @@ private static ParallelInstructionNode createFlatten(String 
name) {
 
   /** Creates an {@link InstructionOutputNode} to act as a PCollection. */
   private static InstructionOutputNode createPCollection(String name) {
-    return InstructionOutputNode.create(new InstructionOutput().setName(name));
+    return InstructionOutputNode.create(new InstructionOutput().setName(name), 
"fakeID");
   }
 
   private static ExecutionLocation getExecutionLocationOf(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceNodeLocationsFunctionTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceNodeLocationsFunctionTest.java
index 4717cca84a34..7d8177de49c8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceNodeLocationsFunctionTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/DeduceNodeLocationsFunctionTest.java
@@ -198,9 +198,9 @@ public void testGraphWithNonDeducibleNodes() throws 
Exception {
     //               --> Flatten --> D
     // B --> out2 --/-->C
     Node a = createReadNode("A", CUSTOM_SOURCE);
-    Node out1 = InstructionOutputNode.create(new InstructionOutput());
+    Node out1 = InstructionOutputNode.create(new InstructionOutput(), 
"fakeId");
     Node b = createReadNode("B", RUNNER_SOURCE);
-    Node out2 = InstructionOutputNode.create(new InstructionOutput());
+    Node out2 = InstructionOutputNode.create(new InstructionOutput(), 
"fakeId");
     Node c = createParDoNode("C", "RunnerDoFn");
     Node flatten =
         ParallelInstructionNode.create(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodesTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodesTest.java
index 6f727eef0030..8d39fc3d898f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodesTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/InsertFetchAndFilterStreamingSideInputNodesTest.java
@@ -93,7 +93,8 @@ public void testSdkParDoWithSideInput() throws Exception {
     RunnerApi.Pipeline pipeline = PipelineTranslation.toProto(p);
 
     Node predecessor = createParDoNode("predecessor");
-    InstructionOutputNode mainInput = InstructionOutputNode.create(new 
InstructionOutput());
+    InstructionOutputNode mainInput =
+        InstructionOutputNode.create(new InstructionOutput(), "fakeId");
     Node sideInputParDo = createParDoNode(findParDoWithSideInput(pipeline));
 
     MutableNetwork<Node, Edge> network = createEmptyNetwork();
@@ -106,7 +107,7 @@ public void testSdkParDoWithSideInput() throws Exception {
     Network<Node, Edge> inputNetwork = ImmutableNetwork.copyOf(network);
     network = 
InsertFetchAndFilterStreamingSideInputNodes.with(pipeline).forNetwork(network);
 
-    Node mainInputClone = 
InstructionOutputNode.create(mainInput.getInstructionOutput());
+    Node mainInputClone = 
InstructionOutputNode.create(mainInput.getInstructionOutput(), "fakeId");
     Node fetchAndFilter =
         FetchAndFilterStreamingSideInputsNode.create(
             pcView.getWindowingStrategyInternal(),
@@ -139,7 +140,7 @@ public void testSdkParDoWithoutSideInput() throws Exception 
{
     RunnerApi.Pipeline pipeline = PipelineTranslation.toProto(p);
 
     Node predecessor = createParDoNode("predecessor");
-    Node mainInput = InstructionOutputNode.create(new InstructionOutput());
+    Node mainInput = InstructionOutputNode.create(new InstructionOutput(), 
"fakeId");
     Node sideInputParDo = createParDoNode("noSideInput");
 
     MutableNetwork<Node, Edge> network = createEmptyNetwork();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
index 661e3941f9b3..0ee0d4551733 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/LengthPrefixUnknownCodersTest.java
@@ -367,6 +367,6 @@ private static InstructionOutputNode 
createInstructionOutputNode(String name, Co
             .setName(name)
             .setCodec(CloudObjects.asCloudObject(coder, /*sdkComponents=*/ 
null));
     instructionOutput.setFactory(new JacksonFactory());
-    return InstructionOutputNode.create(instructionOutput);
+    return InstructionOutputNode.create(instructionOutput, "fakeId");
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunctionTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunctionTest.java
index 19b45566a567..8daa4943c6a3 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunctionTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/MapTaskToNetworkFunctionTest.java
@@ -47,6 +47,7 @@
 import 
org.apache.beam.runners.dataflow.worker.graph.Nodes.InstructionOutputNode;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
 import 
org.apache.beam.runners.dataflow.worker.graph.Nodes.ParallelInstructionNode;
+import org.apache.beam.sdk.fn.IdGenerators;
 import org.apache.beam.sdk.util.Transport;
 import org.hamcrest.Matchers;
 import org.junit.Test;
@@ -58,7 +59,8 @@
 public class MapTaskToNetworkFunctionTest {
   @Test
   public void testEmptyMapTask() {
-    Network<Node, Edge> network = new MapTaskToNetworkFunction().apply(new 
MapTask());
+    Network<Node, Edge> network =
+        new 
MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(new MapTask());
     assertTrue(network.isDirected());
     assertTrue(network.allowsParallelEdges());
     assertFalse(network.allowsSelfLoops());
@@ -75,7 +77,8 @@ public void testRead() {
     mapTask.setInstructions(ImmutableList.of(read));
     mapTask.setFactory(Transport.getJsonFactory());
 
-    Network<Node, Edge> network = new 
MapTaskToNetworkFunction().apply(mapTask);
+    Network<Node, Edge> network =
+        new 
MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
     assertNetworkProperties(network);
     assertEquals(2, network.nodes().size());
     assertEquals(1, network.edges().size());
@@ -103,7 +106,8 @@ public void testParDo() {
     mapTask.setInstructions(ImmutableList.of(read, parDo));
     mapTask.setFactory(Transport.getJsonFactory());
 
-    Network<Node, Edge> network = new 
MapTaskToNetworkFunction().apply(mapTask);
+    Network<Node, Edge> network =
+        new 
MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
     assertNetworkProperties(network);
     assertEquals(4, network.nodes().size());
     assertEquals(3, network.edges().size());
@@ -149,7 +153,8 @@ public void testFlatten() {
     mapTask.setInstructions(ImmutableList.of(readA, readB, flatten));
     mapTask.setFactory(Transport.getJsonFactory());
 
-    Network<Node, Edge> network = new 
MapTaskToNetworkFunction().apply(mapTask);
+    Network<Node, Edge> network =
+        new 
MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
     assertNetworkProperties(network);
     assertEquals(6, network.nodes().size());
     assertEquals(5, network.edges().size());
@@ -193,7 +198,8 @@ public void testParallelEdgeFlatten() {
     mapTask.setInstructions(ImmutableList.of(read, flatten));
     mapTask.setFactory(Transport.getJsonFactory());
 
-    Network<Node, Edge> network = new 
MapTaskToNetworkFunction().apply(mapTask);
+    Network<Node, Edge> network =
+        new 
MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
     assertNetworkProperties(network);
     assertEquals(4, network.nodes().size());
     assertEquals(5, network.edges().size());
@@ -225,7 +231,8 @@ public void testWrite() {
     mapTask.setInstructions(ImmutableList.of(read, write));
     mapTask.setFactory(Transport.getJsonFactory());
 
-    Network<Node, Edge> network = new 
MapTaskToNetworkFunction().apply(mapTask);
+    Network<Node, Edge> network =
+        new 
MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
     assertNetworkProperties(network);
     assertEquals(3, network.nodes().size());
     assertEquals(2, network.edges().size());
@@ -260,7 +267,8 @@ public void testPartialGroupByKey() {
     mapTask.setInstructions(ImmutableList.of(read, pgbk, write));
     mapTask.setFactory(Transport.getJsonFactory());
 
-    Network<Node, Edge> network = new 
MapTaskToNetworkFunction().apply(mapTask);
+    Network<Node, Edge> network =
+        new 
MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
     assertNetworkProperties(network);
     assertEquals(5, network.nodes().size());
     assertEquals(4, network.edges().size());
@@ -310,7 +318,8 @@ public void testMultipleOutput() {
     mapTask.setInstructions(ImmutableList.of(read, parDo, writeA, writeB));
     mapTask.setFactory(Transport.getJsonFactory());
 
-    Network<Node, Edge> network = new 
MapTaskToNetworkFunction().apply(mapTask);
+    Network<Node, Edge> network =
+        new 
MapTaskToNetworkFunction(IdGenerators.decrementingLongs()).apply(mapTask);
     assertNetworkProperties(network);
     assertEquals(7, network.nodes().size());
     assertEquals(6, network.edges().size());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
index 6da18d7ecce0..b27520dbb89d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/NodesTest.java
@@ -57,6 +57,8 @@
 /** Tests for {@link Nodes}. */
 @RunWith(JUnit4.class)
 public class NodesTest {
+  private static final String PCOLLECTION_ID = "fakeId";
+
   @Test
   public void testParallelInstructionNode() {
     ParallelInstruction param = new ParallelInstruction();
@@ -73,18 +75,22 @@ public void testParallelInstructionNode() {
   @Test
   public void testInstructionOutputNode() {
     InstructionOutput param = new InstructionOutput();
-    assertSame(param, 
InstructionOutputNode.create(param).getInstructionOutput());
-    assertNotEquals(InstructionOutputNode.create(param), 
InstructionOutputNode.create(param));
+    assertSame(param, InstructionOutputNode.create(param, 
PCOLLECTION_ID).getInstructionOutput());
+    assertNotEquals(
+        InstructionOutputNode.create(param, PCOLLECTION_ID),
+        InstructionOutputNode.create(param, PCOLLECTION_ID));
   }
 
   @Test
   public void testOutputReceiverNode() {
     OutputReceiver receiver = new OutputReceiver();
     Coder<?> coder = StringUtf8Coder.of();
-    assertSame(receiver, OutputReceiverNode.create(receiver, 
coder).getOutputReceiver());
-    assertSame(coder, OutputReceiverNode.create(receiver, coder).getCoder());
+    assertSame(
+        receiver, OutputReceiverNode.create(receiver, coder, 
PCOLLECTION_ID).getOutputReceiver());
+    assertSame(coder, OutputReceiverNode.create(receiver, coder, 
PCOLLECTION_ID).getCoder());
     assertNotEquals(
-        OutputReceiverNode.create(receiver, coder), 
OutputReceiverNode.create(receiver, coder));
+        OutputReceiverNode.create(receiver, coder, PCOLLECTION_ID),
+        OutputReceiverNode.create(receiver, coder, PCOLLECTION_ID));
   }
 
   @Test
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/RemoveFlattenInstructionsFunctionTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/RemoveFlattenInstructionsFunctionTest.java
index bfe39831c982..b00fb2967ec6 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/RemoveFlattenInstructionsFunctionTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/RemoveFlattenInstructionsFunctionTest.java
@@ -46,6 +46,8 @@
 /** Tests for {@link RemoveFlattenInstructionsFunction}. */
 @RunWith(JUnit4.class)
 public class RemoveFlattenInstructionsFunctionTest {
+  private static final String PCOLLECTION_ID = "fakeId";
+
   @Test
   public void testEmptyNetwork() {
     assertTrue(
@@ -59,24 +61,28 @@ public void testRemoveFlatten() {
     Node a =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("A"), 
Nodes.ExecutionLocation.UNKNOWN);
-    Node aPCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("A.out"));
+    Node aPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("A.out"), 
PCOLLECTION_ID);
     Edge aOutput = DefaultEdge.create();
     Node b =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("B"), 
Nodes.ExecutionLocation.UNKNOWN);
     Edge bOutput = DefaultEdge.create();
-    Node bPCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("B.out"));
+    Node bPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("B.out"), 
PCOLLECTION_ID);
     Node flatten =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("Flatten").setFlatten(new 
FlattenInstruction()),
             Nodes.ExecutionLocation.UNKNOWN);
     Node flattenPCollection =
-        InstructionOutputNode.create(new 
InstructionOutput().setName("Flatten.out"));
+        InstructionOutputNode.create(
+            new InstructionOutput().setName("Flatten.out"), PCOLLECTION_ID);
     Node c =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("C"), 
Nodes.ExecutionLocation.UNKNOWN);
     Edge cOutput = DefaultEdge.create();
-    Node cPCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("C.out"));
+    Node cPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("C.out"), 
PCOLLECTION_ID);
 
     // A --\
     //      Flatten --> C
@@ -109,9 +115,12 @@ public void testRemoveFlattenOnMultiOutputInstruction() {
     Node a =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("A"), 
Nodes.ExecutionLocation.UNKNOWN);
-    Node aOut1PCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("A.out1"));
-    Node aOut2PCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("A.out2"));
-    Node aOut3PCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("A.out3"));
+    Node aOut1PCollection =
+        InstructionOutputNode.create(new 
InstructionOutput().setName("A.out1"), PCOLLECTION_ID);
+    Node aOut2PCollection =
+        InstructionOutputNode.create(new 
InstructionOutput().setName("A.out2"), PCOLLECTION_ID);
+    Node aOut3PCollection =
+        InstructionOutputNode.create(new 
InstructionOutput().setName("A.out3"), PCOLLECTION_ID);
     Edge aOut1 = MultiOutputInfoEdge.create(new 
MultiOutputInfo().setTag("out1"));
     Edge aOut2 = MultiOutputInfoEdge.create(new 
MultiOutputInfo().setTag("out2"));
     Edge aOut3 = MultiOutputInfoEdge.create(new 
MultiOutputInfo().setTag("out3"));
@@ -119,8 +128,10 @@ public void testRemoveFlattenOnMultiOutputInstruction() {
     Node b =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("B"), 
Nodes.ExecutionLocation.UNKNOWN);
-    Node bOut1PCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("B.out1"));
-    Node bOut2PCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("B.out1"));
+    Node bOut1PCollection =
+        InstructionOutputNode.create(new 
InstructionOutput().setName("B.out1"), PCOLLECTION_ID);
+    Node bOut2PCollection =
+        InstructionOutputNode.create(new 
InstructionOutput().setName("B.out1"), PCOLLECTION_ID);
     Edge bOut1 = MultiOutputInfoEdge.create(new 
MultiOutputInfo().setTag("out1"));
     Edge bOut2 = MultiOutputInfoEdge.create(new 
MultiOutputInfo().setTag("out2"));
     Edge bOut1PCollectionEdge = DefaultEdge.create();
@@ -129,22 +140,26 @@ public void testRemoveFlattenOnMultiOutputInstruction() {
             new ParallelInstruction().setName("Flatten").setFlatten(new 
FlattenInstruction()),
             Nodes.ExecutionLocation.UNKNOWN);
     Node flattenPCollection =
-        InstructionOutputNode.create(new 
InstructionOutput().setName("Flatten.out"));
+        InstructionOutputNode.create(
+            new InstructionOutput().setName("Flatten.out"), PCOLLECTION_ID);
     Node c =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("C"), 
Nodes.ExecutionLocation.UNKNOWN);
     Edge cOutput = DefaultEdge.create();
-    Node cPCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("C.out"));
+    Node cPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("C.out"), 
PCOLLECTION_ID);
     Node d =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("D"), 
Nodes.ExecutionLocation.UNKNOWN);
     Edge dOutput = DefaultEdge.create();
-    Node dPCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("D.out"));
+    Node dPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("D.out"), 
PCOLLECTION_ID);
     Node e =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("E"), 
Nodes.ExecutionLocation.UNKNOWN);
     Edge eOutput = DefaultEdge.create();
-    Node ePCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("E.out"));
+    Node ePCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("E.out"), 
PCOLLECTION_ID);
 
     //  /-out1-> C
     // A -out2-\
@@ -196,13 +211,16 @@ public void 
testMultiLevelFlattenResultingInParallelEdges() {
     Node a =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("A"), 
Nodes.ExecutionLocation.UNKNOWN);
-    Node aPCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("A.out"));
+    Node aPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("A.out"), 
PCOLLECTION_ID);
     Edge aOutput = DefaultEdge.create();
     Node b =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("B"), 
Nodes.ExecutionLocation.UNKNOWN);
-    Node bOut1PCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("B.out1"));
-    Node bOut2PCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("B.out1"));
+    Node bOut1PCollection =
+        InstructionOutputNode.create(new 
InstructionOutput().setName("B.out1"), PCOLLECTION_ID);
+    Node bOut2PCollection =
+        InstructionOutputNode.create(new 
InstructionOutput().setName("B.out1"), PCOLLECTION_ID);
     Edge bOut1 = MultiOutputInfoEdge.create(new 
MultiOutputInfo().setTag("out1"));
     Edge bOut2 = MultiOutputInfoEdge.create(new 
MultiOutputInfo().setTag("out2"));
     Node flatten1 =
@@ -210,18 +228,21 @@ public void 
testMultiLevelFlattenResultingInParallelEdges() {
             new ParallelInstruction().setName("Flatten1").setFlatten(new 
FlattenInstruction()),
             Nodes.ExecutionLocation.UNKNOWN);
     Node flatten1PCollection =
-        InstructionOutputNode.create(new 
InstructionOutput().setName("Flatten1.out"));
+        InstructionOutputNode.create(
+            new InstructionOutput().setName("Flatten1.out"), PCOLLECTION_ID);
     Node flatten2 =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("Flatten2").setFlatten(new 
FlattenInstruction()),
             Nodes.ExecutionLocation.UNKNOWN);
     Node flatten2PCollection =
-        InstructionOutputNode.create(new 
InstructionOutput().setName("Flatten2.out"));
+        InstructionOutputNode.create(
+            new InstructionOutput().setName("Flatten2.out"), PCOLLECTION_ID);
     Node c =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("C"), 
Nodes.ExecutionLocation.UNKNOWN);
     Edge cOutput = DefaultEdge.create();
-    Node cPCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("C.out"));
+    Node cPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("C.out"), 
PCOLLECTION_ID);
 
     // A ------\
     //          Flatten1 --\
@@ -262,29 +283,34 @@ public void 
testFlattenMultiplePCollectionsHavingMultipleConsumers() {
     Node a =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("A"), 
Nodes.ExecutionLocation.UNKNOWN);
-    Node aPCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("A.out"));
+    Node aPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("A.out"), 
PCOLLECTION_ID);
     Edge aOutput = DefaultEdge.create();
     Node b =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("B"), 
Nodes.ExecutionLocation.UNKNOWN);
     Edge bOutput = DefaultEdge.create();
-    Node bPCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("B.out"));
+    Node bPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("B.out"), 
PCOLLECTION_ID);
     Node flatten =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("Flatten").setFlatten(new 
FlattenInstruction()),
             Nodes.ExecutionLocation.UNKNOWN);
     Node flattenPCollection =
-        InstructionOutputNode.create(new 
InstructionOutput().setName("Flatten.out"));
+        InstructionOutputNode.create(
+            new InstructionOutput().setName("Flatten.out"), PCOLLECTION_ID);
     Node c =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("C"), 
Nodes.ExecutionLocation.UNKNOWN);
     Edge cOutput = DefaultEdge.create();
-    Node cPCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("C.out"));
+    Node cPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("C.out"), 
PCOLLECTION_ID);
     Node d =
         ParallelInstructionNode.create(
             new ParallelInstruction().setName("D"), 
Nodes.ExecutionLocation.UNKNOWN);
     Edge dOutput = DefaultEdge.create();
-    Node dPCollection = InstructionOutputNode.create(new 
InstructionOutput().setName("D.out"));
+    Node dPCollection =
+        InstructionOutputNode.create(new InstructionOutput().setName("D.out"), 
PCOLLECTION_ID);
 
     // A --\
     //      -> Flatten --> C
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/ReplacePgbkWithPrecombineFunctionTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/ReplacePgbkWithPrecombineFunctionTest.java
index 801f4af62265..6f0b8264b862 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/ReplacePgbkWithPrecombineFunctionTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/graph/ReplacePgbkWithPrecombineFunctionTest.java
@@ -148,6 +148,6 @@ private static ParallelInstructionNode 
createPrecombinePgbkNode(
 
   /** Creates an {@link InstructionOutputNode} to act as a PCollection. */
   private static InstructionOutputNode createInstructionOutputNode(String 
name) {
-    return InstructionOutputNode.create(new InstructionOutput().setName(name));
+    return InstructionOutputNode.create(new InstructionOutput().setName(name), 
"fakeId");
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 173119)
    Time Spent: 20m  (was: 10m)

> Make OutputReceiver of ProcessRemoteBundleOperation map to correct 
> PCollectionId
> --------------------------------------------------------------------------------
>
>                 Key: BEAM-6195
>                 URL: https://issues.apache.org/jira/browse/BEAM-6195
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-dataflow
>            Reporter: Boyuan Zhang
>            Assignee: Boyuan Zhang
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to