This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 18a3fcfc06c Add timer tests to make sure event-time timer firing at
the right time. (#35109)
18a3fcfc06c is described below
commit 18a3fcfc06cc989c1aa0ae6fc26dd46a9059d204
Author: Shunping Huang <[email protected]>
AuthorDate: Fri Jun 6 07:55:23 2025 -0400
Add timer tests to make sure event-time timer firing at the right time.
(#35109)
* Add timer tests to make sure event-time timer firing at the right time.
* Add more tests.
* Disable the failed event-time timer tests for FnApiRunner.
* Fix lints and reformat.
* Disable another new test in FnApiRunnerTest and PortableRunnerTest due to
flakiness.
* Disable a new test in FlinkRunnerTest
* Take out the early firing test case because it depends on bundle size.
---
.../portability/fn_api_runner/fn_runner_test.py | 92 ++++++++++++++++++++++
.../runners/portability/portable_runner_test.py | 20 +++++
2 files changed, 112 insertions(+)
diff --git
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 114fe078343..aafa088ceb1 100644
---
a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++
b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -73,6 +73,7 @@ from apache_beam.tools import utils
from apache_beam.transforms import environments
from apache_beam.transforms import userstate
from apache_beam.transforms import window
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.utils import timestamp
from apache_beam.utils import windowed_value
@@ -768,6 +769,97 @@ class FnApiRunnerTest(unittest.TestCase):
expected = [('fired', ts) for ts in (20, 200)]
assert_that(actual, equal_to(expected))
+ def _run_pardo_et_timer_test(
+ self, n, timer_delay, reset_count=True, clear_timer=True, expected=None):
+ class EventTimeTimerDoFn(beam.DoFn):
+ COUNT = userstate.ReadModifyWriteStateSpec(
+ 'count', coders.VarInt32Coder())
+ # event-time timer
+ TIMER = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
+
+ def __init__(self):
+ self._n = n
+ self._timer_delay = timer_delay
+ self._reset_count = reset_count
+ self._clear_timer = clear_timer
+
+ def process(
+ self,
+ element_pair,
+ t=beam.DoFn.TimestampParam,
+ count=beam.DoFn.StateParam(COUNT),
+ timer=beam.DoFn.TimerParam(TIMER)):
+ local_count = count.read() or 0
+ local_count += 1
+
+ _LOGGER.info(
+ "get element %s, count=%d", str(element_pair[1]), local_count)
+ if local_count == 1:
+ _LOGGER.info("set timer to %s", str(t + self._timer_delay))
+ timer.set(t + self._timer_delay)
+
+ if local_count == self._n:
+ if self._reset_count:
+ _LOGGER.info("reset count")
+ local_count = 0
+
+ # don't need the timer now
+ if self._clear_timer:
+ _LOGGER.info("clear timer")
+ timer.clear()
+
+ count.write(local_count)
+
+ @userstate.on_timer(TIMER)
+ def timer_callback(self, t=beam.DoFn.TimestampParam):
+ _LOGGER.error("Timer should not fire here")
+ _LOGGER.info("timer callback start (timestamp=%s)", str(t))
+ yield "fired"
+
+ with self.create_pipeline() as p:
+ actual = (
+ p | PeriodicImpulse(
+ start_timestamp=timestamp.Timestamp.now(),
+ stop_timestamp=timestamp.Timestamp.now() + 14,
+ fire_interval=1)
+ | beam.WithKeys(0)
+ | beam.ParDo(EventTimeTimerDoFn()))
+ assert_that(actual, equal_to(expected))
+
+ def test_pardo_et_timer_with_no_firing(self):
+ if type(self) in [FnApiRunnerTest,
+ FnApiRunnerTestWithGrpc,
+ FnApiRunnerTestWithGrpcAndMultiWorkers,
+ FnApiRunnerTestWithDisabledCaching,
+ FnApiRunnerTestWithMultiWorkers,
+ FnApiRunnerTestWithBundleRepeat,
+ FnApiRunnerTestWithBundleRepeatAndMultiWorkers]:
+ raise unittest.SkipTest("https://github.com/apache/beam/issues/35168")
+
+ # The timer will not fire. It is initially set to T + 10, but then it is
+ # cleared at T + 4 (count == 5), and reset to T + 5 + 10
+ # (count is reset every 5 seconds).
+ self._run_pardo_et_timer_test(5, 10, True, True, [])
+
+ def test_pardo_et_timer_with_no_reset(self):
+ if type(self) in [FnApiRunnerTest,
+ FnApiRunnerTestWithGrpc,
+ FnApiRunnerTestWithGrpcAndMultiWorkers,
+ FnApiRunnerTestWithDisabledCaching,
+ FnApiRunnerTestWithMultiWorkers,
+ FnApiRunnerTestWithBundleRepeat,
+ FnApiRunnerTestWithBundleRepeatAndMultiWorkers]:
+ raise unittest.SkipTest("https://github.com/apache/beam/issues/35168")
+
+ # The timer will not fire. It is initially set to T + 10, and then it is
+ # cleared at T + 4 and never set again (count is not reset).
+ self._run_pardo_et_timer_test(5, 10, False, True, [])
+
+ def test_pardo_et_timer_with_no_reset_and_no_clear(self):
+ # The timer will fire at T + 10. After the timer is set, it is never
+ # cleared or set again.
+ self._run_pardo_et_timer_test(5, 10, False, False, ["fired"])
+
def test_pardo_state_timers(self):
self._run_pardo_state_timers(windowed=False)
diff --git
a/sdks/python/apache_beam/runners/portability/portable_runner_test.py
b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
index 85d1607e9fa..e128b6a73e4 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
@@ -222,6 +222,26 @@ class PortableRunnerTest(fn_runner_test.FnApiRunnerTest):
def test_draining_sdf_with_sdf_initiated_checkpointing(self):
raise unittest.SkipTest("Portable runners don't support drain yet.")
+ def test_pardo_et_timer_with_no_firing(self):
+ if type(self) in [PortableRunnerTest,
+ PortableRunnerTestWithSubprocesses,
+ PortableRunnerTestWithSubprocessesAndMultiWorkers,
+ PortableRunnerTestWithExternalEnv,
+ PortableRunnerTestWithLocalDocker,
+ PortableRunnerOptimizedWithoutFusion]:
+ raise unittest.SkipTest("https://github.com/apache/beam/issues/35168")
+ super().test_pardo_et_timer_with_no_firing()
+
+ def test_pardo_et_timer_with_no_reset(self):
+ if type(self) in [PortableRunnerTest,
+ PortableRunnerTestWithSubprocesses,
+ PortableRunnerTestWithSubprocessesAndMultiWorkers,
+ PortableRunnerTestWithExternalEnv,
+ PortableRunnerTestWithLocalDocker,
+ PortableRunnerOptimizedWithoutFusion]:
+ raise unittest.SkipTest("https://github.com/apache/beam/issues/35168")
+ super().test_pardo_et_timer_with_no_reset()
+
@unittest.skip("https://github.com/apache/beam/issues/19422")
class PortableRunnerOptimized(PortableRunnerTest):