This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.21.0 by this push:
     new b76ea30  [BEAM-9735] Adding Always trigger and using it in Reshuffle
     new 5ab1ba2  Merge pull request #11395 from angoenka/cherrypick-11365
b76ea30 is described below

commit b76ea30e46fe6c9926a91ea30c480024870b1043
Author: Ankur Goenka <[email protected]>
AuthorDate: Thu Apr 9 15:06:49 2020 -0700

    [BEAM-9735] Adding Always trigger and using it in Reshuffle
---
 sdks/python/apache_beam/transforms/trigger.py      | 45 +++++++++++++++++++++-
 sdks/python/apache_beam/transforms/trigger_test.py | 34 ++++++++++++++++
 sdks/python/apache_beam/transforms/util.py         |  5 +--
 3 files changed, 79 insertions(+), 5 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/trigger.py 
b/sdks/python/apache_beam/transforms/trigger.py
index 3c4a24d..6ce9ba1 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -253,7 +253,7 @@ class TriggerFn(with_metaclass(ABCMeta, object)):  # type: 
ignore[misc]
         'after_end_of_window': AfterWatermark,
         'after_processing_time': AfterProcessingTime,
         # after_processing_time, after_synchronized_processing_time
-        # always
+        'always': Always,
         'default': DefaultTrigger,
         'element_count': AfterCount,
         # never
@@ -367,6 +367,47 @@ class AfterProcessingTime(TriggerFn):
     return False
 
 
+class Always(TriggerFn):
+  """Repeatedly invoke the given trigger, never finishing."""
+  def __init__(self):
+    pass
+
+  def __repr__(self):
+    return 'Always'
+
+  def __eq__(self, other):
+    return type(self) == type(other)
+
+  def __hash__(self):
+    return 1
+
+  def on_element(self, element, window, context):
+    pass
+
+  def on_merge(self, to_be_merged, merge_result, context):
+    pass
+
+  def has_ontime_pane(self):
+    False
+
+  def reset(self, window, context):
+    pass
+
+  def should_fire(self, time_domain, watermark, window, context):
+    return True
+
+  def on_fire(self, watermark, window, context):
+    return False
+
+  @staticmethod
+  def from_runner_api(proto, context):
+    return Always()
+
+  def to_runner_api(self, context):
+    return beam_runner_api_pb2.Trigger(
+        always=beam_runner_api_pb2.Trigger.Always())
+
+
 class AfterWatermark(TriggerFn):
   """Fire exactly once when the watermark passes the end of the window.
 
@@ -985,7 +1026,7 @@ def create_trigger_driver(
   if windowing.is_default() and is_batch:
     driver = BatchGlobalTriggerDriver()
   elif (windowing.windowfn == GlobalWindows() and
-        windowing.triggerfn == AfterCount(1) and is_batch):
+        (windowing.triggerfn in [AfterCount(1), Always()]) and is_batch):
     # Here we also just pass through all the values exactly once.
     driver = BatchGlobalTriggerDriver()
   else:
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py 
b/sdks/python/apache_beam/transforms/trigger_test.py
index 24a2867..202bdbd 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -55,6 +55,7 @@ from apache_beam.transforms.trigger import AfterCount
 from apache_beam.transforms.trigger import AfterEach
 from apache_beam.transforms.trigger import AfterProcessingTime
 from apache_beam.transforms.trigger import AfterWatermark
+from apache_beam.transforms.trigger import Always
 from apache_beam.transforms.trigger import DefaultTrigger
 from apache_beam.transforms.trigger import GeneralTriggerDriver
 from apache_beam.transforms.trigger import InMemoryUnmergedState
@@ -484,6 +485,39 @@ class TriggerPipelineTest(unittest.TestCase):
                   'B-3': {10, 15, 16},
               }.items())))
 
+  def test_always(self):
+    with TestPipeline() as p:
+
+      def construct_timestamped(k_t):
+        return TimestampedValue((k_t[0], k_t[1]), k_t[1])
+
+      def format_result(k_v):
+        return ('%s-%s' % (k_v[0], len(k_v[1])), set(k_v[1]))
+
+      result = (
+          p
+          | beam.Create([1, 1, 2, 3, 4, 5, 10, 11])
+          | beam.FlatMap(lambda t: [('A', t), ('B', t + 5)])
+          | beam.Map(construct_timestamped)
+          | beam.WindowInto(
+              FixedWindows(10),
+              trigger=Always(),
+              accumulation_mode=AccumulationMode.DISCARDING)
+          | beam.GroupByKey()
+          | beam.Map(format_result))
+      assert_that(
+          result,
+          equal_to(
+              list({
+                  'A-2': {10, 11},
+                  # Elements out of windows are also emitted.
+                  'A-6': {1, 2, 3, 4, 5},
+                  # A,1 is emitted twice.
+                  'B-5': {6, 7, 8, 9},
+                  # B,6 is emitted twice.
+                  'B-3': {10, 15, 16},
+              }.items())))
+
   def test_multiple_accumulating_firings(self):
     # PCollection will contain elements from 1 to 10.
     elements = [i for i in range(1, 11)]
diff --git a/sdks/python/apache_beam/transforms/util.py 
b/sdks/python/apache_beam/transforms/util.py
index 369e884..5602766 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -63,8 +63,7 @@ from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.ptransform import ptransform_fn
 from apache_beam.transforms.timeutil import TimeDomain
 from apache_beam.transforms.trigger import AccumulationMode
-from apache_beam.transforms.trigger import AfterCount
-from apache_beam.transforms.trigger import Repeatedly
+from apache_beam.transforms.trigger import Always
 from apache_beam.transforms.userstate import BagStateSpec
 from apache_beam.transforms.userstate import CombiningValueStateSpec
 from apache_beam.transforms.userstate import TimerSpec
@@ -679,7 +678,7 @@ class ReshufflePerKey(PTransform):
     # accept only standard coders.
     ungrouped._windowing = Windowing(
         window.GlobalWindows(),
-        triggerfn=Repeatedly(AfterCount(1)),
+        triggerfn=Always(),
         accumulation_mode=AccumulationMode.DISCARDING,
         timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
     result = (

Reply via email to