This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch release-2.20.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8414c302f97e93fb0f5dd8195def77c35c463370
Author: Ankur Goenka <[email protected]>
AuthorDate: Fri Mar 6 14:14:43 2020 -0800

    [BEAM-9465] Fire repeatedly in reshuffle
---
 sdks/python/apache_beam/transforms/util.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index 361edbe..8fd3f72 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -64,6 +64,7 @@ from apache_beam.transforms.ptransform import ptransform_fn
 from apache_beam.transforms.timeutil import TimeDomain
 from apache_beam.transforms.trigger import AccumulationMode
 from apache_beam.transforms.trigger import AfterCount
+from apache_beam.transforms.trigger import Repeatedly
 from apache_beam.transforms.userstate import BagStateSpec
 from apache_beam.transforms.userstate import CombiningValueStateSpec
 from apache_beam.transforms.userstate import TimerSpec
@@ -678,7 +679,7 @@ class ReshufflePerKey(PTransform):
     # accept only standard coders.
     ungrouped._windowing = Windowing(
         window.GlobalWindows(),
-        triggerfn=AfterCount(1),
+        triggerfn=Repeatedly(AfterCount(1)),
         accumulation_mode=AccumulationMode.DISCARDING,
         timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
     result = (

Reply via email to