This is an automated email from the ASF dual-hosted git repository.
stankiewicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e7cb9f77973 [Website] add drain update to docs (#38450)
e7cb9f77973 is described below
commit e7cb9f779733a947ef2c8484c34e569b9fe63d39
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Wed May 13 09:26:25 2026 +0200
[Website] add drain update to docs (#38450)
* add drain docs
---
CHANGES.md | 1 +
website/www/site/content/en/blog/looping-timers.md | 11 ++-
.../content/en/documentation/programming-guide.md | 83 ++++++++++++++++++++++
website/www/site/data/capability_matrix.yaml | 2 +-
4 files changed, 94 insertions(+), 3 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 7c253e8cdee..058e19ea743 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -69,6 +69,7 @@
## New Features / Improvements
+* Capability introduces an indicator for aggregations and timers firing during
a pipeline drain, allowing users and sinks to recognize and appropriately
handle potentially incomplete or partial data
([#36884](https://github.com/apache/beam/issues/36884)).
* Added support for setting disk provisioned IOPS and throughput in Dataflow
runner via `--diskProvisionedIops` and `--diskProvisionedThroughputMibps`
pipeline options (Java/Go/Python)
([#38349](https://github.com/apache/beam/issues/38349)).
* TriggerStateMachineRunner changes from BitSetCoder to SentinelBitSetCoder to
encode finished bitset. SentinelBitSetCoder and BitSetCoder are state
diff --git a/website/www/site/content/en/blog/looping-timers.md
b/website/www/site/content/en/blog/looping-timers.md
index 2690dc967ec..ad13aae7544 100644
--- a/website/www/site/content/en/blog/looping-timers.md
+++ b/website/www/site/content/en/blog/looping-timers.md
@@ -221,11 +221,18 @@ public static class LoopingStatefulTimer extends
DoFn<KV<String, Integer>, KV<St
public void onTimer(
OnTimerContext c,
@StateId("key") ValueState<String> key,
- @TimerId("loopingTimer") Timer loopingTimer) {
+ @TimerId("loopingTimer") Timer loopingTimer,
+ CausedByDrain drain) {
LOG.info("Timer @ {} fired", c.timestamp());
c.output(KV.of(key.read(), 0));
+ // Check if drain is in progress and avoid resetting the timer
+ if (drain == CausedByDrain.CAUSED_BY_DRAIN) {
+ LOG.info("Drain in progress, stopping looping timer.");
+ return;
+ }
+
// If we do not put in a “time to live” value, then the timer would loop
forever
Instant nextTimer = c.timestamp().plus(Duration.standardMinutes(1));
if (nextTimer.isBefore(stopTimerTime)) {
@@ -347,4 +354,4 @@ support for dealing with this use case in production.
Runner specific notes:
-Google Cloud Dataflow Runners Drain feature does not support looping timers
(Link to matrix)
+Support for cancelling looping timers on drain is currently limited to
Dataflow and is being implemented (see [Issue
#36884](https://github.com/apache/beam/issues/36884)).
diff --git a/website/www/site/content/en/documentation/programming-guide.md
b/website/www/site/content/en/documentation/programming-guide.md
index 343fb128b3e..3f37f45bace 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -7501,6 +7501,89 @@ class BufferDoFn(DoFn):
{{< code_sample "sdks/go/examples/snippets/04transforms.go"
batching_dofn_example >}}
{{< /highlight >}}
+#### 11.5.3. Looping timers {#looping-timers}
+
+Looping timers are a pattern where a timer sets another timer for a future
time, creating a loop. This is useful for producing periodic outputs or
heartbeats in the absence of data for a specific key.
+
+When draining a pipeline, it is important to terminate these loops to allow
the pipeline to finish. In the Java SDK, you can use the `CausedByDrain`
parameter in the `@OnTimer` method to check if the timer firing was induced by
a drain operation. **Note:** `CausedByDrain` will be set only in certain
runners. Check the [capability
matrix](/documentation/runners/capability-matrix/) for more details.
+
+{{< highlight java >}}
+public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>,
KV<String, Integer>> {
+ @TimerId("loopingTimer") private final TimerSpec loopingTimer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void process(
+ @Element KV<String, Integer> element,
+ @TimerId("loopingTimer") Timer timer,
+ OutputReceiver<KV<String, Integer>> output) {
+
+ // Set initial timer
+ timer.offset(Duration.standardMinutes(1)).setRelative();
+ output.output(element);
+ }
+
+ @OnTimer("loopingTimer")
+ public void onTimer(
+ @Key String key,
+ @TimerId("loopingTimer") Timer timer,
+ OutputReceiver<KV<String, Integer>> output,
+ CausedByDrain drain) {
+
+ output.output(KV.of(key, 0));
+
+ // Cancel looping timer if drain is in progress
+ if (drain == CausedByDrain.CAUSED_BY_DRAIN) {
+ return;
+ }
+
+ // Set next timer
+ timer.offset(Duration.standardMinutes(1)).setRelative();
+ }
+}
+{{< /highlight >}}
+
+{{< highlight py >}}
+# Python does not currently support detecting drain in OnTimer.
+# The following example demonstrates a looping timer without drain support,
+# using event time.
+
+class LoopingTimerDoFn(DoFn):
+ TIMER = TimerSpec('timer', TimeDomain.WATERMARK)
+
+ def process(self, element, ts=DoFn.TimestampParam,
timer=DoFn.TimerParam(TIMER)):
+ timer.set(ts + Duration(seconds=60))
+ yield element
+
+ @on_timer(TIMER)
+ def on_timer(self, key=DoFn.KeyParam, timestamp=DoFn.TimestampParam,
timer=DoFn.TimerParam(TIMER)):
+ yield (key, 0)
+ # Loops forever, cannot handle drain safely if it never stops.
+ timer.set(timestamp + Duration(seconds=60))
+{{< /highlight >}}
+
+{{< highlight go >}}
+// Go does not currently support detecting drain in OnTimer.
+// The following example demonstrates a looping timer without drain support,
+// using event time.
+
+type LoopingTimerFn struct {
+ Timer timers.EventTime
+}
+
+func (fn *LoopingTimerFn) ProcessElement(et beam.EventTime, sp state.Provider,
tp timers.Provider, key string, value int, emit func(string, int)) {
+ nextTime := et.ToTime().Add(60 * time.Second)
+ fn.Timer.Set(tp, nextTime)
+ emit(key, value)
+}
+
+func (fn *LoopingTimerFn) OnTimer(et beam.EventTime, sp state.Provider, tp
timers.Provider, key string, timer timers.Context, emit func(string, int)) {
+ emit(key, 0)
+ // Loops forever, cannot handle drain safely if it never stops.
+ nextTime := et.ToTime().Add(60 * time.Second)
+ fn.Timer.Set(tp, nextTime)
+}
+{{< /highlight >}}
+
## 12. Splittable `DoFns` {#splittable-dofns}
diff --git a/website/www/site/data/capability_matrix.yaml
b/website/www/site/data/capability_matrix.yaml
index b7c236865ef..a1afdc6f8ab 100644
--- a/website/www/site/data/capability_matrix.yaml
+++ b/website/www/site/data/capability_matrix.yaml
@@ -1475,7 +1475,7 @@ capability-matrix:
- class: dataflow
l1: "Partially"
l2:
- l3: Dataflow has a native drain operation, but it does not work
in the presence of event time timer loops. Final implemention pending model
support.
+ l3: Dataflow has a native drain operation, support for event
time timer loops drain is limited to Non-portable runner.
- class: prism
l1: "No"
l2: