This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ea40c6d  Add compacting to TypeCheckCombineFn. (#7821)
ea40c6d is described below

commit ea40c6dc7800c95b4b25921166c263cee204dc11
Author: Robert Bradshaw <rober...@gmail.com>
AuthorDate: Thu Feb 14 01:45:41 2019 +0100

    Add compacting to TypeCheckCombineFn. (#7821)
    
    * [BEAM-4030] Add compact() to various helper CombineFns.
---
 sdks/python/apache_beam/runners/direct/helper_transforms.py |  2 +-
 sdks/python/apache_beam/transforms/combiners.py             | 12 +++++++++---
 sdks/python/apache_beam/typehints/typecheck.py              |  3 +++
 3 files changed, 13 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/runners/direct/helper_transforms.py 
b/sdks/python/apache_beam/runners/direct/helper_transforms.py
index 6d894fb..60b3ad3 100644
--- a/sdks/python/apache_beam/runners/direct/helper_transforms.py
+++ b/sdks/python/apache_beam/runners/direct/helper_transforms.py
@@ -69,7 +69,7 @@ class PartialGroupByKeyCombiningValues(beam.DoFn):
 
   def finish_bundle(self):
     for (k, w), va in self._cache.items():
-      yield WindowedValue((k, va), w.end, (w,))
+      yield WindowedValue((k, self._combine_fn.compact(va)), w.end, (w,))
 
   def default_type_hints(self):
     hints = self._combine_fn.get_type_hints().copy()
diff --git a/sdks/python/apache_beam/transforms/combiners.py 
b/sdks/python/apache_beam/transforms/combiners.py
index 8aa6241..65e098e 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -595,6 +595,9 @@ class SampleCombineFn(core.CombineFn):
   def merge_accumulators(self, heaps):
     return self._top_combiner.merge_accumulators(heaps)
 
+  def compact(self, heap):
+    return self._top_combiner.compact(heap)
+
   def extract_output(self, heap):
     # Here we strip off the random number keys we added in add_input.
     return [e for _, e in self._top_combiner.extract_output(heap)]
@@ -618,6 +621,9 @@ class _TupleCombineFnBase(core.CombineFn):
     return [c.merge_accumulators(a)
             for c, a in zip(self._combiners, zip(*accumulators))]
 
+  def compact(self, accumulator):
+    return [c.compact(a) for c, a in zip(self._combiners, accumulator)]
+
   def extract_output(self, accumulator):
     return tuple([c.extract_output(a)
                   for c, a in zip(self._combiners, accumulator)])
@@ -736,12 +742,12 @@ class _CurriedFn(core.CombineFn):
   def merge_accumulators(self, accumulators):
     return self.fn.merge_accumulators(accumulators, *self.args, **self.kwargs)
 
-  def extract_output(self, accumulator):
-    return self.fn.extract_output(accumulator, *self.args, **self.kwargs)
-
   def compact(self, accumulator):
     return self.fn.compact(accumulator, *self.args, **self.kwargs)
 
+  def extract_output(self, accumulator):
+    return self.fn.extract_output(accumulator, *self.args, **self.kwargs)
+
   def apply(self, elements):
     return self.fn.apply(elements, *self.args, **self.kwargs)
 
diff --git a/sdks/python/apache_beam/typehints/typecheck.py 
b/sdks/python/apache_beam/typehints/typecheck.py
index 72106a5..b69abae 100644
--- a/sdks/python/apache_beam/typehints/typecheck.py
+++ b/sdks/python/apache_beam/typehints/typecheck.py
@@ -213,6 +213,9 @@ class TypeCheckCombineFn(core.CombineFn):
   def merge_accumulators(self, accumulators, *args, **kwargs):
     return self._combinefn.merge_accumulators(accumulators, *args, **kwargs)
 
+  def compact(self, accumulator, *args, **kwargs):
+    return self._combinefn.compact(accumulator, *args, **kwargs)
+
   def extract_output(self, accumulator, *args, **kwargs):
     result = self._combinefn.extract_output(accumulator, *args, **kwargs)
     if self._output_type_hint:

Reply via email to