[ 
https://issues.apache.org/jira/browse/BEAM-3895?focusedWorklogId=82914&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82914
 ]

ASF GitHub Bot logged work on BEAM-3895:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Mar/18 21:06
            Start Date: 21/Mar/18 21:06
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #4910: [BEAM-3895] Add Side 
Inputs to ExecutableStage
URL: https://github.com/apache/beam/pull/4910
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto 
b/model/pipeline/src/main/proto/beam_runner_api.proto
index 9fa301460fb..2a1d98b0a20 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -214,11 +214,14 @@ message ExecutableStagePayload {
   // Input PCollection id.
   string input = 2;
 
+  // Side Input PCollection ids.
+  repeated string side_inputs = 3;
+
   // PTransform ids contained within this executable stage.
-  repeated string transforms = 3;
+  repeated string transforms = 4;
 
   // Output PCollection ids.
-  repeated string outputs = 4;
+  repeated string outputs = 5;
 
 }
 
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
index e66148421fc..8958fbee8cd 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
@@ -64,6 +64,12 @@
    */
   PCollectionNode getInputPCollection();
 
+  /**
+   * Returns the set of {@link PCollectionNode PCollections} that will be 
accessed by this {@link
+   * ExecutableStage} as side inputs.
+   */
+  Collection<PCollectionNode> getSideInputPCollections();
+
   /**
    * Returns the leaf {@link PCollectionNode PCollections} of this {@link 
ExecutableStage}.
    *
@@ -107,6 +113,13 @@ default PTransform toPTransform() {
     pt.putInputs("input", getInputPCollection().getId());
     payload.setInput(input.getId());
 
+    int sideInputIndex = 0;
+    for (PCollectionNode sideInputNode : getSideInputPCollections()) {
+      pt.putInputs(String.format("side_input_%s", sideInputIndex), 
sideInputNode.getId());
+      payload.addSideInputs(sideInputNode.getId());
+      sideInputIndex++;
+    }
+
     int outputIndex = 0;
     for (PCollectionNode output : getOutputPCollections()) {
       pt.putOutputs(String.format("materialized_%d", outputIndex), 
output.getId());
@@ -124,7 +137,6 @@ default PTransform toPTransform() {
         .setUrn(ExecutableStage.URN)
         .setPayload(payload.build().toByteString())
         .build());
-
     return pt.build();
   }
 
@@ -139,14 +151,27 @@ default PTransform toPTransform() {
    */
   static ExecutableStage fromPayload(ExecutableStagePayload payload, 
Components components) {
     Environment environment = payload.getEnvironment();
-    PCollectionNode input = PipelineNode.pCollection(payload.getInput(),
-        components.getPcollectionsOrThrow(payload.getInput()));
-    List<PTransformNode> transforms = payload.getTransformsList().stream()
-        .map(id -> PipelineNode.pTransform(id, 
components.getTransformsOrThrow(id)))
-        .collect(Collectors.toList());
-    List<PCollectionNode> outputs = payload.getOutputsList().stream()
-        .map(id -> PipelineNode.pCollection(id, 
components.getPcollectionsOrThrow(id)))
-        .collect(Collectors.toList());
-    return ImmutableExecutableStage.of(environment, input, transforms, 
outputs);
+    PCollectionNode input =
+        PipelineNode.pCollection(
+            payload.getInput(), 
components.getPcollectionsOrThrow(payload.getInput()));
+    List<PCollectionNode> sideInputs =
+        payload
+            .getSideInputsList()
+            .stream()
+            .map(id -> PipelineNode.pCollection(id, 
components.getPcollectionsOrThrow(id)))
+            .collect(Collectors.toList());
+    List<PTransformNode> transforms =
+        payload
+            .getTransformsList()
+            .stream()
+            .map(id -> PipelineNode.pTransform(id, 
components.getTransformsOrThrow(id)))
+            .collect(Collectors.toList());
+    List<PCollectionNode> outputs =
+        payload
+            .getOutputsList()
+            .stream()
+            .map(id -> PipelineNode.pCollection(id, 
components.getPcollectionsOrThrow(id)))
+            .collect(Collectors.toList());
+    return ImmutableExecutableStage.of(environment, input, sideInputs, 
transforms, outputs);
   }
 }
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuser.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuser.java
index a617e969e15..836889ba610 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuser.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuser.java
@@ -79,12 +79,14 @@ public static ExecutableStage forGrpcPortRead(
     ImmutableSet.Builder<PTransformNode> fusedTransforms = 
ImmutableSet.builder();
     fusedTransforms.addAll(initialNodes);
 
+    Set<PCollectionNode> sideInputs = new LinkedHashSet<>();
     Set<PCollectionNode> fusedCollections = new LinkedHashSet<>();
     Set<PCollectionNode> materializedPCollections = new LinkedHashSet<>();
 
     Queue<PCollectionNode> fusionCandidates = new ArrayDeque<>();
     for (PTransformNode initialConsumer : initialNodes) {
       fusionCandidates.addAll(pipeline.getOutputPCollections(initialConsumer));
+      sideInputs.addAll(pipeline.getSideInputs(initialConsumer));
     }
     while (!fusionCandidates.isEmpty()) {
       PCollectionNode candidate = fusionCandidates.poll();
@@ -112,6 +114,7 @@ public static ExecutableStage forGrpcPortRead(
             // The outputs of every transform fused into this stage must be 
either materialized or
             // themselves fused away, so add them to the set of candidates.
             fusionCandidates.addAll(pipeline.getOutputPCollections(consumer));
+            sideInputs.addAll(pipeline.getSideInputs(consumer));
           }
           break;
         default:
@@ -125,8 +128,9 @@ public static ExecutableStage forGrpcPortRead(
     return ImmutableExecutableStage.of(
         environment,
         inputPCollection,
+        sideInputs,
         fusedTransforms.build(),
-        ImmutableSet.copyOf(materializedPCollections));
+        materializedPCollections);
   }
 
   private static Environment getStageEnvironment(
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
index 68fe5883eec..934675c1cb8 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ImmutableExecutableStage.java
@@ -31,10 +31,15 @@
   static ImmutableExecutableStage of(
       Environment environment,
       PCollectionNode input,
+      Collection<PCollectionNode> sideInputs,
       Collection<PTransformNode> transforms,
       Collection<PCollectionNode> outputs) {
     return new AutoValue_ImmutableExecutableStage(
-        environment, input, ImmutableSet.copyOf(transforms), 
ImmutableSet.copyOf(outputs));
+        environment,
+        input,
+        ImmutableSet.copyOf(sideInputs),
+        ImmutableSet.copyOf(transforms),
+        ImmutableSet.copyOf(outputs));
   }
 
   // Redefine the methods to have a known order.
@@ -44,6 +49,9 @@ static ImmutableExecutableStage of(
   @Override
   public abstract PCollectionNode getInputPCollection();
 
+  @Override
+  public abstract Collection<PCollectionNode> getSideInputPCollections();
+
   @Override
   public abstract Collection<PTransformNode> getTransforms();
 
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
index 5bbefc56c8a..80f2738c4ce 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.core.construction.graph;
 
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
@@ -35,6 +36,7 @@
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.junit.Test;
@@ -52,6 +54,7 @@ public void testRoundTripToFromTransform() throws Exception {
     PTransform pt =
         PTransform.newBuilder()
             .putInputs("input", "input.out")
+            .putInputs("side_input", "sideInput.in")
             .putOutputs("output", "output.out")
             .setSpec(
                 FunctionSpec.newBuilder()
@@ -59,16 +62,19 @@ public void testRoundTripToFromTransform() throws Exception 
{
                     .setPayload(
                         ParDoPayload.newBuilder()
                             
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("foo"))
+                            .putSideInputs("side_input", 
SideInput.getDefaultInstance())
                             .build()
                             .toByteString()))
             .build();
     PCollection input = 
PCollection.newBuilder().setUniqueName("input.out").build();
+    PCollection sideInput = 
PCollection.newBuilder().setUniqueName("sideInput.in").build();
     PCollection output = 
PCollection.newBuilder().setUniqueName("output.out").build();
 
     ImmutableExecutableStage stage =
         ImmutableExecutableStage.of(
             env,
             PipelineNode.pCollection("input.out", input),
+            Collections.singleton(PipelineNode.pCollection("sideInput.in", 
sideInput)),
             Collections.singleton(PipelineNode.pTransform("pt", pt)),
             Collections.singleton(PipelineNode.pCollection("output.out", 
output)));
 
@@ -76,6 +82,7 @@ public void testRoundTripToFromTransform() throws Exception {
         Components.newBuilder()
             .putTransforms("pt", pt)
             .putPcollections("input.out", input)
+            .putPcollections("sideInput.in", sideInput)
             .putPcollections("output.out", output)
             .putEnvironments("foo", env)
             .build();
@@ -83,8 +90,9 @@ public void testRoundTripToFromTransform() throws Exception {
     PTransform stagePTransform = stage.toPTransform();
     assertThat(stagePTransform.getOutputsMap(), hasValue("output.out"));
     assertThat(stagePTransform.getOutputsCount(), equalTo(1));
-    assertThat(stagePTransform.getInputsMap(), hasValue("input.out"));
-    assertThat(stagePTransform.getInputsCount(), equalTo(1));
+    assertThat(
+        stagePTransform.getInputsMap(), allOf(hasValue("input.out"), 
hasValue("sideInput.in")));
+    assertThat(stagePTransform.getInputsCount(), equalTo(2));
 
     ExecutableStagePayload payload = ExecutableStagePayload.parseFrom(
         stagePTransform.getSpec().getPayload());
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
index 09c37c6450b..ee64eca03e3 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
@@ -888,6 +888,71 @@ public void materializesWithSideInputConsumer() {
     assertThat(subgraph, hasSubtransforms(readNode.getId()));
   }
 
+  @Test
+  public void sideInputIncludedInStage() {
+    Environment env = Environment.newBuilder().setUrl("common").build();
+    PTransform readTransform =
+        PTransform.newBuilder()
+            .putInputs("input", "impulse.out")
+            .putOutputs("output", "read.out")
+            .setSpec(
+                FunctionSpec.newBuilder()
+                    .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+                    .setPayload(
+                        ParDoPayload.newBuilder()
+                            
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+                            .build()
+                            .toByteString()))
+            .build();
+
+    PTransform parDoTransform =
+        PTransform.newBuilder()
+            .putInputs("input", "read.out")
+            .putInputs("side_input", "side_read.out")
+            .putOutputs("output", "parDo.out")
+            .setSpec(
+                FunctionSpec.newBuilder()
+                    .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+                    .setPayload(
+                        ParDoPayload.newBuilder()
+                            
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+                            .putSideInputs("side_input", 
SideInput.getDefaultInstance())
+                            .build()
+                            .toByteString()))
+            .build();
+    PCollection sideInputPCollection =
+        PCollection.newBuilder().setUniqueName("side_read.out").build();
+    QueryablePipeline p =
+        QueryablePipeline.forPrimitivesIn(
+            partialComponents
+                .toBuilder()
+                .putTransforms("read", readTransform)
+                .putPcollections(
+                    "read.out", 
PCollection.newBuilder().setUniqueName("read.out").build())
+                .putTransforms(
+                    "side_read",
+                    PTransform.newBuilder()
+                        .putInputs("input", "impulse.out")
+                        .putOutputs("output", "side_read.out")
+                        .build())
+                .putPcollections("side_read.out", sideInputPCollection)
+                .putTransforms("parDo", parDoTransform)
+                .putPcollections(
+                    "parDo.out", 
PCollection.newBuilder().setUniqueName("parDo.out").build())
+                .putEnvironments("common", env)
+                .build());
+
+    PCollectionNode readOutput =
+        getOnlyElement(p.getOutputPCollections(PipelineNode.pTransform("read", 
readTransform)));
+    ExecutableStage subgraph =
+        GreedyStageFuser.forGrpcPortRead(
+            p, readOutput, ImmutableSet.of(PipelineNode.pTransform("parDo", 
parDoTransform)));
+    assertThat(
+        subgraph.getSideInputPCollections(),
+        contains(PipelineNode.pCollection("side_read.out", 
sideInputPCollection)));
+    assertThat(subgraph.getOutputPCollections(), emptyIterable());
+  }
+
   @Test
   public void materializesWithGroupByKeyConsumer() {
     // (impulse.out) -> read -> read.out -> gbk -> gbk.out


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 82914)
    Time Spent: 1h 50m  (was: 1h 40m)

> Side Inputs should be available on ExecutableStage
> --------------------------------------------------
>
>                 Key: BEAM-3895
>                 URL: https://issues.apache.org/jira/browse/BEAM-3895
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Doing this ensures that the runner will have the side inputs immediately 
> available.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to