This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a46cfbf Break fusion for a ParDo which has State or Timers
new 8be36b9 [BEAM-3565] Break fusion for a ParDo which has State or Timers
a46cfbf is described below
commit a46cfbf8adc13318e4e372f079c54b25b5cdbff2
Author: Thomas Groh <[email protected]>
AuthorDate: Fri Feb 16 11:29:22 2018 -0800
Break fusion for a ParDo which has State or Timers
Because these are provided in a key-partitioned manner, the upstream
stage has to preserve keys for this to be executable. This could be
checked, but this is a simpler method to break fusion when it is known
it will be appropriate.
---
.../graph/GreedyPCollectionFusers.java | 41 +++---
.../graph/GreedilyFusedExecutableStageTest.java | 122 ++++++++++++++++++
.../graph/GreedyPipelineFuserTest.java | 139 +++++++++++++++++++++
3 files changed, 288 insertions(+), 14 deletions(-)
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
index da2c92b..992b463 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.core.construction.graph;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Map;
import java.util.Optional;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
@@ -131,27 +132,39 @@ class GreedyPCollectionFusers {
// The PCollection's producer and this ParDo execute in different
environments, so fusion
// is never possible.
return false;
- } else if (!pipeline.getSideInputs(parDo).isEmpty()) {
- // At execution time, a Runner is required to only provide inputs to a
PTransform that, at the
- // time the PTransform processes them, the associated window is ready in
all side inputs that
- // the PTransform consumes. For an arbitrary stage, it is significantly
complex for the runner
- // to determine this for each input. As a result, we break fusion to
simplify this inspection.
- // In general, a ParDo which consumes side inputs cannot be fused into
an executable subgraph
- // alongside any transforms which are upstream of any of its side inputs.
+ }
+ if (!pipeline.getSideInputs(parDo).isEmpty()) {
+ // At execution time, a Runner is required to only provide inputs to a
PTransform that, at
+ // the time the PTransform processes them, the associated window is
ready in all side inputs
+ // that the PTransform consumes. For an arbitrary stage, it is
significantly complex for the
+ // runner to determine this for each input. As a result, we break fusion
to simplify this
+ // inspection. In general, a ParDo which consumes side inputs cannot be
fused into an
+ // executable stage alongside any transforms which are upstream of any
of its side inputs.
return false;
+ } else {
+ try {
+ ParDoPayload payload =
+
ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
+ if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() >
0) {
+ // Inputs to a ParDo that uses State or Timers must be
key-partitioned, and elements for
+ // a key must execute serially. To avoid checking if the rest of the
stage is
+ // key-partitioned and preserves keys, these ParDos do not fuse into
an existing stage.
+ return false;
+ }
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalArgumentException(e);
+ }
}
return true;
}
private static boolean parDoCompatibility(
PTransformNode parDo, PTransformNode other, QueryablePipeline pipeline) {
- if (!pipeline.getSideInputs(parDo).isEmpty()) {
- // This is a convenience rather than a strict requirement. In general, a
ParDo that consumes
- // side inputs can be fused with other transforms in the same
environment which are not
- // upstream of any of the side inputs.
- return false;
- }
- return compatibleEnvironments(parDo, other, pipeline);
+ // This is a convenience rather than a strict requirement. In general, a
ParDo that consumes
+ // side inputs can be fused with other transforms in the same environment
which are not
+ // upstream of any of the side inputs.
+ return pipeline.getSideInputs(parDo).isEmpty()
+ && compatibleEnvironments(parDo, other, pipeline);
}
/**
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedilyFusedExecutableStageTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedilyFusedExecutableStageTest.java
index 563e155..8612e34 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedilyFusedExecutableStageTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedilyFusedExecutableStageTest.java
@@ -38,6 +38,8 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
@@ -242,6 +244,126 @@ public class GreedilyFusedExecutableStageTest {
}
@Test
+ public void materializesWithStatefulConsumer() {
+ // (impulse.out) -> parDo -> (parDo.out)
+ // (parDo.out) -> stateful -> stateful.out
+ // stateful has a state spec which prevents it from fusing with an
upstream ParDo
+ PTransform parDoTransform =
+ PTransform.newBuilder()
+ .putInputs("input", "impulse.out")
+ .putOutputs("output", "parDo.out")
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+ .setPayload(
+ ParDoPayload.newBuilder()
+
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+ .build()
+ .toByteString()))
+ .build();
+ PTransform statefulTransform =
+ PTransform.newBuilder()
+ .putInputs("input", "parDo.out")
+ .putOutputs("output", "stateful.out")
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+ .setPayload(
+ ParDoPayload.newBuilder()
+
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+ .putStateSpecs("state",
StateSpec.getDefaultInstance())
+ .build()
+ .toByteString()))
+ .build();
+
+ QueryablePipeline p =
+ QueryablePipeline.fromComponents(
+ partialComponents
+ .toBuilder()
+ .putTransforms("parDo", parDoTransform)
+ .putPcollections(
+ "parDo.out",
PCollection.newBuilder().setUniqueName("parDo.out").build())
+ .putTransforms("stateful", statefulTransform)
+ .putPcollections(
+ "stateful.out",
PCollection.newBuilder().setUniqueName("stateful.out").build())
+ .putEnvironments("common",
Environment.newBuilder().setUrl("common").build())
+ .build());
+
+ ExecutableStage subgraph =
+ GreedilyFusedExecutableStage.forGrpcPortRead(
+ p,
+ impulseOutputNode,
+ ImmutableSet.of(PipelineNode.pTransform("parDo", parDoTransform)));
+ assertThat(
+ subgraph.getOutputPCollections(),
+ contains(
+ PipelineNode.pCollection(
+ "parDo.out",
PCollection.newBuilder().setUniqueName("parDo.out").build())));
+ assertThat(
+ subgraph.toPTransform().getSubtransformsList(),
containsInAnyOrder("parDo"));
+ }
+
+ @Test
+ public void materializesWithConsumerWithTimer() {
+ // (impulse.out) -> parDo -> (parDo.out)
+ // (parDo.out) -> timer -> timer.out
+ // timer has a timer spec which prevents it from fusing with an upstream
ParDo
+ PTransform parDoTransform =
+ PTransform.newBuilder()
+ .putInputs("input", "impulse.out")
+ .putOutputs("output", "parDo.out")
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+ .setPayload(
+ ParDoPayload.newBuilder()
+
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+ .build()
+ .toByteString()))
+ .build();
+ PTransform timerTransform =
+ PTransform.newBuilder()
+ .putInputs("input", "parDo.out")
+ .putOutputs("output", "timer.out")
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+ .setPayload(
+ ParDoPayload.newBuilder()
+
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+ .putTimerSpecs("timer",
TimerSpec.getDefaultInstance())
+ .build()
+ .toByteString()))
+ .build();
+
+ QueryablePipeline p =
+ QueryablePipeline.fromComponents(
+ partialComponents
+ .toBuilder()
+ .putTransforms("parDo", parDoTransform)
+ .putPcollections(
+ "parDo.out",
PCollection.newBuilder().setUniqueName("parDo.out").build())
+ .putTransforms("timer", timerTransform)
+ .putPcollections(
+ "timer.out",
PCollection.newBuilder().setUniqueName("timer.out").build())
+ .putEnvironments("common",
Environment.newBuilder().setUrl("common").build())
+ .build());
+
+ ExecutableStage subgraph =
+ GreedilyFusedExecutableStage.forGrpcPortRead(
+ p,
+ impulseOutputNode,
+ ImmutableSet.of(PipelineNode.pTransform("parDo", parDoTransform)));
+ assertThat(
+ subgraph.getOutputPCollections(),
+ contains(
+ PipelineNode.pCollection(
+ "parDo.out",
PCollection.newBuilder().setUniqueName("parDo.out").build())));
+ assertThat(
+ subgraph.toPTransform().getSubtransformsList(),
containsInAnyOrder("parDo"));
+ }
+
+ @Test
public void fusesFlatten() {
// (impulse.out) -> parDo -> parDo.out --> flatten -> flatten.out ->
window -> window.out
// \ /
diff --git
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
index 17f4ccb..76bddde 100644
---
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
+++
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
@@ -32,6 +32,8 @@ import
org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.junit.Before;
@@ -766,6 +768,143 @@ public class GreedyPipelineFuserTest {
}
/*
+ * impulse -> .out -> parDo -> .out -> stateful -> .out
+ * becomes
+ * (impulse.out) -> parDo -> (parDo.out)
+ * (parDo.out) -> stateful
+ */
+ @Test
+ public void statefulParDoRootsStage() {
+ // (impulse.out) -> parDo -> (parDo.out)
+ // (parDo.out) -> stateful -> stateful.out
+ // stateful has a state spec which prevents it from fusing with an
upstream ParDo
+ PTransform parDoTransform =
+ PTransform.newBuilder()
+ .putInputs("input", "impulse.out")
+ .putOutputs("output", "parDo.out")
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+ .setPayload(
+ ParDoPayload.newBuilder()
+
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+ .build()
+ .toByteString()))
+ .build();
+ PTransform statefulTransform =
+ PTransform.newBuilder()
+ .putInputs("input", "parDo.out")
+ .putOutputs("output", "stateful.out")
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+ .setPayload(
+ ParDoPayload.newBuilder()
+
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+ .putStateSpecs("state",
StateSpec.getDefaultInstance())
+ .build()
+ .toByteString()))
+ .build();
+
+ Components components =
+ partialComponents
+ .toBuilder()
+ .putTransforms("parDo", parDoTransform)
+ .putPcollections(
+ "parDo.out",
PCollection.newBuilder().setUniqueName("parDo.out").build())
+ .putTransforms("stateful", statefulTransform)
+ .putPcollections(
+ "stateful.out",
PCollection.newBuilder().setUniqueName("stateful.out").build())
+ .putEnvironments("common",
Environment.newBuilder().setUrl("common").build())
+ .build();
+ FusedPipeline fused =
+
GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build());
+
+ assertThat(
+ fused.getRunnerExecutedTransforms(),
+ containsInAnyOrder(
+ PipelineNode.pTransform("impulse",
components.getTransformsOrThrow("impulse"))));
+ assertThat(
+ fused.getFusedStages(),
+ containsInAnyOrder(
+ ExecutableStageMatcher.withInput("impulse.out")
+ .withOutputs("parDo.out")
+ .withTransforms("parDo"),
+ ExecutableStageMatcher.withInput("parDo.out")
+ .withNoOutputs()
+ .withTransforms("stateful")));
+ }
+
+ /*
+ * impulse -> .out -> parDo -> .out -> timer -> .out
+ * becomes
+ * (impulse.out) -> parDo -> (parDo.out)
+ * (parDo.out) -> timer
+ */
+ @Test
+ public void parDoWithTimerRootsStage() {
+ // (impulse.out) -> parDo -> (parDo.out)
+ // (parDo.out) -> timer -> timer.out
+ // timer has a timer spec which prevents it from fusing with an upstream
ParDo
+ PTransform parDoTransform =
+ PTransform.newBuilder()
+ .putInputs("input", "impulse.out")
+ .putOutputs("output", "parDo.out")
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+ .setPayload(
+ ParDoPayload.newBuilder()
+
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+ .build()
+ .toByteString()))
+ .build();
+ PTransform timerTransform =
+ PTransform.newBuilder()
+ .putInputs("input", "parDo.out")
+ .putOutputs("output", "timer.out")
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+ .setPayload(
+ ParDoPayload.newBuilder()
+
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+ .putTimerSpecs("timer",
TimerSpec.getDefaultInstance())
+ .build()
+ .toByteString()))
+ .build();
+
+ Components components =
+ partialComponents
+ .toBuilder()
+ .putTransforms("parDo", parDoTransform)
+ .putPcollections(
+ "parDo.out",
PCollection.newBuilder().setUniqueName("parDo.out").build())
+ .putTransforms("timer", timerTransform)
+ .putPcollections(
+ "timer.out",
PCollection.newBuilder().setUniqueName("timer.out").build())
+ .putEnvironments("common",
Environment.newBuilder().setUrl("common").build())
+ .build();
+
+ FusedPipeline fused =
+
GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build());
+
+ assertThat(
+ fused.getRunnerExecutedTransforms(),
+ containsInAnyOrder(
+ PipelineNode.pTransform("impulse",
components.getTransformsOrThrow("impulse"))));
+ assertThat(
+ fused.getFusedStages(),
+ containsInAnyOrder(
+ ExecutableStageMatcher.withInput("impulse.out")
+ .withOutputs("parDo.out")
+ .withTransforms("parDo"),
+ ExecutableStageMatcher.withInput("parDo.out")
+ .withNoOutputs()
+ .withTransforms("timer")));
+ }
+
+ /*
* impulse -> .out -> ( read -> .out --> goTransform -> .out )
* \
* -> pyTransform -> .out )
--
To stop receiving notification emails like this one, please contact
[email protected].