Repository: beam Updated Branches: refs/heads/master b67a30bf1 -> 673937b92
Translate combining operations through the Runner API. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82cc1e1a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82cc1e1a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82cc1e1a Branch: refs/heads/master Commit: 82cc1e1ad54d55d87ae912a96c148fd16503d017 Parents: b67a30b Author: Robert Bradshaw <[email protected]> Authored: Tue Jul 25 16:12:16 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Wed Jul 26 14:55:39 2017 -0700 ---------------------------------------------------------------------- .../runners/dataflow/dataflow_runner.py | 4 +-- sdks/python/apache_beam/transforms/core.py | 37 +++++++++++++++++++- .../python/apache_beam/transforms/ptransform.py | 12 +++++-- sdks/python/apache_beam/utils/urns.py | 5 ++- 4 files changed, 52 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/82cc1e1a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index aec7d00..d653e91 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -591,8 +591,8 @@ class DataflowRunner(PipelineRunner): PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)}) # Note that the accumulator must not have a WindowedValue encoding, while # the output of this step does in fact have a WindowedValue encoding. - accumulator_encoding = self._get_encoded_output_coder(transform_node, - window_value=False) + accumulator_encoding = self._get_cloud_encoding( + transform_node.fn.get_accumulator_coder()) output_encoding = self._get_encoded_output_coder(transform_node) step.encoding = output_encoding http://git-wip-us.apache.org/repos/asf/beam/blob/82cc1e1a/sdks/python/apache_beam/transforms/core.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 92b8737..25fe39f 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -25,6 +25,7 @@ import types from apache_beam import pvalue from apache_beam import typehints +from apache_beam import coders from apache_beam.coders import typecoders from apache_beam.internal import util from apache_beam.portability.api import beam_runner_api_pb2 @@ -311,7 +312,7 @@ class CallableWrapperDoFn(DoFn): return getattr(self._fn, '_argspec_fn', self._fn) -class CombineFn(WithTypeHints, HasDisplayData): +class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn): """A function object used by a Combine transform with custom processing. A CombineFn specifies how multiple values in all or part of a PCollection can @@ -430,6 +431,11 @@ class CombineFn(WithTypeHints, HasDisplayData): def maybe_from_callable(fn): return fn if isinstance(fn, CombineFn) else CallableWrapperCombineFn(fn) + def get_accumulator_coder(self): + return coders.registry.get_coder(object) + + urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_COMBINE_FN) + class CallableWrapperCombineFn(CombineFn): """For internal use only; no backwards-compatibility guarantees. @@ -816,6 +822,13 @@ def Filter(fn, *args, **kwargs): # pylint: disable=invalid-name return pardo +def _combine_payload(combine_fn, context): + return beam_runner_api_pb2.CombinePayload( + combine_fn=combine_fn.to_runner_api(context), + accumulator_coder_id=context.coders.get_id( + combine_fn.get_accumulator_coder())) + + class CombineGlobally(PTransform): """A CombineGlobally transform. @@ -973,6 +986,17 @@ class CombinePerKey(PTransformWithSideInputs): return pcoll | GroupByKey() | 'Combine' >> CombineValues( self.fn, *args, **kwargs) + def to_runner_api_parameter(self, context): + return ( + urns.COMBINE_PER_KEY_TRANSFORM, + _combine_payload(self.fn, context)) + + @PTransform.register_urn( + urns.COMBINE_PER_KEY_TRANSFORM, beam_runner_api_pb2.CombinePayload) + def from_runner_api_parameter(combine_payload, context): + return CombinePerKey( + CombineFn.from_runner_api(combine_payload.combine_fn, context)) + # TODO(robertwb): Rename to CombineGroupedValues? class CombineValues(PTransformWithSideInputs): @@ -995,6 +1019,17 @@ class CombineValues(PTransformWithSideInputs): CombineValuesDoFn(key_type, self.fn, runtime_type_check), *args, **kwargs) + def to_runner_api_parameter(self, context): + return ( + urns.COMBINE_GROUPED_VALUES_TRANSFORM, + _combine_payload(self.fn, context)) + + @PTransform.register_urn( + urns.COMBINE_GROUPED_VALUES_TRANSFORM, beam_runner_api_pb2.CombinePayload) + def from_runner_api_parameter(combine_payload, context): + return CombineValues( + CombineFn.from_runner_api(combine_payload.combine_fn, context)) + class CombineValuesDoFn(DoFn): """DoFn for performing per-key Combine transforms.""" http://git-wip-us.apache.org/repos/asf/beam/blob/82cc1e1a/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index cd84122..da113e0 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -426,8 +426,16 @@ class PTransform(WithTypeHints, HasDisplayData): _known_urns = {} @classmethod - def register_urn(cls, urn, parameter_type, constructor): - cls._known_urns[urn] = parameter_type, constructor + def register_urn(cls, urn, parameter_type, constructor=None): + def register(constructor): + cls._known_urns[urn] = parameter_type, constructor + return staticmethod(constructor) + if constructor: + # Used as a statement. + register(constructor) + else: + # Used as a decorator. + return register def to_runner_api(self, context): from apache_beam.portability.api import beam_runner_api_pb2 http://git-wip-us.apache.org/repos/asf/beam/blob/82cc1e1a/sdks/python/apache_beam/utils/urns.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py index 9e4635d..7110802 100644 --- a/sdks/python/apache_beam/utils/urns.py +++ b/sdks/python/apache_beam/utils/urns.py @@ -32,9 +32,12 @@ FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1" SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1" SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1" +PICKLED_COMBINE_FN = "beam:combinefn:pickled_python:v0.1" PICKLED_CODER = "beam:coder:pickled_python:v0.1" PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1" +COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1" +COMBINE_GROUPED_VALUES_TRANSFORM = "beam:ptransform:combine_grouped_values:v0.1" FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1" READ_TRANSFORM = "beam:ptransform:read:v0.1" WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1" @@ -53,7 +56,7 @@ class RunnerApiFn(object): to register serialization via pickling. """ - # TODO(robertwb): Figure out issue with dill + local classes + abc metaclass + # TODO(BEAM-2685): Issue with dill + local classes + abc metaclass # __metaclass__ = abc.ABCMeta _known_urns = {}
