This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a929e4d improvements to fanout (#5800)
a929e4d is described below
commit a929e4d7a53e206876cc3e336ea2a7f87ee958b7
Author: Ahmet Altay <[email protected]>
AuthorDate: Wed Jun 27 23:40:31 2018 -0700
improvements to fanout (#5800)
* improvements to fanout
---
sdks/python/apache_beam/transforms/core.py | 15 +++++++++++----
1 file changed, 11 insertions(+), 4 deletions(-)
diff --git a/sdks/python/apache_beam/transforms/core.py
b/sdks/python/apache_beam/transforms/core.py
index 783ea29..3b1cb0a 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -21,6 +21,7 @@ from __future__ import absolute_import
import copy
import inspect
+import itertools
import random
import re
import types
@@ -648,8 +649,14 @@ class CallableWrapperCombineFn(CombineFn):
return self._fn(union(), *args, **kwargs)
def merge_accumulators(self, accumulators, *args, **kwargs):
+ filter_fn = lambda x: x is not self._EMPTY
+
+ class ReiterableNonEmptyAccumulators(object):
+ def __iter__(self):
+ return itertools.ifilter(filter_fn, accumulators)
+
# It's (weakly) assumed that self._fn is associative.
- return self._fn(accumulators, *args, **kwargs)
+ return self._fn(ReiterableNonEmptyAccumulators(), *args, **kwargs)
def extract_output(self, accumulator, *args, **kwargs):
return self._fn(()) if accumulator is self._EMPTY else accumulator
@@ -1141,7 +1148,7 @@ class CombineGlobally(PTransform):
combine_per_key = CombinePerKey(self.fn, *self.args, **self.kwargs)
if self.fanout:
- combine_per_key = combine_per_key.with_hot_key_fanout(fanout)
+ combine_per_key = combine_per_key.with_hot_key_fanout(self.fanout)
combined = (pcoll
| 'KeyWithVoid' >> add_input_types(
@@ -1419,8 +1426,8 @@ class _CombinePerKeyWithHotKeyFanout(PTransform):
class PostCombineFn(CombineFn):
@staticmethod
- def add_input(accumulator, input):
- is_accumulator, value = input
+ def add_input(accumulator, element):
+ is_accumulator, value = element
if is_accumulator:
return combine_fn.merge_accumulators([accumulator, value])
else: