This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 18a3fcfc06c Add timer tests to make sure event-time timer firing at 
the right time. (#35109)
18a3fcfc06c is described below

commit 18a3fcfc06cc989c1aa0ae6fc26dd46a9059d204
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Jun 6 07:55:23 2025 -0400

    Add timer tests to make sure event-time timer firing at the right time. 
(#35109)
    
    * Add timer tests to make sure event-time timer firing at the right time.
    
    * Add more tests.
    
    * Disable the failed event-time timer tests for FnApiRunner.
    
    * Fix lints and reformat.
    
    * Disable another new test in FnApiRunnerTest and PortableRunnerTest due to 
flakiness.
    
    * Disable a new test in FlinkRunnerTest
    
    * Take out the early firing test case because it depends on bundle size.
---
 .../portability/fn_api_runner/fn_runner_test.py    | 92 ++++++++++++++++++++++
 .../runners/portability/portable_runner_test.py    | 20 +++++
 2 files changed, 112 insertions(+)

diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 114fe078343..aafa088ceb1 100644
--- 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -73,6 +73,7 @@ from apache_beam.tools import utils
 from apache_beam.transforms import environments
 from apache_beam.transforms import userstate
 from apache_beam.transforms import window
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
 from apache_beam.utils import timestamp
 from apache_beam.utils import windowed_value
 
@@ -768,6 +769,97 @@ class FnApiRunnerTest(unittest.TestCase):
       expected = [('fired', ts) for ts in (20, 200)]
       assert_that(actual, equal_to(expected))
 
+  def _run_pardo_et_timer_test(
+      self, n, timer_delay, reset_count=True, clear_timer=True, expected=None):
+    class EventTimeTimerDoFn(beam.DoFn):
+      COUNT = userstate.ReadModifyWriteStateSpec(
+          'count', coders.VarInt32Coder())
+      # event-time timer
+      TIMER = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
+
+      def __init__(self):
+        self._n = n
+        self._timer_delay = timer_delay
+        self._reset_count = reset_count
+        self._clear_timer = clear_timer
+
+      def process(
+          self,
+          element_pair,
+          t=beam.DoFn.TimestampParam,
+          count=beam.DoFn.StateParam(COUNT),
+          timer=beam.DoFn.TimerParam(TIMER)):
+        local_count = count.read() or 0
+        local_count += 1
+
+        _LOGGER.info(
+            "get element %s, count=%d", str(element_pair[1]), local_count)
+        if local_count == 1:
+          _LOGGER.info("set timer to %s", str(t + self._timer_delay))
+          timer.set(t + self._timer_delay)
+
+        if local_count == self._n:
+          if self._reset_count:
+            _LOGGER.info("reset count")
+            local_count = 0
+
+          # don't need the timer now
+          if self._clear_timer:
+            _LOGGER.info("clear timer")
+            timer.clear()
+
+        count.write(local_count)
+
+      @userstate.on_timer(TIMER)
+      def timer_callback(self, t=beam.DoFn.TimestampParam):
+        _LOGGER.error("Timer should not fire here")
+        _LOGGER.info("timer callback start (timestamp=%s)", str(t))
+        yield "fired"
+
+    with self.create_pipeline() as p:
+      actual = (
+          p | PeriodicImpulse(
+              start_timestamp=timestamp.Timestamp.now(),
+              stop_timestamp=timestamp.Timestamp.now() + 14,
+              fire_interval=1)
+          | beam.WithKeys(0)
+          | beam.ParDo(EventTimeTimerDoFn()))
+      assert_that(actual, equal_to(expected))
+
+  def test_pardo_et_timer_with_no_firing(self):
+    if type(self) in [FnApiRunnerTest,
+                      FnApiRunnerTestWithGrpc,
+                      FnApiRunnerTestWithGrpcAndMultiWorkers,
+                      FnApiRunnerTestWithDisabledCaching,
+                      FnApiRunnerTestWithMultiWorkers,
+                      FnApiRunnerTestWithBundleRepeat,
+                      FnApiRunnerTestWithBundleRepeatAndMultiWorkers]:
+      raise unittest.SkipTest("https://github.com/apache/beam/issues/35168";)
+
+    # The timer will not fire. It is initially set to T + 10, but then it is
+    # cleared at T + 4 (count == 5), and reset to T + 5 + 10
+    # (count is reset every 5 seconds).
+    self._run_pardo_et_timer_test(5, 10, True, True, [])
+
+  def test_pardo_et_timer_with_no_reset(self):
+    if type(self) in [FnApiRunnerTest,
+                      FnApiRunnerTestWithGrpc,
+                      FnApiRunnerTestWithGrpcAndMultiWorkers,
+                      FnApiRunnerTestWithDisabledCaching,
+                      FnApiRunnerTestWithMultiWorkers,
+                      FnApiRunnerTestWithBundleRepeat,
+                      FnApiRunnerTestWithBundleRepeatAndMultiWorkers]:
+      raise unittest.SkipTest("https://github.com/apache/beam/issues/35168";)
+
+    # The timer will not fire. It is initially set to T + 10, and then it is
+    # cleared at T + 4 and never set again (count is not reset).
+    self._run_pardo_et_timer_test(5, 10, False, True, [])
+
+  def test_pardo_et_timer_with_no_reset_and_no_clear(self):
+    # The timer will fire at T + 10. After the timer is set, it is never
+    # cleared or set again.
+    self._run_pardo_et_timer_test(5, 10, False, False, ["fired"])
+
   def test_pardo_state_timers(self):
     self._run_pardo_state_timers(windowed=False)
 
diff --git 
a/sdks/python/apache_beam/runners/portability/portable_runner_test.py 
b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
index 85d1607e9fa..e128b6a73e4 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
@@ -222,6 +222,26 @@ class PortableRunnerTest(fn_runner_test.FnApiRunnerTest):
   def test_draining_sdf_with_sdf_initiated_checkpointing(self):
     raise unittest.SkipTest("Portable runners don't support drain yet.")
 
+  def test_pardo_et_timer_with_no_firing(self):
+    if type(self) in [PortableRunnerTest,
+                      PortableRunnerTestWithSubprocesses,
+                      PortableRunnerTestWithSubprocessesAndMultiWorkers,
+                      PortableRunnerTestWithExternalEnv,
+                      PortableRunnerTestWithLocalDocker,
+                      PortableRunnerOptimizedWithoutFusion]:
+      raise unittest.SkipTest("https://github.com/apache/beam/issues/35168";)
+    super().test_pardo_et_timer_with_no_firing()
+
+  def test_pardo_et_timer_with_no_reset(self):
+    if type(self) in [PortableRunnerTest,
+                      PortableRunnerTestWithSubprocesses,
+                      PortableRunnerTestWithSubprocessesAndMultiWorkers,
+                      PortableRunnerTestWithExternalEnv,
+                      PortableRunnerTestWithLocalDocker,
+                      PortableRunnerOptimizedWithoutFusion]:
+      raise unittest.SkipTest("https://github.com/apache/beam/issues/35168";)
+    super().test_pardo_et_timer_with_no_reset()
+
 
 @unittest.skip("https://github.com/apache/beam/issues/19422";)
 class PortableRunnerOptimized(PortableRunnerTest):

Reply via email to