This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.21.0 by this push:
new b76ea30 [BEAM-9735] Adding Always trigger and using it in Reshuffle
new 5ab1ba2 Merge pull request #11395 from angoenka/cherrypick-11365
b76ea30 is described below
commit b76ea30e46fe6c9926a91ea30c480024870b1043
Author: Ankur Goenka <[email protected]>
AuthorDate: Thu Apr 9 15:06:49 2020 -0700
[BEAM-9735] Adding Always trigger and using it in Reshuffle
---
sdks/python/apache_beam/transforms/trigger.py | 45 +++++++++++++++++++++-
sdks/python/apache_beam/transforms/trigger_test.py | 34 ++++++++++++++++
sdks/python/apache_beam/transforms/util.py | 5 +--
3 files changed, 79 insertions(+), 5 deletions(-)
diff --git a/sdks/python/apache_beam/transforms/trigger.py
b/sdks/python/apache_beam/transforms/trigger.py
index 3c4a24d..6ce9ba1 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -253,7 +253,7 @@ class TriggerFn(with_metaclass(ABCMeta, object)): # type:
ignore[misc]
'after_end_of_window': AfterWatermark,
'after_processing_time': AfterProcessingTime,
# after_processing_time, after_synchronized_processing_time
- # always
+ 'always': Always,
'default': DefaultTrigger,
'element_count': AfterCount,
# never
@@ -367,6 +367,47 @@ class AfterProcessingTime(TriggerFn):
return False
+class Always(TriggerFn):
+ """Repeatedly invoke the given trigger, never finishing."""
+ def __init__(self):
+ pass
+
+ def __repr__(self):
+ return 'Always'
+
+ def __eq__(self, other):
+ return type(self) == type(other)
+
+ def __hash__(self):
+ return 1
+
+ def on_element(self, element, window, context):
+ pass
+
+ def on_merge(self, to_be_merged, merge_result, context):
+ pass
+
+ def has_ontime_pane(self):
+ False
+
+ def reset(self, window, context):
+ pass
+
+ def should_fire(self, time_domain, watermark, window, context):
+ return True
+
+ def on_fire(self, watermark, window, context):
+ return False
+
+ @staticmethod
+ def from_runner_api(proto, context):
+ return Always()
+
+ def to_runner_api(self, context):
+ return beam_runner_api_pb2.Trigger(
+ always=beam_runner_api_pb2.Trigger.Always())
+
+
class AfterWatermark(TriggerFn):
"""Fire exactly once when the watermark passes the end of the window.
@@ -985,7 +1026,7 @@ def create_trigger_driver(
if windowing.is_default() and is_batch:
driver = BatchGlobalTriggerDriver()
elif (windowing.windowfn == GlobalWindows() and
- windowing.triggerfn == AfterCount(1) and is_batch):
+ (windowing.triggerfn in [AfterCount(1), Always()]) and is_batch):
# Here we also just pass through all the values exactly once.
driver = BatchGlobalTriggerDriver()
else:
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py
b/sdks/python/apache_beam/transforms/trigger_test.py
index 24a2867..202bdbd 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -55,6 +55,7 @@ from apache_beam.transforms.trigger import AfterCount
from apache_beam.transforms.trigger import AfterEach
from apache_beam.transforms.trigger import AfterProcessingTime
from apache_beam.transforms.trigger import AfterWatermark
+from apache_beam.transforms.trigger import Always
from apache_beam.transforms.trigger import DefaultTrigger
from apache_beam.transforms.trigger import GeneralTriggerDriver
from apache_beam.transforms.trigger import InMemoryUnmergedState
@@ -484,6 +485,39 @@ class TriggerPipelineTest(unittest.TestCase):
'B-3': {10, 15, 16},
}.items())))
+ def test_always(self):
+ with TestPipeline() as p:
+
+ def construct_timestamped(k_t):
+ return TimestampedValue((k_t[0], k_t[1]), k_t[1])
+
+ def format_result(k_v):
+ return ('%s-%s' % (k_v[0], len(k_v[1])), set(k_v[1]))
+
+ result = (
+ p
+ | beam.Create([1, 1, 2, 3, 4, 5, 10, 11])
+ | beam.FlatMap(lambda t: [('A', t), ('B', t + 5)])
+ | beam.Map(construct_timestamped)
+ | beam.WindowInto(
+ FixedWindows(10),
+ trigger=Always(),
+ accumulation_mode=AccumulationMode.DISCARDING)
+ | beam.GroupByKey()
+ | beam.Map(format_result))
+ assert_that(
+ result,
+ equal_to(
+ list({
+ 'A-2': {10, 11},
+ # Elements out of windows are also emitted.
+ 'A-6': {1, 2, 3, 4, 5},
+ # A,1 is emitted twice.
+ 'B-5': {6, 7, 8, 9},
+ # B,6 is emitted twice.
+ 'B-3': {10, 15, 16},
+ }.items())))
+
def test_multiple_accumulating_firings(self):
# PCollection will contain elements from 1 to 10.
elements = [i for i in range(1, 11)]
diff --git a/sdks/python/apache_beam/transforms/util.py
b/sdks/python/apache_beam/transforms/util.py
index 369e884..5602766 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -63,8 +63,7 @@ from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.ptransform import ptransform_fn
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.transforms.trigger import AccumulationMode
-from apache_beam.transforms.trigger import AfterCount
-from apache_beam.transforms.trigger import Repeatedly
+from apache_beam.transforms.trigger import Always
from apache_beam.transforms.userstate import BagStateSpec
from apache_beam.transforms.userstate import CombiningValueStateSpec
from apache_beam.transforms.userstate import TimerSpec
@@ -679,7 +678,7 @@ class ReshufflePerKey(PTransform):
# accept only standard coders.
ungrouped._windowing = Windowing(
window.GlobalWindows(),
- triggerfn=Repeatedly(AfterCount(1)),
+ triggerfn=Always(),
accumulation_mode=AccumulationMode.DISCARDING,
timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
result = (