This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5a6f763da54 [Drain] OnTimer - propagate caused by drain bit up to
DoFnRunner (#37012)
5a6f763da54 is described below
commit 5a6f763da545a72416a4809d7d53e0927526233e
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Fri Feb 27 17:09:24 2026 +0100
[Drain] OnTimer - propagate caused by drain bit up to DoFnRunner (#37012)
* add causedByDrain to DoFnRunner.onTimer interface and all
implementations. Mostly passthrough.
---
.../org/apache/beam/runners/core/DoFnRunner.java | 4 ++-
.../runners/core/LateDataDroppingDoFnRunner.java | 7 ++++--
...TimeBoundedSplittableProcessElementInvoker.java | 6 +++++
.../apache/beam/runners/core/ProcessFnRunner.java | 4 ++-
.../runners/core/PushbackSideInputDoFnRunner.java | 4 ++-
.../apache/beam/runners/core/SimpleDoFnRunner.java | 25 ++++++++++++++++---
.../core/SimplePushbackSideInputDoFnRunner.java | 7 ++++--
.../core/SplittableParDoViaKeyedWorkItems.java | 2 +-
.../beam/runners/core/StatefulDoFnRunner.java | 16 +++++++++---
.../beam/runners/core/SimpleDoFnRunnerTest.java | 17 ++++++++-----
.../SimplePushbackSideInputDoFnRunnerTest.java | 20 +++++++++++----
.../beam/runners/core/StatefulDoFnRunnerTest.java | 3 ++-
.../apache/beam/runners/direct/ParDoEvaluator.java | 3 ++-
.../wrappers/streaming/DoFnOperator.java | 3 ++-
.../functions/FlinkStatefulDoFnFunction.java | 3 ++-
.../wrappers/streaming/DoFnOperator.java | 3 ++-
.../flink/metrics/DoFnRunnerWithMetricsUpdate.java | 14 +++++++++--
.../functions/FlinkStatefulDoFnFunction.java | 3 ++-
.../wrappers/streaming/DoFnOperator.java | 12 +++++----
.../streaming/ExecutableStageDoFnOperator.java | 7 ++++--
.../streaming/stableinput/BufferedElements.java | 10 +++++++-
.../streaming/stableinput/BufferingDoFnRunner.java | 4 ++-
.../wrappers/streaming/DoFnOperatorTest.java | 4 ++-
.../streaming/ExecutableStageDoFnOperatorTest.java | 13 +++++++---
.../dataflow/worker/DataflowProcessFnRunner.java | 4 ++-
.../dataflow/worker/GroupAlsoByWindowFnRunner.java | 4 ++-
.../runners/dataflow/worker/SimpleParDoFn.java | 3 ++-
.../StreamingKeyedWorkItemSideInputDoFnRunner.java | 4 ++-
.../worker/StreamingSideInputDoFnRunner.java | 4 ++-
.../fnexecution/control/TimerReceiverFactory.java | 3 ++-
.../translation/PipelineTranslatorUtils.java | 3 ++-
.../fnexecution/control/RemoteExecutionTest.java | 4 ++-
.../runners/fnexecution/wire/CommonCoderTest.java | 4 ++-
.../runners/jet/processors/StatefulParDoP.java | 3 ++-
.../samza/metrics/DoFnRunnerWithMetrics.java | 13 ++++++++--
.../runners/samza/runtime/AsyncDoFnRunner.java | 7 ++++--
.../apache/beam/runners/samza/runtime/DoFnOp.java | 3 ++-
.../runtime/DoFnRunnerWithKeyedInternals.java | 13 ++++++++--
.../beam/runners/samza/runtime/PortableDoFnOp.java | 3 ++-
.../runners/samza/runtime/SamzaDoFnRunners.java | 7 ++++--
.../translation/batch/DoFnRunnerFactory.java | 4 ++-
.../translation/batch/DoFnRunnerWithMetrics.java | 14 +++++++++--
.../spark/translation/AbstractInOutIterator.java | 3 ++-
.../spark/translation/DoFnRunnerWithMetrics.java | 14 +++++++++--
.../translation/AbstractInOutIteratorTest.java | 9 +++++--
.../translation/SparkInputDataProcessorTest.java | 4 ++-
.../java/org/apache/beam/sdk/transforms/DoFn.java | 7 ++++++
.../org/apache/beam/sdk/transforms/DoFnTester.java | 6 +++++
.../construction/SplittableParDoNaiveBounded.java | 6 +++++
.../apache/beam/sdk/util/construction/Timer.java | 29 +++++++++++++++++++---
.../beam/sdk/util/construction/TimerTest.java | 28 ++++++++++++++-------
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 29 +++++++++++++++++++++-
.../beam/fn/harness/FnApiDoFnRunnerTest.java | 4 ++-
...plittablePairWithRestrictionDoFnRunnerTest.java | 4 ++-
.../harness/control/ProcessBundleHandlerTest.java | 13 +++++++---
55 files changed, 353 insertions(+), 97 deletions(-)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index dce4fcaa68b..b07b7eca7d8 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.WindowedValue;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
@@ -46,7 +47,8 @@ public interface DoFnRunner<InputT extends @Nullable Object,
OutputT extends @Nu
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain);
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain);
/**
* Calls a {@link DoFn DoFn's} {@link DoFn.FinishBundle @FinishBundle}
method and performs
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index f89c3f36db3..f5587b46598 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
@@ -89,8 +90,10 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT,
W extends BoundedWin
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
- doFnRunner.onTimer(timerId, timerFamilyId, key, window, timestamp,
outputTimestamp, timeDomain);
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
+ doFnRunner.onTimer(
+ timerId, timerFamilyId, key, window, timestamp, outputTimestamp,
timeDomain, causedByDrain);
}
@Override
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 6f9f15b1358..5f37067233a 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -47,6 +47,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.OutputBuilderSuppliers;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
@@ -396,6 +397,11 @@ public class
OutputAndTimeBoundedSplittableProcessElementInvoker<
return element.getRecordOffset();
}
+ @Override
+ public CausedByDrain causedByDrain() {
+ return element.causedByDrain();
+ }
+
@Override
public PipelineOptions getPipelineOptions() {
return pipelineOptions;
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
index b8a4dcf7cb7..e678ad81cf1 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowedValue;
@@ -90,7 +91,8 @@ public class ProcessFnRunner<InputT, OutputT, RestrictionT>
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
throw new UnsupportedOperationException("User timers unsupported in
ProcessFn");
}
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 61feaffad91..fc78585422c 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.WindowedValue;
import org.joda.time.Instant;
@@ -50,7 +51,8 @@ public interface PushbackSideInputDoFnRunner<InputT, OutputT>
{
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain);
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain);
/** Calls the underlying {@link DoFn.OnWindowExpiration} method. */
<KeyT> void onWindowExpiration(BoundedWindow window, Instant
outputTimestamp, KeyT key);
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 0fd63556b9c..05186ba5adb 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.util.OutputBuilderSuppliers;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
@@ -200,11 +201,13 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
Preconditions.checkNotNull(outputTimestamp, "outputTimestamp");
OnTimerArgumentProvider<KeyT> argumentProvider =
- new OnTimerArgumentProvider<>(timerId, key, window, timestamp,
outputTimestamp, timeDomain);
+ new OnTimerArgumentProvider<>(
+ timerId, key, window, timestamp, outputTimestamp, timeDomain,
causedByDrain);
invoker.invokeOnTimer(timerId, timerFamilyId, argumentProvider);
}
@@ -399,6 +402,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
return elem.getValue();
}
+ @Override
+ public CausedByDrain causedByDrain() {
+ return elem.causedByDrain();
+ }
+
@Override
public <T> T sideInput(PCollectionView<T> view) {
checkNotNull(view, "View passed to sideInput cannot be null");
@@ -702,6 +710,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
private final TimeDomain timeDomain;
private final String timerId;
private final KeyT key;
+ private final CausedByDrain causedByDrain;
private final OutputBuilderSupplier builderSupplier;
/** Lazily initialized; should only be accessed via {@link
#getNamespace()}. */
@@ -727,7 +736,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
BoundedWindow window,
Instant fireTimestamp,
Instant timestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
fn.super();
this.timerId = timerId;
this.window = window;
@@ -735,13 +745,15 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
this.timestamp = timestamp;
this.timeDomain = timeDomain;
this.key = key;
+ this.causedByDrain = causedByDrain;
this.builderSupplier =
OutputBuilderSuppliers.supplierForElement(
WindowedValues.builder()
.setValue(null)
.setTimestamp(timestamp)
.setWindow(window)
- .setPaneInfo(PaneInfo.NO_FIRING));
+ .setPaneInfo(PaneInfo.NO_FIRING)
+ .setCausedByDrain(causedByDrain));
}
@Override
@@ -749,6 +761,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
return timestamp;
}
+ @Override
+ public CausedByDrain causedByDrain() {
+ return causedByDrain;
+ }
+
@Override
public Instant fireTimestamp() {
return fireTimestamp;
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
index 2be8071f983..fe1c0fd45c9 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.WindowedValue;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -115,8 +116,10 @@ public class SimplePushbackSideInputDoFnRunner<InputT,
OutputT>
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
- underlying.onTimer(timerId, timerFamilyId, key, window, timestamp,
outputTimestamp, timeDomain);
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
+ underlying.onTimer(
+ timerId, timerFamilyId, key, window, timestamp, outputTimestamp,
timeDomain, causedByDrain);
}
@Override
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index 8f8c69b5e97..72219083c06 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -605,7 +605,7 @@ public class SplittableParDoViaKeyedWorkItems {
wakeupTime,
wakeupTime,
TimeDomain.PROCESSING_TIME,
- CausedByDrain.NORMAL));
+ timer == null ? CausedByDrain.NORMAL : timer.causedByDrain()));
}
private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapOptionsAsSetup(
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 52bbb3a306a..e562a4067d2 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.sdk.values.WindowingStrategy;
@@ -48,7 +49,8 @@ import org.joda.time.Instant;
/**
* A customized {@link DoFnRunner} that handles late data dropping and garbage
collection for
* stateful {@link DoFn DoFns}. It registers a GC timer in {@link
#processElement(WindowedValue)}
- * and does cleanup in {@link #onTimer(String, String, BoundedWindow, Instant,
Instant, TimeDomain)}
+ * and does cleanup in {@link #onTimer(String, String, BoundedWindow, Instant,
Instant, TimeDomain,
+ * boolean)}
*
* @param <InputT> the type of the {@link DoFn} (main) input elements
* @param <OutputT> the type of the {@link DoFn} (main) output elements
@@ -208,7 +210,8 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends
BoundedWindow>
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
if (timerId.equals(SORT_FLUSH_TIMER)) {
onSortFlushTimer(window,
stepContext.timerInternals().currentInputWatermarkTime());
@@ -232,7 +235,14 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends
BoundedWindow>
stepContext.timerInternals().currentInputWatermarkTime());
} else {
doFnRunner.onTimer(
- timerId, timerFamilyId, key, window, timestamp, outputTimestamp,
timeDomain);
+ timerId,
+ timerFamilyId,
+ key,
+ window,
+ timestamp,
+ outputTimestamp,
+ timeDomain,
+ causedByDrain);
}
}
}
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 92385b1c822..49c9277314b 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -141,7 +141,8 @@ public class SimpleDoFnRunnerTest {
GlobalWindow.INSTANCE,
new Instant(0),
new Instant(0),
- TimeDomain.EVENT_TIME);
+ TimeDomain.EVENT_TIME,
+ CausedByDrain.NORMAL);
}
/**
@@ -266,7 +267,8 @@ public class SimpleDoFnRunnerTest {
GlobalWindow.INSTANCE,
currentTime.plus(offset),
currentTime.plus(offset),
- TimeDomain.EVENT_TIME);
+ TimeDomain.EVENT_TIME,
+ CausedByDrain.CAUSED_BY_DRAIN);
assertThat(
fn.onTimerInvocations,
@@ -277,7 +279,8 @@ public class SimpleDoFnRunnerTest {
StateNamespaces.window(windowFn.windowCoder(),
GlobalWindow.INSTANCE),
currentTime.plus(offset),
currentTime.plus(offset),
- TimeDomain.EVENT_TIME)));
+ TimeDomain.EVENT_TIME,
+ CausedByDrain.CAUSED_BY_DRAIN)));
}
/**
@@ -593,7 +596,8 @@ public class SimpleDoFnRunnerTest {
GlobalWindow.INSTANCE,
new Instant(0),
new Instant(0),
- TimeDomain.EVENT_TIME);
+ TimeDomain.EVENT_TIME,
+ CausedByDrain.NORMAL);
}
@Test
@@ -625,7 +629,8 @@ public class SimpleDoFnRunnerTest {
GlobalWindow.INSTANCE,
new Instant(0),
new Instant(0),
- TimeDomain.EVENT_TIME);
+ TimeDomain.EVENT_TIME,
+ CausedByDrain.NORMAL);
});
assertThat(exception.getCause(), isA(IllegalArgumentException.class));
@@ -703,7 +708,7 @@ public class SimpleDoFnRunnerTest {
context.fireTimestamp(),
context.timestamp(),
context.timeDomain(),
- CausedByDrain.NORMAL));
+ context.causedByDrain()));
}
}
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
index e32065f6ef5..9703c3c80bd 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
@@ -308,7 +308,15 @@ public class SimplePushbackSideInputDoFnRunnerTest {
// Mocking is not easily compatible with annotation analysis, so we
manually record
// the method call.
- runner.onTimer(timerId, "", null, window, timestamp, timestamp,
TimeDomain.EVENT_TIME);
+ runner.onTimer(
+ timerId,
+ "",
+ null,
+ window,
+ timestamp,
+ timestamp,
+ TimeDomain.EVENT_TIME,
+ CausedByDrain.CAUSED_BY_DRAIN);
assertThat(
underlying.firedTimers,
@@ -319,7 +327,7 @@ public class SimplePushbackSideInputDoFnRunnerTest {
timestamp,
timestamp,
TimeDomain.EVENT_TIME,
- CausedByDrain.NORMAL)));
+ CausedByDrain.CAUSED_BY_DRAIN)));
}
private static class TestDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, OutputT> {
@@ -353,7 +361,8 @@ public class SimplePushbackSideInputDoFnRunnerTest {
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
firedTimers.add(
TimerData.of(
timerId,
@@ -362,7 +371,7 @@ public class SimplePushbackSideInputDoFnRunnerTest {
timestamp,
outputTimestamp,
timeDomain,
- CausedByDrain.NORMAL));
+ causedByDrain));
}
@Override
@@ -510,7 +519,8 @@ public class SimplePushbackSideInputDoFnRunnerTest {
window,
timer.getTimestamp(),
timer.getOutputTimestamp(),
- timer.getDomain());
+ timer.getDomain(),
+ timer.causedByDrain());
}
}
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 4ed4f09fe62..df3aceff4a9 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -468,7 +468,8 @@ public class StatefulDoFnRunnerTest {
window,
timer.getTimestamp(),
timer.getOutputTimestamp(),
- timer.getDomain());
+ timer.getDomain(),
+ timer.causedByDrain());
}
}
diff --git
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 6e7763e4b79..e42ae91d097 100644
---
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -257,7 +257,8 @@ class ParDoEvaluator<InputT> implements
TransformEvaluator<InputT> {
window,
timer.getTimestamp(),
timer.getOutputTimestamp(),
- timer.getDomain());
+ timer.getDomain(),
+ timer.causedByDrain());
} catch (Exception e) {
throw UserCodeException.wrap(e);
}
diff --git
a/runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 43668e0298e..dca2f3075aa 100644
---
a/runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++
b/runners/flink/1.20/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -1200,7 +1200,8 @@ public class DoFnOperator<PreInputT, InputT, OutputT>
window,
timerData.getTimestamp(),
timerData.getOutputTimestamp(),
- timerData.getDomain());
+ timerData.getDomain(),
+ timerData.causedByDrain());
}
@SuppressWarnings("unchecked")
diff --git
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 2a208d30a87..7e0b0be28d4 100644
---
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -250,7 +250,8 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
window,
timer.getTimestamp(),
timer.getOutputTimestamp(),
- timer.getDomain());
+ timer.getDomain(),
+ timer.causedByDrain());
}
@Override
diff --git
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index f5ce658de4f..f83e719ed0b 100644
---
a/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++
b/runners/flink/2.0/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -1200,7 +1200,8 @@ public class DoFnOperator<PreInputT, InputT, OutputT>
window,
timerData.getTimestamp(),
timerData.getOutputTimestamp(),
- timerData.getDomain());
+ timerData.getDomain(),
+ timerData.causedByDrain());
}
@SuppressWarnings("unchecked")
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
index f1ec36564a4..56e077253ae 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.WindowedValue;
import org.joda.time.Instant;
@@ -73,10 +74,19 @@ public class DoFnRunnerWithMetricsUpdate<InputT, OutputT>
implements DoFnRunner<
final BoundedWindow window,
final Instant timestamp,
final Instant outputTimestamp,
- final TimeDomain timeDomain) {
+ final TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
try (Closeable ignored =
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName)))
{
- delegate.onTimer(timerId, timerFamilyId, key, window, timestamp,
outputTimestamp, timeDomain);
+ delegate.onTimer(
+ timerId,
+ timerFamilyId,
+ key,
+ window,
+ timestamp,
+ outputTimestamp,
+ timeDomain,
+ causedByDrain);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 3aa5a0802b3..12c59569f43 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -250,7 +250,8 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
window,
timer.getTimestamp(),
timer.getOutputTimestamp(),
- timer.getDomain());
+ timer.getDomain(),
+ timer.causedByDrain());
}
@Override
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 13414682f8e..2bf0d40cd5f 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -974,10 +974,10 @@ public class DoFnOperator<PreInputT, InputT, OutputT>
* of finishing a bundle in snapshot() first.
*
* <p>In order to avoid having {@link
DoFnRunner#processElement(WindowedValue)} or {@link
- * DoFnRunner#onTimer(String, String, Object, BoundedWindow, Instant,
Instant, TimeDomain)} not
- * between StartBundle and FinishBundle, this method needs to be called in
each processElement and
- * each processWatermark and onProcessingTime. Do not need to call in
onEventTime, because it has
- * been guaranteed in the processWatermark.
+ * DoFnRunner#onTimer(String, String, Object, BoundedWindow, Instant,
Instant, TimeDomain,
+ * CausedByDrain)} not between StartBundle and FinishBundle, this method
needs to be called in
+ * each processElement and each processWatermark and onProcessingTime. Do
not need to call in
+ * onEventTime, because it has been guaranteed in the processWatermark.
*/
private void checkInvokeStartBundle() {
if (!bundleStarted) {
@@ -1195,6 +1195,7 @@ public class DoFnOperator<PreInputT, InputT, OutputT>
BoundedWindow window = ((WindowNamespace) namespace).getWindow();
timerInternals.onFiredOrDeletedTimer(timerData);
+ // hej
pushbackDoFnRunner.onTimer(
timerData.getTimerId(),
timerData.getTimerFamilyId(),
@@ -1202,7 +1203,8 @@ public class DoFnOperator<PreInputT, InputT, OutputT>
window,
timerData.getTimestamp(),
timerData.getOutputTimestamp(),
- timerData.getDomain());
+ timerData.getDomain(),
+ timerData.causedByDrain());
}
@SuppressWarnings("unchecked")
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index d5a7ff035ef..2134fa869b4 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -101,6 +101,7 @@ import
org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.Timer;
import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
import org.apache.beam.sdk.util.construction.graph.UserStateReference;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -1001,7 +1002,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
Object timerKey = keyForTimer.get();
Preconditions.checkNotNull(timerKey, "Key for timer needs to be set
before calling onTimer");
Preconditions.checkNotNull(remoteBundle, "Call to onTimer outside of a
bundle");
@@ -1034,7 +1036,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
timestamp,
outputTimestamp,
// TODO: Support propagating the PaneInfo through.
- PaneInfo.NO_FIRING);
+ PaneInfo.NO_FIRING,
+ causedByDrain);
try {
timerReceiver.accept(timerValue);
} catch (Exception e) {
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java
index 0c2ba87c4ba..d0a1b12e01e 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.WindowedValue;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -104,7 +105,14 @@ class BufferedElements {
@Override
public void processWith(DoFnRunner doFnRunner) {
doFnRunner.onTimer(
- timerId, timerFamilyId, key, window, timestamp, outputTimestamp,
timeDomain);
+ timerId,
+ timerFamilyId,
+ key,
+ window,
+ timestamp,
+ outputTimestamp,
+ timeDomain,
+ CausedByDrain.NORMAL);
}
@Override
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
index 10a3182f90f..73b20238ef0 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
@@ -32,6 +32,7 @@ import org.apache.beam.runners.flink.translation.utils.Locker;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.WindowedValue;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
@@ -234,7 +235,8 @@ public class BufferingDoFnRunner<InputT, OutputT>
implements DoFnRunner<InputT,
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
minBufferedElementTimestamp =
Math.min(outputTimestamp.getMillis(), minBufferedElementTimestamp);
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index c2556d7229b..5c4975ffab0 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -81,6 +81,7 @@ import
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -507,7 +508,8 @@ public class DoFnOperatorTest {
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
if ("cleanup".equals(timerId)) {
holdState.clear();
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 982871e59f1..01c9d25f1bf 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -98,6 +98,7 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.NoopLock;
import org.apache.beam.sdk.util.construction.Timer;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowedValue;
@@ -534,14 +535,16 @@ public class ExecutableStageDoFnOperatorTest {
windowedValue.getWindows(),
timestamp,
timestamp,
- PaneInfo.NO_FIRING),
+ PaneInfo.NO_FIRING,
+ CausedByDrain.NORMAL),
TimerInternals.TimerData.of(
"",
TimerReceiverFactory.encodeToTimerDataTimerId("transform",
timerId),
StateNamespaces.window(IntervalWindow.getCoder(),
intervalWindow),
timestamp,
timestamp,
- TimeDomain.EVENT_TIME));
+ TimeDomain.EVENT_TIME,
+ CausedByDrain.NORMAL));
timerConsumer.accept("timer", timerTarget);
timerConsumer.accept("timer2", timerTarget2);
@@ -867,7 +870,8 @@ public class ExecutableStageDoFnOperatorTest {
stateNamespace,
window.maxTimestamp(),
window.maxTimestamp(),
- TimeDomain.EVENT_TIME);
+ TimeDomain.EVENT_TIME,
+ CausedByDrain.NORMAL);
operator.setTimer(
Timer.of(
windowedValue.getValue().getKey(),
@@ -875,7 +879,8 @@ public class ExecutableStageDoFnOperatorTest {
windowedValue.getWindows(),
window.maxTimestamp(),
window.maxTimestamp(),
- PaneInfo.NO_FIRING),
+ PaneInfo.NO_FIRING,
+ CausedByDrain.NORMAL),
userTimer2);
assertThat(testHarness.numEventTimeTimers(), is(1));
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
index 741716b8284..ec1fcd6c843 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
@@ -117,7 +118,8 @@ class DataflowProcessFnRunner<InputT, OutputT, RestrictionT>
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
throw new UnsupportedOperationException("Unsupported for ProcessFn");
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
index 1909a73dc8b..37e04316e23 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValueReceiver;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.WindowedValue;
import org.joda.time.Instant;
@@ -81,7 +82,8 @@ public class GroupAlsoByWindowFnRunner<InputT, OutputT>
implements DoFnRunner<In
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
throw new UnsupportedOperationException(
String.format("Timers are not supported by %s",
GroupAlsoByWindowFn.class.getSimpleName()));
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
index 00c8192b1e4..434d46c20a5 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
@@ -374,7 +374,8 @@ public class SimpleParDoFn<InputT, OutputT> implements
ParDoFn {
window,
timer.getTimestamp(),
timer.getOutputTimestamp(),
- timer.getDomain());
+ timer.getDomain(),
+ timer.causedByDrain());
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
index e5d8a18be76..0b9ccd1f37c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.WindowedValue;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
@@ -142,7 +143,8 @@ public class StreamingKeyedWorkItemSideInputDoFnRunner<K,
InputT, OutputT, W ext
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
throw new UnsupportedOperationException(
"Attempt to deliver a timer to a DoFn, but timers are not supported in
Dataflow.");
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
index b5b723adb2b..3b7891c5378 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.WindowedValue;
import org.joda.time.Instant;
@@ -84,7 +85,8 @@ public class StreamingSideInputDoFnRunner<InputT, OutputT, W
extends BoundedWind
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
throw new UnsupportedOperationException(
"Attempt to deliver a timer to a DoFn, but timers are not supported in
Dataflow.");
}
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
index d99a733a4eb..952431411b8 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/TimerReceiverFactory.java
@@ -94,7 +94,8 @@ public class TimerReceiverFactory {
namespace,
timer.getClearBit() ? BoundedWindow.TIMESTAMP_MAX_VALUE :
timer.getFireTimestamp(),
timer.getClearBit() ? BoundedWindow.TIMESTAMP_MAX_VALUE :
timer.getHoldTimestamp(),
- timerSpec.getTimerSpec().getTimeDomain());
+ timerSpec.getTimerSpec().getTimeDomain(),
+ timer.causedByDrain());
timerDataConsumer.accept(timer, timerData);
}
};
diff --git
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index 5b7880ad56e..d7d4c8cac44 100644
---
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
+++
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -156,7 +156,8 @@ public final class PipelineTranslatorUtils {
Collections.singletonList(window),
timestamp,
outputTimestamp,
- PaneInfo.NO_FIRING);
+ PaneInfo.NO_FIRING,
+ timer.causedByDrain());
KV<String, String> transformAndTimerId =
TimerReceiverFactory.decodeTimerDataTimerId(timer.getTimerFamilyId());
FnDataReceiver<Timer> fnTimerReceiver =
timerReceivers.get(transformAndTimerId);
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 556cc7993ae..996c206a7e9 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -137,6 +137,7 @@ import
org.apache.beam.sdk.util.construction.graph.GreedyPipelineFuser;
import org.apache.beam.sdk.util.construction.graph.PipelineNode.PTransformNode;
import org.apache.beam.sdk.util.construction.graph.ProtoOverrides;
import org.apache.beam.sdk.util.construction.graph.SplittableParDoExpander;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -2254,6 +2255,7 @@ public class RemoteExecutionTest implements Serializable {
Collections.singletonList(GlobalWindow.INSTANCE),
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(fireTimestamp)),
BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(holdTimestamp)),
- PaneInfo.NO_FIRING);
+ PaneInfo.NO_FIRING,
+ CausedByDrain.NORMAL);
}
}
diff --git
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java
index eccf1e66434..a79b0996cbb 100644
---
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java
+++
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/wire/CommonCoderTest.java
@@ -84,6 +84,7 @@ import
org.apache.beam.sdk.util.construction.CoderTranslation.TranslationContext
import org.apache.beam.sdk.util.construction.CoderTranslator;
import org.apache.beam.sdk.util.construction.ModelCoderRegistrar;
import org.apache.beam.sdk.util.construction.Timer;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.WindowedValues;
@@ -326,7 +327,8 @@ public class CommonCoderTest {
windows,
new Instant(((Number) kvMap.get("fireTimestamp")).longValue()),
new Instant(((Number) kvMap.get("holdTimestamp")).longValue()),
- paneInfo);
+ paneInfo,
+ CausedByDrain.NORMAL); // todo - add tests once causedByDrain is
added to proto
} else if (s.equals(getUrn(StandardCoders.Enum.INTERVAL_WINDOW))) {
Map<String, Object> kvMap = (Map<String, Object>) value;
Instant end = new Instant(((Number) kvMap.get("end")).longValue());
diff --git
a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
index f696873ec5c..ed085e5e56b 100644
---
a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
+++
b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
@@ -105,7 +105,8 @@ public class StatefulParDoP<OutputT>
window,
timer.getTimestamp(),
timer.getOutputTimestamp(),
- timer.getDomain());
+ timer.getDomain(),
+ timer.causedByDrain());
}
@Override
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java
index b815649a765..7bec91abb34 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java
@@ -21,6 +21,7 @@ import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.WindowedValue;
import org.joda.time.Instant;
@@ -61,11 +62,19 @@ public class DoFnRunnerWithMetrics<InT, OutT> implements
DoFnRunner<InT, OutT> {
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
withMetrics(
() ->
underlying.onTimer(
- timerId, timerFamilyId, key, window, timestamp,
outputTimestamp, timeDomain),
+ timerId,
+ timerFamilyId,
+ key,
+ window,
+ timestamp,
+ outputTimestamp,
+ timeDomain,
+ causedByDrain),
false);
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunner.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunner.java
index d07a9bda78c..2485ac2d552 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunner.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/AsyncDoFnRunner.java
@@ -29,6 +29,7 @@ import org.apache.beam.runners.samza.SamzaPipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowedValue;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -154,8 +155,10 @@ public class AsyncDoFnRunner<InT, OutT> implements
DoFnRunner<InT, OutT> {
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
- underlying.onTimer(timerId, timerFamilyId, key, window, timestamp,
outputTimestamp, timeDomain);
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
+ underlying.onTimer(
+ timerId, timerFamilyId, key, window, timestamp, outputTimestamp,
timeDomain, causedByDrain);
}
@Override
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index 2f27e31d05d..bc87e2460ec 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -437,7 +437,8 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT,
OutT, Void> {
window,
timer.getTimestamp(),
timer.getOutputTimestamp(),
- timer.getDomain());
+ timer.getDomain(),
+ timer.causedByDrain());
}
// todo: should this go through bundle manager to start and finish the
bundle?
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java
index 34e3405660c..84cf5b26c50 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java
@@ -22,6 +22,7 @@ import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowedValue;
import org.joda.time.Instant;
@@ -66,14 +67,22 @@ public class DoFnRunnerWithKeyedInternals<InputT, OutputT>
implements DoFnRunner
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
// Note: wrap with KV.of(key, null) as a special use case of
setKeyedInternals() to set key
// directly.
setKeyedInternals(KV.of(key, null));
try {
underlying.onTimer(
- timerId, timerFamilyId, key, window, timestamp, outputTimestamp,
timeDomain);
+ timerId,
+ timerFamilyId,
+ key,
+ window,
+ timestamp,
+ outputTimestamp,
+ timeDomain,
+ causedByDrain);
} finally {
clearKeyedInternals();
}
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableDoFnOp.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableDoFnOp.java
index 743a42d1479..468e4b9aa8d 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableDoFnOp.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/PortableDoFnOp.java
@@ -425,7 +425,8 @@ public class PortableDoFnOp<InT, FnOutT, OutT> implements
Op<InT, OutT, Void> {
window,
timer.getTimestamp(),
timer.getOutputTimestamp(),
- timer.getDomain());
+ timer.getDomain(),
+ timer.causedByDrain());
}
// todo: should this go through bundle manager to start and finish the
bundle?
diff --git
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index 7129ba9145e..a2ec88a4341 100644
---
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -61,6 +61,7 @@ import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
import org.apache.beam.sdk.util.construction.Timer;
import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
import org.apache.beam.sdk.util.construction.graph.PipelineNode;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -430,7 +431,8 @@ public class SamzaDoFnRunners {
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
final KV<String, String> timerReceiverKey =
TimerReceiverFactory.decodeTimerDataTimerId(timerFamilyId);
final FnDataReceiver<Timer> timerReceiver =
@@ -443,7 +445,8 @@ public class SamzaDoFnRunners {
timestamp,
outputTimestamp,
// TODO: Support propagating the PaneInfo through.
- PaneInfo.NO_FIRING);
+ PaneInfo.NO_FIRING,
+ causedByDrain);
try {
timerReceiver.accept(timerValue);
} catch (Exception e) {
diff --git
a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java
b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java
index 350f7daa56c..15ec818dba7 100644
---
a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java
+++
b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerFactory.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
import org.apache.beam.sdk.util.WindowedValueReceiver;
import org.apache.beam.sdk.util.construction.ParDoTranslation;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -266,7 +267,8 @@ abstract class DoFnRunnerFactory<InT, T> implements
Serializable {
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
throw new UnsupportedOperationException();
}
diff --git
a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
index db4ee5be578..28dbf44cb8f 100644
---
a/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
+++
b/runners/spark/3/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.WindowedValue;
import org.joda.time.Instant;
@@ -77,9 +78,18 @@ class DoFnRunnerWithMetrics<InT, OutT> implements
DoFnRunnerWithTeardown<InT, Ou
final BoundedWindow window,
final Instant timestamp,
final Instant outputTimestamp,
- final TimeDomain timeDomain) {
+ final TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
try (Closeable ignored =
MetricsEnvironment.scopedMetricsContainer(metrics)) {
- delegate.onTimer(timerId, timerFamilyId, key, window, timestamp,
outputTimestamp, timeDomain);
+ delegate.onTimer(
+ timerId,
+ timerFamilyId,
+ key,
+ window,
+ timestamp,
+ outputTimestamp,
+ timeDomain,
+ causedByDrain);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/AbstractInOutIterator.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/AbstractInOutIterator.java
index 9d73a605b3b..4ac56c2550b 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/AbstractInOutIterator.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/AbstractInOutIterator.java
@@ -75,7 +75,8 @@ public abstract class AbstractInOutIterator<K, InputT,
OutputT>
window,
timer.getTimestamp(),
timer.getOutputTimestamp(),
- timer.getDomain());
+ timer.getDomain(),
+ timer.causedByDrain());
} finally {
if (this.ctx.getTimerDataIterator()
instanceof ParDoStateUpdateFn.SparkTimerInternalsIterator) {
diff --git
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
index a6b1f65571d..c8cd7eb5f26 100644
---
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
+++
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.WindowedValue;
import org.joda.time.Instant;
@@ -76,9 +77,18 @@ public class DoFnRunnerWithMetrics<InputT, OutputT>
implements DoFnRunner<InputT
final BoundedWindow window,
final Instant timestamp,
final Instant outputTimestamp,
- final TimeDomain timeDomain) {
+ final TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {
try (Closeable ignored =
MetricsEnvironment.scopedMetricsContainer(metricsContainer())) {
- delegate.onTimer(timerId, timerFamilyId, key, window, timestamp,
outputTimestamp, timeDomain);
+ delegate.onTimer(
+ timerId,
+ timerFamilyId,
+ key,
+ window,
+ timestamp,
+ outputTimestamp,
+ timeDomain,
+ causedByDrain);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/AbstractInOutIteratorTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/AbstractInOutIteratorTest.java
index b1e0e3a3980..cfda84c1981 100644
---
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/AbstractInOutIteratorTest.java
+++
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/AbstractInOutIteratorTest.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowedValue;
import org.joda.time.Instant;
@@ -56,6 +57,7 @@ public class AbstractInOutIteratorTest {
private static final Instant TEST_TIMESTAMP = new Instant(42L);
private static final Instant TEST_OUTPUT_TIMESTAMP = new Instant(84L);
private static final TimeDomain TEST_TIME_DOMAIN = TimeDomain.EVENT_TIME;
+ private static final CausedByDrain TEST_CAUSED_BY_DRAIN =
CausedByDrain.CAUSED_BY_DRAIN;
private StateNamespace testNamespace;
@@ -88,6 +90,7 @@ public class AbstractInOutIteratorTest {
when(mockTimer.getTimestamp()).thenReturn(TEST_TIMESTAMP);
when(mockTimer.getOutputTimestamp()).thenReturn(TEST_OUTPUT_TIMESTAMP);
when(mockTimer.getDomain()).thenReturn(TEST_TIME_DOMAIN);
+ when(mockTimer.causedByDrain()).thenReturn(TEST_CAUSED_BY_DRAIN);
}
@Test
@@ -107,7 +110,8 @@ public class AbstractInOutIteratorTest {
mockWindow,
TEST_TIMESTAMP,
TEST_OUTPUT_TIMESTAMP,
- TEST_TIME_DOMAIN);
+ TEST_TIME_DOMAIN,
+ CausedByDrain.CAUSED_BY_DRAIN);
// Verify that timer data iterator deletion was not called (no timer
iterator was set in this
// test)
@@ -133,7 +137,8 @@ public class AbstractInOutIteratorTest {
mockWindow,
TEST_TIMESTAMP,
TEST_OUTPUT_TIMESTAMP,
- TEST_TIME_DOMAIN);
+ TEST_TIME_DOMAIN,
+ CausedByDrain.CAUSED_BY_DRAIN);
// Verify that the timer data iterator's deleteTimer method was called
verify(mockTimerDataIterator).deleteTimer(mockTimer);
diff --git
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java
index 2ff06b59f8e..2dc428d5a6b 100644
---
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java
+++
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SparkInputDataProcessorTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
@@ -252,7 +253,8 @@ public class SparkInputDataProcessorTest {
BoundedWindow window,
Instant timestamp,
Instant outputTimestamp,
- TimeDomain timeDomain) {}
+ TimeDomain timeDomain,
+ CausedByDrain causedByDrain) {}
@Override
public void finishBundle() {}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 125408108c0..c1b4f2ec7d8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -45,6 +45,7 @@ import
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.OutputBuilder;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -330,6 +331,9 @@ public abstract class DoFn<InputT extends @Nullable Object,
OutputT extends @Nul
@Pure
public abstract @Nullable Long currentRecordOffset();
+
+ @Pure
+ public abstract CausedByDrain causedByDrain();
}
/** Information accessible when running a {@link DoFn.OnTimer} method. */
@@ -346,6 +350,9 @@ public abstract class DoFn<InputT extends @Nullable Object,
OutputT extends @Nul
/** Returns the time domain of the current timer. */
public abstract TimeDomain timeDomain();
+
+ @Pure
+ public abstract CausedByDrain causedByDrain();
}
public abstract class OnWindowExpirationContext extends WindowedContext {
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 3bdeb57ed88..1558bb74e0d 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -51,6 +51,7 @@ import org.apache.beam.sdk.util.OutputBuilderSupplier;
import org.apache.beam.sdk.util.OutputBuilderSuppliers;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -589,6 +590,11 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
return element.getCurrentRecordOffset();
}
+ @Override
+ public CausedByDrain causedByDrain() {
+ return CausedByDrain.NORMAL;
+ }
+
@Override
public PipelineOptions getPipelineOptions() {
return options;
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
index 9f5322fb511..a22d3378cfd 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/SplittableParDoNaiveBounded.java
@@ -47,6 +47,7 @@ import
org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.OutputBuilderSupplier;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.OutputBuilder;
import org.apache.beam.sdk.values.PCollection;
@@ -513,6 +514,11 @@ public class SplittableParDoNaiveBounded {
throw new UnsupportedOperationException();
}
+ @Override
+ public CausedByDrain causedByDrain() {
+ return outerContext.causedByDrain();
+ }
+
@Override
public Object sideInput(String tagId) {
PCollectionView<?> view = sideInputMapping.get(tagId);
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java
index d443f008b7d..6e5bb3303c3 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/Timer.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
@@ -65,9 +66,17 @@ public abstract class Timer<K> {
Collection<? extends BoundedWindow> windows,
Instant fireTimestamp,
Instant holdTimestamp,
- PaneInfo paneInfo) {
+ PaneInfo paneInfo,
+ CausedByDrain causedByDrain) {
return new AutoValue_Timer(
- userKey, dynamicTimerTag, windows, false, fireTimestamp,
holdTimestamp, paneInfo);
+ userKey,
+ dynamicTimerTag,
+ windows,
+ false,
+ fireTimestamp,
+ holdTimestamp,
+ paneInfo,
+ causedByDrain);
}
/**
@@ -76,7 +85,8 @@ public abstract class Timer<K> {
*/
public static <K> Timer<K> cleared(
K userKey, String dynamicTimerTag, Collection<? extends BoundedWindow>
windows) {
- return new AutoValue_Timer(userKey, dynamicTimerTag, windows, true, null,
null, null);
+ return new AutoValue_Timer(
+ userKey, dynamicTimerTag, windows, true, null, null, null,
CausedByDrain.NORMAL);
}
/** Returns the key that the timer is set on. */
@@ -116,6 +126,8 @@ public abstract class Timer<K> {
*/
public abstract @Nullable PaneInfo getPaneInfo();
+ public abstract @Nullable CausedByDrain causedByDrain();
+
@Override
public final boolean equals(@Nullable Object other) {
if (!(other instanceof Timer)) {
@@ -186,6 +198,7 @@ public abstract class Timer<K> {
InstantCoder.of().encode(timer.getFireTimestamp(), outStream);
InstantCoder.of().encode(timer.getHoldTimestamp(), outStream);
PaneInfoCoder.INSTANCE.encode(timer.getPaneInfo(), outStream);
+ // todo maybe similarly to windowedValue, should we propagate metadata
with paneinfo bit
}
}
@@ -201,7 +214,15 @@ public abstract class Timer<K> {
Instant fireTimestamp = InstantCoder.of().decode(inStream);
Instant holdTimestamp = InstantCoder.of().decode(inStream);
PaneInfo paneInfo = PaneInfoCoder.INSTANCE.decode(inStream);
- return Timer.of(userKey, dynamicTimerTag, windows, fireTimestamp,
holdTimestamp, paneInfo);
+ // todo maybe similarly to windowedValue, should we propagate metadata
with paneinfo bit
+ return Timer.of(
+ userKey,
+ dynamicTimerTag,
+ windows,
+ fireTimestamp,
+ holdTimestamp,
+ paneInfo,
+ CausedByDrain.NORMAL);
}
@Override
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TimerTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TimerTest.java
index e96bf27c6ed..06f43350f5a 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TimerTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/construction/TimerTest.java
@@ -27,6 +27,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -57,7 +58,8 @@ public class TimerTest {
Collections.singleton(GlobalWindow.INSTANCE),
FIRE_TIME,
HOLD_TIME,
- PaneInfo.NO_FIRING);
+ PaneInfo.NO_FIRING,
+ CausedByDrain.NORMAL);
assertEquals("key", timer.getUserKey());
assertEquals("tag", timer.getDynamicTimerTag());
assertEquals(FIRE_TIME, timer.getFireTimestamp());
@@ -79,7 +81,8 @@ public class TimerTest {
Collections.singleton(GlobalWindow.INSTANCE),
FIRE_TIME,
HOLD_TIME,
- PaneInfo.NO_FIRING));
+ PaneInfo.NO_FIRING,
+ CausedByDrain.NORMAL));
CoderProperties.structuralValueDecodeEncodeEqual(
coder, Timer.cleared("key", "tag",
Collections.singleton(GlobalWindow.INSTANCE)));
CoderProperties.structuralValueConsistentWithEquals(
@@ -90,14 +93,16 @@ public class TimerTest {
Collections.singleton(GlobalWindow.INSTANCE),
FIRE_TIME,
HOLD_TIME,
- PaneInfo.NO_FIRING),
+ PaneInfo.NO_FIRING,
+ CausedByDrain.NORMAL),
Timer.of(
"key",
"tag",
Collections.singleton(GlobalWindow.INSTANCE),
FIRE_TIME,
HOLD_TIME,
- PaneInfo.NO_FIRING));
+ PaneInfo.NO_FIRING,
+ CausedByDrain.NORMAL));
CoderProperties.structuralValueConsistentWithEquals(
coder,
Timer.cleared("key", "tag",
Collections.singleton(GlobalWindow.INSTANCE)),
@@ -115,7 +120,8 @@ public class TimerTest {
Collections.singletonList(GlobalWindow.INSTANCE),
FIRE_TIME,
HOLD_TIME,
- PaneInfo.NO_FIRING));
+ PaneInfo.NO_FIRING,
+ CausedByDrain.NORMAL));
CoderProperties.coderDecodeEncodeEqual(
coder, Timer.cleared("key", "tag",
Collections.singletonList(GlobalWindow.INSTANCE)));
CoderProperties.coderConsistentWithEquals(
@@ -126,14 +132,16 @@ public class TimerTest {
Collections.singletonList(GlobalWindow.INSTANCE),
FIRE_TIME,
HOLD_TIME,
- PaneInfo.NO_FIRING),
+ PaneInfo.NO_FIRING,
+ CausedByDrain.NORMAL),
Timer.of(
"key",
"tag",
Collections.singletonList(GlobalWindow.INSTANCE),
FIRE_TIME,
HOLD_TIME,
- PaneInfo.NO_FIRING));
+ PaneInfo.NO_FIRING,
+ CausedByDrain.NORMAL));
CoderProperties.coderConsistentWithEquals(
coder,
Timer.cleared("key", "tag",
Collections.singletonList(GlobalWindow.INSTANCE)),
@@ -146,14 +154,16 @@ public class TimerTest {
Collections.singletonList(GlobalWindow.INSTANCE),
FIRE_TIME,
HOLD_TIME,
- PaneInfo.NO_FIRING),
+ PaneInfo.NO_FIRING,
+ CausedByDrain.NORMAL),
Timer.of(
"key",
"tag",
Collections.singletonList(GlobalWindow.INSTANCE),
FIRE_TIME,
HOLD_TIME,
- PaneInfo.NO_FIRING));
+ PaneInfo.NO_FIRING,
+ CausedByDrain.NORMAL));
CoderProperties.coderDeterministic(
coder,
Timer.cleared("key", "tag",
Collections.singletonList(GlobalWindow.INSTANCE)),
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 1b7d75f6ec3..78aceeaab19 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -102,6 +102,7 @@ import
org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.ParDoTranslation;
import org.apache.beam.sdk.util.construction.RehydratedComponents;
import org.apache.beam.sdk.util.construction.Timer;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.OutputBuilder;
import org.apache.beam.sdk.values.PCollectionView;
@@ -247,6 +248,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
private WindowedValue<InputT> currentElement;
private Object currentKey;
+ private CausedByDrain causedByDrain;
/**
* Only valid during {@link
@@ -1200,6 +1202,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
checkNotNull(timerBundleTracker);
try {
currentKey = timer.getUserKey();
+ causedByDrain = timer.causedByDrain();
+ // add drain
Iterator<BoundedWindow> windowIterator =
(Iterator<BoundedWindow>) timer.getWindows().iterator();
while (windowIterator.hasNext()) {
@@ -1531,7 +1535,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
Collections.singletonList(boundedWindow),
scheduledTime,
outputTimestamp,
- paneInfo);
+ paneInfo,
+ causedByDrain);
}
}
@@ -1848,6 +1853,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
currentElement.getTimestamp(),
currentElement.getPaneInfo());
}
+
+ @Override
+ public CausedByDrain causedByDrain() {
+ return currentElement.causedByDrain();
+ }
}
/** Provides arguments for a {@link DoFnInvoker} for a non-window observing
method. */
@@ -1929,6 +1939,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
}
outputTo(consumer, WindowedValues.of(output, timestamp, windows,
paneInfo));
}
+
+ @Override
+ public CausedByDrain causedByDrain() {
+ return currentElement.causedByDrain();
+ }
}
/** Provides base arguments for a {@link DoFnInvoker} for a non-window
observing method. */
@@ -2278,6 +2293,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
.setWindow(currentWindow)
.setTimestamp(currentTimer.getHoldTimestamp())
.setPaneInfo(currentTimer.getPaneInfo())
+ .setCausedByDrain(causedByDrain)
.setReceiver(
windowedValue -> {
checkOnWindowExpirationTimestamp(windowedValue.getTimestamp());
@@ -2395,6 +2411,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
.setValue(value)
.setTimestamp(currentTimer.getHoldTimestamp())
.setWindow(currentWindow)
+ .setCausedByDrain(causedByDrain)
.setReceiver(
windowedValue ->
context.outputWindowedValue(
@@ -2432,6 +2449,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
.setValue(value)
.setTimestamp(currentTimer.getHoldTimestamp())
.setWindow(currentWindow)
+ .setCausedByDrain(causedByDrain)
.setReceiver(
windowedValue ->
context.outputWindowedValue(
@@ -2469,6 +2487,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
.setValue(value)
.setTimestamp(currentTimer.getHoldTimestamp())
.setWindow(currentWindow)
+ .setCausedByDrain(causedByDrain)
.setReceiver(
windowedValue ->
context.outputWindowedValue(
@@ -2546,6 +2565,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
return currentWindow;
}
+ @Override
+ public CausedByDrain causedByDrain() {
+ return causedByDrain;
+ }
+
@Override
public OutputBuilder<OutputT> builder(OutputT value) {
return WindowedValues.<OutputT>builder()
@@ -2553,6 +2577,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
.setTimestamp(currentTimer.getHoldTimestamp())
.setWindow(currentWindow)
.setPaneInfo(currentTimer.getPaneInfo())
+ .setCausedByDrain(causedByDrain)
.setReceiver(
windowedValue -> {
checkTimerTimestamp(windowedValue.getTimestamp());
@@ -2734,6 +2759,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
.setValue(value)
.setTimestamp(currentTimer.getHoldTimestamp())
.setWindow(currentWindow)
+ .setCausedByDrain(causedByDrain)
.setPaneInfo(currentTimer.getPaneInfo())
.setReceiver(
windowedValue ->
@@ -2772,6 +2798,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
.setTimestamp(currentTimer.getHoldTimestamp())
.setWindow(currentWindow)
.setPaneInfo(currentTimer.getPaneInfo())
+ .setCausedByDrain(causedByDrain)
.setReceiver(
windowedValue ->
context.outputWindowedValue(
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index ef19b7c1880..50a2fec0b5a 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -123,6 +123,7 @@ import
org.apache.beam.sdk.util.construction.RehydratedComponents;
import org.apache.beam.sdk.util.construction.SdkComponents;
import org.apache.beam.sdk.util.construction.graph.ProtoOverrides;
import org.apache.beam.sdk.util.construction.graph.SplittableParDoExpander;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -1161,7 +1162,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
Collections.singletonList(GlobalWindow.INSTANCE),
fireTimestamp,
holdTimestamp,
- PaneInfo.NO_FIRING);
+ PaneInfo.NO_FIRING,
+ CausedByDrain.NORMAL);
}
private <T> WindowedValue<T> valueInWindows(
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java
index 13a0b105ec3..756f17fdfa3 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/SplittablePairWithRestrictionDoFnRunnerTest.java
@@ -64,6 +64,7 @@ import
org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.beam.sdk.util.construction.SdkComponents;
import org.apache.beam.sdk.util.construction.graph.ProtoOverrides;
import org.apache.beam.sdk.util.construction.graph.SplittableParDoExpander;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -123,7 +124,8 @@ public class SplittablePairWithRestrictionDoFnRunnerTest
implements Serializable
Collections.singletonList(GlobalWindow.INSTANCE),
fireTimestamp,
holdTimestamp,
- PaneInfo.NO_FIRING);
+ PaneInfo.NO_FIRING,
+ CausedByDrain.NORMAL);
}
private <T> WindowedValue<T> valueInWindows(
diff --git
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index 52b6c87a5c0..d86ef653dca 100644
---
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -145,6 +145,7 @@ import org.apache.beam.sdk.util.construction.ModelCoders;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.ParDoTranslation;
import org.apache.beam.sdk.util.construction.Timer;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
@@ -1129,7 +1130,8 @@ public class ProcessBundleHandlerTest {
Collections.singletonList(GlobalWindow.INSTANCE),
Instant.ofEpochMilli(1L),
Instant.ofEpochMilli(1L),
- PaneInfo.ON_TIME_AND_ONLY_FIRING),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING,
+ CausedByDrain.NORMAL),
encodedTimer);
Elements elements =
Elements.newBuilder()
@@ -1248,7 +1250,8 @@ public class ProcessBundleHandlerTest {
Collections.singletonList(GlobalWindow.INSTANCE),
Instant.ofEpochMilli(1L),
Instant.ofEpochMilli(1L),
- PaneInfo.ON_TIME_AND_ONLY_FIRING),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING,
+ CausedByDrain.NORMAL),
encodedTimer);
assertThrows(
@@ -1342,7 +1345,8 @@ public class ProcessBundleHandlerTest {
Collections.singletonList(GlobalWindow.INSTANCE),
Instant.ofEpochMilli(1L),
Instant.ofEpochMilli(1L),
- PaneInfo.ON_TIME_AND_ONLY_FIRING),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING,
+ CausedByDrain.NORMAL),
encodedTimer);
InstructionResponse.Builder builder =
@@ -1961,7 +1965,8 @@ public class ProcessBundleHandlerTest {
Collections.singletonList(GlobalWindow.INSTANCE),
Instant.ofEpochMilli(1L),
Instant.ofEpochMilli(1L),
- PaneInfo.ON_TIME_AND_ONLY_FIRING),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING,
+ CausedByDrain.NORMAL),
encodedTimer);
Elements elements =
Elements.newBuilder()