This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.21.0 by this push:
new f2ee530 Merge pull request #11362: [BEAM-9733] Improve watermark and
timer handling
new 36f19f0 Merge pull request #11518 from mxm/release-2.21.0
f2ee530 is described below
commit f2ee530ad52e4e79b0f76b1abb25a5ebb98d5a39
Author: Maximilian Michels <[email protected]>
AuthorDate: Fri Apr 24 12:45:33 2020 +0200
Merge pull request #11362: [BEAM-9733] Improve watermark and timer handling
Backport of #11362 / 643945a8e4
---
runners/flink/flink_runner.gradle | 5 +-
.../functions/ImpulseSourceFunction.java | 12 +
.../wrappers/streaming/DoFnOperator.java | 213 ++++++++++------
.../streaming/ExecutableStageDoFnOperator.java | 204 +++++++++++----
.../wrappers/streaming/SplittableDoFnOperator.java | 4 +
.../beam/runners/flink/FlinkSavepointTest.java | 34 +--
.../runners/flink/PortableTimersExecutionTest.java | 35 ++-
.../functions/ImpulseSourceFunctionTest.java | 17 +-
.../wrappers/streaming/DoFnOperatorTest.java | 5 +-
.../streaming/ExecutableStageDoFnOperatorTest.java | 273 +++++++++++++++++----
.../wrappers/streaming/WindowDoFnOperatorTest.java | 4 +-
11 files changed, 587 insertions(+), 219 deletions(-)
diff --git a/runners/flink/flink_runner.gradle
b/runners/flink/flink_runner.gradle
index 460099a..f5ecc69 100644
--- a/runners/flink/flink_runner.gradle
+++ b/runners/flink/flink_runner.gradle
@@ -111,7 +111,10 @@ spotless {
test {
systemProperty "log4j.configuration", "log4j-test.properties"
- //systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug"
+ // Change log level to debug:
+ // systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug"
+ // Change log level to debug only for the package and nested packages:
+ // systemProperty
"org.slf4j.simpleLogger.log.org.apache.beam.runners.flink.translation.wrappers.streaming",
"debug"
jvmArgs "-XX:-UseGCOverheadLimit"
if (System.getProperty("beamSurefireArgline")) {
jvmArgs System.getProperty("beamSurefireArgline")
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunction.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunction.java
index 4e50ce1..6c610de 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunction.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunction.java
@@ -26,6 +26,7 @@ import
org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
/**
* Source function which sends a single global impulse to a downstream
operator. It may keep the
@@ -58,6 +59,17 @@ public class ImpulseSourceFunction
impulseEmitted.add(true);
}
}
+ // Always emit a final watermark.
+ // (1) In case we didn't restore the pipeline, this is important to close
the global window;
+ // if no operator holds back this watermark.
+ // (2) In case we are restoring the pipeline, this is needed to initialize
the operators with
+ // the current watermark and trigger execution of any pending timers.
+ sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
+ // Wait to allow checkpoints of the pipeline
+ waitToEnsureCheckpointingWorksCorrectly();
+ }
+
+ private void waitToEnsureCheckpointingWorksCorrectly() {
// Do nothing, but still look busy ...
// we can't return here since Flink requires that all operators stay up,
// otherwise checkpointing would not work correctly anymore
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index a6d6f1b..5dcd994 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -116,6 +116,8 @@ import
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.OutputTag;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Flink operator for executing {@link DoFn DoFns}.
@@ -123,11 +125,15 @@ import org.joda.time.Instant;
* @param <InputT> the input type of the {@link DoFn}
* @param <OutputT> the output type of the {@link DoFn}
*/
+// We use Flink's lifecycle methods to initialize transient fields
+@SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED")
public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<WindowedValue<OutputT>>
implements OneInputStreamOperator<WindowedValue<InputT>,
WindowedValue<OutputT>>,
TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue,
WindowedValue<OutputT>>,
Triggerable<ByteBuffer, TimerData> {
+ private static final Logger LOG =
LoggerFactory.getLogger(DoFnOperator.class);
+
protected DoFn<InputT, OutputT> doFn;
protected final SerializablePipelineOptions serializedOptions;
@@ -206,14 +212,16 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
private transient volatile long elementCount;
/** Time that the last bundle was finished (to set the timer). */
private transient volatile long lastFinishBundleTime;
+ /** Callback to be executed before the current bundle is started. */
+ private transient volatile Runnable preBundleCallback;
/** Callback to be executed after the current bundle was finished. */
private transient volatile Runnable bundleFinishedCallback;
// Watermark state.
// Volatile because these can be set in two mutually exclusive threads (see
above).
- protected transient volatile long currentInputWatermark;
- protected transient volatile long currentSideInputWatermark;
- protected transient volatile long currentOutputWatermark;
+ private transient volatile long currentInputWatermark;
+ private transient volatile long currentSideInputWatermark;
+ private transient volatile long currentOutputWatermark;
private transient volatile long pushedBackWatermark;
/** Constructor for DoFnOperator. */
@@ -353,9 +361,9 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
pushedBackElementsHandler =
NonKeyedPushedBackElementsHandler.create(listState);
}
- setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
-
setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
- setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+ currentInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+ currentSideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
+ currentOutputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
sideInputReader = NullSideInputReader.of(sideInputs);
@@ -371,9 +379,9 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
Stream<WindowedValue<InputT>> pushedBack =
pushedBackElementsHandler.getElements();
long min =
pushedBack.map(v ->
v.getTimestamp().getMillis()).reduce(Long.MAX_VALUE, Math::min);
- setPushedBackWatermark(min);
+ pushedBackWatermark = min;
} else {
- setPushedBackWatermark(Long.MAX_VALUE);
+ pushedBackWatermark = Long.MAX_VALUE;
}
// StatefulPardo or WindowDoFn
@@ -549,15 +557,20 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
}
}
- protected long getPushbackWatermarkHold() {
- return pushedBackWatermark;
+ public long getEffectiveInputWatermark() {
+ // hold back by the pushed back values waiting for side inputs
+ return Math.min(pushedBackWatermark, currentInputWatermark);
+ }
+
+ public long getCurrentOutputWatermark() {
+ return currentOutputWatermark;
}
- protected void setPushedBackWatermark(long watermark) {
- pushedBackWatermark = watermark;
+ protected final void setPreBundleCallback(Runnable callback) {
+ this.preBundleCallback = callback;
}
- protected void setBundleFinishedCallback(Runnable callback) {
+ protected final void setBundleFinishedCallback(Runnable callback) {
this.bundleFinishedCallback = callback;
}
@@ -581,7 +594,7 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
pushedBackElementsHandler.pushBack(pushedBackValue);
}
- setPushedBackWatermark(min);
+ pushedBackWatermark = min;
checkInvokeFinishBundleByCount();
}
@@ -635,7 +648,7 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
pushedBackElementsHandler.pushBack(pushedBackValue);
}
- setPushedBackWatermark(min);
+ pushedBackWatermark = min;
checkInvokeFinishBundleByCount();
@@ -644,12 +657,12 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
}
@Override
- public void processWatermark(Watermark mark) throws Exception {
+ public final void processWatermark(Watermark mark) throws Exception {
processWatermark1(mark);
}
@Override
- public void processWatermark1(Watermark mark) throws Exception {
+ public final void processWatermark1(Watermark mark) throws Exception {
// We do the check here because we are guaranteed to at least get the +Inf
watermark on the
// main input when the job finishes.
if (currentSideInputWatermark >=
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
@@ -659,46 +672,69 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
emitAllPushedBackData();
}
- setCurrentInputWatermark(mark.getTimestamp());
+ currentInputWatermark = mark.getTimestamp();
- if (keyCoder == null) {
- long potentialOutputWatermark = Math.min(getPushbackWatermarkHold(),
currentInputWatermark);
- if (potentialOutputWatermark > currentOutputWatermark) {
- setCurrentOutputWatermark(potentialOutputWatermark);
- emitWatermark(currentOutputWatermark);
- }
- } else {
- // hold back by the pushed back values waiting for side inputs
- long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(),
mark.getTimestamp());
+ long inputWatermarkHold =
applyInputWatermarkHold(getEffectiveInputWatermark());
+ if (keyCoder != null) {
+ timeServiceManager.advanceWatermark(new Watermark(inputWatermarkHold));
+ }
- timeServiceManager.advanceWatermark(new
Watermark(pushedBackInputWatermark));
+ long potentialOutputWatermark =
+ applyOutputWatermarkHold(
+ currentOutputWatermark,
computeOutputWatermark(inputWatermarkHold));
+ maybeEmitWatermark(potentialOutputWatermark);
+ }
- Instant watermarkHold = keyedStateInternals.watermarkHold();
+ /**
+ * Allows to apply a hold to the input watermark. By default, just passes
the input watermark
+ * through.
+ */
+ public long applyInputWatermarkHold(long inputWatermark) {
+ return inputWatermark;
+ }
- long combinedWatermarkHold = Math.min(watermarkHold.getMillis(),
getPushbackWatermarkHold());
- combinedWatermarkHold =
- Math.min(combinedWatermarkHold,
timerInternals.getMinOutputTimestampMs());
- long potentialOutputWatermark = Math.min(pushedBackInputWatermark,
combinedWatermarkHold);
+ /**
+ * Allows to apply a hold to the output watermark before it is send out. By
default, just passes
+ * the potential output watermark through which will make it the new output
watermark.
+ *
+ * @param currentOutputWatermark the current output watermark
+ * @param potentialOutputWatermark The potential new output watermark which
can be adjusted, if
+ * needed. The input watermark hold has already been applied.
+ * @return The new output watermark which will be emitted.
+ */
+ public long applyOutputWatermarkHold(long currentOutputWatermark, long
potentialOutputWatermark) {
+ return potentialOutputWatermark;
+ }
- if (potentialOutputWatermark > currentOutputWatermark) {
- setCurrentOutputWatermark(potentialOutputWatermark);
- emitWatermark(currentOutputWatermark);
- }
+ private long computeOutputWatermark(long inputWatermarkHold) {
+ final long potentialOutputWatermark;
+ if (keyCoder == null) {
+ potentialOutputWatermark = inputWatermarkHold;
+ } else {
+ Instant watermarkHold = keyedStateInternals.watermarkHold();
+ long combinedWatermarkHold = Math.min(watermarkHold.getMillis(),
inputWatermarkHold);
+ potentialOutputWatermark =
+ Math.min(combinedWatermarkHold,
timerInternals.getMinOutputTimestampMs());
}
+ return potentialOutputWatermark;
}
- private void emitWatermark(long watermark) {
- // Must invoke finishBatch before emit the +Inf watermark otherwise there
are some late events.
- if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
- invokeFinishBundle();
+ private void maybeEmitWatermark(long watermark) {
+ if (watermark > currentOutputWatermark) {
+ // Must invoke finishBatch before emit the +Inf watermark otherwise
there are some late
+ // events.
+ if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ invokeFinishBundle();
+ }
+ LOG.debug("Emitting watermark {}", watermark);
+ currentOutputWatermark = watermark;
+ output.emitWatermark(new Watermark(watermark));
}
- output.emitWatermark(new Watermark(watermark));
}
@Override
- public void processWatermark2(Watermark mark) throws Exception {
-
- setCurrentSideInputWatermark(mark.getTimestamp());
+ public final void processWatermark2(Watermark mark) throws Exception {
+ currentSideInputWatermark = mark.getTimestamp();
if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
// this means we will never see any more side input
emitAllPushedBackData();
@@ -727,8 +763,7 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
}
pushedBackElementsHandler.clear();
-
- setPushedBackWatermark(Long.MAX_VALUE);
+ pushedBackWatermark = Long.MAX_VALUE;
}
/**
@@ -743,7 +778,11 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
*/
private void checkInvokeStartBundle() {
if (!bundleStarted) {
+ LOG.debug("Starting bundle.");
outputManager.flushBuffer();
+ if (preBundleCallback != null) {
+ preBundleCallback.run();
+ }
pushbackDoFnRunner.startBundle();
bundleStarted = true;
}
@@ -773,15 +812,17 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
protected final void invokeFinishBundle() {
if (bundleStarted) {
+ LOG.debug("Finishing bundle.");
pushbackDoFnRunner.finishBundle();
+ LOG.debug("Finished bundle. Element count: {}", elementCount);
elementCount = 0L;
lastFinishBundleTime =
getProcessingTimeService().getCurrentProcessingTime();
bundleStarted = false;
// callback only after current bundle was fully finalized
// it could start a new bundle, for example resulting from timer
processing
if (bundleFinishedCallback != null) {
+ LOG.debug("Invoking bundle finish callback.");
bundleFinishedCallback.run();
- bundleFinishedCallback = null;
}
}
}
@@ -843,6 +884,11 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
// allow overriding this in WindowDoFnOperator
protected void fireTimer(TimerData timerData) {
+ LOG.debug(
+ "Firing timer: {} at {} with output time {}",
+ timerData.getTimerId(),
+ timerData.getTimestamp().getMillis(),
+ timerData.getOutputTimestamp().getMillis());
StateNamespace namespace = timerData.getNamespace();
// This is a user timer, so namespace must be WindowNamespace
checkArgument(namespace instanceof WindowNamespace);
@@ -857,18 +903,6 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
timerData.getDomain());
}
- private void setCurrentInputWatermark(long currentInputWatermark) {
- this.currentInputWatermark = currentInputWatermark;
- }
-
- private void setCurrentSideInputWatermark(long currentInputWatermark) {
- this.currentSideInputWatermark = currentInputWatermark;
- }
-
- private void setCurrentOutputWatermark(long currentOutputWatermark) {
- this.currentOutputWatermark = currentOutputWatermark;
- }
-
@SuppressWarnings("unchecked")
Coder<InputT> getInputCoder() {
return (Coder<InputT>)
Iterables.getOnlyElement(windowedInputCoder.getCoderArguments());
@@ -1259,6 +1293,11 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
@Override
public void setTimer(TimerData timer) {
try {
+ LOG.debug(
+ "Setting timer: {} at {} with output time {}",
+ timer.getTimerId(),
+ timer.getTimestamp().getMillis(),
+ timer.getOutputTimestamp().getMillis());
String contextTimerId = getContextTimerId(timer.getTimerId(),
timer.getNamespace());
// Only one timer can exist at a time for a given timer id and context.
// If a timer gets set twice in the same context, the second must
@@ -1367,7 +1406,7 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
@Override
public Instant currentInputWatermarkTime() {
- return new Instant(Math.min(currentInputWatermark,
getPushbackWatermarkHold()));
+ return new Instant(getEffectiveInputWatermark());
}
@Nullable
@@ -1376,30 +1415,44 @@ public class DoFnOperator<InputT, OutputT> extends
AbstractStreamOperator<Window
return new Instant(currentOutputWatermark);
}
+ /**
+ * Check whether event time timers lower or equal to the given timestamp
exist. Caution: This is
+ * scoped by the current key.
+ */
+ public boolean hasPendingEventTimeTimers(long maxTimestamp) throws
Exception {
+ for (TimerData timer : pendingTimersById.values()) {
+ if (timer.getDomain() == TimeDomain.EVENT_TIME
+ && timer.getTimestamp().getMillis() <= maxTimestamp) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/** Unique contextual id of a timer. Used to look up any existing timers
in a context. */
private String getContextTimerId(String timerId, StateNamespace namespace)
{
return timerId + namespace.stringKey();
}
+ }
- /**
- * In Beam, a timer with timestamp {@code T} is only illegible for firing
when the time has
- * moved past this time stamp, i.e. {@code T < current_time}. In the case
of event time,
- * current_time is the watermark, in the case of processing time it is the
system time.
- *
- * <p>Flink's TimerService has different semantics because it only ensures
{@code T <=
- * current_time}.
- *
- * <p>To make up for this, we need to add one millisecond to Flink's
internal timer timestamp.
- * Note that we do not modify Beam's timestamp and we are not exposing
Flink's timestamp.
- *
- * <p>See also https://jira.apache.org/jira/browse/BEAM-3863
- */
- private long adjustTimestampForFlink(long beamTimerTimestamp) {
- if (beamTimerTimestamp == Long.MAX_VALUE) {
- // We would overflow, do not adjust timestamp
- return Long.MAX_VALUE;
- }
- return beamTimerTimestamp + 1;
+ /**
+ * In Beam, a timer with timestamp {@code T} is only illegible for firing
when the time has moved
+ * past this time stamp, i.e. {@code T < current_time}. In the case of event
time, current_time is
+ * the watermark, in the case of processing time it is the system time.
+ *
+ * <p>Flink's TimerService has different semantics because it only ensures
{@code T <=
+ * current_time}.
+ *
+ * <p>To make up for this, we need to add one millisecond to Flink's
internal timer timestamp.
+ * Note that we do not modify Beam's timestamp and we are not exposing
Flink's timestamp.
+ *
+ * <p>See also https://jira.apache.org/jira/browse/BEAM-3863
+ */
+ static long adjustTimestampForFlink(long beamTimerTimestamp) {
+ if (beamTimerTimestamp == Long.MAX_VALUE) {
+ // We would overflow, do not adjust timestamp
+ return Long.MAX_VALUE;
}
+ return beamTimerTimestamp + 1;
}
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 594ec59..1bbd1e5 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -17,9 +17,11 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming;
+import static
org.apache.beam.runners.core.StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_TIMER_ID;
import static
org.apache.beam.runners.flink.translation.utils.FlinkPortableRunnerUtils.requiresTimeSortedInput;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
@@ -75,6 +77,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.fn.IdGenerator;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
@@ -102,6 +105,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,6 +118,8 @@ import org.slf4j.LoggerFactory;
*
* <p>TODO Integrate support for progress updates and metrics
*/
+// We use Flink's lifecycle methods to initialize transient fields
+@SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED")
public class ExecutableStageDoFnOperator<InputT, OutputT> extends
DoFnOperator<InputT, OutputT> {
private static final Logger LOG =
LoggerFactory.getLogger(ExecutableStageDoFnOperator.class);
@@ -135,11 +141,17 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
private transient ExecutableStage executableStage;
private transient SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner;
- /**
- * Watermark held back due to async processing. Volatile due to multiple
mutually exclusive
- * threads. Please see the description in DoFnOperator.
- */
- private transient volatile long backupWatermarkHold = Long.MIN_VALUE;
+ /** The minimum event time timer timestamp observed during the last bundle.
*/
+ private transient long minEventTimeTimerTimestampInLastBundle;
+
+ /** The minimum event time timer timestamp observed in the current bundle. */
+ private transient long minEventTimeTimerTimestampInCurrentBundle;
+
+ /** The input watermark before the current bundle started. */
+ private transient long inputWatermarkBeforeBundleStart;
+
+ /** Flag indicating whether the operator has been closed. */
+ private transient boolean closed;
/** Constructor. */
public ExecutableStageDoFnOperator(
@@ -220,6 +232,11 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
}
};
+ minEventTimeTimerTimestampInCurrentBundle = Long.MAX_VALUE;
+ minEventTimeTimerTimestampInLastBundle = Long.MAX_VALUE;
+ super.setPreBundleCallback(this::preBundleStartCallback);
+ super.setBundleFinishedCallback(this::finishBundleCallback);
+
// This will call {@code createWrappingDoFnRunner} which needs the above
dependencies.
super.open();
}
@@ -436,8 +453,10 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
return this.<ByteBuffer>getKeyedStateBackend().getCurrentKey();
}
- private void setTimer(Timer<?> timerElement, TimerInternals.TimerData
timerData) {
+ void setTimer(Timer<?> timerElement, TimerInternals.TimerData timerData) {
try {
+ Preconditions.checkState(
+ sdkHarnessRunner.isBundleInProgress(), "Bundle was expected to be in
progress!!");
LOG.debug("Setting timer: {} {}", timerElement, timerData);
// KvToByteBufferKeySelector returns the key encoded, it doesn't care
about the
// window, timestamp or pane information.
@@ -455,6 +474,12 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
timerData.getNamespace(), timerData.getTimerId(),
timerData.getDomain());
} else {
timerInternals.setTimer(timerData);
+ if (!timerData.getTimerId().equals(GC_TIMER_ID)) {
+ minEventTimeTimerTimestampInCurrentBundle =
+ Math.min(
+ minEventTimeTimerTimestampInCurrentBundle,
+
adjustTimestampForFlink(timerData.getTimestamp().getMillis()));
+ }
}
}
} catch (Exception e) {
@@ -473,6 +498,18 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
}
@Override
+ public void close() throws Exception {
+ closed = true;
+ // We might still holding back the watermark and Flink does not trigger
the timer
+ // callback for watermark advancement anymore.
+ processWatermark1(Watermark.MAX_WATERMARK);
+ while (getCurrentOutputWatermark() <
Watermark.MAX_WATERMARK.getTimestamp()) {
+ invokeFinishBundle();
+ }
+ super.close();
+ }
+
+ @Override
public void dispose() throws Exception {
// may be called multiple times when an exception is thrown
if (stageContext != null) {
@@ -517,7 +554,27 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
}
@Override
- public void processWatermark(Watermark mark) throws Exception {
+ public long applyInputWatermarkHold(long inputWatermark) {
+ // We must wait until all elements/timers have been processed (happens
async!) before the
+ // watermark can be progressed. We can't just advance the input watermark
until at least one
+ // bundle has been completed since the watermark has been received.
Otherwise we potentially
+ // violate the Beam timer contract which allows for already set timer to
be modified by
+ // successive elements.
+ //
+ // For example, we set a timer at t1, then finish the bundle (e.g. due to
the bundle timeout),
+ // then receive an element which updates the timer to fire at t2, and then
receive a watermark
+ // w1, where w1 > t2 > t1. If we do not hold back the input watermark
here, w1 would fire the
+ // initial timer at t1, but we want to make sure to fire the updated
version of the timer at
+ // t2.
+ if (sdkHarnessRunner.isBundleInProgress()) {
+ return inputWatermarkBeforeBundleStart;
+ } else {
+ return inputWatermark;
+ }
+ }
+
+ @Override
+ public long applyOutputWatermarkHold(long currentOutputWatermark, long
potentialOutputWatermark) {
// Due to the asynchronous communication with the SDK harness,
// a bundle might still be in progress and not all items have
// yet been received from the SDK harness. If we just set this
@@ -544,30 +601,47 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
// every watermark. So we have implemented 2) below.
//
if (sdkHarnessRunner.isBundleInProgress()) {
- if (mark.getTimestamp() >=
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
- invokeFinishBundle();
- setPushedBackWatermark(Long.MAX_VALUE);
+ if (minEventTimeTimerTimestampInLastBundle < Long.MAX_VALUE) {
+ // We can safely advance the watermark to before the last bundle's
minimum event timer
+ // but not past the potential output watermark which includes holds to
the input watermark.
+ return Math.min(minEventTimeTimerTimestampInLastBundle - 1,
potentialOutputWatermark);
} else {
- // It is not safe to advance the output watermark yet, so add a hold
on the current
- // output watermark.
- backupWatermarkHold = Math.max(backupWatermarkHold,
getPushbackWatermarkHold());
- setPushedBackWatermark(Math.min(currentOutputWatermark,
backupWatermarkHold));
- super.setBundleFinishedCallback(
- () -> {
- try {
- LOG.debug("processing pushed back watermark: {}", mark);
- // at this point the bundle is finished, allow the watermark
to pass
- // we are restoring the previous hold in case it was already
set for side inputs
- setPushedBackWatermark(backupWatermarkHold);
- super.processWatermark(mark);
- } catch (Exception e) {
- throw new RuntimeException(
- "Failed to process pushed back watermark after finished
bundle.", e);
- }
- });
+ // We don't have any information yet, use the current output watermark
for now.
+ return currentOutputWatermark;
+ }
+ } else {
+ // No bundle was started when we advanced the input watermark.
+ // Thus, we can safely set a new output watermark.
+ return potentialOutputWatermark;
+ }
+ }
+
+ private void preBundleStartCallback() {
+ inputWatermarkBeforeBundleStart = getEffectiveInputWatermark();
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void finishBundleCallback() {
+ minEventTimeTimerTimestampInLastBundle =
minEventTimeTimerTimestampInCurrentBundle;
+ minEventTimeTimerTimestampInCurrentBundle = Long.MAX_VALUE;
+ try {
+ if (!closed
+ && minEventTimeTimerTimestampInLastBundle < Long.MAX_VALUE
+ && minEventTimeTimerTimestampInLastBundle <=
getEffectiveInputWatermark()) {
+ ProcessingTimeService processingTimeService =
getProcessingTimeService();
+ // We are scheduling a timer for advancing the watermark, to not delay
finishing the bundle
+ // and temporarily release the checkpoint lock. Otherwise, we could
potentially loop when a
+ // timer keeps scheduling a timer for the same timestamp.
+ processingTimeService.registerTimer(
+ processingTimeService.getCurrentProcessingTime(),
+ ts -> processWatermark1(new
Watermark(getEffectiveInputWatermark())));
+ } else {
+ processWatermark1(new Watermark(getEffectiveInputWatermark()));
}
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to progress watermark to " + getEffectiveInputWatermark(),
e);
}
- super.processWatermark(mark);
}
private static class SdkHarnessDoFnRunner<InputT, OutputT>
@@ -647,7 +721,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
@Override
public void processElement(WindowedValue<InputT> element) {
try {
- LOG.debug("Sending value: {}", element);
+ LOG.debug("Processing value: {}", element);
mainInputReceiver.accept(element);
} catch (Exception e) {
throw new RuntimeException("Failed to process element with SDK
harness.", e);
@@ -769,8 +843,14 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
.collect(Collectors.toList());
KeyedStateBackend<ByteBuffer> stateBackend = getKeyedStateBackend();
+
StateCleaner stateCleaner =
- new StateCleaner(userStates, windowCoder, () ->
stateBackend.getCurrentKey());
+ new StateCleaner(
+ userStates,
+ windowCoder,
+ stateBackend::getCurrentKey,
+ timerInternals::hasPendingEventTimeTimers,
+ cleanupTimer);
return new StatefulDoFnRunner<InputT, OutputT, BoundedWindow>(
sdkHarnessRunner,
@@ -800,6 +880,8 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
if (!stateCleaner.cleanupQueue.isEmpty()) {
try (Locker locker = Locker.locked(stateBackendLock)) {
stateCleaner.cleanupState(keyedStateInternals,
stateBackend::setCurrentKey);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to cleanup state.", e);
}
}
}
@@ -834,23 +916,27 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
@Override
public void setForWindow(InputT input, BoundedWindow window) {
Preconditions.checkNotNull(input, "Null input passed to CleanupTimer");
- // make sure this fires after any window.maxTimestamp() timers
- Instant gcTime = LateDataUtils.garbageCollectionTime(window,
windowingStrategy).plus(1);
// needs to match the encoding in prepareStateBackend for state request
handler
final ByteBuffer key = FlinkKeyUtils.encodeKey(((KV) input).getKey(),
keyCoder);
// Ensure the state backend is not concurrently accessed by the state
requests
try (Locker locker = Locker.locked(stateBackendLock)) {
keyedStateBackend.setCurrentKey(key);
- timerInternals.setTimer(
- StateNamespaces.window(windowCoder, window),
- GC_TIMER_ID,
- "",
- gcTime,
- window.maxTimestamp(),
- TimeDomain.EVENT_TIME);
+ setCleanupTimer(window);
}
}
+ void setCleanupTimer(BoundedWindow window) {
+ // make sure this fires after any window.maxTimestamp() timers
+ Instant gcTime = LateDataUtils.garbageCollectionTime(window,
windowingStrategy).plus(1);
+ timerInternals.setTimer(
+ StateNamespaces.window(windowCoder, window),
+ GC_TIMER_ID,
+ "",
+ gcTime,
+ window.maxTimestamp(),
+ TimeDomain.EVENT_TIME);
+ }
+
@Override
public boolean isForWindow(
String timerId, BoundedWindow window, Instant timestamp, TimeDomain
timeDomain) {
@@ -866,34 +952,52 @@ public class ExecutableStageDoFnOperator<InputT, OutputT>
extends DoFnOperator<I
private final Coder windowCoder;
private final ArrayDeque<KV<ByteBuffer, BoundedWindow>> cleanupQueue;
private final Supplier<ByteBuffer> currentKeySupplier;
+ private final ThrowingFunction<Long, Boolean> hasPendingEventTimeTimers;
+ private final CleanupTimer cleanupTimer;
StateCleaner(
- List<String> userStateNames, Coder windowCoder, Supplier<ByteBuffer>
currentKeySupplier) {
+ List<String> userStateNames,
+ Coder windowCoder,
+ Supplier<ByteBuffer> currentKeySupplier,
+ ThrowingFunction<Long, Boolean> hasPendingEventTimeTimers,
+ CleanupTimer cleanupTimer) {
this.userStateNames = userStateNames;
this.windowCoder = windowCoder;
this.currentKeySupplier = currentKeySupplier;
+ this.hasPendingEventTimeTimers = hasPendingEventTimeTimers;
+ this.cleanupTimer = cleanupTimer;
this.cleanupQueue = new ArrayDeque<>();
}
@Override
public void clearForWindow(BoundedWindow window) {
+ // Delay cleanup until the end of the bundle to allow stateful
processing and new timers.
// Executed in the context of onTimer(..) where the correct key will be
set
cleanupQueue.add(KV.of(currentKeySupplier.get(), window));
}
@SuppressWarnings("ByteBufferBackingArray")
- void cleanupState(StateInternals stateInternals, Consumer<ByteBuffer>
keyContextConsumer) {
+ void cleanupState(StateInternals stateInternals, Consumer<ByteBuffer>
keyContextConsumer)
+ throws Exception {
while (!cleanupQueue.isEmpty()) {
- KV<ByteBuffer, BoundedWindow> kv = cleanupQueue.remove();
- if (LOG.isDebugEnabled()) {
- LOG.debug("State cleanup for {} {}",
Arrays.toString(kv.getKey().array()), kv.getValue());
- }
+ KV<ByteBuffer, BoundedWindow> kv =
Preconditions.checkNotNull(cleanupQueue.remove());
+ BoundedWindow window = Preconditions.checkNotNull(kv.getValue());
keyContextConsumer.accept(kv.getKey());
- for (String userState : userStateNames) {
- StateNamespace namespace = StateNamespaces.window(windowCoder,
kv.getValue());
- StateTag<BagState<Void>> bagStateStateTag = StateTags.bag(userState,
VoidCoder.of());
- BagState<?> state = stateInternals.state(namespace,
bagStateStateTag);
- state.clear();
+ // Check whether we have pending timers which were set during the
bundle.
+ if
(hasPendingEventTimeTimers.apply(window.maxTimestamp().getMillis())) {
+ // Re-add GC timer and let remaining timers fire. Don't cleanup
state yet.
+ cleanupTimer.setCleanupTimer(window);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("State cleanup for {} {}",
Arrays.toString(kv.getKey().array()), window);
+ }
+ // No more timers (finally!). Time to clean up.
+ for (String userState : userStateNames) {
+ StateNamespace namespace = StateNamespaces.window(windowCoder,
window);
+ StateTag<BagState<Void>> bagStateStateTag =
StateTags.bag(userState, VoidCoder.of());
+ BagState<?> state = stateInternals.state(namespace,
bagStateStateTag);
+ state.clear();
+ }
}
}
}
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
index b3c905c..c93a04f 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
@@ -53,6 +53,8 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Flink operator for executing splittable {@link DoFn DoFns}. Specifically,
for executing the
@@ -61,6 +63,8 @@ import org.joda.time.Instant;
public class SplittableDoFnOperator<InputT, OutputT, RestrictionT>
extends DoFnOperator<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>,
OutputT> {
+ private static final Logger LOG =
LoggerFactory.getLogger(SplittableDoFnOperator.class);
+
private transient ScheduledExecutorService executorService;
public SplittableDoFnOperator(
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
index 6d37fbc..c1ef358 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
@@ -66,7 +66,7 @@ import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsIterableContaining;
-import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -291,8 +291,9 @@ public class FlinkSavepointTest implements Serializable {
if (isPortablePipeline) {
key =
pipeline
- .apply(Impulse.create())
+ .apply("ImpulseStage", Impulse.create())
.apply(
+ "KvMapperStage",
MapElements.via(
new InferableFunction<byte[], KV<String, Void>>() {
@Override
@@ -304,6 +305,7 @@ public class FlinkSavepointTest implements Serializable {
}
}))
.apply(
+ "TimerStage",
ParDo.of(
new DoFn<KV<String, Void>, KV<String, Long>>() {
@StateId("nextInteger")
@@ -311,14 +313,12 @@ public class FlinkSavepointTest implements Serializable {
StateSpecs.value();
@TimerId("timer")
- private final TimerSpec timer =
- TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+ private final TimerSpec timer =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void processElement(
ProcessContext context, @TimerId("timer") Timer
timer) {
-
- timer.offset(Duration.ZERO).setRelative();
+ timer.set(new Instant(0));
}
@OnTimer("timer")
@@ -327,20 +327,20 @@ public class FlinkSavepointTest implements Serializable {
@StateId("nextInteger") ValueState<Long>
nextInteger,
@TimerId("timer") Timer timer) {
Long current = nextInteger.read();
- if (current == null) {
- current = -1L;
- }
- long next = current + 1;
- nextInteger.write(next);
- context.output(KV.of("key", next));
- timer.offset(Duration.millis(100)).setRelative();
+ current = current != null ? current : 0L;
+ context.output(KV.of("key", current));
+ LOG.debug("triggering timer {}", current);
+ nextInteger.write(current + 1);
+ // Trigger timer again and continue to hold back the
watermark
+ timer.withOutputTimestamp(new
Instant(0)).set(context.fireTimestamp());
}
}));
} else {
key =
pipeline
- .apply(GenerateSequence.from(0))
+ .apply("IdGeneratorStage", GenerateSequence.from(0))
.apply(
+ "KvMapperStage",
ParDo.of(
new DoFn<Long, KV<String, Long>>() {
@ProcessElement
@@ -351,6 +351,7 @@ public class FlinkSavepointTest implements Serializable {
}
if (restored) {
return key.apply(
+ "VerificationStage",
ParDo.of(
new DoFn<KV<String, Long>, String>() {
@@ -372,6 +373,7 @@ public class FlinkSavepointTest implements Serializable {
}));
} else {
return key.apply(
+ "VerificationStage",
ParDo.of(
new DoFn<KV<String, Long>, String>() {
@@ -386,12 +388,14 @@ public class FlinkSavepointTest implements Serializable {
ProcessContext context,
@StateId("valueState") ValueState<Integer> intValueState,
@StateId("bagState") BagState<Integer> intBagState) {
- Long value =
Objects.requireNonNull(context.element().getValue());
+ long value =
Objects.requireNonNull(context.element().getValue());
+ LOG.debug("value: {} timestamp: {}", value,
context.timestamp().getMillis());
if (value == 0L) {
intValueState.write(42);
intBagState.add(40);
intBagState.add(1);
intBagState.add(1);
+ } else if (value >= 1) {
oneShotLatch.countDown();
}
}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
index b761c01..1030632 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
@@ -17,13 +17,14 @@
*/
package org.apache.beam.runners.flink;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
@@ -33,7 +34,6 @@ import
org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.state.StateSpec;
@@ -66,6 +66,9 @@ import org.slf4j.LoggerFactory;
/**
* Tests the state and timer integration of {@link
*
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator}.
+ *
+ * <p>The test sets the same timers multiple times per key. This tests that
only the latest version
+ * of a given timer is run.
*/
@RunWith(Parameterized.class)
public class PortableTimersExecutionTest implements Serializable {
@@ -100,12 +103,13 @@ public class PortableTimersExecutionTest implements
Serializable {
@Test(timeout = 120_000)
public void testTimerExecution() throws Exception {
- PipelineOptions options =
PipelineOptionsFactory.fromArgs("--experiments=beam_fn_api").create();
+ FlinkPipelineOptions options =
+
PipelineOptionsFactory.fromArgs("--experiments=beam_fn_api").as(FlinkPipelineOptions.class);
options.setRunner(CrashingRunner.class);
- options.as(FlinkPipelineOptions.class).setFlinkMaster("[local]");
- options.as(FlinkPipelineOptions.class).setStreaming(isStreaming);
- options.as(FlinkPipelineOptions.class).setParallelism(2);
-
options.as(FlinkPipelineOptions.class).setShutdownSourcesOnFinalWatermark(true);
+ options.setFlinkMaster("[local]");
+ options.setStreaming(isStreaming);
+ options.setParallelism(2);
+ options.setShutdownSourcesOnFinalWatermark(true);
options
.as(PortablePipelineOptions.class)
.setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
@@ -116,6 +120,7 @@ public class PortableTimersExecutionTest implements
Serializable {
final int timerOutput = 4093;
// Enough keys that we exercise interesting code paths
int numKeys = 50;
+ int numDuplicateTimers = 15;
List<KV<String, Integer>> input = new ArrayList<>();
List<KV<String, Integer>> expectedOutput = new ArrayList<>();
@@ -123,7 +128,7 @@ public class PortableTimersExecutionTest implements
Serializable {
// Each key should have just one final output at GC time
expectedOutput.add(KV.of(key.toString(), timerOutput));
- for (int i = 0; i < 15; ++i) {
+ for (int i = 0; i < numDuplicateTimers; ++i) {
// Each input should be output with the offset added
input.add(KV.of(key.toString(), i));
expectedOutput.add(KV.of(key.toString(), i + offset));
@@ -167,13 +172,18 @@ public class PortableTimersExecutionTest implements
Serializable {
@OnTimer(timerId)
public void onTimer(
@StateId(stateId) ValueState<String> state,
OutputReceiver<KV<String, Integer>> r) {
- r.output(KV.of(state.read(), timerOutput));
+ String read = Objects.requireNonNull(state.read(), "State must not
be null");
+ KV<String, Integer> of = KV.of(read, timerOutput);
+ r.output(of);
}
};
final Pipeline pipeline = Pipeline.create(options);
PCollection<KV<String, Integer>> output =
-
pipeline.apply(Impulse.create()).apply(ParDo.of(inputFn)).apply(ParDo.of(testFn));
+ pipeline
+ .apply("Impulse", Impulse.create())
+ .apply("Input", ParDo.of(inputFn))
+ .apply("Timers", ParDo.of(testFn));
PAssert.that(output).containsInAnyOrder(expectedOutput);
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
@@ -185,9 +195,8 @@ public class PortableTimersExecutionTest implements
Serializable {
"none",
flinkJobExecutor,
pipelineProto,
- options.as(FlinkPipelineOptions.class),
- new FlinkPipelineRunner(
- options.as(FlinkPipelineOptions.class), null,
Collections.emptyList()));
+ options,
+ new FlinkPipelineRunner(options, null,
Collections.emptyList()));
jobInvocation.start();
while (jobInvocation.getState() != Enum.DONE) {
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunctionTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunctionTest.java
index 85d3cfa..e45cc1f 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunctionTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ImpulseSourceFunctionTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@@ -75,8 +76,9 @@ public class ImpulseSourceFunctionTest {
source.run(sourceContext);
// 2) Should use checkpoint lock
verify(sourceContext).getCheckpointLock();
- // 3) Should emit impulse element
+ // 3) Should emit impulse element and the final watermark
verify(sourceContext).collect(argThat(elementMatcher));
+ verify(sourceContext).emitWatermark(Watermark.MAX_WATERMARK);
verifyNoMoreInteractions(sourceContext);
// 4) Should modify checkpoint state
verify(mockListState).get();
@@ -93,11 +95,13 @@ public class ImpulseSourceFunctionTest {
// 1) Should finish
source.run(sourceContext);
- // 2) Should _not_ emit impulse element
- verifyNoMoreInteractions(sourceContext);
- // 3) Should keep checkpoint state
+ // 2) Should keep checkpoint state
verify(mockListState).get();
verifyNoMoreInteractions(mockListState);
+ // 3) Should always emit the final watermark
+ verify(sourceContext).emitWatermark(Watermark.MAX_WATERMARK);
+ // 4) Should _not_ emit impulse element
+ verifyNoMoreInteractions(sourceContext);
}
@Test(timeout = 10_000)
@@ -128,6 +132,7 @@ public class ImpulseSourceFunctionTest {
sourceThread.join();
}
verify(sourceContext).collect(argThat(elementMatcher));
+ verify(sourceContext).emitWatermark(Watermark.MAX_WATERMARK);
verify(mockListState).add(true);
verify(mockListState).get();
verifyNoMoreInteractions(mockListState);
@@ -162,7 +167,9 @@ public class ImpulseSourceFunctionTest {
sourceThread.interrupt();
sourceThread.join();
- // nothing should have been emitted because the impulse was emitted before
restore
+ // Should always emit the final watermark
+ verify(sourceContext).emitWatermark(Watermark.MAX_WATERMARK);
+ // no element should have been emitted because the impulse was emitted
before restore
verifyNoMoreInteractions(sourceContext);
}
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index 75fc6f9..c569045 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -250,6 +250,7 @@ public class DoFnOperatorTest {
public void testWatermarkContract() throws Exception {
final Instant timerTimestamp = new Instant(1000);
+ final Instant timerOutputTimestamp = timerTimestamp.minus(1);
final String eventTimeMessage = "Event timer fired: ";
final String processingTimeMessage = "Processing timer fired";
@@ -279,7 +280,7 @@ public class DoFnOperatorTest {
@TimerId(processingTimerId) Timer processingTimer) {
eventTimer.set(timerTimestamp);
eventTimerWithOutputTimestamp
- .withOutputTimestamp(timerTimestamp.minus(1))
+ .withOutputTimestamp(timerOutputTimestamp)
.set(timerTimestamp);
processingTimer.offset(Duration.millis(timerTimestamp.getMillis())).setRelative();
}
@@ -365,7 +366,7 @@ public class DoFnOperatorTest {
assertThat(stripStreamRecordFromWindowedValue(testHarness.getOutput()),
emptyIterable());
assertThat(
doFnOperator.timerInternals.getMinOutputTimestampMs(),
- is(timerTimestamp.minus(1).getMillis()));
+ is(timerOutputTimestamp.getMillis()));
// this must fire the event timers
testHarness.processWatermark(timerTimestamp.getMillis() + 1);
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
index 0a59091..02229dc 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperatorTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.flink.translation.wrappers.streaming;
import static
org.apache.beam.runners.core.construction.PTransformTranslation.PAR_DO_TRANSFORM_URN;
+import static
org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
@@ -25,10 +26,8 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.iterableWithSize;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
@@ -47,6 +46,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
+import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -88,6 +88,7 @@ import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -377,10 +378,7 @@ public class ExecutableStageDoFnOperatorTest {
testHarness.close(); // triggers finish bundle
- assertThat(
- testHarness.getOutput(),
- contains(
- new StreamRecord<>(three), new Watermark(watermark), new
Watermark(Long.MAX_VALUE)));
+ assertThat(stripStreamRecordFromWindowedValue(testHarness.getOutput()),
contains(three));
assertThat(
testHarness.getSideOutput(tagsToOutputTags.get(additionalOutput1)),
@@ -392,6 +390,139 @@ public class ExecutableStageDoFnOperatorTest {
}
@Test
+ public void testWatermarkHandling() throws Exception {
+ TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
+ DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory
=
+ new DoFnOperator.MultiOutputOutputManagerFactory(mainOutput,
VoidCoder.of());
+ ExecutableStageDoFnOperator<KV<String, Integer>, Integer> operator =
+ getOperator(
+ mainOutput,
+ Collections.emptyList(),
+ outputManagerFactory,
+ WindowingStrategy.of(FixedWindows.of(Duration.millis(10))),
+ StringUtf8Coder.of(),
+ WindowedValue.getFullCoder(
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
IntervalWindow.getCoder()));
+
+ KeyedOneInputStreamOperatorTestHarness<
+ String, WindowedValue<KV<String, Integer>>, WindowedValue<Integer>>
+ testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ operator,
+ val -> val.getValue().getKey(),
+ new CoderTypeInformation<>(StringUtf8Coder.of()));
+ RemoteBundle bundle = Mockito.mock(RemoteBundle.class);
+ when(bundle.getInputReceivers())
+ .thenReturn(
+ ImmutableMap.<String, FnDataReceiver<WindowedValue>>builder()
+ .put("input", Mockito.mock(FnDataReceiver.class))
+ .build());
+ when(bundle.getTimerReceivers())
+ .thenReturn(
+ ImmutableMap.<KV<String, String>,
FnDataReceiver<WindowedValue>>builder()
+ .put(KV.of("transform", "timer"),
Mockito.mock(FnDataReceiver.class))
+ .put(KV.of("transform", "timer2"),
Mockito.mock(FnDataReceiver.class))
+ .put(KV.of("transform", "timer3"),
Mockito.mock(FnDataReceiver.class))
+ .build());
+ when(stageBundleFactory.getBundle(any(), any(), any(),
any())).thenReturn(bundle);
+
+ testHarness.open();
+ assertThat(
+ operator.getCurrentOutputWatermark(),
is(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()));
+
+ // No bundle has been started, watermark can be freely advanced
+ testHarness.processWatermark(0);
+ assertThat(operator.getCurrentOutputWatermark(), is(0L));
+
+ // Trigger a new bundle
+ IntervalWindow intervalWindow = new IntervalWindow(new Instant(0), new
Instant(9));
+ WindowedValue<KV<String, Integer>> windowedValue =
+ WindowedValue.of(KV.of("one", 1), Instant.now(), intervalWindow,
PaneInfo.NO_FIRING);
+ testHarness.processElement(new StreamRecord<>(windowedValue));
+
+ // The output watermark should be held back during the bundle
+ testHarness.processWatermark(1);
+ assertThat(operator.getEffectiveInputWatermark(), is(1L));
+ assertThat(operator.getCurrentOutputWatermark(), is(0L));
+
+ // After the bundle has been finished, the watermark should be advanced
+ operator.invokeFinishBundle();
+ assertThat(operator.getCurrentOutputWatermark(), is(1L));
+
+ // Bundle finished, watermark can be freely advanced
+ testHarness.processWatermark(2);
+ assertThat(operator.getEffectiveInputWatermark(), is(2L));
+ assertThat(operator.getCurrentOutputWatermark(), is(2L));
+
+ // Trigger a new bundle
+ testHarness.processElement(new StreamRecord<>(windowedValue));
+ assertThat(testHarness.numEventTimeTimers(), is(1)); // cleanup timer
+
+ // Set at timer
+ Instant timerTarget = new Instant(5);
+ Instant timerTarget2 = new Instant(6);
+ operator.getLockToAcquireForStateAccessDuringBundles().lock();
+
+ BiConsumer<String, Instant> timerConsumer =
+ (timerId, timestamp) ->
+ operator.setTimer(
+ Timer.of(
+ windowedValue.getValue().getKey(),
+ "",
+ windowedValue.getWindows(),
+ timestamp,
+ timestamp,
+ PaneInfo.NO_FIRING),
+ TimerInternals.TimerData.of(
+ TimerReceiverFactory.encodeToTimerDataTimerId("transform",
timerId),
+ "",
+ StateNamespaces.window(IntervalWindow.getCoder(),
intervalWindow),
+ timestamp,
+ timestamp,
+ TimeDomain.EVENT_TIME));
+
+ timerConsumer.accept("timer", timerTarget);
+ timerConsumer.accept("timer2", timerTarget2);
+ assertThat(testHarness.numEventTimeTimers(), is(3));
+
+ // Advance input watermark past the timer
+ // Check the output watermark is held back
+ long targetWatermark = timerTarget.getMillis() + 100;
+ testHarness.processWatermark(targetWatermark);
+ // Do not yet advance the output watermark because we are still processing
a bundle
+ assertThat(testHarness.numEventTimeTimers(), is(3));
+ assertThat(operator.getCurrentOutputWatermark(), is(2L));
+
+ // Check that the timers are fired but the output watermark is advanced no
further than
+ // the minimum timer timestamp of the previous bundle because we are still
processing a
+ // bundle which might contain more timers.
+ // Timers can create loops if they keep rescheduling themselves when firing
+ // Thus, we advance the watermark asynchronously to allow for
checkpointing to run
+ operator.invokeFinishBundle();
+ assertThat(testHarness.numEventTimeTimers(), is(3));
+ testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
+ assertThat(testHarness.numEventTimeTimers(), is(0));
+ assertThat(operator.getCurrentOutputWatermark(), is(5L));
+
+ // Output watermark is advanced synchronously when the bundle finishes,
+ // no more timers are scheduled
+ operator.invokeFinishBundle();
+ assertThat(operator.getCurrentOutputWatermark(), is(targetWatermark));
+ assertThat(testHarness.numEventTimeTimers(), is(0));
+
+ // Watermark is advanced in a blocking fashion on close, not via a timers
+ // Create a bundle with a pending timer to simulate that
+ testHarness.processElement(new StreamRecord<>(windowedValue));
+ timerConsumer.accept("timer3", new Instant(targetWatermark));
+ assertThat(testHarness.numEventTimeTimers(), is(1));
+
+ // This should be blocking until the watermark reaches Long.MAX_VALUE.
+ testHarness.close();
+ assertThat(testHarness.numEventTimeTimers(), is(0));
+ assertThat(operator.getCurrentOutputWatermark(), is(Long.MAX_VALUE));
+ }
+
+ @Test
public void testStageBundleClosed() throws Exception {
TupleTag<Integer> mainOutput = new TupleTag<>("main-output");
DoFnOperator.MultiOutputOutputManagerFactory<Integer> outputManagerFactory
=
@@ -492,7 +623,7 @@ public class ExecutableStageDoFnOperatorTest {
}
@Test
- public void testEnsureStateCleanupWithKeyedInputStateCleaner() {
+ public void testEnsureStateCleanupWithKeyedInputStateCleaner() throws
Exception {
GlobalWindow.Coder windowCoder = GlobalWindow.Coder.INSTANCE;
InMemoryStateInternals<String> stateInternals =
InMemoryStateInternals.forKey("key");
List<String> userStateNames = ImmutableList.of("state1", "state2");
@@ -514,13 +645,13 @@ public class ExecutableStageDoFnOperatorTest {
// Test that state is cleaned up correctly
ExecutableStageDoFnOperator.StateCleaner stateCleaner =
new ExecutableStageDoFnOperator.StateCleaner(
- userStateNames, windowCoder, () -> key.getValue());
+ userStateNames, windowCoder, key::getValue, ts -> false, null);
for (BagState<String> bagState : bagStates) {
assertThat(Iterables.size(bagState.read()), is(1));
}
stateCleaner.clearForWindow(GlobalWindow.INSTANCE);
- stateCleaner.cleanupState(stateInternals, (k) -> key.setValue(k));
+ stateCleaner.cleanupState(stateInternals, key::setValue);
for (BagState<String> bagState : bagStates) {
assertThat(Iterables.size(bagState.read()), is(0));
@@ -530,6 +661,10 @@ public class ExecutableStageDoFnOperatorTest {
@Test
public void testEnsureDeferredStateCleanupTimerFiring() throws Exception {
testEnsureDeferredStateCleanupTimerFiring(false);
+ }
+
+ @Test
+ public void testEnsureDeferredStateCleanupTimerFiringWithCheckpointing()
throws Exception {
testEnsureDeferredStateCleanupTimerFiring(true);
}
@@ -561,7 +696,7 @@ public class ExecutableStageDoFnOperatorTest {
AtomicBoolean timerInputReceived = new AtomicBoolean();
IntervalWindow window = new IntervalWindow(new Instant(0), new
Instant(1000));
IntervalWindow.IntervalWindowCoder windowCoder =
IntervalWindow.IntervalWindowCoder.of();
- WindowedValue<KV<String, Integer>> one =
+ WindowedValue<KV<String, Integer>> windowedValue =
WindowedValue.of(
KV.of("one", 1), window.maxTimestamp(), ImmutableList.of(window),
PaneInfo.NO_FIRING);
@@ -579,36 +714,37 @@ public class ExecutableStageDoFnOperatorTest {
when(bundle.getTimerReceivers()).thenReturn(ImmutableMap.of(timerInputKey,
timerReceiver));
KeyedOneInputStreamOperatorTestHarness<
- String, WindowedValue<KV<String, Integer>>,
WindowedValue<KV<String, Integer>>>
+ ByteBuffer, WindowedValue<KV<String, Integer>>,
WindowedValue<Integer>>
testHarness =
new KeyedOneInputStreamOperatorTestHarness(
- operator, val -> val, new CoderTypeInformation<>(keyCoder));
+ operator,
+ operator.keySelector,
+ new
CoderTypeInformation<>(FlinkKeyUtils.ByteBufferCoder.of()));
testHarness.open();
- Lock stateBackendLock = (Lock) Whitebox.getInternalState(operator,
"stateBackendLock");
+ Lock stateBackendLock = Whitebox.getInternalState(operator,
"stateBackendLock");
stateBackendLock.lock();
KeyedStateBackend<ByteBuffer> keyedStateBackend =
operator.getKeyedStateBackend();
- ByteBuffer key = FlinkKeyUtils.encodeKey(one.getValue().getKey(),
keyCoder);
+ ByteBuffer key =
FlinkKeyUtils.encodeKey(windowedValue.getValue().getKey(), keyCoder);
keyedStateBackend.setCurrentKey(key);
DoFnOperator.FlinkTimerInternals timerInternals =
- (DoFnOperator.FlinkTimerInternals) Whitebox.getInternalState(operator,
"timerInternals");
+ Whitebox.getInternalState(operator, "timerInternals");
Object doFnRunner = Whitebox.getInternalState(operator, "doFnRunner");
Object delegate = Whitebox.getInternalState(doFnRunner, "delegate");
Object stateCleaner = Whitebox.getInternalState(delegate, "stateCleaner");
- Collection<?> cleanupTimers =
- (Collection) Whitebox.getInternalState(stateCleaner, "cleanupQueue");
+ Collection<?> cleanupQueue = Whitebox.getInternalState(stateCleaner,
"cleanupQueue");
// create some state which can be cleaned up
assertThat(testHarness.numKeyedStateEntries(), is(0));
StateNamespace stateNamespace = StateNamespaces.window(windowCoder,
window);
- BagState<String> state =
+ BagState<ByteString> state = // State from the SDK Harness is stored as
ByteStrings
operator.keyedStateInternals.state(
- stateNamespace, StateTags.bag(stateId, StringUtf8Coder.of()));
- state.add("testUserState");
+ stateNamespace, StateTags.bag(stateId, ByteStringCoder.of()));
+ state.add(ByteString.copyFrom("userstate".getBytes(Charsets.UTF_8)));
assertThat(testHarness.numKeyedStateEntries(), is(1));
// user timer that fires after the end of the window and after state
cleanup
@@ -617,51 +753,84 @@ public class ExecutableStageDoFnOperatorTest {
TimerReceiverFactory.encodeToTimerDataTimerId(
timerInputKey.getKey(), timerInputKey.getValue()),
stateNamespace,
- window.maxTimestamp().plus(1),
+ window.maxTimestamp(),
TimeDomain.EVENT_TIME);
timerInternals.setTimer(userTimer);
// start of bundle
- testHarness.processElement(new StreamRecord<>(one));
- verify(receiver).accept(one);
+ testHarness.processElement(new StreamRecord<>(windowedValue));
+ verify(receiver).accept(windowedValue);
- // move watermark past cleanup and user timer while bundle in progress
- operator.processWatermark(new
Watermark(window.maxTimestamp().plus(2).getMillis()));
+ // move watermark past user timer while bundle is in progress
+ testHarness.processWatermark(new
Watermark(window.maxTimestamp().plus(1).getMillis()));
- // due to watermark hold the timers won't fire at this point
- assertFalse("Watermark must be held back until bundle is complete.",
timerInputReceived.get());
- assertThat(cleanupTimers, hasSize(0));
+ // Output watermark is held back and timers do not yet fire (they can
still be changed!)
+ assertThat(timerInputReceived.get(), is(false));
+ assertThat(
+ operator.getCurrentOutputWatermark(),
is(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()));
+ // The timer fires on bundle finish
+ operator.invokeFinishBundle();
+ assertThat(timerInputReceived.getAndSet(false), is(true));
- if (withCheckpointing) {
- // Upon checkpointing, the bundle is finished and the watermark advances;
- // timers can fire. Note: The bundle is ensured to be finished.
- testHarness.snapshot(0, 0);
+ // Move watermark past the cleanup timer
+ testHarness.processWatermark(new
Watermark(window.maxTimestamp().plus(2).getMillis()));
+ operator.invokeFinishBundle();
- // The user timer was scheduled to fire after cleanup, but executes first
- assertTrue("Timer should have been triggered.",
timerInputReceived.get());
- // Cleanup will be executed after the bundle is complete
- assertThat(cleanupTimers, hasSize(0));
- verifyNoMoreInteractions(receiver);
- } else {
- // Upon finishing a bundle, the watermark advances; timers can fire.
- // Note that this will finish the current bundle, but will also start a
new one
- // when timers fire as part of advancing the watermark
- operator.invokeFinishBundle();
+ // Cleanup timer has fired and cleanup queue is prepared for bundle finish
+ assertThat(testHarness.numEventTimeTimers(), is(0));
+ assertThat(testHarness.numKeyedStateEntries(), is(1));
+ assertThat(cleanupQueue, hasSize(1));
- // The user timer was scheduled to fire after cleanup, but executes first
- assertTrue("Timer should have been triggered.",
timerInputReceived.get());
- // Cleanup will be executed after the bundle is complete
- assertThat(cleanupTimers, hasSize(1));
- verifyNoMoreInteractions(receiver);
+ // Cleanup timer are rescheduled if a new timer is created during the
bundle
+ TimerInternals.TimerData userTimer2 =
+ TimerInternals.TimerData.of(
+ TimerReceiverFactory.encodeToTimerDataTimerId(
+ timerInputKey.getKey(), timerInputKey.getValue()),
+ stateNamespace,
+ window.maxTimestamp(),
+ TimeDomain.EVENT_TIME);
+ operator.setTimer(
+ Timer.of(
+ windowedValue.getValue().getKey(),
+ "",
+ windowedValue.getWindows(),
+ window.maxTimestamp(),
+ window.maxTimestamp(),
+ PaneInfo.NO_FIRING),
+ userTimer2);
+ assertThat(testHarness.numEventTimeTimers(), is(1));
- // Finish bundle which has been started by finishing the bundle
+ if (withCheckpointing) {
+ // Upon checkpointing, the bundle will be finished.
+ testHarness.snapshot(0, 0);
+ } else {
operator.invokeFinishBundle();
- assertThat(cleanupTimers, hasSize(0));
}
+ // Cleanup queue has been processed and cleanup timer has been re-added
due to pending timers
+ // for the window.
+ assertThat(cleanupQueue, hasSize(0));
+ verifyNoMoreInteractions(receiver);
+ assertThat(testHarness.numKeyedStateEntries(), is(2));
+ assertThat(testHarness.numEventTimeTimers(), is(2));
+
+ // No timer has been fired but bundle should be ended
+ assertThat(timerInputReceived.get(), is(false));
+ assertThat(Whitebox.getInternalState(operator, "bundleStarted"),
is(false));
+
+ // Allow user timer and cleanup timer to fire by triggering watermark
advancement
+ testHarness.setProcessingTime(testHarness.getProcessingTime() + 1);
+ assertThat(timerInputReceived.getAndSet(false), is(true));
+ assertThat(cleanupQueue, hasSize(1));
+
+ // Cleanup will be executed after the bundle is complete because there are
no more pending
+ // timers for the window
+ operator.invokeFinishBundle();
+ assertThat(cleanupQueue, hasSize(0));
assertThat(testHarness.numKeyedStateEntries(), is(0));
testHarness.close();
+ verifyNoMoreInteractions(receiver);
}
@Test
@@ -851,7 +1020,8 @@ public class ExecutableStageDoFnOperatorTest {
* #runtimeContext}. The context factory is mocked to return {@link
#stageContext} every time. The
* behavior of the stage context itself is unchanged.
*/
- private ExecutableStageDoFnOperator<Integer, Integer> getOperator(
+ @SuppressWarnings("rawtypes")
+ private ExecutableStageDoFnOperator getOperator(
TupleTag<Integer> mainOutput,
List<TupleTag<?>> additionalOutputs,
DoFnOperator.MultiOutputOutputManagerFactory<Integer>
outputManagerFactory) {
@@ -864,7 +1034,8 @@ public class ExecutableStageDoFnOperatorTest {
WindowedValue.getFullCoder(StringUtf8Coder.of(),
GlobalWindow.Coder.INSTANCE));
}
- private ExecutableStageDoFnOperator<Integer, Integer> getOperator(
+ @SuppressWarnings("rawtypes")
+ private ExecutableStageDoFnOperator getOperator(
TupleTag<Integer> mainOutput,
List<TupleTag<?>> additionalOutputs,
DoFnOperator.MultiOutputOutputManagerFactory<Integer>
outputManagerFactory,
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
index fdb84c4..d552f5e 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
@@ -152,7 +152,7 @@ public class WindowDoFnOperatorTest {
assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(1));
assertThat(testHarness.numKeyedStateEntries(), is(6));
- assertThat(windowDoFnOperator.currentOutputWatermark, is(1L));
+ assertThat(windowDoFnOperator.getCurrentOutputWatermark(), is(1L));
assertThat(timerInternals.getMinOutputTimestampMs(), is(Long.MAX_VALUE));
// close window
@@ -162,7 +162,7 @@ public class WindowDoFnOperatorTest {
assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(0));
assertThat(testHarness.numKeyedStateEntries(), is(3));
- assertThat(windowDoFnOperator.currentOutputWatermark, is(100L));
+ assertThat(windowDoFnOperator.getCurrentOutputWatermark(), is(100L));
assertThat(timerInternals.getMinOutputTimestampMs(), is(Long.MAX_VALUE));
testHarness.processWatermark(200L);