This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new a46cfbf Break fusion for a ParDo which has State or Timers new 8be36b9 [BEAM-3565] Break fusion for a ParDo which has State or Timers a46cfbf is described below commit a46cfbf8adc13318e4e372f079c54b25b5cdbff2 Author: Thomas Groh <tg...@google.com> AuthorDate: Fri Feb 16 11:29:22 2018 -0800 Break fusion for a ParDo which has State or Timers Because these are provided in a key-partitioned manner, the upstream stage has to preserve keys for this to be executable. This could be checked, but this is a simpler method to break fusion when it is known it will be appropriate. --- .../graph/GreedyPCollectionFusers.java | 41 +++--- .../graph/GreedilyFusedExecutableStageTest.java | 122 ++++++++++++++++++ .../graph/GreedyPipelineFuserTest.java | 139 +++++++++++++++++++++ 3 files changed, 288 insertions(+), 14 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java index da2c92b..992b463 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java @@ -21,6 +21,7 @@ package org.apache.beam.runners.core.construction.graph; import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.ImmutableMap; +import com.google.protobuf.InvalidProtocolBufferException; import java.util.Map; import java.util.Optional; import org.apache.beam.model.pipeline.v1.RunnerApi.Environment; @@ -131,27 +132,39 @@ class GreedyPCollectionFusers { // The PCollection's producer and this ParDo execute in different environments, so fusion // is never possible. return false; - } else if (!pipeline.getSideInputs(parDo).isEmpty()) { - // At execution time, a Runner is required to only provide inputs to a PTransform that, at the - // time the PTransform processes them, the associated window is ready in all side inputs that - // the PTransform consumes. For an arbitrary stage, it is significantly complex for the runner - // to determine this for each input. As a result, we break fusion to simplify this inspection. - // In general, a ParDo which consumes side inputs cannot be fused into an executable subgraph - // alongside any transforms which are upstream of any of its side inputs. + } + if (!pipeline.getSideInputs(parDo).isEmpty()) { + // At execution time, a Runner is required to only provide inputs to a PTransform that, at + // the time the PTransform processes them, the associated window is ready in all side inputs + // that the PTransform consumes. For an arbitrary stage, it is significantly complex for the + // runner to determine this for each input. As a result, we break fusion to simplify this + // inspection. In general, a ParDo which consumes side inputs cannot be fused into an + // executable stage alongside any transforms which are upstream of any of its side inputs. return false; + } else { + try { + ParDoPayload payload = + ParDoPayload.parseFrom(parDo.getTransform().getSpec().getPayload()); + if (payload.getStateSpecsCount() > 0 || payload.getTimerSpecsCount() > 0) { + // Inputs to a ParDo that uses State or Timers must be key-partitioned, and elements for + // a key must execute serially. To avoid checking if the rest of the stage is + // key-partitioned and preserves keys, these ParDos do not fuse into an existing stage. + return false; + } + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException(e); + } } return true; } private static boolean parDoCompatibility( PTransformNode parDo, PTransformNode other, QueryablePipeline pipeline) { - if (!pipeline.getSideInputs(parDo).isEmpty()) { - // This is a convenience rather than a strict requirement. In general, a ParDo that consumes - // side inputs can be fused with other transforms in the same environment which are not - // upstream of any of the side inputs. - return false; - } - return compatibleEnvironments(parDo, other, pipeline); + // This is a convenience rather than a strict requirement. In general, a ParDo that consumes + // side inputs can be fused with other transforms in the same environment which are not + // upstream of any of the side inputs. + return pipeline.getSideInputs(parDo).isEmpty() + && compatibleEnvironments(parDo, other, pipeline); } /** diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedilyFusedExecutableStageTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedilyFusedExecutableStageTest.java index 563e155..8612e34 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedilyFusedExecutableStageTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedilyFusedExecutableStageTest.java @@ -38,6 +38,8 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform; import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload; import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec; import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput; +import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec; +import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec; import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.apache.beam.runners.core.construction.graph.PipelineNode.PCollectionNode; @@ -242,6 +244,126 @@ public class GreedilyFusedExecutableStageTest { } @Test + public void materializesWithStatefulConsumer() { + // (impulse.out) -> parDo -> (parDo.out) + // (parDo.out) -> stateful -> stateful.out + // stateful has a state spec which prevents it from fusing with an upstream ParDo + PTransform parDoTransform = + PTransform.newBuilder() + .putInputs("input", "impulse.out") + .putOutputs("output", "parDo.out") + .setSpec( + FunctionSpec.newBuilder() + .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) + .setPayload( + ParDoPayload.newBuilder() + .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common")) + .build() + .toByteString())) + .build(); + PTransform statefulTransform = + PTransform.newBuilder() + .putInputs("input", "parDo.out") + .putOutputs("output", "stateful.out") + .setSpec( + FunctionSpec.newBuilder() + .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) + .setPayload( + ParDoPayload.newBuilder() + .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common")) + .putStateSpecs("state", StateSpec.getDefaultInstance()) + .build() + .toByteString())) + .build(); + + QueryablePipeline p = + QueryablePipeline.fromComponents( + partialComponents + .toBuilder() + .putTransforms("parDo", parDoTransform) + .putPcollections( + "parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build()) + .putTransforms("stateful", statefulTransform) + .putPcollections( + "stateful.out", PCollection.newBuilder().setUniqueName("stateful.out").build()) + .putEnvironments("common", Environment.newBuilder().setUrl("common").build()) + .build()); + + ExecutableStage subgraph = + GreedilyFusedExecutableStage.forGrpcPortRead( + p, + impulseOutputNode, + ImmutableSet.of(PipelineNode.pTransform("parDo", parDoTransform))); + assertThat( + subgraph.getOutputPCollections(), + contains( + PipelineNode.pCollection( + "parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build()))); + assertThat( + subgraph.toPTransform().getSubtransformsList(), containsInAnyOrder("parDo")); + } + + @Test + public void materializesWithConsumerWithTimer() { + // (impulse.out) -> parDo -> (parDo.out) + // (parDo.out) -> timer -> timer.out + // timer has a timer spec which prevents it from fusing with an upstream ParDo + PTransform parDoTransform = + PTransform.newBuilder() + .putInputs("input", "impulse.out") + .putOutputs("output", "parDo.out") + .setSpec( + FunctionSpec.newBuilder() + .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) + .setPayload( + ParDoPayload.newBuilder() + .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common")) + .build() + .toByteString())) + .build(); + PTransform timerTransform = + PTransform.newBuilder() + .putInputs("input", "parDo.out") + .putOutputs("output", "timer.out") + .setSpec( + FunctionSpec.newBuilder() + .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) + .setPayload( + ParDoPayload.newBuilder() + .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common")) + .putTimerSpecs("timer", TimerSpec.getDefaultInstance()) + .build() + .toByteString())) + .build(); + + QueryablePipeline p = + QueryablePipeline.fromComponents( + partialComponents + .toBuilder() + .putTransforms("parDo", parDoTransform) + .putPcollections( + "parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build()) + .putTransforms("timer", timerTransform) + .putPcollections( + "timer.out", PCollection.newBuilder().setUniqueName("timer.out").build()) + .putEnvironments("common", Environment.newBuilder().setUrl("common").build()) + .build()); + + ExecutableStage subgraph = + GreedilyFusedExecutableStage.forGrpcPortRead( + p, + impulseOutputNode, + ImmutableSet.of(PipelineNode.pTransform("parDo", parDoTransform))); + assertThat( + subgraph.getOutputPCollections(), + contains( + PipelineNode.pCollection( + "parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build()))); + assertThat( + subgraph.toPTransform().getSubtransformsList(), containsInAnyOrder("parDo")); + } + + @Test public void fusesFlatten() { // (impulse.out) -> parDo -> parDo.out --> flatten -> flatten.out -> window -> window.out // \ / diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java index 17f4ccb..76bddde 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/graph/GreedyPipelineFuserTest.java @@ -32,6 +32,8 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec; import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput; +import org.apache.beam.model.pipeline.v1.RunnerApi.StateSpec; +import org.apache.beam.model.pipeline.v1.RunnerApi.TimerSpec; import org.apache.beam.model.pipeline.v1.RunnerApi.WindowIntoPayload; import org.apache.beam.runners.core.construction.PTransformTranslation; import org.junit.Before; @@ -766,6 +768,143 @@ public class GreedyPipelineFuserTest { } /* + * impulse -> .out -> parDo -> .out -> stateful -> .out + * becomes + * (impulse.out) -> parDo -> (parDo.out) + * (parDo.out) -> stateful + */ + @Test + public void statefulParDoRootsStage() { + // (impulse.out) -> parDo -> (parDo.out) + // (parDo.out) -> stateful -> stateful.out + // stateful has a state spec which prevents it from fusing with an upstream ParDo + PTransform parDoTransform = + PTransform.newBuilder() + .putInputs("input", "impulse.out") + .putOutputs("output", "parDo.out") + .setSpec( + FunctionSpec.newBuilder() + .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) + .setPayload( + ParDoPayload.newBuilder() + .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common")) + .build() + .toByteString())) + .build(); + PTransform statefulTransform = + PTransform.newBuilder() + .putInputs("input", "parDo.out") + .putOutputs("output", "stateful.out") + .setSpec( + FunctionSpec.newBuilder() + .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) + .setPayload( + ParDoPayload.newBuilder() + .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common")) + .putStateSpecs("state", StateSpec.getDefaultInstance()) + .build() + .toByteString())) + .build(); + + Components components = + partialComponents + .toBuilder() + .putTransforms("parDo", parDoTransform) + .putPcollections( + "parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build()) + .putTransforms("stateful", statefulTransform) + .putPcollections( + "stateful.out", PCollection.newBuilder().setUniqueName("stateful.out").build()) + .putEnvironments("common", Environment.newBuilder().setUrl("common").build()) + .build(); + FusedPipeline fused = + GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); + + assertThat( + fused.getRunnerExecutedTransforms(), + containsInAnyOrder( + PipelineNode.pTransform("impulse", components.getTransformsOrThrow("impulse")))); + assertThat( + fused.getFusedStages(), + containsInAnyOrder( + ExecutableStageMatcher.withInput("impulse.out") + .withOutputs("parDo.out") + .withTransforms("parDo"), + ExecutableStageMatcher.withInput("parDo.out") + .withNoOutputs() + .withTransforms("stateful"))); + } + + /* + * impulse -> .out -> parDo -> .out -> timer -> .out + * becomes + * (impulse.out) -> parDo -> (parDo.out) + * (parDo.out) -> timer + */ + @Test + public void parDoWithTimerRootsStage() { + // (impulse.out) -> parDo -> (parDo.out) + // (parDo.out) -> timer -> timer.out + // timer has a timer spec which prevents it from fusing with an upstream ParDo + PTransform parDoTransform = + PTransform.newBuilder() + .putInputs("input", "impulse.out") + .putOutputs("output", "parDo.out") + .setSpec( + FunctionSpec.newBuilder() + .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) + .setPayload( + ParDoPayload.newBuilder() + .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common")) + .build() + .toByteString())) + .build(); + PTransform timerTransform = + PTransform.newBuilder() + .putInputs("input", "parDo.out") + .putOutputs("output", "timer.out") + .setSpec( + FunctionSpec.newBuilder() + .setUrn(PTransformTranslation.PAR_DO_TRANSFORM_URN) + .setPayload( + ParDoPayload.newBuilder() + .setDoFn(SdkFunctionSpec.newBuilder().setEnvironmentId("common")) + .putTimerSpecs("timer", TimerSpec.getDefaultInstance()) + .build() + .toByteString())) + .build(); + + Components components = + partialComponents + .toBuilder() + .putTransforms("parDo", parDoTransform) + .putPcollections( + "parDo.out", PCollection.newBuilder().setUniqueName("parDo.out").build()) + .putTransforms("timer", timerTransform) + .putPcollections( + "timer.out", PCollection.newBuilder().setUniqueName("timer.out").build()) + .putEnvironments("common", Environment.newBuilder().setUrl("common").build()) + .build(); + + FusedPipeline fused = + GreedyPipelineFuser.fuse(Pipeline.newBuilder().setComponents(components).build()); + + assertThat( + fused.getRunnerExecutedTransforms(), + containsInAnyOrder( + PipelineNode.pTransform("impulse", components.getTransformsOrThrow("impulse")))); + assertThat( + fused.getFusedStages(), + containsInAnyOrder( + ExecutableStageMatcher.withInput("impulse.out") + .withOutputs("parDo.out") + .withTransforms("parDo"), + ExecutableStageMatcher.withInput("parDo.out") + .withNoOutputs() + .withTransforms("timer"))); + } + + /* * impulse -> .out -> ( read -> .out --> goTransform -> .out ) * \ * -> pyTransform -> .out ) -- To stop receiving notification emails like this one, please contact lc...@apache.org.