[
https://issues.apache.org/jira/browse/BEAM-6186?focusedWorklogId=173417&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-173417
]
ASF GitHub Bot logged work on BEAM-6186:
----------------------------------------
Author: ASF GitHub Bot
Created on: 10/Dec/18 08:21
Start Date: 10/Dec/18 08:21
Worklog Time Spent: 10m
Work Description: robertwb closed pull request #7227: [BEAM-6186]
Optimization cleanup: move phase utilities out of local scope.
URL: https://github.com/apache/beam/pull/7227
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 76155492fe57..62a2d299983d 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -53,6 +53,9 @@
from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners import pipeline_context
from apache_beam.runners import runner
+from apache_beam.runners.portability import fn_api_runner_transforms
+from apache_beam.runners.portability.fn_api_runner_transforms import
only_element
+from apache_beam.runners.portability.fn_api_runner_transforms import
unique_name
from apache_beam.runners.worker import bundle_processor
from apache_beam.runners.worker import data_plane
from apache_beam.runners.worker import sdk_worker
@@ -292,98 +295,15 @@ def maybe_profile(self):
def create_stages(self, pipeline_proto):
- # First define a couple of helpers.
-
- def union(a, b):
- # Minimize the number of distinct sets.
- if not a or a == b:
- return b
- elif not b:
- return a
- else:
- return frozenset.union(a, b)
-
- class Stage(object):
- """A set of Transforms that can be sent to the worker for processing."""
- def __init__(self, name, transforms,
- downstream_side_inputs=None, must_follow=frozenset()):
- self.name = name
- self.transforms = transforms
- self.downstream_side_inputs = downstream_side_inputs
- self.must_follow = must_follow
- self.timer_pcollections = []
-
- def __repr__(self):
- must_follow = ', '.join(prev.name for prev in self.must_follow)
- downstream_side_inputs = ', '.join(
- str(si) for si in self.downstream_side_inputs)
- return "%s\n %s\n must follow: %s\n downstream_side_inputs: %s" % (
- self.name,
- '\n'.join(["%s:%s" % (transform.unique_name, transform.spec.urn)
- for transform in self.transforms]),
- must_follow,
- downstream_side_inputs)
-
- def can_fuse(self, consumer):
- def no_overlap(a, b):
- return not a.intersection(b)
- return (
- not self in consumer.must_follow
- and not self.is_flatten() and not consumer.is_flatten()
- and no_overlap(self.downstream_side_inputs,
consumer.side_inputs()))
-
- def fuse(self, other):
- return Stage(
- "(%s)+(%s)" % (self.name, other.name),
- self.transforms + other.transforms,
- union(self.downstream_side_inputs, other.downstream_side_inputs),
- union(self.must_follow, other.must_follow))
-
- def is_flatten(self):
- return any(transform.spec.urn == common_urns.primitives.FLATTEN.urn
- for transform in self.transforms)
-
- def side_inputs(self):
- for transform in self.transforms:
- if transform.spec.urn == common_urns.primitives.PAR_DO.urn:
- payload = proto_utils.parse_Bytes(
- transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
- for side_input in payload.side_inputs:
- yield transform.inputs[side_input]
-
- def has_as_main_input(self, pcoll):
- for transform in self.transforms:
- if transform.spec.urn == common_urns.primitives.PAR_DO.urn:
- payload = proto_utils.parse_Bytes(
- transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
- local_side_inputs = payload.side_inputs
- else:
- local_side_inputs = {}
- for local_id, pipeline_id in transform.inputs.items():
- if pcoll == pipeline_id and local_id not in local_side_inputs:
- return True
-
- def deduplicate_read(self):
- seen_pcolls = set()
- new_transforms = []
- for transform in self.transforms:
- if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
- pcoll = only_element(list(transform.outputs.items()))[1]
- if pcoll in seen_pcolls:
- continue
- seen_pcolls.add(pcoll)
- new_transforms.append(transform)
- self.transforms = new_transforms
-
# Some helper functions.
+ # TODO(BEAM-6186): Move these to fn_api_runner_transforms.
+
+ Stage = fn_api_runner_transforms.Stage
+ union = fn_api_runner_transforms.union
def add_or_get_coder_id(coder_proto):
- for coder_id, coder in pipeline_components.coders.items():
- if coder == coder_proto:
- return coder_id
- new_coder_id = unique_name(pipeline_components.coders, 'coder')
- pipeline_components.coders[new_coder_id].CopyFrom(coder_proto)
- return new_coder_id
+ return fn_api_runner_transforms.TransformContext(
+ pipeline_components).add_or_get_coder_id(coder_proto)
def windowed_coder_id(coder_id, window_coder_id):
proto = beam_runner_api_pb2.Coder(
@@ -467,10 +387,6 @@ def wrap_unknown_coders(coder_id, with_bytes):
def impulse_to_input(stages):
bytes_coder_id = add_or_get_coder_id(
beam.coders.BytesCoder().to_runner_api(None))
- global_window_coder_id = add_or_get_coder_id(
- beam.coders.coders.GlobalWindowCoder().to_runner_api(None))
- globally_windowed_bytes_coder_id = windowed_coder_id(
- bytes_coder_id, global_window_coder_id)
for stage in stages:
# First map Reads, if any, to Impulse + triggered read op.
@@ -483,7 +399,7 @@ def impulse_to_input(stages):
pipeline_components.pcollections[impulse_pc].CopyFrom(
beam_runner_api_pb2.PCollection(
unique_name=impulse_pc,
- coder_id=globally_windowed_bytes_coder_id,
+ coder_id=bytes_coder_id,
windowing_strategy_id=read_pc_proto.windowing_strategy_id,
is_bounded=read_pc_proto.is_bounded))
stage.transforms.remove(transform)
@@ -539,10 +455,7 @@ def lift_combiners(stages):
output_pcoll = pipeline_components.pcollections[only_element(
list(transform.outputs.values()))]
- windowed_input_coder = pipeline_components.coders[
- input_pcoll.coder_id]
- element_coder_id, window_coder_id = (
- windowed_input_coder.component_coder_ids)
+ element_coder_id = input_pcoll.coder_id
element_coder = pipeline_components.coders[element_coder_id]
key_coder_id, _ = element_coder.component_coder_ids
accumulator_coder_id = combine_payload.accumulator_coder_id
@@ -575,8 +488,7 @@ def lift_combiners(stages):
pipeline_components.pcollections[precombined_pcoll_id].CopyFrom(
beam_runner_api_pb2.PCollection(
unique_name=transform.unique_name + '/Precombine.out',
- coder_id=windowed_coder_id(
- key_accumulator_coder_id, window_coder_id),
+ coder_id=key_accumulator_coder_id,
windowing_strategy_id=input_pcoll.windowing_strategy_id,
is_bounded=input_pcoll.is_bounded))
@@ -585,8 +497,7 @@ def lift_combiners(stages):
pipeline_components.pcollections[grouped_pcoll_id].CopyFrom(
beam_runner_api_pb2.PCollection(
unique_name=transform.unique_name + '/Group.out',
- coder_id=windowed_coder_id(
- key_accumulator_iter_coder_id, window_coder_id),
+ coder_id=key_accumulator_iter_coder_id,
windowing_strategy_id=output_pcoll.windowing_strategy_id,
is_bounded=output_pcoll.is_bounded))
@@ -595,8 +506,7 @@ def lift_combiners(stages):
pipeline_components.pcollections[merged_pcoll_id].CopyFrom(
beam_runner_api_pb2.PCollection(
unique_name=transform.unique_name + '/Merge.out',
- coder_id=windowed_coder_id(
- key_accumulator_coder_id, window_coder_id),
+ coder_id=key_accumulator_coder_id,
windowing_strategy_id=output_pcoll.windowing_strategy_id,
is_bounded=output_pcoll.is_bounded))
@@ -919,10 +829,6 @@ def inject_timer_pcollections(stages):
next(iter(transform.inputs.values()))]
# Create the appropriate coder for the timer PCollection.
key_coder_id = input_pcoll.coder_id
- if (pipeline_components.coders[key_coder_id].spec.spec.urn
- == common_urns.coders.WINDOWED_VALUE.urn):
- key_coder_id = pipeline_components.coders[
- key_coder_id].component_coder_ids[0]
if (pipeline_components.coders[key_coder_id].spec.spec.urn
== common_urns.coders.KV.urn):
key_coder_id = pipeline_components.coders[
@@ -933,10 +839,6 @@ def inject_timer_pcollections(stages):
spec=beam_runner_api_pb2.FunctionSpec(
urn=common_urns.coders.KV.urn)),
component_coder_ids=[key_coder_id, spec.timer_coder_id]))
- timer_pcoll_coder_id = windowed_coder_id(
- key_timer_coder_id,
- pipeline_components.windowing_strategies[
- input_pcoll.windowing_strategy_id].window_coder_id)
# Inject the read and write pcollections.
timer_read_pcoll = unique_name(
pipeline_components.pcollections,
@@ -947,13 +849,13 @@ def inject_timer_pcollections(stages):
pipeline_components.pcollections[timer_read_pcoll].CopyFrom(
beam_runner_api_pb2.PCollection(
unique_name=timer_read_pcoll,
- coder_id=timer_pcoll_coder_id,
+ coder_id=key_timer_coder_id,
windowing_strategy_id=input_pcoll.windowing_strategy_id,
is_bounded=input_pcoll.is_bounded))
pipeline_components.pcollections[timer_write_pcoll].CopyFrom(
beam_runner_api_pb2.PCollection(
unique_name=timer_write_pcoll,
- coder_id=timer_pcoll_coder_id,
+ coder_id=key_timer_coder_id,
windowing_strategy_id=input_pcoll.windowing_strategy_id,
is_bounded=input_pcoll.is_bounded))
stage.transforms.append(
@@ -996,47 +898,42 @@ def process(stage):
process(stage)
return ordered
+ def window_pcollection_coders(stages):
+ # Some SDK workers require windowed coders for their PCollections.
+ # TODO(BEAM-4150): Consistently use unwindowed coders everywhere.
+ for pcoll in pipeline_components.pcollections.values():
+ if (pipeline_components.coders[pcoll.coder_id].spec.spec.urn
+ != common_urns.coders.WINDOWED_VALUE.urn):
+ original_coder_id = pcoll.coder_id
+ pcoll.coder_id = windowed_coder_id(
+ pcoll.coder_id,
+ pipeline_components.windowing_strategies[
+ pcoll.windowing_strategy_id].window_coder_id)
+ if (original_coder_id in safe_coders
+ and pcoll.coder_id not in safe_coders):
+ # TODO: This assumes the window coder is safe.
+ safe_coders[pcoll.coder_id] = windowed_coder_id(
+ safe_coders[original_coder_id],
+ pipeline_components.windowing_strategies[
+ pcoll.windowing_strategy_id].window_coder_id)
+
+ return stages
+
# Now actually apply the operations.
pipeline_components = copy.deepcopy(pipeline_proto.components)
- # Some SDK workers require windowed coders for their PCollections.
- # TODO(BEAM-4150): Consistently use unwindowed coders everywhere.
- for pcoll in pipeline_components.pcollections.values():
- if (pipeline_components.coders[pcoll.coder_id].spec.spec.urn
- != common_urns.coders.WINDOWED_VALUE.urn):
- pcoll.coder_id = windowed_coder_id(
- pcoll.coder_id,
- pipeline_components.windowing_strategies[
- pcoll.windowing_strategy_id].window_coder_id)
-
- known_composites = set(
- [common_urns.primitives.GROUP_BY_KEY.urn,
- common_urns.composites.COMBINE_PER_KEY.urn])
-
- def leaf_transforms(root_ids):
- for root_id in root_ids:
- root = pipeline_proto.components.transforms[root_id]
- if root.spec.urn in known_composites:
- yield root_id
- elif not root.subtransforms:
- # Make sure its outputs are not a subset of its inputs.
- if set(root.outputs.values()) - set(root.inputs.values()):
- yield root_id
- else:
- for leaf in leaf_transforms(root.subtransforms):
- yield leaf
-
# Initial set of stages are singleton leaf transforms.
- stages = [
- Stage(name, [pipeline_proto.components.transforms[name]])
- for name in leaf_transforms(pipeline_proto.root_transform_ids)]
+ stages = list(fn_api_runner_transforms.leaf_transform_stages(
+ pipeline_proto.root_transform_ids,
+ pipeline_proto.components))
# Apply each phase in order.
for phase in [
annotate_downstream_side_inputs, fix_side_input_pcoll_coders,
lift_combiners, expand_gbk, sink_flattens, greedily_fuse,
- impulse_to_input, inject_timer_pcollections, sort_stages]:
+ impulse_to_input, inject_timer_pcollections, sort_stages,
+ window_pcollection_coders]:
logging.info('%s %s %s', '=' * 20, phase, '=' * 20)
stages = list(phase(stages))
logging.debug('Stages: %s', [str(s) for s in stages])
@@ -1663,23 +1560,6 @@ def monitoring_metrics(self):
return self._monitoring_metrics
-def only_element(iterable):
- element, = iterable
- return element
-
-
-def unique_name(existing, prefix):
- if prefix in existing:
- counter = 0
- while True:
- counter += 1
- prefix_counter = prefix + "_%s" % counter
- if prefix_counter not in existing:
- return prefix_counter
- else:
- return prefix
-
-
def create_buffer_id(name, kind='materialize'):
return ('%s:%s' % (kind, name)).encode('utf-8')
diff --git
a/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
new file mode 100644
index 000000000000..79418d99862c
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
@@ -0,0 +1,169 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Pipeline transformations for the FnApiRunner.
+"""
+from __future__ import absolute_import
+from __future__ import print_function
+
+from builtins import object
+
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.runners.worker import bundle_processor
+from apache_beam.utils import proto_utils
+
+# This module is experimental. No backwards-compatibility guarantees.
+
+
+KNOWN_COMPOSITES = frozenset(
+ [common_urns.primitives.GROUP_BY_KEY.urn,
+ common_urns.composites.COMBINE_PER_KEY.urn])
+
+
+class Stage(object):
+ """A set of Transforms that can be sent to the worker for processing."""
+ def __init__(self, name, transforms,
+ downstream_side_inputs=None, must_follow=frozenset(),
+ parent=None):
+ self.name = name
+ self.transforms = transforms
+ self.downstream_side_inputs = downstream_side_inputs
+ self.must_follow = must_follow
+ self.timer_pcollections = []
+ self.parent = parent
+
+ def __repr__(self):
+ must_follow = ', '.join(prev.name for prev in self.must_follow)
+ if self.downstream_side_inputs is None:
+ downstream_side_inputs = '<unknown>'
+ else:
+ downstream_side_inputs = ', '.join(
+ str(si) for si in self.downstream_side_inputs)
+ return "%s\n %s\n must follow: %s\n downstream_side_inputs: %s" % (
+ self.name,
+ '\n'.join(["%s:%s" % (transform.unique_name, transform.spec.urn)
+ for transform in self.transforms]),
+ must_follow,
+ downstream_side_inputs)
+
+ def can_fuse(self, consumer):
+ def no_overlap(a, b):
+ return not a.intersection(b)
+ return (
+ not self in consumer.must_follow
+ and not self.is_flatten() and not consumer.is_flatten()
+ and no_overlap(self.downstream_side_inputs, consumer.side_inputs()))
+
+ def fuse(self, other):
+ return Stage(
+ "(%s)+(%s)" % (self.name, other.name),
+ self.transforms + other.transforms,
+ union(self.downstream_side_inputs, other.downstream_side_inputs),
+ union(self.must_follow, other.must_follow))
+
+ def is_flatten(self):
+ return any(transform.spec.urn == common_urns.primitives.FLATTEN.urn
+ for transform in self.transforms)
+
+ def side_inputs(self):
+ for transform in self.transforms:
+ if transform.spec.urn == common_urns.primitives.PAR_DO.urn:
+ payload = proto_utils.parse_Bytes(
+ transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
+ for side_input in payload.side_inputs:
+ yield transform.inputs[side_input]
+
+ def has_as_main_input(self, pcoll):
+ for transform in self.transforms:
+ if transform.spec.urn == common_urns.primitives.PAR_DO.urn:
+ payload = proto_utils.parse_Bytes(
+ transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
+ local_side_inputs = payload.side_inputs
+ else:
+ local_side_inputs = {}
+ for local_id, pipeline_id in transform.inputs.items():
+ if pcoll == pipeline_id and local_id not in local_side_inputs:
+ return True
+
+ def deduplicate_read(self):
+ seen_pcolls = set()
+ new_transforms = []
+ for transform in self.transforms:
+ if transform.spec.urn == bundle_processor.DATA_INPUT_URN:
+ pcoll = only_element(list(transform.outputs.items()))[1]
+ if pcoll in seen_pcolls:
+ continue
+ seen_pcolls.add(pcoll)
+ new_transforms.append(transform)
+ self.transforms = new_transforms
+
+
+class TransformContext(object):
+ def __init__(self, components):
+ self.components = components
+
+ def add_or_get_coder_id(self, coder_proto):
+ for coder_id, coder in self.components.coders.items():
+ if coder == coder_proto:
+ return coder_id
+ new_coder_id = unique_name(self.components.coders, 'coder')
+ self.components.coders[new_coder_id].CopyFrom(coder_proto)
+ return new_coder_id
+
+
+def leaf_transform_stages(
+ root_ids, components, parent=None, known_composites=KNOWN_COMPOSITES):
+ for root_id in root_ids:
+ root = components.transforms[root_id]
+ if root.spec.urn in known_composites:
+ yield Stage(root_id, [root], parent=parent)
+ elif not root.subtransforms:
+ # Make sure its outputs are not a subset of its inputs.
+ if set(root.outputs.values()) - set(root.inputs.values()):
+ yield Stage(root_id, [root], parent=parent)
+ else:
+ for stage in leaf_transform_stages(
+ root.subtransforms, components, root_id, known_composites):
+ yield stage
+
+
+def union(a, b):
+ # Minimize the number of distinct sets.
+ if not a or a == b:
+ return b
+ elif not b:
+ return a
+ else:
+ return frozenset.union(a, b)
+
+
+def unique_name(existing, prefix):
+ if prefix in existing:
+ counter = 0
+ while True:
+ counter += 1
+ prefix_counter = prefix + "_%s" % counter
+ if prefix_counter not in existing:
+ return prefix_counter
+ else:
+ return prefix
+
+
+def only_element(iterable):
+ element, = iterable
+ return element
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 173417)
Time Spent: 40m (was: 0.5h)
> Cleanup FnApiRunner optimization phases.
> ----------------------------------------
>
> Key: BEAM-6186
> URL: https://issues.apache.org/jira/browse/BEAM-6186
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-core
> Reporter: Robert Bradshaw
> Assignee: Ahmet Altay
> Priority: Minor
> Time Spent: 40m
> Remaining Estimate: 0h
>
> They are currently expressed as functions with closure. It would be good to
> pull them out with explicit dependencies both to better be able to follow the
> code, and also be able to test and reuse them.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)