This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a929e4d  improvements to fanout (#5800)
a929e4d is described below

commit a929e4d7a53e206876cc3e336ea2a7f87ee958b7
Author: Ahmet Altay <[email protected]>
AuthorDate: Wed Jun 27 23:40:31 2018 -0700

    improvements to fanout (#5800)
    
    * improvements to fanout
---
 sdks/python/apache_beam/transforms/core.py | 15 +++++++++++----
 1 file changed, 11 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 783ea29..3b1cb0a 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -21,6 +21,7 @@ from __future__ import absolute_import
 
 import copy
 import inspect
+import itertools
 import random
 import re
 import types
@@ -648,8 +649,14 @@ class CallableWrapperCombineFn(CombineFn):
     return self._fn(union(), *args, **kwargs)
 
   def merge_accumulators(self, accumulators, *args, **kwargs):
+    filter_fn = lambda x: x is not self._EMPTY
+
+    class ReiterableNonEmptyAccumulators(object):
+      def __iter__(self):
+        return itertools.ifilter(filter_fn, accumulators)
+
     # It's (weakly) assumed that self._fn is associative.
-    return self._fn(accumulators, *args, **kwargs)
+    return self._fn(ReiterableNonEmptyAccumulators(), *args, **kwargs)
 
   def extract_output(self, accumulator, *args, **kwargs):
     return self._fn(()) if accumulator is self._EMPTY else accumulator
@@ -1141,7 +1148,7 @@ class CombineGlobally(PTransform):
 
     combine_per_key = CombinePerKey(self.fn, *self.args, **self.kwargs)
     if self.fanout:
-      combine_per_key = combine_per_key.with_hot_key_fanout(fanout)
+      combine_per_key = combine_per_key.with_hot_key_fanout(self.fanout)
 
     combined = (pcoll
                 | 'KeyWithVoid' >> add_input_types(
@@ -1419,8 +1426,8 @@ class _CombinePerKeyWithHotKeyFanout(PTransform):
 
     class PostCombineFn(CombineFn):
       @staticmethod
-      def add_input(accumulator, input):
-        is_accumulator, value = input
+      def add_input(accumulator, element):
+        is_accumulator, value = element
         if is_accumulator:
           return combine_fn.merge_accumulators([accumulator, value])
         else:

Reply via email to