[ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82057&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82057
 ]

ASF GitHub Bot logged work on BEAM-3565:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Mar/18 21:32
            Start Date: 19/Mar/18 21:32
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #4844: [BEAM-3565] Add 
ExecutableStagePayload to simplify runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto 
b/model/pipeline/src/main/proto/beam_runner_api.proto
index b45be09efb6..3ed90dd036e 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -203,6 +203,23 @@ message PCollection {
   DisplayData display_data = 5;
 }
 
+// The payload for an executable stage. This will eventually be passed to an 
SDK in the form of a
+// ProcessBundleDescriptor.
+message ExecutableStagePayload {
+
+  Environment environment = 1;
+
+  // Input PCollection id.
+  string input = 2;
+
+  // PTransform ids contained within this executable stage.
+  repeated string transforms = 3;
+
+  // Output PCollection ids.
+  repeated string outputs = 4;
+
+}
+
 // The payload for the primitive ParDo transform.
 message ParDoPayload {
 
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
new file mode 100644
index 00000000000..1200dc621a7
--- /dev/null
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
@@ -0,0 +1,25 @@
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+
+/**
+ * Utilities for converting {@link ExecutableStage}s to and from {@link 
RunnerApi} protocol buffers.
+ */
+public class ExecutableStageTranslation {
+
+  /** Extracts an {@link ExecutableStagePayload} from the given transform. */
+  public static ExecutableStagePayload getExecutableStagePayload(
+      AppliedPTransform<?, ?, ?> appliedTransform) throws IOException {
+    RunnerApi.PTransform transform =
+        PTransformTranslation.toProto(appliedTransform, 
SdkComponents.create());
+    checkArgument(ExecutableStage.URN.equals(transform.getSpec().getUrn()));
+    return ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+  }
+
+}
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
index 766ce0d7136..27bfed87553 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
@@ -18,19 +18,16 @@
 
 package org.apache.beam.runners.core.construction.graph;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.collect.Iterables.getOnlyElement;
-
 import java.util.Collection;
-import java.util.Optional;
+import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
-import org.apache.beam.runners.core.construction.Environments;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
 
@@ -84,64 +81,72 @@
    * follows:
    *
    * <ul>
-   *   <li>The {@link PTransform#getSubtransformsList()} contains the result 
of {@link
-   *       #getTransforms()}.
+   *   <li>The {@link PTransform#getSubtransformsList()} contains no 
subtransforms. This ensures
+   *       that executable stages are treated as primitive transforms.
    *   <li>The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
    *       {@link #getInputPCollection()}.
    *   <li>The output {@link PCollection PCollections} in the values of {@link
    *       PTransform#getOutputsMap()} are the {@link PCollectionNode 
PCollections} returned by
    *       {@link #getOutputPCollections()}.
+   *   <li>The {@link FunctionSpec} contains an {@link ExecutableStagePayload} 
which has its input
+   *       and output PCollections set to the same values as the outer 
PTransform itself. It further
+   *       contains the environment set of transforms for this stage.
    * </ul>
+   *
+   * <p>The executable stage can be reconstructed from the resulting {@link 
ExecutableStagePayload}
+   * and components alone via {@link #fromPayload(ExecutableStagePayload, 
Components)}.
    */
   default PTransform toPTransform() {
+    ExecutableStagePayload.Builder payload = 
ExecutableStagePayload.newBuilder();
+
+    payload.setEnvironment(getEnvironment());
+
+    PCollectionNode input = getInputPCollection();
+    payload.setInput(input.getId());
+
+    for (PTransformNode transform : getTransforms()) {
+      payload.addTransforms(transform.getId());
+    }
+
+    for (PCollectionNode output : getOutputPCollections()) {
+      payload.addOutputs(output.getId());
+    }
+
     PTransform.Builder pt = PTransform.newBuilder();
+    pt.setSpec(FunctionSpec.newBuilder()
+        .setUrn(ExecutableStage.URN)
+        .setPayload(payload.build().toByteString())
+        .build());
     pt.putInputs("input", getInputPCollection().getId());
-    int i = 0;
-    for (PCollectionNode materializedPCollection : getOutputPCollections()) {
-      pt.putOutputs(String.format("materialized_%s", i), 
materializedPCollection.getId());
-      i++;
-    }
-    for (PTransformNode fusedTransform : getTransforms()) {
-      pt.addSubtransforms(fusedTransform.getId());
+    int outputIndex = 0;
+    for (PCollectionNode pcNode : getOutputPCollections()) {
+      // Do something
+      pt.putOutputs(String.format("materialized_%d", outputIndex), 
pcNode.getId());
+      outputIndex++;
     }
-    pt.setSpec(FunctionSpec.newBuilder().setUrn(ExecutableStage.URN));
     return pt.build();
   }
 
+  // TODO: Should this live under ExecutableStageTranslation?
   /**
-   * Return an {@link ExecutableStage} constructed from the provided {@link 
PTransform}
+   * Return an {@link ExecutableStage} constructed from the provided {@link 
FunctionSpec}
    * representation.
    *
-   * <p>See {@link #toPTransform()} for information about the required format 
of the {@link
-   * PTransform}. The environment will be determined by an arbitrary {@link 
PTransform} contained
-   * within the {@link PTransform#getSubtransformsList()}.
+   * <p>See {@link #toPTransform()} for how the payload is constructed. Note 
that the payload
+   * contains some information redundant with the {@link PTransform} due to 
runner implementations
+   * not having the full transform context at translation time, but rather 
access to an
+   * {@link org.apache.beam.sdk.runners.AppliedPTransform}.
    */
-  static ExecutableStage fromPTransform(PTransform ptransform, Components 
components) {
-    checkArgument(ptransform.getSpec().getUrn().equals(URN));
-    // It may be better to put this in an explicit Payload if other metadata 
becomes required
-    Optional<Environment> environment =
-        Environments.getEnvironment(ptransform.getSubtransforms(0), 
components);
-    checkArgument(
-        environment.isPresent(),
-        "%s with no %s",
-        ExecutableStage.class.getSimpleName(),
-        Environment.class.getSimpleName());
-    String inputId = getOnlyElement(ptransform.getInputsMap().values());
-    PCollectionNode inputNode =
-        PipelineNode.pCollection(inputId, 
components.getPcollectionsOrThrow(inputId));
-    Collection<PCollectionNode> outputNodes =
-        ptransform
-            .getOutputsMap()
-            .values()
-            .stream()
-            .map(id -> PipelineNode.pCollection(id, 
components.getPcollectionsOrThrow(id)))
-            .collect(Collectors.toSet());
-    Collection<PTransformNode> transformNodes =
-        ptransform
-            .getSubtransformsList()
-            .stream()
-            .map(id -> PipelineNode.pTransform(id, 
components.getTransformsOrThrow(id)))
-            .collect(Collectors.toSet());
-    return ImmutableExecutableStage.of(environment.get(), inputNode, 
transformNodes, outputNodes);
+  static ExecutableStage fromPayload(ExecutableStagePayload payload, 
Components components) {
+    Environment environment = payload.getEnvironment();
+    PCollectionNode input = PipelineNode.pCollection(payload.getInput(),
+        components.getPcollectionsOrThrow(payload.getInput()));
+    List<PTransformNode> transforms = payload.getTransformsList().stream()
+        .map(id -> PipelineNode.pTransform(id, 
components.getTransformsOrThrow(id)))
+        .collect(Collectors.toList());
+    List<PCollectionNode> outputs = payload.getOutputsList().stream()
+        .map(id -> PipelineNode.pCollection(id, 
components.getPcollectionsOrThrow(id)))
+        .collect(Collectors.toList());
+    return ImmutableExecutableStage.of(environment, input, transforms, 
outputs);
   }
 }
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
index e13eb6f80cf..5bbefc56c8a 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
@@ -29,6 +29,7 @@
 import java.util.Collections;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
@@ -46,7 +47,7 @@
 @RunWith(JUnit4.class)
 public class ExecutableStageTest {
   @Test
-  public void testRoundTripToFromTransform() {
+  public void testRoundTripToFromTransform() throws Exception {
     Environment env = Environment.newBuilder().setUrl("foo").build();
     PTransform pt =
         PTransform.newBuilder()
@@ -84,13 +85,15 @@ public void testRoundTripToFromTransform() {
     assertThat(stagePTransform.getOutputsCount(), equalTo(1));
     assertThat(stagePTransform.getInputsMap(), hasValue("input.out"));
     assertThat(stagePTransform.getInputsCount(), equalTo(1));
-    assertThat(stagePTransform.getSubtransformsList(), contains("pt"));
 
-    assertThat(ExecutableStage.fromPTransform(stagePTransform, components), 
equalTo(stage));
+    ExecutableStagePayload payload = ExecutableStagePayload.parseFrom(
+        stagePTransform.getSpec().getPayload());
+    assertThat(payload.getTransformsList(), contains("pt"));
+    assertThat(ExecutableStage.fromPayload(payload, components), 
equalTo(stage));
   }
 
   @Test
-  public void testRoundTripToFromTransformFused() {
+  public void testRoundTripToFromTransformFused() throws Exception {
     PTransform parDoTransform =
         PTransform.newBuilder()
             .putInputs("input", "impulse.out")
@@ -148,9 +151,11 @@ public void testRoundTripToFromTransformFused() {
     assertThat(ptransform.getSpec().getUrn(), equalTo(ExecutableStage.URN));
     assertThat(ptransform.getInputsMap().values(), 
containsInAnyOrder("impulse.out"));
     assertThat(ptransform.getOutputsMap().values(), emptyIterable());
-    assertThat(ptransform.getSubtransformsList(), contains("parDo", "window"));
 
-    ExecutableStage desered = ExecutableStage.fromPTransform(ptransform, 
components);
+    ExecutableStagePayload payload = ExecutableStagePayload.parseFrom(
+        ptransform.getSpec().getPayload());
+    assertThat(payload.getTransformsList(), contains("parDo", "window"));
+    ExecutableStage desered = ExecutableStage.fromPayload(payload, components);
     assertThat(desered, equalTo(subgraph));
   }
 }
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
index 9f3355f4212..09c37c6450b 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
@@ -20,7 +20,6 @@
 
 import static com.google.common.collect.Iterables.getOnlyElement;
 import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
@@ -30,6 +29,7 @@
 import com.google.common.collect.ImmutableSet;
 import java.util.Collections;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
 import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
@@ -44,6 +44,8 @@
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -239,8 +241,7 @@ public void fusesCompatibleEnvironments() {
                 PipelineNode.pTransform("window", windowTransform)));
     // Nothing consumes the outputs of ParDo or Window, so they don't have to 
be materialized
     assertThat(subgraph.getOutputPCollections(), emptyIterable());
-    assertThat(
-        subgraph.toPTransform().getSubtransformsList(), 
containsInAnyOrder("parDo", "window"));
+    assertThat(subgraph, hasSubtransforms("parDo", "window"));
   }
 
   @Test
@@ -299,8 +300,7 @@ public void materializesWithStatefulConsumer() {
         contains(
             PipelineNode.pCollection(
                 "parDo.out", 
PCollection.newBuilder().setUniqueName("parDo.out").build())));
-    assertThat(
-        subgraph.toPTransform().getSubtransformsList(), 
containsInAnyOrder("parDo"));
+    assertThat(subgraph, hasSubtransforms("parDo"));
   }
 
   @Test
@@ -359,8 +359,7 @@ public void materializesWithConsumerWithTimer() {
         contains(
             PipelineNode.pCollection(
                 "parDo.out", 
PCollection.newBuilder().setUniqueName("parDo.out").build())));
-    assertThat(
-        subgraph.toPTransform().getSubtransformsList(), 
containsInAnyOrder("parDo"));
+    assertThat(subgraph, hasSubtransforms("parDo"));
   }
 
   @Test
@@ -440,9 +439,7 @@ public void fusesFlatten() {
         GreedyStageFuser.forGrpcPortRead(
             p, impulseOutputNode, p.getPerElementConsumers(impulseOutputNode));
     assertThat(subgraph.getOutputPCollections(), emptyIterable());
-    assertThat(
-        subgraph.toPTransform().getSubtransformsList(),
-        containsInAnyOrder("read", "parDo", "flatten", "window"));
+    assertThat(subgraph, hasSubtransforms("read", "parDo", "flatten", 
"window"));
   }
 
   @Test
@@ -524,9 +521,7 @@ public void fusesFlattenWithDifferentEnvironmentInputs() {
         GreedyStageFuser.forGrpcPortRead(
             p, impulseOutputNode, 
ImmutableSet.of(PipelineNode.pTransform("read", readTransform)));
     assertThat(subgraph.getOutputPCollections(), emptyIterable());
-    assertThat(
-        subgraph.toPTransform().getSubtransformsList(),
-        containsInAnyOrder("read", "flatten", "window"));
+    assertThat(subgraph, hasSubtransforms("read", "flatten", "window"));
 
     // Flatten shows up in both of these subgraphs, but elements only go 
through a path to the
     // flatten once.
@@ -540,9 +535,7 @@ public void fusesFlattenWithDifferentEnvironmentInputs() {
         contains(
             PipelineNode.pCollection(
                 "flatten.out", 
components.getPcollectionsOrThrow("flatten.out"))));
-    assertThat(
-        readFromOtherEnv.toPTransform().getSubtransformsList(),
-        containsInAnyOrder("envRead", "flatten"));
+    assertThat(readFromOtherEnv, hasSubtransforms("envRead", "flatten"));
   }
 
   @Test
@@ -892,7 +885,7 @@ public void materializesWithSideInputConsumer() {
         GreedyStageFuser.forGrpcPortRead(
             p, impulseOutputNode, ImmutableSet.of(readNode));
     assertThat(subgraph.getOutputPCollections(), contains(readOutput));
-    assertThat(subgraph.toPTransform().getSubtransformsList(), 
contains(readNode.getId()));
+    assertThat(subgraph, hasSubtransforms(readNode.getId()));
   }
 
   @Test
@@ -943,6 +936,28 @@ public void materializesWithGroupByKeyConsumer() {
         GreedyStageFuser.forGrpcPortRead(
             p, impulseOutputNode, ImmutableSet.of(readNode));
     assertThat(subgraph.getOutputPCollections(), contains(readOutput));
-    assertThat(subgraph.toPTransform().getSubtransformsList(), 
contains(readNode.getId()));
+    assertThat(subgraph, hasSubtransforms(readNode.getId()));
+  }
+
+  private static TypeSafeMatcher<ExecutableStage> hasSubtransforms(String id, 
String... ids) {
+    Set<String> expectedTransforms = 
ImmutableSet.<String>builder().add(id).add(ids).build();
+    return new TypeSafeMatcher<ExecutableStage>() {
+      @Override
+      protected boolean matchesSafely(ExecutableStage executableStage) {
+        // NOTE: Transform names must be unique, so it's fine to throw here if 
this does not hold.
+        Set<String> stageTransforms = executableStage.getTransforms().stream()
+            .map(PTransformNode::getId)
+            .collect(Collectors.toSet());
+        return stageTransforms.containsAll(expectedTransforms)
+            && expectedTransforms.containsAll(stageTransforms);
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description.appendText(
+            "ExecutableStage with subtransform ids: " + expectedTransforms);
+      }
+    };
   }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 82057)
    Time Spent: 13h 10m  (was: 13h)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> ------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-3565
>                 URL: https://issues.apache.org/jira/browse/BEAM-3565
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>             Fix For: 2.4.0
>
>          Time Spent: 13h 10m
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to