This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a6f763da54 [Drain] OnTimer - propagate caused by drain bit up to 
DoFnRunner (#37012)
5a6f763da54 is described below

commit 5a6f763da545a72416a4809d7d53e0927526233e
Author: RadosÅ‚aw Stankiewicz <[email protected]>
AuthorDate: Fri Feb 27 17:09:24 2026 +0100

    [Drain] OnTimer - propagate caused by drain bit up to DoFnRunner (#37012)
    
    * add causedByDrain to DoFnRunner.onTimer interface and all 
implementations. Mostly passthrough.
---
 .../org/apache/beam/runners/core/DoFnRunner.java   |  4 ++-
 .../runners/core/LateDataDroppingDoFnRunner.java   |  7 ++++--
 ...TimeBoundedSplittableProcessElementInvoker.java |  6 +++++
 .../apache/beam/runners/core/ProcessFnRunner.java  |  4 ++-
 .../runners/core/PushbackSideInputDoFnRunner.java  |  4 ++-
 .../apache/beam/runners/core/SimpleDoFnRunner.java | 25 ++++++++++++++++---
 .../core/SimplePushbackSideInputDoFnRunner.java    |  7 ++++--
 .../core/SplittableParDoViaKeyedWorkItems.java     |  2 +-
 .../beam/runners/core/StatefulDoFnRunner.java      | 16 +++++++++---
 .../beam/runners/core/SimpleDoFnRunnerTest.java    | 17 ++++++++-----
 .../SimplePushbackSideInputDoFnRunnerTest.java     | 20 +++++++++++----
 .../beam/runners/core/StatefulDoFnRunnerTest.java  |  3 ++-
 .../apache/beam/runners/direct/ParDoEvaluator.java |  3 ++-
 .../wrappers/streaming/DoFnOperator.java           |  3 ++-
 .../functions/FlinkStatefulDoFnFunction.java       |  3 ++-
 .../wrappers/streaming/DoFnOperator.java           |  3 ++-
 .../flink/metrics/DoFnRunnerWithMetricsUpdate.java | 14 +++++++++--
 .../functions/FlinkStatefulDoFnFunction.java       |  3 ++-
 .../wrappers/streaming/DoFnOperator.java           | 12 +++++----
 .../streaming/ExecutableStageDoFnOperator.java     |  7 ++++--
 .../streaming/stableinput/BufferedElements.java    | 10 +++++++-
 .../streaming/stableinput/BufferingDoFnRunner.java |  4 ++-
 .../wrappers/streaming/DoFnOperatorTest.java       |  4 ++-
 .../streaming/ExecutableStageDoFnOperatorTest.java | 13 +++++++---
 .../dataflow/worker/DataflowProcessFnRunner.java   |  4 ++-
 .../dataflow/worker/GroupAlsoByWindowFnRunner.java |  4 ++-
 .../runners/dataflow/worker/SimpleParDoFn.java     |  3 ++-
 .../StreamingKeyedWorkItemSideInputDoFnRunner.java |  4 ++-
 .../worker/StreamingSideInputDoFnRunner.java       |  4 ++-
 .../fnexecution/control/TimerReceiverFactory.java  |  3 ++-
 .../translation/PipelineTranslatorUtils.java       |  3 ++-
 .../fnexecution/control/RemoteExecutionTest.java   |  4 ++-
 .../runners/fnexecution/wire/CommonCoderTest.java  |  4 ++-
 .../runners/jet/processors/StatefulParDoP.java     |  3 ++-
 .../samza/metrics/DoFnRunnerWithMetrics.java       | 13 ++++++++--
 .../runners/samza/runtime/AsyncDoFnRunner.java     |  7 ++++--
 .../apache/beam/runners/samza/runtime/DoFnOp.java  |  3 ++-
 .../runtime/DoFnRunnerWithKeyedInternals.java      | 13 ++++++++--
 .../beam/runners/samza/runtime/PortableDoFnOp.java |  3 ++-
 .../runners/samza/runtime/SamzaDoFnRunners.java    |  7 ++++--
 .../translation/batch/DoFnRunnerFactory.java       |  4 ++-
 .../translation/batch/DoFnRunnerWithMetrics.java   | 14 +++++++++--
 .../spark/translation/AbstractInOutIterator.java   |  3 ++-
 .../spark/translation/DoFnRunnerWithMetrics.java   | 14 +++++++++--
 .../translation/AbstractInOutIteratorTest.java     |  9 +++++--
 .../translation/SparkInputDataProcessorTest.java   |  4 ++-
 .../java/org/apache/beam/sdk/transforms/DoFn.java  |  7 ++++++
 .../org/apache/beam/sdk/transforms/DoFnTester.java |  6 +++++
 .../construction/SplittableParDoNaiveBounded.java  |  6 +++++
 .../apache/beam/sdk/util/construction/Timer.java   | 29 +++++++++++++++++++---
 .../beam/sdk/util/construction/TimerTest.java      | 28 ++++++++++++++-------
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    | 29 +++++++++++++++++++++-
 .../beam/fn/harness/FnApiDoFnRunnerTest.java       |  4 ++-
 ...plittablePairWithRestrictionDoFnRunnerTest.java |  4 ++-
 .../harness/control/ProcessBundleHandlerTest.java  | 13 +++++++---
 55 files changed, 353 insertions(+), 97 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index dce4fcaa68b..b07b7eca7d8 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
@@ -46,7 +47,8 @@ public interface DoFnRunner<InputT extends @Nullable Object, 
OutputT extends @Nu
       BoundedWindow window,
       Instant timestamp,
       Instant outputTimestamp,
-      TimeDomain timeDomain);
+      TimeDomain timeDomain,
+      CausedByDrain causedByDrain);
 
   /**
    * Calls a {@link DoFn DoFn's} {@link DoFn.FinishBundle @FinishBundle} 
method and performs
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index f89c3f36db3..f5587b46598 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
@@ -89,8 +90,10 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, 
W extends BoundedWin
       BoundedWindow window,
       Instant timestamp,
       Instant outputTimestamp,
-      TimeDomain timeDomain) {
-    doFnRunner.onTimer(timerId, timerFamilyId, key, window, timestamp, 
outputTimestamp, timeDomain);
+      TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
+    doFnRunner.onTimer(
+        timerId, timerFamilyId, key, window, timestamp, outputTimestamp, 
timeDomain, causedByDrain);
   }
 
   @Override
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 6f9f15b1358..5f37067233a 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -47,6 +47,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.OutputBuilderSuppliers;
 import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.Row;
@@ -396,6 +397,11 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
       return element.getRecordOffset();
     }
 
+    @Override
+    public CausedByDrain causedByDrain() {
+      return element.causedByDrain();
+    }
+
     @Override
     public PipelineOptions getPipelineOptions() {
       return pipelineOptions;
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
index b8a4dcf7cb7..e678ad81cf1 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.WindowedValue;
@@ -90,7 +91,8 @@ public class ProcessFnRunner<InputT, OutputT, RestrictionT>
       BoundedWindow window,
       Instant timestamp,
       Instant outputTimestamp,
-      TimeDomain timeDomain) {
+      TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
     throw new UnsupportedOperationException("User timers unsupported in 
ProcessFn");
   }
 
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 61feaffad91..fc78585422c 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.joda.time.Instant;
 
@@ -50,7 +51,8 @@ public interface PushbackSideInputDoFnRunner<InputT, OutputT> 
{
       BoundedWindow window,
       Instant timestamp,
       Instant outputTimestamp,
-      TimeDomain timeDomain);
+      TimeDomain timeDomain,
+      CausedByDrain causedByDrain);
 
   /** Calls the underlying {@link DoFn.OnWindowExpiration} method. */
   <KeyT> void onWindowExpiration(BoundedWindow window, Instant 
outputTimestamp, KeyT key);
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 0fd63556b9c..05186ba5adb 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.util.OutputBuilderSuppliers;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.TupleTag;
@@ -200,11 +201,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
       BoundedWindow window,
       Instant timestamp,
       Instant outputTimestamp,
-      TimeDomain timeDomain) {
+      TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
     Preconditions.checkNotNull(outputTimestamp, "outputTimestamp");
 
     OnTimerArgumentProvider<KeyT> argumentProvider =
-        new OnTimerArgumentProvider<>(timerId, key, window, timestamp, 
outputTimestamp, timeDomain);
+        new OnTimerArgumentProvider<>(
+            timerId, key, window, timestamp, outputTimestamp, timeDomain, 
causedByDrain);
     invoker.invokeOnTimer(timerId, timerFamilyId, argumentProvider);
   }
 
@@ -399,6 +402,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
       return elem.getValue();
     }
 
+    @Override
+    public CausedByDrain causedByDrain() {
+      return elem.causedByDrain();
+    }
+
     @Override
     public <T> T sideInput(PCollectionView<T> view) {
       checkNotNull(view, "View passed to sideInput cannot be null");
@@ -702,6 +710,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     private final TimeDomain timeDomain;
     private final String timerId;
     private final KeyT key;
+    private final CausedByDrain causedByDrain;
     private final OutputBuilderSupplier builderSupplier;
 
     /** Lazily initialized; should only be accessed via {@link 
#getNamespace()}. */
@@ -727,7 +736,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
         BoundedWindow window,
         Instant fireTimestamp,
         Instant timestamp,
-        TimeDomain timeDomain) {
+        TimeDomain timeDomain,
+        CausedByDrain causedByDrain) {
       fn.super();
       this.timerId = timerId;
       this.window = window;
@@ -735,13 +745,15 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
       this.timestamp = timestamp;
       this.timeDomain = timeDomain;
       this.key = key;
+      this.causedByDrain = causedByDrain;
       this.builderSupplier =
           OutputBuilderSuppliers.supplierForElement(
               WindowedValues.builder()
                   .setValue(null)
                   .setTimestamp(timestamp)
                   .setWindow(window)
-                  .setPaneInfo(PaneInfo.NO_FIRING));
+                  .setPaneInfo(PaneInfo.NO_FIRING)
+                  .setCausedByDrain(causedByDrain));
     }
 
     @Override
@@ -749,6 +761,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
       return timestamp;
     }
 
+    @Override
+    public CausedByDrain causedByDrain() {
+      return causedByDrain;
+    }
+
     @Override
     public Instant fireTimestamp() {
       return fireTimestamp;
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
index 2be8071f983..fe1c0fd45c9 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.WindowedValue;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -115,8 +116,10 @@ public class SimplePushbackSideInputDoFnRunner<InputT, 
OutputT>
       BoundedWindow window,
       Instant timestamp,
       Instant outputTimestamp,
-      TimeDomain timeDomain) {
-    underlying.onTimer(timerId, timerFamilyId, key, window, timestamp, 
outputTimestamp, timeDomain);
+      TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
+    underlying.onTimer(
+        timerId, timerFamilyId, key, window, timestamp, outputTimestamp, 
timeDomain, causedByDrain);
   }
 
   @Override
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 8f8c69b5e97..72219083c06 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -605,7 +605,7 @@ public class SplittableParDoViaKeyedWorkItems {
               wakeupTime,
               wakeupTime,
               TimeDomain.PROCESSING_TIME,
-              CausedByDrain.NORMAL));
+              timer == null ? CausedByDrain.NORMAL : timer.causedByDrain()));
     }
 
     private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapOptionsAsSetup(
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 52bbb3a306a..e562a4067d2 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -48,7 +49,8 @@ import org.joda.time.Instant;
 /**
  * A customized {@link DoFnRunner} that handles late data dropping and garbage 
collection for
  * stateful {@link DoFn DoFns}. It registers a GC timer in {@link 
#processElement(WindowedValue)}
- * and does cleanup in {@link #onTimer(String, String, BoundedWindow, Instant, 
Instant, TimeDomain)}
+ * and does cleanup in {@link #onTimer(String, String, BoundedWindow, Instant, 
Instant, TimeDomain,
+ * boolean)}
  *
  * @param <InputT> the type of the {@link DoFn} (main) input elements
  * @param <OutputT> the type of the {@link DoFn} (main) output elements
@@ -208,7 +210,8 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends 
BoundedWindow>
       BoundedWindow window,
       Instant timestamp,
       Instant outputTimestamp,
-      TimeDomain timeDomain) {
+      TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
 
     if (timerId.equals(SORT_FLUSH_TIMER)) {
       onSortFlushTimer(window, 
stepContext.timerInternals().currentInputWatermarkTime());
@@ -232,7 +235,14 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends 
BoundedWindow>
             stepContext.timerInternals().currentInputWatermarkTime());
       } else {
         doFnRunner.onTimer(
-            timerId, timerFamilyId, key, window, timestamp, outputTimestamp, 
timeDomain);
+            timerId,
+            timerFamilyId,
+            key,
+            window,
+            timestamp,
+            outputTimestamp,
+            timeDomain,
+            causedByDrain);
       }
     }
   }
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 92385b1c822..49c9277314b 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -141,7 +141,8 @@ public class SimpleDoFnRunnerTest {
         GlobalWindow.INSTANCE,
         new Instant(0),
         new Instant(0),
-        TimeDomain.EVENT_TIME);
+        TimeDomain.EVENT_TIME,
+        CausedByDrain.NORMAL);
   }
 
   /**
@@ -266,7 +267,8 @@ public class SimpleDoFnRunnerTest {
         GlobalWindow.INSTANCE,
         currentTime.plus(offset),
         currentTime.plus(offset),
-        TimeDomain.EVENT_TIME);
+        TimeDomain.EVENT_TIME,
+        CausedByDrain.CAUSED_BY_DRAIN);
 
     assertThat(
         fn.onTimerInvocations,
@@ -277,7 +279,8 @@ public class SimpleDoFnRunnerTest {
                 StateNamespaces.window(windowFn.windowCoder(), 
GlobalWindow.INSTANCE),
                 currentTime.plus(offset),
                 currentTime.plus(offset),
-                TimeDomain.EVENT_TIME)));
+                TimeDomain.EVENT_TIME,
+                CausedByDrain.CAUSED_BY_DRAIN)));
   }
 
   /**
@@ -593,7 +596,8 @@ public class SimpleDoFnRunnerTest {
         GlobalWindow.INSTANCE,
         new Instant(0),
         new Instant(0),
-        TimeDomain.EVENT_TIME);
+        TimeDomain.EVENT_TIME,
+        CausedByDrain.NORMAL);
   }
 
   @Test
@@ -625,7 +629,8 @@ public class SimpleDoFnRunnerTest {
                   GlobalWindow.INSTANCE,
                   new Instant(0),
                   new Instant(0),
-                  TimeDomain.EVENT_TIME);
+                  TimeDomain.EVENT_TIME,
+                  CausedByDrain.NORMAL);
             });
 
     assertThat(exception.getCause(), isA(IllegalArgumentException.class));
@@ -703,7 +708,7 @@ public class SimpleDoFnRunnerTest {
               context.fireTimestamp(),
               context.timestamp(),
               context.timeDomain(),
-              CausedByDrain.NORMAL));
+              context.causedByDrain()));
     }
   }
 
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
index e32065f6ef5..9703c3c80bd 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
@@ -308,7 +308,15 @@ public class SimplePushbackSideInputDoFnRunnerTest {
 
     // Mocking is not easily compatible with annotation analysis, so we 
manually record
     // the method call.
-    runner.onTimer(timerId, "", null, window, timestamp, timestamp, 
TimeDomain.EVENT_TIME);
+    runner.onTimer(
+        timerId,
+        "",
+        null,
+        window,
+        timestamp,
+        timestamp,
+        TimeDomain.EVENT_TIME,
+        CausedByDrain.CAUSED_BY_DRAIN);
 
     assertThat(
         underlying.firedTimers,
@@ -319,7 +327,7 @@ public class SimplePushbackSideInputDoFnRunnerTest {
                 timestamp,
                 timestamp,
                 TimeDomain.EVENT_TIME,
-                CausedByDrain.NORMAL)));
+                CausedByDrain.CAUSED_BY_DRAIN)));
   }
 
   private static class TestDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, OutputT> {
@@ -353,7 +361,8 @@ public class SimplePushbackSideInputDoFnRunnerTest {
         BoundedWindow window,
         Instant timestamp,
         Instant outputTimestamp,
-        TimeDomain timeDomain) {
+        TimeDomain timeDomain,
+        CausedByDrain causedByDrain) {
       firedTimers.add(
           TimerData.of(
               timerId,
@@ -362,7 +371,7 @@ public class SimplePushbackSideInputDoFnRunnerTest {
               timestamp,
               outputTimestamp,
               timeDomain,
-              CausedByDrain.NORMAL));
+              causedByDrain));
     }
 
     @Override
@@ -510,7 +519,8 @@ public class SimplePushbackSideInputDoFnRunnerTest {
           window,
           timer.getTimestamp(),
           timer.getOutputTimestamp(),
-          timer.getDomain());
+          timer.getDomain(),
+          timer.causedByDrain());
     }
   }
 
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 4ed4f09fe62..df3aceff4a9 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -468,7 +468,8 @@ public class StatefulDoFnRunnerTest {
           window,
           timer.getTimestamp(),
           timer.getOutputTimestamp(),
-          timer.getDomain());
+          timer.getDomain(),
+          timer.causedByDrain());
     }
   }
 
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 6e7763e4b79..e42ae91d097 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -257,7 +257,8 @@ class ParDoEvaluator<InputT> implements 
TransformEvaluator<InputT> {
           window,
           timer.getTimestamp(),
           timer.getOutputTimestamp(),
-          timer.getDomain());
+          timer.getDomain(),
+          timer.causedByDrain());
     } catch (Exception e) {
       throw UserCodeException.wrap(e);
     }
diff --git 
a/runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 43668e0298e..dca2f3075aa 100644
--- 
a/runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -1200,7 +1200,8 @@ public class DoFnOperator<PreInputT, InputT, OutputT>
         window,
         timerData.getTimestamp(),
         timerData.getOutputTimestamp(),
-        timerData.getDomain());
+        timerData.getDomain(),
+        timerData.causedByDrain());
   }
 
   @SuppressWarnings("unchecked")
diff --git 
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
 
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 2a208d30a87..7e0b0be28d4 100644
--- 
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ 
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -250,7 +250,8 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
         window,
         timer.getTimestamp(),
         timer.getOutputTimestamp(),
-        timer.getDomain());
+        timer.getDomain(),
+        timer.causedByDrain());
   }
 
   @Override
diff --git 
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index f5ce658de4f..f83e719ed0b 100644
--- 
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -1200,7 +1200,8 @@ public class DoFnOperator<PreInputT, InputT, OutputT>
         window,
         timerData.getTimestamp(),
         timerData.getOutputTimestamp(),
-        timerData.getDomain());
+        timerData.getDomain(),
+        timerData.causedByDrain());
   }
 
   @SuppressWarnings("unchecked")
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
index f1ec36564a4..56e077253ae 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.joda.time.Instant;
 
@@ -73,10 +74,19 @@ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT> 
implements DoFnRunner<
       final BoundedWindow window,
       final Instant timestamp,
       final Instant outputTimestamp,
-      final TimeDomain timeDomain) {
+      final TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
     try (Closeable ignored =
         
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName)))
 {
-      delegate.onTimer(timerId, timerFamilyId, key, window, timestamp, 
outputTimestamp, timeDomain);
+      delegate.onTimer(
+          timerId,
+          timerFamilyId,
+          key,
+          window,
+          timestamp,
+          outputTimestamp,
+          timeDomain,
+          causedByDrain);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 3aa5a0802b3..12c59569f43 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -250,7 +250,8 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
         window,
         timer.getTimestamp(),
         timer.getOutputTimestamp(),
-        timer.getDomain());
+        timer.getDomain(),
+        timer.causedByDrain());
   }
 
   @Override
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 13414682f8e..2bf0d40cd5f 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -974,10 +974,10 @@ public class DoFnOperator<PreInputT, InputT, OutputT>
    * of finishing a bundle in snapshot() first.
    *
    * <p>In order to avoid having {@link 
DoFnRunner#processElement(WindowedValue)} or {@link
-   * DoFnRunner#onTimer(String, String, Object, BoundedWindow, Instant, 
Instant, TimeDomain)} not
-   * between StartBundle and FinishBundle, this method needs to be called in 
each processElement and
-   * each processWatermark and onProcessingTime. Do not need to call in 
onEventTime, because it has
-   * been guaranteed in the processWatermark.
+   * DoFnRunner#onTimer(String, String, Object, BoundedWindow, Instant, 
Instant, TimeDomain,
+   * CausedByDrain)} not between StartBundle and FinishBundle, this method 
needs to be called in
+   * each processElement and each processWatermark and onProcessingTime. Do 
not need to call in
+   * onEventTime, because it has been guaranteed in the processWatermark.
    */
   private void checkInvokeStartBundle() {
     if (!bundleStarted) {
@@ -1195,6 +1195,7 @@ public class DoFnOperator<PreInputT, InputT, OutputT>
     BoundedWindow window = ((WindowNamespace) namespace).getWindow();
     timerInternals.onFiredOrDeletedTimer(timerData);
 
+    // hej
     pushbackDoFnRunner.onTimer(
         timerData.getTimerId(),
         timerData.getTimerFamilyId(),
@@ -1202,7 +1203,8 @@ public class DoFnOperator<PreInputT, InputT, OutputT>
         window,
         timerData.getTimestamp(),
         timerData.getOutputTimestamp(),
-        timerData.getDomain());
+        timerData.getDomain(),
+        timerData.causedByDrain());
   }
 
   @SuppressWarnings("unchecked")
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index d5a7ff035ef..2134fa869b4 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -101,6 +101,7 @@ import 
org.apache.beam.sdk.util.construction.PTransformTranslation;
 import org.apache.beam.sdk.util.construction.Timer;
 import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
 import org.apache.beam.sdk.util.construction.graph.UserStateReference;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -1001,7 +1002,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
         BoundedWindow window,
         Instant timestamp,
         Instant outputTimestamp,
-        TimeDomain timeDomain) {
+        TimeDomain timeDomain,
+        CausedByDrain causedByDrain) {
       Object timerKey = keyForTimer.get();
       Preconditions.checkNotNull(timerKey, "Key for timer needs to be set 
before calling onTimer");
       Preconditions.checkNotNull(remoteBundle, "Call to onTimer outside of a 
bundle");
@@ -1034,7 +1036,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
                 timestamp,
                 outputTimestamp,
                 // TODO: Support propagating the PaneInfo through.
-                PaneInfo.NO_FIRING);
+                PaneInfo.NO_FIRING,
+                causedByDrain);
         try {
           timerReceiver.accept(timerValue);
         } catch (Exception e) {
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java
index 0c2ba87c4ba..d0a1b12e01e 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -104,7 +105,14 @@ class BufferedElements {
     @Override
     public void processWith(DoFnRunner doFnRunner) {
       doFnRunner.onTimer(
-          timerId, timerFamilyId, key, window, timestamp, outputTimestamp, 
timeDomain);
+          timerId,
+          timerFamilyId,
+          key,
+          window,
+          timestamp,
+          outputTimestamp,
+          timeDomain,
+          CausedByDrain.NORMAL);
     }
 
     @Override
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
index 10a3182f90f..73b20238ef0 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
@@ -32,6 +32,7 @@ import org.apache.beam.runners.flink.translation.utils.Locker;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
@@ -234,7 +235,8 @@ public class BufferingDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT,
       BoundedWindow window,
       Instant timestamp,
       Instant outputTimestamp,
-      TimeDomain timeDomain) {
+      TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
 
     minBufferedElementTimestamp =
         Math.min(outputTimestamp.getMillis(), minBufferedElementTimestamp);
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index c2556d7229b..5c4975ffab0 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -81,6 +81,7 @@ import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -507,7 +508,8 @@ public class DoFnOperatorTest {
                   BoundedWindow window,
                   Instant timestamp,
                   Instant outputTimestamp,
-                  TimeDomain timeDomain) {
+                  TimeDomain timeDomain,
+                  CausedByDrain causedByDrain) {
 
                 if ("cleanup".equals(timerId)) {
                   holdState.clear();
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 982871e59f1..01c9d25f1bf 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -98,6 +98,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.NoopLock;
 import org.apache.beam.sdk.util.construction.Timer;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowedValue;
@@ -534,14 +535,16 @@ public class ExecutableStageDoFnOperatorTest {
                     windowedValue.getWindows(),
                     timestamp,
                     timestamp,
-                    PaneInfo.NO_FIRING),
+                    PaneInfo.NO_FIRING,
+                    CausedByDrain.NORMAL),
                 TimerInternals.TimerData.of(
                     "",
                     TimerReceiverFactory.encodeToTimerDataTimerId("transform", 
timerId),
                     StateNamespaces.window(IntervalWindow.getCoder(), 
intervalWindow),
                     timestamp,
                     timestamp,
-                    TimeDomain.EVENT_TIME));
+                    TimeDomain.EVENT_TIME,
+                    CausedByDrain.NORMAL));
 
     timerConsumer.accept("timer", timerTarget);
     timerConsumer.accept("timer2", timerTarget2);
@@ -867,7 +870,8 @@ public class ExecutableStageDoFnOperatorTest {
             stateNamespace,
             window.maxTimestamp(),
             window.maxTimestamp(),
-            TimeDomain.EVENT_TIME);
+            TimeDomain.EVENT_TIME,
+            CausedByDrain.NORMAL);
     operator.setTimer(
         Timer.of(
             windowedValue.getValue().getKey(),
@@ -875,7 +879,8 @@ public class ExecutableStageDoFnOperatorTest {
             windowedValue.getWindows(),
             window.maxTimestamp(),
             window.maxTimestamp(),
-            PaneInfo.NO_FIRING),
+            PaneInfo.NO_FIRING,
+            CausedByDrain.NORMAL),
         userTimer2);
     assertThat(testHarness.numEventTimeTimers(), is(1));
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
index 741716b8284..ec1fcd6c843 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
@@ -117,7 +118,8 @@ class DataflowProcessFnRunner<InputT, OutputT, RestrictionT>
       BoundedWindow window,
       Instant timestamp,
       Instant outputTimestamp,
-      TimeDomain timeDomain) {
+      TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
     throw new UnsupportedOperationException("Unsupported for ProcessFn");
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
index 1909a73dc8b..37e04316e23 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValueReceiver;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.joda.time.Instant;
 
@@ -81,7 +82,8 @@ public class GroupAlsoByWindowFnRunner<InputT, OutputT> 
implements DoFnRunner<In
       BoundedWindow window,
       Instant timestamp,
       Instant outputTimestamp,
-      TimeDomain timeDomain) {
+      TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
     throw new UnsupportedOperationException(
         String.format("Timers are not supported by %s", 
GroupAlsoByWindowFn.class.getSimpleName()));
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
index 00c8192b1e4..434d46c20a5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
@@ -374,7 +374,8 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
           window,
           timer.getTimestamp(),
           timer.getOutputTimestamp(),
-          timer.getDomain());
+          timer.getDomain(),
+          timer.causedByDrain());
     }
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
index e5d8a18be76..0b9ccd1f37c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
@@ -142,7 +143,8 @@ public class StreamingKeyedWorkItemSideInputDoFnRunner<K, 
InputT, OutputT, W ext
       BoundedWindow window,
       Instant timestamp,
       Instant outputTimestamp,
-      TimeDomain timeDomain) {
+      TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
     throw new UnsupportedOperationException(
         "Attempt to deliver a timer to a DoFn, but timers are not supported in 
Dataflow.");
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
index b5b723adb2b..3b7891c5378 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.joda.time.Instant;
 
@@ -84,7 +85,8 @@ public class StreamingSideInputDoFnRunner<InputT, OutputT, W 
extends BoundedWind
       BoundedWindow window,
       Instant timestamp,
       Instant outputTimestamp,
-      TimeDomain timeDomain) {
+      TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
     throw new UnsupportedOperationException(
         "Attempt to deliver a timer to a DoFn, but timers are not supported in 
Dataflow.");
   }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
index d99a733a4eb..952431411b8 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
@@ -94,7 +94,8 @@ public class TimerReceiverFactory {
                 namespace,
                 timer.getClearBit() ? BoundedWindow.TIMESTAMP_MAX_VALUE : 
timer.getFireTimestamp(),
                 timer.getClearBit() ? BoundedWindow.TIMESTAMP_MAX_VALUE : 
timer.getHoldTimestamp(),
-                timerSpec.getTimerSpec().getTimeDomain());
+                timerSpec.getTimerSpec().getTimeDomain(),
+                timer.causedByDrain());
         timerDataConsumer.accept(timer, timerData);
       }
     };
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index 5b7880ad56e..d7d4c8cac44 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -156,7 +156,8 @@ public final class PipelineTranslatorUtils {
             Collections.singletonList(window),
             timestamp,
             outputTimestamp,
-            PaneInfo.NO_FIRING);
+            PaneInfo.NO_FIRING,
+            timer.causedByDrain());
     KV<String, String> transformAndTimerId =
         TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerFamilyId());
     FnDataReceiver<Timer> fnTimerReceiver = 
timerReceivers.get(transformAndTimerId);
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 556cc7993ae..996c206a7e9 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -137,6 +137,7 @@ import 
org.apache.beam.sdk.util.construction.graph.GreedyPipelineFuser;
 import org.apache.beam.sdk.util.construction.graph.PipelineNode.PTransformNode;
 import org.apache.beam.sdk.util.construction.graph.ProtoOverrides;
 import org.apache.beam.sdk.util.construction.graph.SplittableParDoExpander;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -2254,6 +2255,7 @@ public class RemoteExecutionTest implements Serializable {
         Collections.singletonList(GlobalWindow.INSTANCE),
         BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(fireTimestamp)),
         BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(holdTimestamp)),
-        PaneInfo.NO_FIRING);
+        PaneInfo.NO_FIRING,
+        CausedByDrain.NORMAL);
   }
 }
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java
index eccf1e66434..a79b0996cbb 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java
@@ -84,6 +84,7 @@ import 
org.apache.beam.sdk.util.construction.CoderTranslation.TranslationContext
 import org.apache.beam.sdk.util.construction.CoderTranslator;
 import org.apache.beam.sdk.util.construction.ModelCoderRegistrar;
 import org.apache.beam.sdk.util.construction.Timer;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.sdk.values.WindowedValues;
@@ -326,7 +327,8 @@ public class CommonCoderTest {
           windows,
           new Instant(((Number) kvMap.get("fireTimestamp")).longValue()),
           new Instant(((Number) kvMap.get("holdTimestamp")).longValue()),
-          paneInfo);
+          paneInfo,
+          CausedByDrain.NORMAL); // todo - add tests once causedByDrain is 
added to proto
     } else if (s.equals(getUrn(StandardCoders.Enum.INTERVAL_WINDOW))) {
       Map<String, Object> kvMap = (Map<String, Object>) value;
       Instant end = new Instant(((Number) kvMap.get("end")).longValue());
diff --git 
a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
 
b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
index f696873ec5c..ed085e5e56b 100644
--- 
a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
+++ 
b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
@@ -105,7 +105,8 @@ public class StatefulParDoP<OutputT>
         window,
         timer.getTimestamp(),
         timer.getOutputTimestamp(),
-        timer.getDomain());
+        timer.getDomain(),
+        timer.causedByDrain());
   }
 
   @Override
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java
index b815649a765..7bec91abb34 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java
@@ -21,6 +21,7 @@ import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.joda.time.Instant;
 
@@ -61,11 +62,19 @@ public class DoFnRunnerWithMetrics<InT, OutT> implements 
DoFnRunner<InT, OutT> {
       BoundedWindow window,
       Instant timestamp,
       Instant outputTimestamp,
-      TimeDomain timeDomain) {
+      TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
     withMetrics(
         () ->
             underlying.onTimer(
-                timerId, timerFamilyId, key, window, timestamp, 
outputTimestamp, timeDomain),
+                timerId,
+                timerFamilyId,
+                key,
+                window,
+                timestamp,
+                outputTimestamp,
+                timeDomain,
+                causedByDrain),
         false);
   }
 
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunner.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunner.java
index d07a9bda78c..2485ac2d552 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunner.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunner.java
@@ -29,6 +29,7 @@ import org.apache.beam.runners.samza.SamzaPipelineOptions;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -154,8 +155,10 @@ public class AsyncDoFnRunner<InT, OutT> implements 
DoFnRunner<InT, OutT> {
       BoundedWindow window,
       Instant timestamp,
       Instant outputTimestamp,
-      TimeDomain timeDomain) {
-    underlying.onTimer(timerId, timerFamilyId, key, window, timestamp, 
outputTimestamp, timeDomain);
+      TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
+    underlying.onTimer(
+        timerId, timerFamilyId, key, window, timestamp, outputTimestamp, 
timeDomain, causedByDrain);
   }
 
   @Override
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index 2f27e31d05d..bc87e2460ec 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -437,7 +437,8 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, 
OutT, Void> {
         window,
         timer.getTimestamp(),
         timer.getOutputTimestamp(),
-        timer.getDomain());
+        timer.getDomain(),
+        timer.causedByDrain());
   }
 
   // todo: should this go through bundle manager to start and finish the 
bundle?
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java
index 34e3405660c..84cf5b26c50 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java
@@ -22,6 +22,7 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.joda.time.Instant;
@@ -66,14 +67,22 @@ public class DoFnRunnerWithKeyedInternals<InputT, OutputT> 
implements DoFnRunner
       BoundedWindow window,
       Instant timestamp,
       Instant outputTimestamp,
-      TimeDomain timeDomain) {
+      TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
     // Note: wrap with KV.of(key, null) as a special use case of 
setKeyedInternals() to set key
     // directly.
     setKeyedInternals(KV.of(key, null));
 
     try {
       underlying.onTimer(
-          timerId, timerFamilyId, key, window, timestamp, outputTimestamp, 
timeDomain);
+          timerId,
+          timerFamilyId,
+          key,
+          window,
+          timestamp,
+          outputTimestamp,
+          timeDomain,
+          causedByDrain);
     } finally {
       clearKeyedInternals();
     }
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableDoFnOp.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableDoFnOp.java
index 743a42d1479..468e4b9aa8d 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableDoFnOp.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableDoFnOp.java
@@ -425,7 +425,8 @@ public class PortableDoFnOp<InT, FnOutT, OutT> implements 
Op<InT, OutT, Void> {
         window,
         timer.getTimestamp(),
         timer.getOutputTimestamp(),
-        timer.getDomain());
+        timer.getDomain(),
+        timer.causedByDrain());
   }
 
   // todo: should this go through bundle manager to start and finish the 
bundle?
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index 7129ba9145e..a2ec88a4341 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -61,6 +61,7 @@ import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
 import org.apache.beam.sdk.util.construction.Timer;
 import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
 import org.apache.beam.sdk.util.construction.graph.PipelineNode;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -430,7 +431,8 @@ public class SamzaDoFnRunners {
         BoundedWindow window,
         Instant timestamp,
         Instant outputTimestamp,
-        TimeDomain timeDomain) {
+        TimeDomain timeDomain,
+        CausedByDrain causedByDrain) {
       final KV<String, String> timerReceiverKey =
           TimerReceiverFactory.decodeTimerDataTimerId(timerFamilyId);
       final FnDataReceiver<Timer> timerReceiver =
@@ -443,7 +445,8 @@ public class SamzaDoFnRunners {
               timestamp,
               outputTimestamp,
               // TODO: Support propagating the PaneInfo through.
-              PaneInfo.NO_FIRING);
+              PaneInfo.NO_FIRING,
+              causedByDrain);
       try {
         timerReceiver.accept(timerValue);
       } catch (Exception e) {
diff --git 
a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java
 
b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java
index 350f7daa56c..15ec818dba7 100644
--- 
a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java
+++ 
b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
 import org.apache.beam.sdk.util.WindowedValueReceiver;
 import org.apache.beam.sdk.util.construction.ParDoTranslation;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -266,7 +267,8 @@ abstract class DoFnRunnerFactory<InT, T> implements 
Serializable {
           BoundedWindow window,
           Instant timestamp,
           Instant outputTimestamp,
-          TimeDomain timeDomain) {
+          TimeDomain timeDomain,
+          CausedByDrain causedByDrain) {
         throw new UnsupportedOperationException();
       }
 
diff --git 
a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
 
b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
index db4ee5be578..28dbf44cb8f 100644
--- 
a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
+++ 
b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.joda.time.Instant;
 
@@ -77,9 +78,18 @@ class DoFnRunnerWithMetrics<InT, OutT> implements 
DoFnRunnerWithTeardown<InT, Ou
       final BoundedWindow window,
       final Instant timestamp,
       final Instant outputTimestamp,
-      final TimeDomain timeDomain) {
+      final TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
     try (Closeable ignored = 
MetricsEnvironment.scopedMetricsContainer(metrics)) {
-      delegate.onTimer(timerId, timerFamilyId, key, window, timestamp, 
outputTimestamp, timeDomain);
+      delegate.onTimer(
+          timerId,
+          timerFamilyId,
+          key,
+          window,
+          timestamp,
+          outputTimestamp,
+          timeDomain,
+          causedByDrain);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/AbstractInOutIterator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/AbstractInOutIterator.java
index 9d73a605b3b..4ac56c2550b 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/AbstractInOutIterator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/AbstractInOutIterator.java
@@ -75,7 +75,8 @@ public abstract class AbstractInOutIterator<K, InputT, 
OutputT>
               window,
               timer.getTimestamp(),
               timer.getOutputTimestamp(),
-              timer.getDomain());
+              timer.getDomain(),
+              timer.causedByDrain());
     } finally {
       if (this.ctx.getTimerDataIterator()
           instanceof ParDoStateUpdateFn.SparkTimerInternalsIterator) {
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
index a6b1f65571d..c8cd7eb5f26 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.joda.time.Instant;
 
@@ -76,9 +77,18 @@ public class DoFnRunnerWithMetrics<InputT, OutputT> 
implements DoFnRunner<InputT
       final BoundedWindow window,
       final Instant timestamp,
       final Instant outputTimestamp,
-      final TimeDomain timeDomain) {
+      final TimeDomain timeDomain,
+      CausedByDrain causedByDrain) {
     try (Closeable ignored = 
MetricsEnvironment.scopedMetricsContainer(metricsContainer())) {
-      delegate.onTimer(timerId, timerFamilyId, key, window, timestamp, 
outputTimestamp, timeDomain);
+      delegate.onTimer(
+          timerId,
+          timerFamilyId,
+          key,
+          window,
+          timestamp,
+          outputTimestamp,
+          timeDomain,
+          causedByDrain);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/AbstractInOutIteratorTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/AbstractInOutIteratorTest.java
index b1e0e3a3980..cfda84c1981 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/AbstractInOutIteratorTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/AbstractInOutIteratorTest.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.joda.time.Instant;
@@ -56,6 +57,7 @@ public class AbstractInOutIteratorTest {
   private static final Instant TEST_TIMESTAMP = new Instant(42L);
   private static final Instant TEST_OUTPUT_TIMESTAMP = new Instant(84L);
   private static final TimeDomain TEST_TIME_DOMAIN = TimeDomain.EVENT_TIME;
+  private static final CausedByDrain TEST_CAUSED_BY_DRAIN = 
CausedByDrain.CAUSED_BY_DRAIN;
 
   private StateNamespace testNamespace;
 
@@ -88,6 +90,7 @@ public class AbstractInOutIteratorTest {
     when(mockTimer.getTimestamp()).thenReturn(TEST_TIMESTAMP);
     when(mockTimer.getOutputTimestamp()).thenReturn(TEST_OUTPUT_TIMESTAMP);
     when(mockTimer.getDomain()).thenReturn(TEST_TIME_DOMAIN);
+    when(mockTimer.causedByDrain()).thenReturn(TEST_CAUSED_BY_DRAIN);
   }
 
   @Test
@@ -107,7 +110,8 @@ public class AbstractInOutIteratorTest {
             mockWindow,
             TEST_TIMESTAMP,
             TEST_OUTPUT_TIMESTAMP,
-            TEST_TIME_DOMAIN);
+            TEST_TIME_DOMAIN,
+            CausedByDrain.CAUSED_BY_DRAIN);
 
     // Verify that timer data iterator deletion was not called (no timer 
iterator was set in this
     // test)
@@ -133,7 +137,8 @@ public class AbstractInOutIteratorTest {
             mockWindow,
             TEST_TIMESTAMP,
             TEST_OUTPUT_TIMESTAMP,
-            TEST_TIME_DOMAIN);
+            TEST_TIME_DOMAIN,
+            CausedByDrain.CAUSED_BY_DRAIN);
 
     // Verify that the timer data iterator's deleteTimer method was called
     verify(mockTimerDataIterator).deleteTimer(mockTimer);
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java
index 2ff06b59f8e..2dc428d5a6b 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
@@ -252,7 +253,8 @@ public class SparkInputDataProcessorTest {
         BoundedWindow window,
         Instant timestamp,
         Instant outputTimestamp,
-        TimeDomain timeDomain) {}
+        TimeDomain timeDomain,
+        CausedByDrain causedByDrain) {}
 
     @Override
     public void finishBundle() {}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 125408108c0..c1b4f2ec7d8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -45,6 +45,7 @@ import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.OutputBuilder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -330,6 +331,9 @@ public abstract class DoFn<InputT extends @Nullable Object, 
OutputT extends @Nul
 
     @Pure
     public abstract @Nullable Long currentRecordOffset();
+
+    @Pure
+    public abstract CausedByDrain causedByDrain();
   }
 
   /** Information accessible when running a {@link DoFn.OnTimer} method. */
@@ -346,6 +350,9 @@ public abstract class DoFn<InputT extends @Nullable Object, 
OutputT extends @Nul
 
     /** Returns the time domain of the current timer. */
     public abstract TimeDomain timeDomain();
+
+    @Pure
+    public abstract CausedByDrain causedByDrain();
   }
 
   public abstract class OnWindowExpirationContext extends WindowedContext {
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 3bdeb57ed88..1558bb74e0d 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.util.OutputBuilderSupplier;
 import org.apache.beam.sdk.util.OutputBuilderSuppliers;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -589,6 +590,11 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
       return element.getCurrentRecordOffset();
     }
 
+    @Override
+    public CausedByDrain causedByDrain() {
+      return CausedByDrain.NORMAL;
+    }
+
     @Override
     public PipelineOptions getPipelineOptions() {
       return options;
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
index 9f5322fb511..a22d3378cfd 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
@@ -47,6 +47,7 @@ import 
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.OutputBuilderSupplier;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.OutputBuilder;
 import org.apache.beam.sdk.values.PCollection;
@@ -513,6 +514,11 @@ public class SplittableParDoNaiveBounded {
         throw new UnsupportedOperationException();
       }
 
+      @Override
+      public CausedByDrain causedByDrain() {
+        return outerContext.causedByDrain();
+      }
+
       @Override
       public Object sideInput(String tagId) {
         PCollectionView<?> view = sideInputMapping.get(tagId);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java
index d443f008b7d..6e5bb3303c3 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
@@ -65,9 +66,17 @@ public abstract class Timer<K> {
       Collection<? extends BoundedWindow> windows,
       Instant fireTimestamp,
       Instant holdTimestamp,
-      PaneInfo paneInfo) {
+      PaneInfo paneInfo,
+      CausedByDrain causedByDrain) {
     return new AutoValue_Timer(
-        userKey, dynamicTimerTag, windows, false, fireTimestamp, 
holdTimestamp, paneInfo);
+        userKey,
+        dynamicTimerTag,
+        windows,
+        false,
+        fireTimestamp,
+        holdTimestamp,
+        paneInfo,
+        causedByDrain);
   }
 
   /**
@@ -76,7 +85,8 @@ public abstract class Timer<K> {
    */
   public static <K> Timer<K> cleared(
       K userKey, String dynamicTimerTag, Collection<? extends BoundedWindow> 
windows) {
-    return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null, 
null, null);
+    return new AutoValue_Timer(
+        userKey, dynamicTimerTag, windows, true, null, null, null, 
CausedByDrain.NORMAL);
   }
 
   /** Returns the key that the timer is set on. */
@@ -116,6 +126,8 @@ public abstract class Timer<K> {
    */
   public abstract @Nullable PaneInfo getPaneInfo();
 
+  public abstract @Nullable CausedByDrain causedByDrain();
+
   @Override
   public final boolean equals(@Nullable Object other) {
     if (!(other instanceof Timer)) {
@@ -186,6 +198,7 @@ public abstract class Timer<K> {
         InstantCoder.of().encode(timer.getFireTimestamp(), outStream);
         InstantCoder.of().encode(timer.getHoldTimestamp(), outStream);
         PaneInfoCoder.INSTANCE.encode(timer.getPaneInfo(), outStream);
+        // todo maybe similarly to windowedValue, should we propagate metadata 
with paneinfo bit
       }
     }
 
@@ -201,7 +214,15 @@ public abstract class Timer<K> {
       Instant fireTimestamp = InstantCoder.of().decode(inStream);
       Instant holdTimestamp = InstantCoder.of().decode(inStream);
       PaneInfo paneInfo = PaneInfoCoder.INSTANCE.decode(inStream);
-      return Timer.of(userKey, dynamicTimerTag, windows, fireTimestamp, 
holdTimestamp, paneInfo);
+      // todo maybe similarly to windowedValue, should we propagate metadata 
with paneinfo bit
+      return Timer.of(
+          userKey,
+          dynamicTimerTag,
+          windows,
+          fireTimestamp,
+          holdTimestamp,
+          paneInfo,
+          CausedByDrain.NORMAL);
     }
 
     @Override
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TimerTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TimerTest.java
index e96bf27c6ed..06f43350f5a 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TimerTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TimerTest.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -57,7 +58,8 @@ public class TimerTest {
             Collections.singleton(GlobalWindow.INSTANCE),
             FIRE_TIME,
             HOLD_TIME,
-            PaneInfo.NO_FIRING);
+            PaneInfo.NO_FIRING,
+            CausedByDrain.NORMAL);
     assertEquals("key", timer.getUserKey());
     assertEquals("tag", timer.getDynamicTimerTag());
     assertEquals(FIRE_TIME, timer.getFireTimestamp());
@@ -79,7 +81,8 @@ public class TimerTest {
             Collections.singleton(GlobalWindow.INSTANCE),
             FIRE_TIME,
             HOLD_TIME,
-            PaneInfo.NO_FIRING));
+            PaneInfo.NO_FIRING,
+            CausedByDrain.NORMAL));
     CoderProperties.structuralValueDecodeEncodeEqual(
         coder, Timer.cleared("key", "tag", 
Collections.singleton(GlobalWindow.INSTANCE)));
     CoderProperties.structuralValueConsistentWithEquals(
@@ -90,14 +93,16 @@ public class TimerTest {
             Collections.singleton(GlobalWindow.INSTANCE),
             FIRE_TIME,
             HOLD_TIME,
-            PaneInfo.NO_FIRING),
+            PaneInfo.NO_FIRING,
+            CausedByDrain.NORMAL),
         Timer.of(
             "key",
             "tag",
             Collections.singleton(GlobalWindow.INSTANCE),
             FIRE_TIME,
             HOLD_TIME,
-            PaneInfo.NO_FIRING));
+            PaneInfo.NO_FIRING,
+            CausedByDrain.NORMAL));
     CoderProperties.structuralValueConsistentWithEquals(
         coder,
         Timer.cleared("key", "tag", 
Collections.singleton(GlobalWindow.INSTANCE)),
@@ -115,7 +120,8 @@ public class TimerTest {
             Collections.singletonList(GlobalWindow.INSTANCE),
             FIRE_TIME,
             HOLD_TIME,
-            PaneInfo.NO_FIRING));
+            PaneInfo.NO_FIRING,
+            CausedByDrain.NORMAL));
     CoderProperties.coderDecodeEncodeEqual(
         coder, Timer.cleared("key", "tag", 
Collections.singletonList(GlobalWindow.INSTANCE)));
     CoderProperties.coderConsistentWithEquals(
@@ -126,14 +132,16 @@ public class TimerTest {
             Collections.singletonList(GlobalWindow.INSTANCE),
             FIRE_TIME,
             HOLD_TIME,
-            PaneInfo.NO_FIRING),
+            PaneInfo.NO_FIRING,
+            CausedByDrain.NORMAL),
         Timer.of(
             "key",
             "tag",
             Collections.singletonList(GlobalWindow.INSTANCE),
             FIRE_TIME,
             HOLD_TIME,
-            PaneInfo.NO_FIRING));
+            PaneInfo.NO_FIRING,
+            CausedByDrain.NORMAL));
     CoderProperties.coderConsistentWithEquals(
         coder,
         Timer.cleared("key", "tag", 
Collections.singletonList(GlobalWindow.INSTANCE)),
@@ -146,14 +154,16 @@ public class TimerTest {
             Collections.singletonList(GlobalWindow.INSTANCE),
             FIRE_TIME,
             HOLD_TIME,
-            PaneInfo.NO_FIRING),
+            PaneInfo.NO_FIRING,
+            CausedByDrain.NORMAL),
         Timer.of(
             "key",
             "tag",
             Collections.singletonList(GlobalWindow.INSTANCE),
             FIRE_TIME,
             HOLD_TIME,
-            PaneInfo.NO_FIRING));
+            PaneInfo.NO_FIRING,
+            CausedByDrain.NORMAL));
     CoderProperties.coderDeterministic(
         coder,
         Timer.cleared("key", "tag", 
Collections.singletonList(GlobalWindow.INSTANCE)),
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 1b7d75f6ec3..78aceeaab19 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -102,6 +102,7 @@ import 
org.apache.beam.sdk.util.construction.PTransformTranslation;
 import org.apache.beam.sdk.util.construction.ParDoTranslation;
 import org.apache.beam.sdk.util.construction.RehydratedComponents;
 import org.apache.beam.sdk.util.construction.Timer;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.OutputBuilder;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -247,6 +248,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
   private WindowedValue<InputT> currentElement;
 
   private Object currentKey;
+  private CausedByDrain causedByDrain;
 
   /**
    * Only valid during {@link
@@ -1200,6 +1202,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
     checkNotNull(timerBundleTracker);
     try {
       currentKey = timer.getUserKey();
+      causedByDrain = timer.causedByDrain();
+      // add drain
       Iterator<BoundedWindow> windowIterator =
           (Iterator<BoundedWindow>) timer.getWindows().iterator();
       while (windowIterator.hasNext()) {
@@ -1531,7 +1535,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
           Collections.singletonList(boundedWindow),
           scheduledTime,
           outputTimestamp,
-          paneInfo);
+          paneInfo,
+          causedByDrain);
     }
   }
 
@@ -1848,6 +1853,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
           currentElement.getTimestamp(),
           currentElement.getPaneInfo());
     }
+
+    @Override
+    public CausedByDrain causedByDrain() {
+      return currentElement.causedByDrain();
+    }
   }
 
   /** Provides arguments for a {@link DoFnInvoker} for a non-window observing 
method. */
@@ -1929,6 +1939,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
       }
       outputTo(consumer, WindowedValues.of(output, timestamp, windows, 
paneInfo));
     }
+
+    @Override
+    public CausedByDrain causedByDrain() {
+      return currentElement.causedByDrain();
+    }
   }
 
   /** Provides base arguments for a {@link DoFnInvoker} for a non-window 
observing method. */
@@ -2278,6 +2293,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
             .setWindow(currentWindow)
             .setTimestamp(currentTimer.getHoldTimestamp())
             .setPaneInfo(currentTimer.getPaneInfo())
+            .setCausedByDrain(causedByDrain)
             .setReceiver(
                 windowedValue -> {
                   
checkOnWindowExpirationTimestamp(windowedValue.getTimestamp());
@@ -2395,6 +2411,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
                     .setValue(value)
                     .setTimestamp(currentTimer.getHoldTimestamp())
                     .setWindow(currentWindow)
+                    .setCausedByDrain(causedByDrain)
                     .setReceiver(
                         windowedValue ->
                             context.outputWindowedValue(
@@ -2432,6 +2449,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
                     .setValue(value)
                     .setTimestamp(currentTimer.getHoldTimestamp())
                     .setWindow(currentWindow)
+                    .setCausedByDrain(causedByDrain)
                     .setReceiver(
                         windowedValue ->
                             context.outputWindowedValue(
@@ -2469,6 +2487,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
                     .setValue(value)
                     .setTimestamp(currentTimer.getHoldTimestamp())
                     .setWindow(currentWindow)
+                    .setCausedByDrain(causedByDrain)
                     .setReceiver(
                         windowedValue ->
                             context.outputWindowedValue(
@@ -2546,6 +2565,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
         return currentWindow;
       }
 
+      @Override
+      public CausedByDrain causedByDrain() {
+        return causedByDrain;
+      }
+
       @Override
       public OutputBuilder<OutputT> builder(OutputT value) {
         return WindowedValues.<OutputT>builder()
@@ -2553,6 +2577,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
             .setTimestamp(currentTimer.getHoldTimestamp())
             .setWindow(currentWindow)
             .setPaneInfo(currentTimer.getPaneInfo())
+            .setCausedByDrain(causedByDrain)
             .setReceiver(
                 windowedValue -> {
                   checkTimerTimestamp(windowedValue.getTimestamp());
@@ -2734,6 +2759,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
                     .setValue(value)
                     .setTimestamp(currentTimer.getHoldTimestamp())
                     .setWindow(currentWindow)
+                    .setCausedByDrain(causedByDrain)
                     .setPaneInfo(currentTimer.getPaneInfo())
                     .setReceiver(
                         windowedValue ->
@@ -2772,6 +2798,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
                     .setTimestamp(currentTimer.getHoldTimestamp())
                     .setWindow(currentWindow)
                     .setPaneInfo(currentTimer.getPaneInfo())
+                    .setCausedByDrain(causedByDrain)
                     .setReceiver(
                         windowedValue ->
                             context.outputWindowedValue(
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index ef19b7c1880..50a2fec0b5a 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -123,6 +123,7 @@ import 
org.apache.beam.sdk.util.construction.RehydratedComponents;
 import org.apache.beam.sdk.util.construction.SdkComponents;
 import org.apache.beam.sdk.util.construction.graph.ProtoOverrides;
 import org.apache.beam.sdk.util.construction.graph.SplittableParDoExpander;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -1161,7 +1162,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
           Collections.singletonList(GlobalWindow.INSTANCE),
           fireTimestamp,
           holdTimestamp,
-          PaneInfo.NO_FIRING);
+          PaneInfo.NO_FIRING,
+          CausedByDrain.NORMAL);
     }
 
     private <T> WindowedValue<T> valueInWindows(
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java
index 13a0b105ec3..756f17fdfa3 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java
@@ -64,6 +64,7 @@ import 
org.apache.beam.sdk.util.construction.PipelineTranslation;
 import org.apache.beam.sdk.util.construction.SdkComponents;
 import org.apache.beam.sdk.util.construction.graph.ProtoOverrides;
 import org.apache.beam.sdk.util.construction.graph.SplittableParDoExpander;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -123,7 +124,8 @@ public class SplittablePairWithRestrictionDoFnRunnerTest 
implements Serializable
           Collections.singletonList(GlobalWindow.INSTANCE),
           fireTimestamp,
           holdTimestamp,
-          PaneInfo.NO_FIRING);
+          PaneInfo.NO_FIRING,
+          CausedByDrain.NORMAL);
     }
 
     private <T> WindowedValue<T> valueInWindows(
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 52b6c87a5c0..d86ef653dca 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -145,6 +145,7 @@ import org.apache.beam.sdk.util.construction.ModelCoders;
 import org.apache.beam.sdk.util.construction.PTransformTranslation;
 import org.apache.beam.sdk.util.construction.ParDoTranslation;
 import org.apache.beam.sdk.util.construction.Timer;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
@@ -1129,7 +1130,8 @@ public class ProcessBundleHandlerTest {
                 Collections.singletonList(GlobalWindow.INSTANCE),
                 Instant.ofEpochMilli(1L),
                 Instant.ofEpochMilli(1L),
-                PaneInfo.ON_TIME_AND_ONLY_FIRING),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING,
+                CausedByDrain.NORMAL),
             encodedTimer);
     Elements elements =
         Elements.newBuilder()
@@ -1248,7 +1250,8 @@ public class ProcessBundleHandlerTest {
                 Collections.singletonList(GlobalWindow.INSTANCE),
                 Instant.ofEpochMilli(1L),
                 Instant.ofEpochMilli(1L),
-                PaneInfo.ON_TIME_AND_ONLY_FIRING),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING,
+                CausedByDrain.NORMAL),
             encodedTimer);
 
     assertThrows(
@@ -1342,7 +1345,8 @@ public class ProcessBundleHandlerTest {
                 Collections.singletonList(GlobalWindow.INSTANCE),
                 Instant.ofEpochMilli(1L),
                 Instant.ofEpochMilli(1L),
-                PaneInfo.ON_TIME_AND_ONLY_FIRING),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING,
+                CausedByDrain.NORMAL),
             encodedTimer);
 
     InstructionResponse.Builder builder =
@@ -1961,7 +1965,8 @@ public class ProcessBundleHandlerTest {
                 Collections.singletonList(GlobalWindow.INSTANCE),
                 Instant.ofEpochMilli(1L),
                 Instant.ofEpochMilli(1L),
-                PaneInfo.ON_TIME_AND_ONLY_FIRING),
+                PaneInfo.ON_TIME_AND_ONLY_FIRING,
+                CausedByDrain.NORMAL),
             encodedTimer);
     Elements elements =
         Elements.newBuilder()


Reply via email to