This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c565881  [BEAM-2939] Support SDF expansion in the Flink runner.
     new 365c99d  Merge pull request #8412 [BEAM-2939] Support basic SDF 
expansion for the Flink runner.
c565881 is described below

commit c565881b3041730f4e1206ed8404e4b0317e5037
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri Apr 26 14:43:12 2019 +0200

    [BEAM-2939] Support SDF expansion in the Flink runner.
---
 .../runners/core/construction/Environments.java    |  9 +++++
 .../core/construction/PTransformTranslation.java   |  4 +++
 .../graph/GreedyPCollectionFusers.java             | 18 ++++++++++
 .../core/construction/graph/QueryablePipeline.java |  6 +++-
 .../runners/portability/flink_runner_test.py       |  4 ---
 .../portability/fn_api_runner_transforms.py        | 39 +++++++++++++++++++---
 .../runners/portability/portable_runner.py         | 31 +++++++++++------
 7 files changed, 91 insertions(+), 20 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
index a052756..c00bb35 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
@@ -57,6 +57,15 @@ public class Environments {
               Environments::combineExtractor)
           .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, 
Environments::parDoExtractor)
           .put(PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN, 
Environments::parDoExtractor)
+          .put(
+              PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN,
+              Environments::parDoExtractor)
+          .put(
+              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN,
+              Environments::parDoExtractor)
+          .put(
+              
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN,
+              Environments::parDoExtractor)
           .put(PTransformTranslation.READ_TRANSFORM_URN, 
Environments::readExtractor)
           .put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, 
Environments::windowExtractor)
           .build();
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index b880283..92a0350 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -111,6 +111,10 @@ public class PTransformTranslation {
       getUrn(SplittableParDoComponents.PROCESS_KEYED_ELEMENTS);
   public static final String SPLITTABLE_PROCESS_ELEMENTS_URN =
       getUrn(SplittableParDoComponents.PROCESS_ELEMENTS);
+  public static final String SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN =
+      getUrn(SplittableParDoComponents.SPLIT_AND_SIZE_RESTRICTIONS);
+  public static final String 
SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN =
+      
getUrn(SplittableParDoComponents.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS);
 
   public static final String ITERABLE_SIDE_INPUT =
       getUrn(RunnerApi.StandardSideInputTypes.Enum.ITERABLE);
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
index e1c5091..61875c6 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
@@ -47,6 +47,15 @@ class GreedyPCollectionFusers {
       ImmutableMap.<String, FusibilityChecker>builder()
           .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, 
GreedyPCollectionFusers::canFuseParDo)
           .put(
+              PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN,
+              GreedyPCollectionFusers::canFuseParDo)
+          .put(
+              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN,
+              GreedyPCollectionFusers::canFuseParDo)
+          .put(
+              
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN,
+              GreedyPCollectionFusers::canFuseParDo)
+          .put(
               PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN,
               GreedyPCollectionFusers::canFuseCompatibleEnvironment)
           .put(
@@ -75,6 +84,15 @@ class GreedyPCollectionFusers {
               PTransformTranslation.PAR_DO_TRANSFORM_URN,
               GreedyPCollectionFusers::parDoCompatibility)
           .put(
+              PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN,
+              GreedyPCollectionFusers::parDoCompatibility)
+          .put(
+              PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN,
+              GreedyPCollectionFusers::parDoCompatibility)
+          .put(
+              
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN,
+              GreedyPCollectionFusers::parDoCompatibility)
+          .put(
               PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN,
               GreedyPCollectionFusers::compatibleEnvironments)
           .put(
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
index c4b12d7..81b49f1 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
@@ -30,6 +30,8 @@ import static 
org.apache.beam.runners.core.construction.PTransformTranslation.PA
 import static 
org.apache.beam.runners.core.construction.PTransformTranslation.READ_TRANSFORM_URN;
 import static 
org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN;
 import static 
org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN;
+import static 
org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN;
+import static 
org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN;
 import static 
org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
 import static 
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
 
@@ -173,7 +175,9 @@ public class QueryablePipeline {
           COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN,
           COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN,
           SPLITTABLE_PROCESS_KEYED_URN,
-          SPLITTABLE_PROCESS_ELEMENTS_URN);
+          SPLITTABLE_PROCESS_ELEMENTS_URN,
+          SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN,
+          SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN);
 
   /** Returns true if the provided transform is a primitive. */
   private static boolean isPrimitiveTransform(PTransform transform) {
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py 
b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index b607e5a..8e8a3c2 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -44,7 +44,6 @@ if __name__ == '__main__':
   #
   # python -m apache_beam.runners.portability.flink_runner_test \
   #     --flink_job_server_jar=/path/to/job_server.jar \
-  #     --type=Batch \
   #     --environment_type=docker \
   #     --extra_experiments=beam_experiments \
   #     [FlinkRunnerTest.test_method, ...]
@@ -240,9 +239,6 @@ if __name__ == '__main__':
                 counter_name, line)
         )
 
-    def test_sdf(self):
-      raise unittest.SkipTest("BEAM-2939")
-
     def test_sdf_with_sdf_initiated_checkpointing(self):
       raise unittest.SkipTest("BEAM-2939")
 
diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
index f1aef40..207688c 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
@@ -39,7 +39,9 @@ from apache_beam.utils import proto_utils
 
 KNOWN_COMPOSITES = frozenset([
     common_urns.primitives.GROUP_BY_KEY.urn,
-    common_urns.composites.COMBINE_PER_KEY.urn])
+    common_urns.composites.COMBINE_PER_KEY.urn,
+    common_urns.primitives.PAR_DO.urn,  # After SDF expansion.
+])
 
 COMBINE_URNS = frozenset([
     common_urns.composites.COMBINE_PER_KEY.urn,
@@ -420,7 +422,10 @@ def pipeline_from_stages(
     if parent is None:
       roots.add(child)
     else:
-      if parent not in components.transforms:
+      if isinstance(parent, Stage):
+        parent = parent.name
+      if (parent not in components.transforms
+          and parent in pipeline_proto.components.transforms):
         components.transforms[parent].CopyFrom(
             pipeline_proto.components.transforms[parent])
         del components.transforms[parent].subtransforms[:]
@@ -644,7 +649,7 @@ def lift_combiners(stages, context):
             [transform],
             downstream_side_inputs=base_stage.downstream_side_inputs,
             must_follow=base_stage.must_follow,
-            parent=base_stage.name,
+            parent=base_stage,
             environment=base_stage.environment)
 
       yield make_stage(
@@ -723,8 +728,14 @@ def expand_sdf(stages, context):
               getattr(proto, name).extend(value)
             elif name == 'urn':
               proto.spec.urn = value
+            elif name == 'payload':
+              proto.spec.payload = value
             else:
               setattr(proto, name, value)
+          if 'unique_name' not in kwargs and hasattr(proto, 'unique_name'):
+            proto.unique_name = unique_name(
+                set([p.unique_name for p in protos.values()]),
+                original.unique_name + suffix)
           return new_id
 
         def make_stage(base_stage, transform_id, extra_must_follow=()):
@@ -788,6 +799,25 @@ def expand_sdf(stages, context):
             inputs=dict(transform.inputs, **{main_input_tag: paired_pcoll_id}),
             outputs={'out': split_pcoll_id})
 
+        if common_urns.composites.RESHUFFLE.urn in context.known_runner_urns:
+          reshuffle_pcoll_id = copy_like(
+              context.components.pcollections,
+              main_input_id,
+              '_reshuffle',
+              coder_id=sized_coder_id)
+          reshuffle_transform_id = copy_like(
+              context.components.transforms,
+              transform,
+              unique_name=transform.unique_name + '/Reshuffle',
+              urn=common_urns.composites.RESHUFFLE.urn,
+              payload=b'',
+              inputs=dict(transform.inputs, **{main_input_tag: 
split_pcoll_id}),
+              outputs={'out': reshuffle_pcoll_id})
+          yield make_stage(stage, reshuffle_transform_id)
+        else:
+          reshuffle_pcoll_id = split_pcoll_id
+          reshuffle_transform_id = None
+
         process_transform_id = copy_like(
             context.components.transforms,
             transform,
@@ -795,7 +825,8 @@ def expand_sdf(stages, context):
             urn=
             common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS
             .urn,
-            inputs=dict(transform.inputs, **{main_input_tag: split_pcoll_id}))
+            inputs=dict(
+                transform.inputs, **{main_input_tag: reshuffle_pcoll_id}))
 
         yield make_stage(stage, pair_transform_id)
         split_stage = make_stage(stage, split_transform_id)
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py 
b/sdks/python/apache_beam/runners/portability/portable_runner.py
index cb52b92..4759748 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -195,21 +195,18 @@ class PortableRunner(runner.PipelineRunner):
         del transform_proto.subtransforms[:]
 
     # Preemptively apply combiner lifting, until all runners support it.
-    # This optimization is idempotent.
+    # Also apply sdf expansion.
+    # These optimizations commute and are idempotent.
     pre_optimize = options.view_as(DebugOptions).lookup_experiment(
-        'pre_optimize', 'combine').lower()
+        'pre_optimize', 'lift_combiners,expand_sdf').lower()
     if not options.view_as(StandardOptions).streaming:
       flink_known_urns = frozenset([
           common_urns.composites.RESHUFFLE.urn,
           common_urns.primitives.IMPULSE.urn,
           common_urns.primitives.FLATTEN.urn,
           common_urns.primitives.GROUP_BY_KEY.urn])
-      if pre_optimize == 'combine':
-        proto_pipeline = fn_api_runner_transforms.optimize_pipeline(
-            proto_pipeline,
-            phases=[fn_api_runner_transforms.lift_combiners],
-            known_runner_urns=flink_known_urns,
-            partial=True)
+      if pre_optimize == 'none':
+        pass
       elif pre_optimize == 'all':
         proto_pipeline = fn_api_runner_transforms.optimize_pipeline(
             proto_pipeline,
@@ -217,6 +214,7 @@ class PortableRunner(runner.PipelineRunner):
                     fn_api_runner_transforms.annotate_stateful_dofns_as_roots,
                     fn_api_runner_transforms.fix_side_input_pcoll_coders,
                     fn_api_runner_transforms.lift_combiners,
+                    fn_api_runner_transforms.expand_sdf,
                     fn_api_runner_transforms.fix_flatten_coders,
                     # fn_api_runner_transforms.sink_flattens,
                     fn_api_runner_transforms.greedily_fuse,
@@ -225,10 +223,21 @@ class PortableRunner(runner.PipelineRunner):
                     fn_api_runner_transforms.remove_data_plane_ops,
                     fn_api_runner_transforms.sort_stages],
             known_runner_urns=flink_known_urns)
-      elif pre_optimize == 'none':
-        pass
       else:
-        raise ValueError('Unknown value for pre_optimize: %s' % pre_optimize)
+        phases = []
+        for phase_name in pre_optimize.split(','):
+          # For now, these are all we allow.
+          if phase_name in ('lift_combiners', 'expand_sdf'):
+            phases.append(getattr(fn_api_runner_transforms, phase_name))
+          else:
+            raise ValueError(
+                'Unknown or inapplicable phase for pre_optimize: %s'
+                % phase_name)
+        proto_pipeline = fn_api_runner_transforms.optimize_pipeline(
+            proto_pipeline,
+            phases=phases,
+            known_runner_urns=flink_known_urns,
+            partial=True)
 
     if not job_service:
       channel = grpc.insecure_channel(job_endpoint)

Reply via email to