This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c565881 [BEAM-2939] Support SDF expansion in the Flink runner.
new 365c99d Merge pull request #8412 [BEAM-2939] Support basic SDF
expansion for the Flink runner.
c565881 is described below
commit c565881b3041730f4e1206ed8404e4b0317e5037
Author: Robert Bradshaw <[email protected]>
AuthorDate: Fri Apr 26 14:43:12 2019 +0200
[BEAM-2939] Support SDF expansion in the Flink runner.
---
.../runners/core/construction/Environments.java | 9 +++++
.../core/construction/PTransformTranslation.java | 4 +++
.../graph/GreedyPCollectionFusers.java | 18 ++++++++++
.../core/construction/graph/QueryablePipeline.java | 6 +++-
.../runners/portability/flink_runner_test.py | 4 ---
.../portability/fn_api_runner_transforms.py | 39 +++++++++++++++++++---
.../runners/portability/portable_runner.py | 31 +++++++++++------
7 files changed, 91 insertions(+), 20 deletions(-)
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
index a052756..c00bb35 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
@@ -57,6 +57,15 @@ public class Environments {
Environments::combineExtractor)
.put(PTransformTranslation.PAR_DO_TRANSFORM_URN,
Environments::parDoExtractor)
.put(PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN,
Environments::parDoExtractor)
+ .put(
+ PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN,
+ Environments::parDoExtractor)
+ .put(
+ PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN,
+ Environments::parDoExtractor)
+ .put(
+
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN,
+ Environments::parDoExtractor)
.put(PTransformTranslation.READ_TRANSFORM_URN,
Environments::readExtractor)
.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN,
Environments::windowExtractor)
.build();
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index b880283..92a0350 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -111,6 +111,10 @@ public class PTransformTranslation {
getUrn(SplittableParDoComponents.PROCESS_KEYED_ELEMENTS);
public static final String SPLITTABLE_PROCESS_ELEMENTS_URN =
getUrn(SplittableParDoComponents.PROCESS_ELEMENTS);
+ public static final String SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN =
+ getUrn(SplittableParDoComponents.SPLIT_AND_SIZE_RESTRICTIONS);
+ public static final String
SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN =
+
getUrn(SplittableParDoComponents.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS);
public static final String ITERABLE_SIDE_INPUT =
getUrn(RunnerApi.StandardSideInputTypes.Enum.ITERABLE);
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
index e1c5091..61875c6 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
@@ -47,6 +47,15 @@ class GreedyPCollectionFusers {
ImmutableMap.<String, FusibilityChecker>builder()
.put(PTransformTranslation.PAR_DO_TRANSFORM_URN,
GreedyPCollectionFusers::canFuseParDo)
.put(
+ PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN,
+ GreedyPCollectionFusers::canFuseParDo)
+ .put(
+ PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN,
+ GreedyPCollectionFusers::canFuseParDo)
+ .put(
+
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN,
+ GreedyPCollectionFusers::canFuseParDo)
+ .put(
PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN,
GreedyPCollectionFusers::canFuseCompatibleEnvironment)
.put(
@@ -75,6 +84,15 @@ class GreedyPCollectionFusers {
PTransformTranslation.PAR_DO_TRANSFORM_URN,
GreedyPCollectionFusers::parDoCompatibility)
.put(
+ PTransformTranslation.SPLITTABLE_PAIR_WITH_RESTRICTION_URN,
+ GreedyPCollectionFusers::parDoCompatibility)
+ .put(
+ PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN,
+ GreedyPCollectionFusers::parDoCompatibility)
+ .put(
+
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN,
+ GreedyPCollectionFusers::parDoCompatibility)
+ .put(
PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN,
GreedyPCollectionFusers::compatibleEnvironments)
.put(
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
index c4b12d7..81b49f1 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
@@ -30,6 +30,8 @@ import static
org.apache.beam.runners.core.construction.PTransformTranslation.PA
import static
org.apache.beam.runners.core.construction.PTransformTranslation.READ_TRANSFORM_URN;
import static
org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN;
import static
org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_PROCESS_KEYED_URN;
+import static
org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN;
+import static
org.apache.beam.runners.core.construction.PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN;
import static
org.apache.beam.runners.core.construction.PTransformTranslation.TEST_STREAM_TRANSFORM_URN;
import static
org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
@@ -173,7 +175,9 @@ public class QueryablePipeline {
COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN,
COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN,
SPLITTABLE_PROCESS_KEYED_URN,
- SPLITTABLE_PROCESS_ELEMENTS_URN);
+ SPLITTABLE_PROCESS_ELEMENTS_URN,
+ SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN,
+ SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN);
/** Returns true if the provided transform is a primitive. */
private static boolean isPrimitiveTransform(PTransform transform) {
diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
index b607e5a..8e8a3c2 100644
--- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -44,7 +44,6 @@ if __name__ == '__main__':
#
# python -m apache_beam.runners.portability.flink_runner_test \
# --flink_job_server_jar=/path/to/job_server.jar \
- # --type=Batch \
# --environment_type=docker \
# --extra_experiments=beam_experiments \
# [FlinkRunnerTest.test_method, ...]
@@ -240,9 +239,6 @@ if __name__ == '__main__':
counter_name, line)
)
- def test_sdf(self):
- raise unittest.SkipTest("BEAM-2939")
-
def test_sdf_with_sdf_initiated_checkpointing(self):
raise unittest.SkipTest("BEAM-2939")
diff --git
a/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
index f1aef40..207688c 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
@@ -39,7 +39,9 @@ from apache_beam.utils import proto_utils
KNOWN_COMPOSITES = frozenset([
common_urns.primitives.GROUP_BY_KEY.urn,
- common_urns.composites.COMBINE_PER_KEY.urn])
+ common_urns.composites.COMBINE_PER_KEY.urn,
+ common_urns.primitives.PAR_DO.urn, # After SDF expansion.
+])
COMBINE_URNS = frozenset([
common_urns.composites.COMBINE_PER_KEY.urn,
@@ -420,7 +422,10 @@ def pipeline_from_stages(
if parent is None:
roots.add(child)
else:
- if parent not in components.transforms:
+ if isinstance(parent, Stage):
+ parent = parent.name
+ if (parent not in components.transforms
+ and parent in pipeline_proto.components.transforms):
components.transforms[parent].CopyFrom(
pipeline_proto.components.transforms[parent])
del components.transforms[parent].subtransforms[:]
@@ -644,7 +649,7 @@ def lift_combiners(stages, context):
[transform],
downstream_side_inputs=base_stage.downstream_side_inputs,
must_follow=base_stage.must_follow,
- parent=base_stage.name,
+ parent=base_stage,
environment=base_stage.environment)
yield make_stage(
@@ -723,8 +728,14 @@ def expand_sdf(stages, context):
getattr(proto, name).extend(value)
elif name == 'urn':
proto.spec.urn = value
+ elif name == 'payload':
+ proto.spec.payload = value
else:
setattr(proto, name, value)
+ if 'unique_name' not in kwargs and hasattr(proto, 'unique_name'):
+ proto.unique_name = unique_name(
+ set([p.unique_name for p in protos.values()]),
+ original.unique_name + suffix)
return new_id
def make_stage(base_stage, transform_id, extra_must_follow=()):
@@ -788,6 +799,25 @@ def expand_sdf(stages, context):
inputs=dict(transform.inputs, **{main_input_tag: paired_pcoll_id}),
outputs={'out': split_pcoll_id})
+ if common_urns.composites.RESHUFFLE.urn in context.known_runner_urns:
+ reshuffle_pcoll_id = copy_like(
+ context.components.pcollections,
+ main_input_id,
+ '_reshuffle',
+ coder_id=sized_coder_id)
+ reshuffle_transform_id = copy_like(
+ context.components.transforms,
+ transform,
+ unique_name=transform.unique_name + '/Reshuffle',
+ urn=common_urns.composites.RESHUFFLE.urn,
+ payload=b'',
+ inputs=dict(transform.inputs, **{main_input_tag:
split_pcoll_id}),
+ outputs={'out': reshuffle_pcoll_id})
+ yield make_stage(stage, reshuffle_transform_id)
+ else:
+ reshuffle_pcoll_id = split_pcoll_id
+ reshuffle_transform_id = None
+
process_transform_id = copy_like(
context.components.transforms,
transform,
@@ -795,7 +825,8 @@ def expand_sdf(stages, context):
urn=
common_urns.sdf_components.PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS
.urn,
- inputs=dict(transform.inputs, **{main_input_tag: split_pcoll_id}))
+ inputs=dict(
+ transform.inputs, **{main_input_tag: reshuffle_pcoll_id}))
yield make_stage(stage, pair_transform_id)
split_stage = make_stage(stage, split_transform_id)
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py
b/sdks/python/apache_beam/runners/portability/portable_runner.py
index cb52b92..4759748 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -195,21 +195,18 @@ class PortableRunner(runner.PipelineRunner):
del transform_proto.subtransforms[:]
# Preemptively apply combiner lifting, until all runners support it.
- # This optimization is idempotent.
+ # Also apply sdf expansion.
+ # These optimizations commute and are idempotent.
pre_optimize = options.view_as(DebugOptions).lookup_experiment(
- 'pre_optimize', 'combine').lower()
+ 'pre_optimize', 'lift_combiners,expand_sdf').lower()
if not options.view_as(StandardOptions).streaming:
flink_known_urns = frozenset([
common_urns.composites.RESHUFFLE.urn,
common_urns.primitives.IMPULSE.urn,
common_urns.primitives.FLATTEN.urn,
common_urns.primitives.GROUP_BY_KEY.urn])
- if pre_optimize == 'combine':
- proto_pipeline = fn_api_runner_transforms.optimize_pipeline(
- proto_pipeline,
- phases=[fn_api_runner_transforms.lift_combiners],
- known_runner_urns=flink_known_urns,
- partial=True)
+ if pre_optimize == 'none':
+ pass
elif pre_optimize == 'all':
proto_pipeline = fn_api_runner_transforms.optimize_pipeline(
proto_pipeline,
@@ -217,6 +214,7 @@ class PortableRunner(runner.PipelineRunner):
fn_api_runner_transforms.annotate_stateful_dofns_as_roots,
fn_api_runner_transforms.fix_side_input_pcoll_coders,
fn_api_runner_transforms.lift_combiners,
+ fn_api_runner_transforms.expand_sdf,
fn_api_runner_transforms.fix_flatten_coders,
# fn_api_runner_transforms.sink_flattens,
fn_api_runner_transforms.greedily_fuse,
@@ -225,10 +223,21 @@ class PortableRunner(runner.PipelineRunner):
fn_api_runner_transforms.remove_data_plane_ops,
fn_api_runner_transforms.sort_stages],
known_runner_urns=flink_known_urns)
- elif pre_optimize == 'none':
- pass
else:
- raise ValueError('Unknown value for pre_optimize: %s' % pre_optimize)
+ phases = []
+ for phase_name in pre_optimize.split(','):
+ # For now, these are all we allow.
+ if phase_name in ('lift_combiners', 'expand_sdf'):
+ phases.append(getattr(fn_api_runner_transforms, phase_name))
+ else:
+ raise ValueError(
+ 'Unknown or inapplicable phase for pre_optimize: %s'
+ % phase_name)
+ proto_pipeline = fn_api_runner_transforms.optimize_pipeline(
+ proto_pipeline,
+ phases=phases,
+ known_runner_urns=flink_known_urns,
+ partial=True)
if not job_service:
channel = grpc.insecure_channel(job_endpoint)