[ 
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82485&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82485
 ]

ASF GitHub Bot logged work on BEAM-3565:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Mar/18 21:24
            Start Date: 20/Mar/18 21:24
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #4898: [BEAM-3565] Clean up 
ExecutableStage
URL: https://github.com/apache/beam/pull/4898
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto 
b/model/pipeline/src/main/proto/beam_runner_api.proto
index 3ed90dd036e..9fa301460fb 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -207,6 +207,8 @@ message PCollection {
 // ProcessBundleDescriptor.
 message ExecutableStagePayload {
 
+  // Environment in which this stage executes. We use an environment rather 
than environment id
+  // because ExecutableStages use environments directly. This may change in 
the future.
   Environment environment = 1;
 
   // Input PCollection id.
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
index 27bfed87553..e66148421fc 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
@@ -81,53 +81,53 @@
    * follows:
    *
    * <ul>
-   *   <li>The {@link PTransform#getSubtransformsList()} contains no 
subtransforms. This ensures
+   *   <li>The {@link PTransform#getSubtransformsList()} is empty. This ensures
    *       that executable stages are treated as primitive transforms.
    *   <li>The only {@link PCollection} in the {@link 
PTransform#getInputsMap()} is the result of
    *       {@link #getInputPCollection()}.
    *   <li>The output {@link PCollection PCollections} in the values of {@link
    *       PTransform#getOutputsMap()} are the {@link PCollectionNode 
PCollections} returned by
    *       {@link #getOutputPCollections()}.
-   *   <li>The {@link FunctionSpec} contains an {@link ExecutableStagePayload} 
which has its input
-   *       and output PCollections set to the same values as the outer 
PTransform itself. It further
-   *       contains the environment set of transforms for this stage.
+   *   <li>The {@link PTransform#getSpec()} contains an {@link 
ExecutableStagePayload} with inputs
+   *       and outputs equal to the PTransform's inputs and outputs, and 
transforms equal to the
+   *       result of {@link #getTransforms}.
    * </ul>
    *
    * <p>The executable stage can be reconstructed from the resulting {@link 
ExecutableStagePayload}
    * and components alone via {@link #fromPayload(ExecutableStagePayload, 
Components)}.
    */
   default PTransform toPTransform() {
+    PTransform.Builder pt = PTransform.newBuilder();
     ExecutableStagePayload.Builder payload = 
ExecutableStagePayload.newBuilder();
 
     payload.setEnvironment(getEnvironment());
 
+    // Populate inputs and outputs of the stage payload and outer PTransform 
simultaneously.
     PCollectionNode input = getInputPCollection();
+    pt.putInputs("input", getInputPCollection().getId());
     payload.setInput(input.getId());
 
-    for (PTransformNode transform : getTransforms()) {
-      payload.addTransforms(transform.getId());
-    }
-
+    int outputIndex = 0;
     for (PCollectionNode output : getOutputPCollections()) {
+      pt.putOutputs(String.format("materialized_%d", outputIndex), 
output.getId());
       payload.addOutputs(output.getId());
+      outputIndex++;
+    }
+
+    // Inner PTransforms of this stage are hidden from the outer pipeline and 
only belong in the
+    // stage payload.
+    for (PTransformNode transform : getTransforms()) {
+      payload.addTransforms(transform.getId());
     }
 
-    PTransform.Builder pt = PTransform.newBuilder();
     pt.setSpec(FunctionSpec.newBuilder()
         .setUrn(ExecutableStage.URN)
         .setPayload(payload.build().toByteString())
         .build());
-    pt.putInputs("input", getInputPCollection().getId());
-    int outputIndex = 0;
-    for (PCollectionNode pcNode : getOutputPCollections()) {
-      // Do something
-      pt.putOutputs(String.format("materialized_%d", outputIndex), 
pcNode.getId());
-      outputIndex++;
-    }
+
     return pt.build();
   }
 
-  // TODO: Should this live under ExecutableStageTranslation?
   /**
    * Return an {@link ExecutableStage} constructed from the provided {@link 
FunctionSpec}
    * representation.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 82485)
    Time Spent: 15.5h  (was: 15h 20m)

> Add utilities for producing a collection of PTransforms that can execute in a 
> single SDK Harness
> ------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-3565
>                 URL: https://issues.apache.org/jira/browse/BEAM-3565
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>             Fix For: 2.4.0
>
>          Time Spent: 15.5h
>  Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java 
> runner libraries should provide some way to take a Pipeline that executes in 
> both a runner and an environment and construct a collection of transforms 
> which can execute within a single environment.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to