[
https://issues.apache.org/jira/browse/BEAM-3565?focusedWorklogId=82057&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-82057
]
ASF GitHub Bot logged work on BEAM-3565:
----------------------------------------
Author: ASF GitHub Bot
Created on: 19/Mar/18 21:32
Start Date: 19/Mar/18 21:32
Worklog Time Spent: 10m
Work Description: tgroh closed pull request #4844: [BEAM-3565] Add
ExecutableStagePayload to simplify runner stage reconstruction
URL: https://github.com/apache/beam/pull/4844
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto
b/model/pipeline/src/main/proto/beam_runner_api.proto
index b45be09efb6..3ed90dd036e 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -203,6 +203,23 @@ message PCollection {
DisplayData display_data = 5;
}
+// The payload for an executable stage. This will eventually be passed to an
SDK in the form of a
+// ProcessBundleDescriptor.
+message ExecutableStagePayload {
+
+ Environment environment = 1;
+
+ // Input PCollection id.
+ string input = 2;
+
+ // PTransform ids contained within this executable stage.
+ repeated string transforms = 3;
+
+ // Output PCollection ids.
+ repeated string outputs = 4;
+
+}
+
// The payload for the primitive ParDo transform.
message ParDoPayload {
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
new file mode 100644
index 00000000000..1200dc621a7
--- /dev/null
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ExecutableStageTranslation.java
@@ -0,0 +1,25 @@
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+
+/**
+ * Utilities for converting {@link ExecutableStage}s to and from {@link
RunnerApi} protocol buffers.
+ */
+public class ExecutableStageTranslation {
+
+ /** Extracts an {@link ExecutableStagePayload} from the given transform. */
+ public static ExecutableStagePayload getExecutableStagePayload(
+ AppliedPTransform<?, ?, ?> appliedTransform) throws IOException {
+ RunnerApi.PTransform transform =
+ PTransformTranslation.toProto(appliedTransform,
SdkComponents.create());
+ checkArgument(ExecutableStage.URN.equals(transform.getSpec().getUrn()));
+ return ExecutableStagePayload.parseFrom(transform.getSpec().getPayload());
+ }
+
+}
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
index 766ce0d7136..27bfed87553 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/ExecutableStage.java
@@ -18,19 +18,16 @@
package org.apache.beam.runners.core.construction.graph;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.collect.Iterables.getOnlyElement;
-
import java.util.Collection;
-import java.util.Optional;
+import java.util.List;
import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
-import org.apache.beam.runners.core.construction.Environments;
import
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
import
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
@@ -84,64 +81,72 @@
* follows:
*
* <ul>
- * <li>The {@link PTransform#getSubtransformsList()} contains the result
of {@link
- * #getTransforms()}.
+ * <li>The {@link PTransform#getSubtransformsList()} contains no
subtransforms. This ensures
+ * that executable stages are treated as primitive transforms.
* <li>The only {@link PCollection} in the {@link
PTransform#getInputsMap()} is the result of
* {@link #getInputPCollection()}.
* <li>The output {@link PCollection PCollections} in the values of {@link
* PTransform#getOutputsMap()} are the {@link PCollectionNode
PCollections} returned by
* {@link #getOutputPCollections()}.
+ * <li>The {@link FunctionSpec} contains an {@link ExecutableStagePayload}
which has its input
+ * and output PCollections set to the same values as the outer
PTransform itself. It further
+ * contains the environment set of transforms for this stage.
* </ul>
+ *
+ * <p>The executable stage can be reconstructed from the resulting {@link
ExecutableStagePayload}
+ * and components alone via {@link #fromPayload(ExecutableStagePayload,
Components)}.
*/
default PTransform toPTransform() {
+ ExecutableStagePayload.Builder payload =
ExecutableStagePayload.newBuilder();
+
+ payload.setEnvironment(getEnvironment());
+
+ PCollectionNode input = getInputPCollection();
+ payload.setInput(input.getId());
+
+ for (PTransformNode transform : getTransforms()) {
+ payload.addTransforms(transform.getId());
+ }
+
+ for (PCollectionNode output : getOutputPCollections()) {
+ payload.addOutputs(output.getId());
+ }
+
PTransform.Builder pt = PTransform.newBuilder();
+ pt.setSpec(FunctionSpec.newBuilder()
+ .setUrn(ExecutableStage.URN)
+ .setPayload(payload.build().toByteString())
+ .build());
pt.putInputs("input", getInputPCollection().getId());
- int i = 0;
- for (PCollectionNode materializedPCollection : getOutputPCollections()) {
- pt.putOutputs(String.format("materialized_%s", i),
materializedPCollection.getId());
- i++;
- }
- for (PTransformNode fusedTransform : getTransforms()) {
- pt.addSubtransforms(fusedTransform.getId());
+ int outputIndex = 0;
+ for (PCollectionNode pcNode : getOutputPCollections()) {
+ // Do something
+ pt.putOutputs(String.format("materialized_%d", outputIndex),
pcNode.getId());
+ outputIndex++;
}
- pt.setSpec(FunctionSpec.newBuilder().setUrn(ExecutableStage.URN));
return pt.build();
}
+ // TODO: Should this live under ExecutableStageTranslation?
/**
- * Return an {@link ExecutableStage} constructed from the provided {@link
PTransform}
+ * Return an {@link ExecutableStage} constructed from the provided {@link
FunctionSpec}
* representation.
*
- * <p>See {@link #toPTransform()} for information about the required format
of the {@link
- * PTransform}. The environment will be determined by an arbitrary {@link
PTransform} contained
- * within the {@link PTransform#getSubtransformsList()}.
+ * <p>See {@link #toPTransform()} for how the payload is constructed. Note
that the payload
+ * contains some information redundant with the {@link PTransform} due to
runner implementations
+ * not having the full transform context at translation time, but rather
access to an
+ * {@link org.apache.beam.sdk.runners.AppliedPTransform}.
*/
- static ExecutableStage fromPTransform(PTransform ptransform, Components
components) {
- checkArgument(ptransform.getSpec().getUrn().equals(URN));
- // It may be better to put this in an explicit Payload if other metadata
becomes required
- Optional<Environment> environment =
- Environments.getEnvironment(ptransform.getSubtransforms(0),
components);
- checkArgument(
- environment.isPresent(),
- "%s with no %s",
- ExecutableStage.class.getSimpleName(),
- Environment.class.getSimpleName());
- String inputId = getOnlyElement(ptransform.getInputsMap().values());
- PCollectionNode inputNode =
- PipelineNode.pCollection(inputId,
components.getPcollectionsOrThrow(inputId));
- Collection<PCollectionNode> outputNodes =
- ptransform
- .getOutputsMap()
- .values()
- .stream()
- .map(id -> PipelineNode.pCollection(id,
components.getPcollectionsOrThrow(id)))
- .collect(Collectors.toSet());
- Collection<PTransformNode> transformNodes =
- ptransform
- .getSubtransformsList()
- .stream()
- .map(id -> PipelineNode.pTransform(id,
components.getTransformsOrThrow(id)))
- .collect(Collectors.toSet());
- return ImmutableExecutableStage.of(environment.get(), inputNode,
transformNodes, outputNodes);
+ static ExecutableStage fromPayload(ExecutableStagePayload payload,
Components components) {
+ Environment environment = payload.getEnvironment();
+ PCollectionNode input = PipelineNode.pCollection(payload.getInput(),
+ components.getPcollectionsOrThrow(payload.getInput()));
+ List<PTransformNode> transforms = payload.getTransformsList().stream()
+ .map(id -> PipelineNode.pTransform(id,
components.getTransformsOrThrow(id)))
+ .collect(Collectors.toList());
+ List<PCollectionNode> outputs = payload.getOutputsList().stream()
+ .map(id -> PipelineNode.pCollection(id,
components.getPcollectionsOrThrow(id)))
+ .collect(Collectors.toList());
+ return ImmutableExecutableStage.of(environment, input, transforms,
outputs);
}
}
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
index e13eb6f80cf..5bbefc56c8a 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/ExecutableStageTest.java
@@ -29,6 +29,7 @@
import java.util.Collections;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
@@ -46,7 +47,7 @@
@RunWith(JUnit4.class)
public class ExecutableStageTest {
@Test
- public void testRoundTripToFromTransform() {
+ public void testRoundTripToFromTransform() throws Exception {
Environment env = Environment.newBuilder().setUrl("foo").build();
PTransform pt =
PTransform.newBuilder()
@@ -84,13 +85,15 @@ public void testRoundTripToFromTransform() {
assertThat(stagePTransform.getOutputsCount(), equalTo(1));
assertThat(stagePTransform.getInputsMap(), hasValue("input.out"));
assertThat(stagePTransform.getInputsCount(), equalTo(1));
- assertThat(stagePTransform.getSubtransformsList(), contains("pt"));
- assertThat(ExecutableStage.fromPTransform(stagePTransform, components),
equalTo(stage));
+ ExecutableStagePayload payload = ExecutableStagePayload.parseFrom(
+ stagePTransform.getSpec().getPayload());
+ assertThat(payload.getTransformsList(), contains("pt"));
+ assertThat(ExecutableStage.fromPayload(payload, components),
equalTo(stage));
}
@Test
- public void testRoundTripToFromTransformFused() {
+ public void testRoundTripToFromTransformFused() throws Exception {
PTransform parDoTransform =
PTransform.newBuilder()
.putInputs("input", "impulse.out")
@@ -148,9 +151,11 @@ public void testRoundTripToFromTransformFused() {
assertThat(ptransform.getSpec().getUrn(), equalTo(ExecutableStage.URN));
assertThat(ptransform.getInputsMap().values(),
containsInAnyOrder("impulse.out"));
assertThat(ptransform.getOutputsMap().values(), emptyIterable());
- assertThat(ptransform.getSubtransformsList(), contains("parDo", "window"));
- ExecutableStage desered = ExecutableStage.fromPTransform(ptransform,
components);
+ ExecutableStagePayload payload = ExecutableStagePayload.parseFrom(
+ ptransform.getSpec().getPayload());
+ assertThat(payload.getTransformsList(), contains("parDo", "window"));
+ ExecutableStage desered = ExecutableStage.fromPayload(payload, components);
assertThat(desered, equalTo(subgraph));
}
}
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
index 9f3355f4212..09c37c6450b 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyStageFuserTest.java
@@ -20,7 +20,6 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
@@ -30,6 +29,7 @@
import com.google.common.collect.ImmutableSet;
import java.util.Collections;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
@@ -44,6 +44,8 @@
import org.apache.beam.runners.core.construction.PTransformTranslation;
import
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
import
org.apache.beam.runners.core.construction.graph.PipelineNode.PTransformNode;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeMatcher;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -239,8 +241,7 @@ public void fusesCompatibleEnvironments() {
PipelineNode.pTransform("window", windowTransform)));
// Nothing consumes the outputs of ParDo or Window, so they don't have to
be materialized
assertThat(subgraph.getOutputPCollections(), emptyIterable());
- assertThat(
- subgraph.toPTransform().getSubtransformsList(),
containsInAnyOrder("parDo", "window"));
+ assertThat(subgraph, hasSubtransforms("parDo", "window"));
}
@Test
@@ -299,8 +300,7 @@ public void materializesWithStatefulConsumer() {
contains(
PipelineNode.pCollection(
"parDo.out",
PCollection.newBuilder().setUniqueName("parDo.out").build())));
- assertThat(
- subgraph.toPTransform().getSubtransformsList(),
containsInAnyOrder("parDo"));
+ assertThat(subgraph, hasSubtransforms("parDo"));
}
@Test
@@ -359,8 +359,7 @@ public void materializesWithConsumerWithTimer() {
contains(
PipelineNode.pCollection(
"parDo.out",
PCollection.newBuilder().setUniqueName("parDo.out").build())));
- assertThat(
- subgraph.toPTransform().getSubtransformsList(),
containsInAnyOrder("parDo"));
+ assertThat(subgraph, hasSubtransforms("parDo"));
}
@Test
@@ -440,9 +439,7 @@ public void fusesFlatten() {
GreedyStageFuser.forGrpcPortRead(
p, impulseOutputNode, p.getPerElementConsumers(impulseOutputNode));
assertThat(subgraph.getOutputPCollections(), emptyIterable());
- assertThat(
- subgraph.toPTransform().getSubtransformsList(),
- containsInAnyOrder("read", "parDo", "flatten", "window"));
+ assertThat(subgraph, hasSubtransforms("read", "parDo", "flatten",
"window"));
}
@Test
@@ -524,9 +521,7 @@ public void fusesFlattenWithDifferentEnvironmentInputs() {
GreedyStageFuser.forGrpcPortRead(
p, impulseOutputNode,
ImmutableSet.of(PipelineNode.pTransform("read", readTransform)));
assertThat(subgraph.getOutputPCollections(), emptyIterable());
- assertThat(
- subgraph.toPTransform().getSubtransformsList(),
- containsInAnyOrder("read", "flatten", "window"));
+ assertThat(subgraph, hasSubtransforms("read", "flatten", "window"));
// Flatten shows up in both of these subgraphs, but elements only go
through a path to the
// flatten once.
@@ -540,9 +535,7 @@ public void fusesFlattenWithDifferentEnvironmentInputs() {
contains(
PipelineNode.pCollection(
"flatten.out",
components.getPcollectionsOrThrow("flatten.out"))));
- assertThat(
- readFromOtherEnv.toPTransform().getSubtransformsList(),
- containsInAnyOrder("envRead", "flatten"));
+ assertThat(readFromOtherEnv, hasSubtransforms("envRead", "flatten"));
}
@Test
@@ -892,7 +885,7 @@ public void materializesWithSideInputConsumer() {
GreedyStageFuser.forGrpcPortRead(
p, impulseOutputNode, ImmutableSet.of(readNode));
assertThat(subgraph.getOutputPCollections(), contains(readOutput));
- assertThat(subgraph.toPTransform().getSubtransformsList(),
contains(readNode.getId()));
+ assertThat(subgraph, hasSubtransforms(readNode.getId()));
}
@Test
@@ -943,6 +936,28 @@ public void materializesWithGroupByKeyConsumer() {
GreedyStageFuser.forGrpcPortRead(
p, impulseOutputNode, ImmutableSet.of(readNode));
assertThat(subgraph.getOutputPCollections(), contains(readOutput));
- assertThat(subgraph.toPTransform().getSubtransformsList(),
contains(readNode.getId()));
+ assertThat(subgraph, hasSubtransforms(readNode.getId()));
+ }
+
+ private static TypeSafeMatcher<ExecutableStage> hasSubtransforms(String id,
String... ids) {
+ Set<String> expectedTransforms =
ImmutableSet.<String>builder().add(id).add(ids).build();
+ return new TypeSafeMatcher<ExecutableStage>() {
+ @Override
+ protected boolean matchesSafely(ExecutableStage executableStage) {
+ // NOTE: Transform names must be unique, so it's fine to throw here if
this does not hold.
+ Set<String> stageTransforms = executableStage.getTransforms().stream()
+ .map(PTransformNode::getId)
+ .collect(Collectors.toSet());
+ return stageTransforms.containsAll(expectedTransforms)
+ && expectedTransforms.containsAll(stageTransforms);
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText(
+ "ExecutableStage with subtransform ids: " + expectedTransforms);
+ }
+ };
}
+
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 82057)
Time Spent: 13h 10m (was: 13h)
> Add utilities for producing a collection of PTransforms that can execute in a
> single SDK Harness
> ------------------------------------------------------------------------------------------------
>
> Key: BEAM-3565
> URL: https://issues.apache.org/jira/browse/BEAM-3565
> Project: Beam
> Issue Type: Bug
> Components: runner-core
> Reporter: Thomas Groh
> Assignee: Thomas Groh
> Priority: Major
> Labels: portability
> Fix For: 2.4.0
>
> Time Spent: 13h 10m
> Remaining Estimate: 0h
>
> An SDK Harness executes some ("fused") collection of PTransforms. The java
> runner libraries should provide some way to take a Pipeline that executes in
> both a runner and an environment and construct a collection of transforms
> which can execute within a single environment.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)