Repository: beam
Updated Branches:
  refs/heads/master b67a30bf1 -> 673937b92


Translate combining operations through the Runner API.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82cc1e1a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82cc1e1a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82cc1e1a

Branch: refs/heads/master
Commit: 82cc1e1ad54d55d87ae912a96c148fd16503d017
Parents: b67a30b
Author: Robert Bradshaw <[email protected]>
Authored: Tue Jul 25 16:12:16 2017 -0700
Committer: Robert Bradshaw <[email protected]>
Committed: Wed Jul 26 14:55:39 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/dataflow_runner.py         |  4 +--
 sdks/python/apache_beam/transforms/core.py      | 37 +++++++++++++++++++-
 .../python/apache_beam/transforms/ptransform.py | 12 +++++--
 sdks/python/apache_beam/utils/urns.py           |  5 ++-
 4 files changed, 52 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/82cc1e1a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index aec7d00..d653e91 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -591,8 +591,8 @@ class DataflowRunner(PipelineRunner):
          PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag)})
     # Note that the accumulator must not have a WindowedValue encoding, while
     # the output of this step does in fact have a WindowedValue encoding.
-    accumulator_encoding = self._get_encoded_output_coder(transform_node,
-                                                          window_value=False)
+    accumulator_encoding = self._get_cloud_encoding(
+        transform_node.fn.get_accumulator_coder())
     output_encoding = self._get_encoded_output_coder(transform_node)
 
     step.encoding = output_encoding

http://git-wip-us.apache.org/repos/asf/beam/blob/82cc1e1a/sdks/python/apache_beam/transforms/core.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 92b8737..25fe39f 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -25,6 +25,7 @@ import types
 
 from apache_beam import pvalue
 from apache_beam import typehints
+from apache_beam import coders
 from apache_beam.coders import typecoders
 from apache_beam.internal import util
 from apache_beam.portability.api import beam_runner_api_pb2
@@ -311,7 +312,7 @@ class CallableWrapperDoFn(DoFn):
     return getattr(self._fn, '_argspec_fn', self._fn)
 
 
-class CombineFn(WithTypeHints, HasDisplayData):
+class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
   """A function object used by a Combine transform with custom processing.
 
   A CombineFn specifies how multiple values in all or part of a PCollection can
@@ -430,6 +431,11 @@ class CombineFn(WithTypeHints, HasDisplayData):
   def maybe_from_callable(fn):
     return fn if isinstance(fn, CombineFn) else CallableWrapperCombineFn(fn)
 
+  def get_accumulator_coder(self):
+    return coders.registry.get_coder(object)
+
+  urns.RunnerApiFn.register_pickle_urn(urns.PICKLED_COMBINE_FN)
+
 
 class CallableWrapperCombineFn(CombineFn):
   """For internal use only; no backwards-compatibility guarantees.
@@ -816,6 +822,13 @@ def Filter(fn, *args, **kwargs):  # pylint: 
disable=invalid-name
   return pardo
 
 
+def _combine_payload(combine_fn, context):
+  return beam_runner_api_pb2.CombinePayload(
+      combine_fn=combine_fn.to_runner_api(context),
+      accumulator_coder_id=context.coders.get_id(
+          combine_fn.get_accumulator_coder()))
+
+
 class CombineGlobally(PTransform):
   """A CombineGlobally transform.
 
@@ -973,6 +986,17 @@ class CombinePerKey(PTransformWithSideInputs):
     return pcoll | GroupByKey() | 'Combine' >> CombineValues(
         self.fn, *args, **kwargs)
 
+  def to_runner_api_parameter(self, context):
+    return (
+        urns.COMBINE_PER_KEY_TRANSFORM,
+        _combine_payload(self.fn, context))
+
+  @PTransform.register_urn(
+      urns.COMBINE_PER_KEY_TRANSFORM, beam_runner_api_pb2.CombinePayload)
+  def from_runner_api_parameter(combine_payload, context):
+    return CombinePerKey(
+        CombineFn.from_runner_api(combine_payload.combine_fn, context))
+
 
 # TODO(robertwb): Rename to CombineGroupedValues?
 class CombineValues(PTransformWithSideInputs):
@@ -995,6 +1019,17 @@ class CombineValues(PTransformWithSideInputs):
         CombineValuesDoFn(key_type, self.fn, runtime_type_check),
         *args, **kwargs)
 
+  def to_runner_api_parameter(self, context):
+    return (
+        urns.COMBINE_GROUPED_VALUES_TRANSFORM,
+        _combine_payload(self.fn, context))
+
+  @PTransform.register_urn(
+      urns.COMBINE_GROUPED_VALUES_TRANSFORM, 
beam_runner_api_pb2.CombinePayload)
+  def from_runner_api_parameter(combine_payload, context):
+    return CombineValues(
+        CombineFn.from_runner_api(combine_payload.combine_fn, context))
+
 
 class CombineValuesDoFn(DoFn):
   """DoFn for performing per-key Combine transforms."""

http://git-wip-us.apache.org/repos/asf/beam/blob/82cc1e1a/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index cd84122..da113e0 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -426,8 +426,16 @@ class PTransform(WithTypeHints, HasDisplayData):
   _known_urns = {}
 
   @classmethod
-  def register_urn(cls, urn, parameter_type, constructor):
-    cls._known_urns[urn] = parameter_type, constructor
+  def register_urn(cls, urn, parameter_type, constructor=None):
+    def register(constructor):
+      cls._known_urns[urn] = parameter_type, constructor
+      return staticmethod(constructor)
+    if constructor:
+      # Used as a statement.
+      register(constructor)
+    else:
+      # Used as a decorator.
+      return register
 
   def to_runner_api(self, context):
     from apache_beam.portability.api import beam_runner_api_pb2

http://git-wip-us.apache.org/repos/asf/beam/blob/82cc1e1a/sdks/python/apache_beam/utils/urns.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/urns.py 
b/sdks/python/apache_beam/utils/urns.py
index 9e4635d..7110802 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -32,9 +32,12 @@ FIXED_WINDOWS_FN = "beam:windowfn:fixed_windows:v0.1"
 SLIDING_WINDOWS_FN = "beam:windowfn:sliding_windows:v0.1"
 SESSION_WINDOWS_FN = "beam:windowfn:session_windows:v0.1"
 
+PICKLED_COMBINE_FN = "beam:combinefn:pickled_python:v0.1"
 PICKLED_CODER = "beam:coder:pickled_python:v0.1"
 
 PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
+COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1"
+COMBINE_GROUPED_VALUES_TRANSFORM = 
"beam:ptransform:combine_grouped_values:v0.1"
 FLATTEN_TRANSFORM = "beam:ptransform:flatten:v0.1"
 READ_TRANSFORM = "beam:ptransform:read:v0.1"
 WINDOW_INTO_TRANSFORM = "beam:ptransform:window_into:v0.1"
@@ -53,7 +56,7 @@ class RunnerApiFn(object):
   to register serialization via pickling.
   """
 
-  # TODO(robertwb): Figure out issue with dill + local classes + abc metaclass
+  # TODO(BEAM-2685): Issue with dill + local classes + abc metaclass
   # __metaclass__ = abc.ABCMeta
 
   _known_urns = {}

Reply via email to