This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a46cfbf  Break fusion for a ParDo which has State or Timers
     new 8be36b9  [BEAM-3565] Break fusion for a ParDo which has State or Timers
a46cfbf is described below

commit a46cfbf8adc13318e4e372f079c54b25b5cdbff2
Author: Thomas Groh <tg...@google.com>
AuthorDate: Fri Feb 16 11:29:22 2018 -0800

    Break fusion for a ParDo which has State or Timers
    
    Because these are provided in a key-partitioned manner, the upstream
    stage has to preserve keys for this to be executable. This could be
    checked, but this is a simpler method to break fusion when it is known
    it will be appropriate.
---
 .../graph/GreedyPCollectionFusers.java             |  41 +++---
 .../graph/GreedilyFusedExecutableStageTest.java    | 122 ++++++++++++++++++
 .../graph/GreedyPipelineFuserTest.java             | 139 +++++++++++++++++++++
 3 files changed, 288 insertions(+), 14 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
index da2c92b..992b463 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
@@ -21,6 +21,7 @@ package org.apache.beam.runners.core.construction.graph;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.InvalidProtocolBufferException;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
@@ -131,27 +132,39 @@ class GreedyPCollectionFusers {
       // The PCollection's producer and this ParDo execute in different 
environments, so fusion
       // is never possible.
       return false;
-    } else if (!pipeline.getSideInputs(parDo).isEmpty()) {
-      // At execution time, a Runner is required to only provide inputs to a 
PTransform that, at the
-      // time the PTransform processes them, the associated window is ready in 
all side inputs that
-      // the PTransform consumes. For an arbitrary stage, it is significantly 
complex for the runner
-      // to determine this for each input. As a result, we break fusion to 
simplify this inspection.
-      // In general, a ParDo which consumes side inputs cannot be fused into 
an executable subgraph
-      // alongside any transforms which are upstream of any of its side inputs.
+    }
+    if (!pipeline.getSideInputs(parDo).isEmpty()) {
+      // At execution time, a Runner is required to only provide inputs to a 
PTransform that, at
+      // the time the PTransform processes them, the associated window is 
ready in all side inputs
+      // that the PTransform consumes. For an arbitrary stage, it is 
significantly complex for the
+      // runner to determine this for each input. As a result, we break fusion 
to simplify this
+      // inspection. In general, a ParDo which consumes side inputs cannot be 
fused into an
+      // executable stage alongside any transforms which are upstream of any 
of its side inputs.
       return false;
+    } else {
+      try {
+        ParDoPayload payload =
+            
ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload());
+        if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 
0) {
+          // Inputs to a ParDo that uses State or Timers must be 
key-partitioned, and elements for
+          // a key must execute serially. To avoid checking if the rest of the 
stage is
+          // key-partitioned and preserves keys, these ParDos do not fuse into 
an existing stage.
+          return false;
+        }
+      } catch (InvalidProtocolBufferException e) {
+        throw new IllegalArgumentException(e);
+      }
     }
     return true;
   }
 
   private static boolean parDoCompatibility(
       PTransformNode parDo, PTransformNode other, QueryablePipeline pipeline) {
-    if (!pipeline.getSideInputs(parDo).isEmpty()) {
-      // This is a convenience rather than a strict requirement. In general, a 
ParDo that consumes
-      // side inputs can be fused with other transforms in the same 
environment which are not
-      // upstream of any of the side inputs.
-      return false;
-    }
-    return compatibleEnvironments(parDo, other, pipeline);
+    // This is a convenience rather than a strict requirement. In general, a 
ParDo that consumes
+    // side inputs can be fused with other transforms in the same environment 
which are not
+    // upstream of any of the side inputs.
+    return pipeline.getSideInputs(parDo).isEmpty()
+        && compatibleEnvironments(parDo, other, pipeline);
   }
 
   /**
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedilyFusedExecutableStageTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedilyFusedExecutableStageTest.java
index 563e155..8612e34 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedilyFusedExecutableStageTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedilyFusedExecutableStageTest.java
@@ -38,6 +38,8 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import 
org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode;
@@ -242,6 +244,126 @@ public class GreedilyFusedExecutableStageTest {
   }
 
   @Test
+  public void materializesWithStatefulConsumer() {
+    // (impulse.out) -> parDo -> (parDo.out)
+    // (parDo.out) -> stateful -> stateful.out
+    // stateful has a state spec which prevents it from fusing with an 
upstream ParDo
+    PTransform parDoTransform =
+        PTransform.newBuilder()
+            .putInputs("input", "impulse.out")
+            .putOutputs("output", "parDo.out")
+            .setSpec(
+                FunctionSpec.newBuilder()
+                    .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+                    .setPayload(
+                        ParDoPayload.newBuilder()
+                            
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+                            .build()
+                            .toByteString()))
+            .build();
+    PTransform statefulTransform =
+        PTransform.newBuilder()
+            .putInputs("input", "parDo.out")
+            .putOutputs("output", "stateful.out")
+            .setSpec(
+                FunctionSpec.newBuilder()
+                    .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+                    .setPayload(
+                        ParDoPayload.newBuilder()
+                            
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+                            .putStateSpecs("state", 
StateSpec.getDefaultInstance())
+                            .build()
+                            .toByteString()))
+            .build();
+
+    QueryablePipeline p =
+        QueryablePipeline.fromComponents(
+            partialComponents
+                .toBuilder()
+                .putTransforms("parDo", parDoTransform)
+                .putPcollections(
+                    "parDo.out", 
PCollection.newBuilder().setUniqueName("parDo.out").build())
+                .putTransforms("stateful", statefulTransform)
+                .putPcollections(
+                    "stateful.out", 
PCollection.newBuilder().setUniqueName("stateful.out").build())
+                .putEnvironments("common", 
Environment.newBuilder().setUrl("common").build())
+                .build());
+
+    ExecutableStage subgraph =
+        GreedilyFusedExecutableStage.forGrpcPortRead(
+            p,
+            impulseOutputNode,
+            ImmutableSet.of(PipelineNode.pTransform("parDo", parDoTransform)));
+    assertThat(
+        subgraph.getOutputPCollections(),
+        contains(
+            PipelineNode.pCollection(
+                "parDo.out", 
PCollection.newBuilder().setUniqueName("parDo.out").build())));
+    assertThat(
+        subgraph.toPTransform().getSubtransformsList(), 
containsInAnyOrder("parDo"));
+  }
+
+  @Test
+  public void materializesWithConsumerWithTimer() {
+    // (impulse.out) -> parDo -> (parDo.out)
+    // (parDo.out) -> timer -> timer.out
+    // timer has a timer spec which prevents it from fusing with an upstream 
ParDo
+    PTransform parDoTransform =
+        PTransform.newBuilder()
+            .putInputs("input", "impulse.out")
+            .putOutputs("output", "parDo.out")
+            .setSpec(
+                FunctionSpec.newBuilder()
+                    .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+                    .setPayload(
+                        ParDoPayload.newBuilder()
+                            
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+                            .build()
+                            .toByteString()))
+            .build();
+    PTransform timerTransform =
+        PTransform.newBuilder()
+            .putInputs("input", "parDo.out")
+            .putOutputs("output", "timer.out")
+            .setSpec(
+                FunctionSpec.newBuilder()
+                    .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+                    .setPayload(
+                        ParDoPayload.newBuilder()
+                            
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+                            .putTimerSpecs("timer", 
TimerSpec.getDefaultInstance())
+                            .build()
+                            .toByteString()))
+            .build();
+
+    QueryablePipeline p =
+        QueryablePipeline.fromComponents(
+            partialComponents
+                .toBuilder()
+                .putTransforms("parDo", parDoTransform)
+                .putPcollections(
+                    "parDo.out", 
PCollection.newBuilder().setUniqueName("parDo.out").build())
+                .putTransforms("timer", timerTransform)
+                .putPcollections(
+                    "timer.out", 
PCollection.newBuilder().setUniqueName("timer.out").build())
+                .putEnvironments("common", 
Environment.newBuilder().setUrl("common").build())
+                .build());
+
+    ExecutableStage subgraph =
+        GreedilyFusedExecutableStage.forGrpcPortRead(
+            p,
+            impulseOutputNode,
+            ImmutableSet.of(PipelineNode.pTransform("parDo", parDoTransform)));
+    assertThat(
+        subgraph.getOutputPCollections(),
+        contains(
+            PipelineNode.pCollection(
+                "parDo.out", 
PCollection.newBuilder().setUniqueName("parDo.out").build())));
+    assertThat(
+        subgraph.toPTransform().getSubtransformsList(), 
containsInAnyOrder("parDo"));
+  }
+
+  @Test
   public void fusesFlatten() {
     // (impulse.out) -> parDo -> parDo.out --> flatten -> flatten.out -> 
window -> window.out
     //               \                     /
diff --git 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
index 17f4ccb..76bddde 100644
--- 
a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
+++ 
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java
@@ -32,6 +32,8 @@ import 
org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec;
 import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.junit.Before;
@@ -766,6 +768,143 @@ public class GreedyPipelineFuserTest {
   }
 
   /*
+   * impulse -> .out -> parDo -> .out -> stateful -> .out
+   * becomes
+   * (impulse.out) -> parDo -> (parDo.out)
+   * (parDo.out) -> stateful
+   */
+  @Test
+  public void statefulParDoRootsStage() {
+    // (impulse.out) -> parDo -> (parDo.out)
+    // (parDo.out) -> stateful -> stateful.out
+    // stateful has a state spec which prevents it from fusing with an 
upstream ParDo
+    PTransform parDoTransform =
+        PTransform.newBuilder()
+            .putInputs("input", "impulse.out")
+            .putOutputs("output", "parDo.out")
+            .setSpec(
+                FunctionSpec.newBuilder()
+                    .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+                    .setPayload(
+                        ParDoPayload.newBuilder()
+                            
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+                            .build()
+                            .toByteString()))
+            .build();
+    PTransform statefulTransform =
+        PTransform.newBuilder()
+            .putInputs("input", "parDo.out")
+            .putOutputs("output", "stateful.out")
+            .setSpec(
+                FunctionSpec.newBuilder()
+                    .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+                    .setPayload(
+                        ParDoPayload.newBuilder()
+                            
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+                            .putStateSpecs("state", 
StateSpec.getDefaultInstance())
+                            .build()
+                            .toByteString()))
+            .build();
+
+    Components components =
+        partialComponents
+            .toBuilder()
+            .putTransforms("parDo", parDoTransform)
+            .putPcollections(
+                "parDo.out", 
PCollection.newBuilder().setUniqueName("parDo.out").build())
+            .putTransforms("stateful", statefulTransform)
+            .putPcollections(
+                "stateful.out", 
PCollection.newBuilder().setUniqueName("stateful.out").build())
+            .putEnvironments("common", 
Environment.newBuilder().setUrl("common").build())
+            .build();
+    FusedPipeline fused =
+        
GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build());
+
+    assertThat(
+        fused.getRunnerExecutedTransforms(),
+        containsInAnyOrder(
+            PipelineNode.pTransform("impulse", 
components.getTransformsOrThrow("impulse"))));
+    assertThat(
+        fused.getFusedStages(),
+        containsInAnyOrder(
+            ExecutableStageMatcher.withInput("impulse.out")
+                .withOutputs("parDo.out")
+                .withTransforms("parDo"),
+            ExecutableStageMatcher.withInput("parDo.out")
+                .withNoOutputs()
+                .withTransforms("stateful")));
+  }
+
+  /*
+   * impulse -> .out -> parDo -> .out -> timer -> .out
+   * becomes
+   * (impulse.out) -> parDo -> (parDo.out)
+   * (parDo.out) -> timer
+   */
+  @Test
+  public void parDoWithTimerRootsStage() {
+    // (impulse.out) -> parDo -> (parDo.out)
+    // (parDo.out) -> timer -> timer.out
+    // timer has a timer spec which prevents it from fusing with an upstream 
ParDo
+    PTransform parDoTransform =
+        PTransform.newBuilder()
+            .putInputs("input", "impulse.out")
+            .putOutputs("output", "parDo.out")
+            .setSpec(
+                FunctionSpec.newBuilder()
+                    .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+                    .setPayload(
+                        ParDoPayload.newBuilder()
+                            
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+                            .build()
+                            .toByteString()))
+            .build();
+    PTransform timerTransform =
+        PTransform.newBuilder()
+            .putInputs("input", "parDo.out")
+            .putOutputs("output", "timer.out")
+            .setSpec(
+                FunctionSpec.newBuilder()
+                    .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN)
+                    .setPayload(
+                        ParDoPayload.newBuilder()
+                            
.setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common"))
+                            .putTimerSpecs("timer", 
TimerSpec.getDefaultInstance())
+                            .build()
+                            .toByteString()))
+            .build();
+
+    Components components =
+        partialComponents
+            .toBuilder()
+            .putTransforms("parDo", parDoTransform)
+            .putPcollections(
+                "parDo.out", 
PCollection.newBuilder().setUniqueName("parDo.out").build())
+            .putTransforms("timer", timerTransform)
+            .putPcollections(
+                "timer.out", 
PCollection.newBuilder().setUniqueName("timer.out").build())
+            .putEnvironments("common", 
Environment.newBuilder().setUrl("common").build())
+            .build();
+
+    FusedPipeline fused =
+        
GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build());
+
+    assertThat(
+        fused.getRunnerExecutedTransforms(),
+        containsInAnyOrder(
+            PipelineNode.pTransform("impulse", 
components.getTransformsOrThrow("impulse"))));
+    assertThat(
+        fused.getFusedStages(),
+        containsInAnyOrder(
+            ExecutableStageMatcher.withInput("impulse.out")
+                .withOutputs("parDo.out")
+                .withTransforms("parDo"),
+            ExecutableStageMatcher.withInput("parDo.out")
+                .withNoOutputs()
+                .withTransforms("timer")));
+  }
+
+  /*
    * impulse -> .out -> ( read -> .out --> goTransform -> .out )
    *                                    \
    *                                     -> pyTransform -> .out )

-- 
To stop receiving notification emails like this one, please contact
lc...@apache.org.

Reply via email to