This is an automated email from the ASF dual-hosted git repository.

stankiewicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e7cb9f77973 [Website] add drain update to docs (#38450)
e7cb9f77973 is described below

commit e7cb9f779733a947ef2c8484c34e569b9fe63d39
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Wed May 13 09:26:25 2026 +0200

    [Website] add drain update to docs (#38450)
    
    * add drain docs
---
 CHANGES.md                                         |  1 +
 website/www/site/content/en/blog/looping-timers.md | 11 ++-
 .../content/en/documentation/programming-guide.md  | 83 ++++++++++++++++++++++
 website/www/site/data/capability_matrix.yaml       |  2 +-
 4 files changed, 94 insertions(+), 3 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 7c253e8cdee..058e19ea743 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -69,6 +69,7 @@
 
 ## New Features / Improvements
 
+* Capability introduces an indicator for aggregations and timers firing during 
a pipeline drain, allowing users and sinks to recognize and appropriately 
handle potentially incomplete or partial data 
([#36884](https://github.com/apache/beam/issues/36884)).
 * Added support for setting disk provisioned IOPS and throughput in Dataflow 
runner via `--diskProvisionedIops` and `--diskProvisionedThroughputMibps` 
pipeline options (Java/Go/Python) 
([#38349](https://github.com/apache/beam/issues/38349)).
 * TriggerStateMachineRunner changes from BitSetCoder to SentinelBitSetCoder to
   encode finished bitset. SentinelBitSetCoder and BitSetCoder are state
diff --git a/website/www/site/content/en/blog/looping-timers.md 
b/website/www/site/content/en/blog/looping-timers.md
index 2690dc967ec..ad13aae7544 100644
--- a/website/www/site/content/en/blog/looping-timers.md
+++ b/website/www/site/content/en/blog/looping-timers.md
@@ -221,11 +221,18 @@ public static class LoopingStatefulTimer extends 
DoFn<KV<String, Integer>, KV<St
     public void onTimer(
         OnTimerContext c,
         @StateId("key") ValueState<String> key,
-        @TimerId("loopingTimer") Timer loopingTimer) {
+        @TimerId("loopingTimer") Timer loopingTimer,
+        CausedByDrain drain) {
 
       LOG.info("Timer @ {} fired", c.timestamp());
       c.output(KV.of(key.read(), 0));
 
+      // Check if drain is in progress and avoid resetting the timer
+      if (drain == CausedByDrain.CAUSED_BY_DRAIN) {
+        LOG.info("Drain in progress, stopping looping timer.");
+        return;
+      }
+
       // If we do not put in a “time to live” value, then the timer would loop 
forever
       Instant nextTimer = c.timestamp().plus(Duration.standardMinutes(1));
       if (nextTimer.isBefore(stopTimerTime)) {
@@ -347,4 +354,4 @@ support for dealing with this use case in production.
 
 
 Runner specific notes:
-Google Cloud Dataflow Runners Drain feature does not support looping timers 
(Link to matrix)
+Support for cancelling looping timers on drain is currently limited to 
Dataflow and is being implemented (see [Issue 
#36884](https://github.com/apache/beam/issues/36884)).
diff --git a/website/www/site/content/en/documentation/programming-guide.md 
b/website/www/site/content/en/documentation/programming-guide.md
index 343fb128b3e..3f37f45bace 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -7501,6 +7501,89 @@ class BufferDoFn(DoFn):
 {{< code_sample "sdks/go/examples/snippets/04transforms.go" 
batching_dofn_example >}}
 {{< /highlight >}}
 
+#### 11.5.3. Looping timers {#looping-timers}
+
+Looping timers are a pattern where a timer sets another timer for a future 
time, creating a loop. This is useful for producing periodic outputs or 
heartbeats in the absence of data for a specific key.
+
+When draining a pipeline, it is important to terminate these loops to allow 
the pipeline to finish. In the Java SDK, you can use the `CausedByDrain` 
parameter in the `@OnTimer` method to check if the timer firing was induced by 
a drain operation. **Note:** `CausedByDrain` will be set only in certain 
runners. Check the [capability 
matrix](/documentation/runners/capability-matrix/) for more details.
+
+{{< highlight java >}}
+public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>, 
KV<String, Integer>> {
+    @TimerId("loopingTimer") private final TimerSpec loopingTimer = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @ProcessElement
+    public void process(
+        @Element KV<String, Integer> element,
+        @TimerId("loopingTimer") Timer timer,
+        OutputReceiver<KV<String, Integer>> output) {
+
+      // Set initial timer
+      timer.offset(Duration.standardMinutes(1)).setRelative();
+      output.output(element);
+    }
+
+    @OnTimer("loopingTimer")
+    public void onTimer(
+        @Key String key,
+        @TimerId("loopingTimer") Timer timer,
+        OutputReceiver<KV<String, Integer>> output,
+        CausedByDrain drain) {
+
+      output.output(KV.of(key, 0));
+
+      // Cancel looping timer if drain is in progress
+      if (drain == CausedByDrain.CAUSED_BY_DRAIN) {
+        return;
+      }
+
+      // Set next timer
+      timer.offset(Duration.standardMinutes(1)).setRelative();
+    }
+}
+{{< /highlight >}}
+
+{{< highlight py >}}
+# Python does not currently support detecting drain in OnTimer.
+# The following example demonstrates a looping timer without drain support,
+# using event time.
+
+class LoopingTimerDoFn(DoFn):
+  TIMER = TimerSpec('timer', TimeDomain.WATERMARK)
+
+  def process(self, element, ts=DoFn.TimestampParam, 
timer=DoFn.TimerParam(TIMER)):
+    timer.set(ts + Duration(seconds=60))
+    yield element
+
+  @on_timer(TIMER)
+  def on_timer(self, key=DoFn.KeyParam, timestamp=DoFn.TimestampParam, 
timer=DoFn.TimerParam(TIMER)):
+    yield (key, 0)
+    # Loops forever, cannot handle drain safely if it never stops.
+    timer.set(timestamp + Duration(seconds=60))
+{{< /highlight >}}
+
+{{< highlight go >}}
+// Go does not currently support detecting drain in OnTimer.
+// The following example demonstrates a looping timer without drain support,
+// using event time.
+
+type LoopingTimerFn struct {
+       Timer      timers.EventTime
+}
+
+func (fn *LoopingTimerFn) ProcessElement(et beam.EventTime, sp state.Provider, 
tp timers.Provider, key string, value int, emit func(string, int)) {
+       nextTime := et.ToTime().Add(60 * time.Second)
+       fn.Timer.Set(tp, nextTime)
+       emit(key, value)
+}
+
+func (fn *LoopingTimerFn) OnTimer(et beam.EventTime, sp state.Provider, tp 
timers.Provider, key string, timer timers.Context, emit func(string, int)) {
+       emit(key, 0)
+       // Loops forever, cannot handle drain safely if it never stops.
+       nextTime := et.ToTime().Add(60 * time.Second)
+       fn.Timer.Set(tp, nextTime)
+}
+{{< /highlight >}}
+
 
 ## 12. Splittable `DoFns` {#splittable-dofns}
 
diff --git a/website/www/site/data/capability_matrix.yaml 
b/website/www/site/data/capability_matrix.yaml
index b7c236865ef..a1afdc6f8ab 100644
--- a/website/www/site/data/capability_matrix.yaml
+++ b/website/www/site/data/capability_matrix.yaml
@@ -1475,7 +1475,7 @@ capability-matrix:
             - class: dataflow
               l1: "Partially"
               l2:
-              l3: Dataflow has a native drain operation, but it does not work 
in the presence of event time timer loops. Final implemention pending model 
support.
+              l3: Dataflow has a native drain operation, support for event 
time timer loops drain is limited to Non-portable runner.
             - class: prism
               l1: "No"
               l2:

Reply via email to