This is an automated email from the ASF dual-hosted git repository. amaliujia pushed a commit to branch release-2.20.0 in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8414c302f97e93fb0f5dd8195def77c35c463370 Author: Ankur Goenka <[email protected]> AuthorDate: Fri Mar 6 14:14:43 2020 -0800 [BEAM-9465] Fire repeatedly in reshuffle --- sdks/python/apache_beam/transforms/util.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index 361edbe..8fd3f72 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -64,6 +64,7 @@ from apache_beam.transforms.ptransform import ptransform_fn from apache_beam.transforms.timeutil import TimeDomain from apache_beam.transforms.trigger import AccumulationMode from apache_beam.transforms.trigger import AfterCount +from apache_beam.transforms.trigger import Repeatedly from apache_beam.transforms.userstate import BagStateSpec from apache_beam.transforms.userstate import CombiningValueStateSpec from apache_beam.transforms.userstate import TimerSpec @@ -678,7 +679,7 @@ class ReshufflePerKey(PTransform): # accept only standard coders. ungrouped._windowing = Windowing( window.GlobalWindows(), - triggerfn=AfterCount(1), + triggerfn=Repeatedly(AfterCount(1)), accumulation_mode=AccumulationMode.DISCARDING, timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST) result = (
