mxm commented on a change in pull request #11362:
URL: https://github.com/apache/beam/pull/11362#discussion_r413718706
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -658,46 +671,69 @@ public void processWatermark1(Watermark mark) throws
Exception {
emitAllPushedBackData();
}
- setCurrentInputWatermark(mark.getTimestamp());
+ currentInputWatermark = mark.getTimestamp();
- if (keyCoder == null) {
- long potentialOutputWatermark = Math.min(getPushbackWatermarkHold(),
currentInputWatermark);
- if (potentialOutputWatermark > currentOutputWatermark) {
- setCurrentOutputWatermark(potentialOutputWatermark);
- emitWatermark(currentOutputWatermark);
- }
- } else {
- // hold back by the pushed back values waiting for side inputs
- long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(),
mark.getTimestamp());
+ long inputWatermarkHold =
applyInputWatermarkHold(getEffectiveInputWatermark());
+ if (keyCoder != null) {
+ timeServiceManager.advanceWatermark(new Watermark(inputWatermarkHold));
+ }
- timeServiceManager.advanceWatermark(new
Watermark(pushedBackInputWatermark));
+ long potentialOutputWatermark =
+ applyOutputWatermarkHold(
+ currentOutputWatermark,
computeOutputWatermark(inputWatermarkHold));
+ maybeEmitWatermark(potentialOutputWatermark);
+ }
- Instant watermarkHold = keyedStateInternals.watermarkHold();
+ /**
+ * Allows to apply a hold to the input watermark. By default, just passes
the input watermark
+ * through.
+ */
+ public long applyInputWatermarkHold(long inputWatermark) {
+ return inputWatermark;
+ }
- long combinedWatermarkHold = Math.min(watermarkHold.getMillis(),
getPushbackWatermarkHold());
- combinedWatermarkHold =
- Math.min(combinedWatermarkHold,
timerInternals.getMinOutputTimestampMs());
- long potentialOutputWatermark = Math.min(pushedBackInputWatermark,
combinedWatermarkHold);
+ /**
+ * Allows to apply a hold to the output watermark before it is send out. By
default, just passes
+ * the potential output watermark through which will make it the new output
watermark.
+ *
+ * @param currentOutputWatermark the current output watermark
+ * @param potentialOutputWatermark The potential new output watermark which
can be adjusted, if
+ * needed. The input watermark hold has already been applied.
+ * @return The new output watermark which will be emitted.
+ */
+ public long applyOutputWatermarkHold(long currentOutputWatermark, long
potentialOutputWatermark) {
+ return potentialOutputWatermark;
+ }
- if (potentialOutputWatermark > currentOutputWatermark) {
- setCurrentOutputWatermark(potentialOutputWatermark);
- emitWatermark(currentOutputWatermark);
- }
+ private long computeOutputWatermark(long inputWatermarkHold) {
+ final long potentialOutputWatermark;
+ if (keyCoder == null) {
+ potentialOutputWatermark = inputWatermarkHold;
+ } else {
+ Instant watermarkHold = keyedStateInternals.watermarkHold();
+ long combinedWatermarkHold = Math.min(watermarkHold.getMillis(),
inputWatermarkHold);
+ potentialOutputWatermark =
+ Math.min(combinedWatermarkHold,
timerInternals.getMinOutputTimestampMs());
}
+ return potentialOutputWatermark;
}
- private void emitWatermark(long watermark) {
- // Must invoke finishBatch before emit the +Inf watermark otherwise there
are some late events.
- if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
- invokeFinishBundle();
+ private void maybeEmitWatermark(long watermark) {
+ if (watermark > currentOutputWatermark) {
+ // Must invoke finishBatch before emit the +Inf watermark otherwise
there are some late
+ // events.
+ if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ invokeFinishBundle();
+ }
+ LOG.debug("Emitting watermark {}", watermark);
+ currentOutputWatermark = watermark;
+ output.emitWatermark(new Watermark(watermark));
}
- output.emitWatermark(new Watermark(watermark));
}
@Override
- public void processWatermark2(Watermark mark) throws Exception {
-
- setCurrentSideInputWatermark(mark.getTimestamp());
+ public final void processWatermark2(Watermark mark) throws Exception {
+ currentSideInputWatermark = mark.getTimestamp();
Review comment:
Emitting ready-data side input data is handled when new side input data
arrives (see `processElement2`). This is a safe-guard because no more side
input data will arrive after the max watermark. Maybe side input processing
could be optimized based on the watermark but I'd tackle this independently of
the PR because the logic didn't change here.
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -544,30 +601,49 @@ public void processWatermark(Watermark mark) throws
Exception {
// every watermark. So we have implemented 2) below.
//
if (sdkHarnessRunner.isBundleInProgress()) {
- if (mark.getTimestamp() >=
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
- invokeFinishBundle();
- setPushedBackWatermark(Long.MAX_VALUE);
+ if (minEventTimeTimerTimestampInLastBundle < Long.MAX_VALUE) {
+ // We can safely advance the watermark to before the last bundle's
minimum event timer
+ // but not past the potential output watermark which includes holds to
the input watermark.
+ return Math.min(minEventTimeTimerTimestampInLastBundle - 1,
potentialOutputWatermark);
} else {
- // It is not safe to advance the output watermark yet, so add a hold
on the current
- // output watermark.
- backupWatermarkHold = Math.max(backupWatermarkHold,
getPushbackWatermarkHold());
- setPushedBackWatermark(Math.min(currentOutputWatermark,
backupWatermarkHold));
- super.setBundleFinishedCallback(
- () -> {
- try {
- LOG.debug("processing pushed back watermark: {}", mark);
- // at this point the bundle is finished, allow the watermark
to pass
- // we are restoring the previous hold in case it was already
set for side inputs
- setPushedBackWatermark(backupWatermarkHold);
- super.processWatermark(mark);
- } catch (Exception e) {
- throw new RuntimeException(
- "Failed to process pushed back watermark after finished
bundle.", e);
- }
- });
+ // We don't have any information yet, use the current output watermark
for now.
+ return currentOutputWatermark;
+ }
+ } else {
+ // No bundle was started when we advanced the input watermark.
+ // Thus, we can safely set a new output watermark.
+ return potentialOutputWatermark;
+ }
+ }
+
+ private void preBundleStartCallback() {
+ inputWatermarkBeforeBundleStart = getEffectiveInputWatermark();
+ }
+
+ @SuppressWarnings("FutureReturnValueIgnored")
+ private void finishBundleCallback() {
+ minEventTimeTimerTimestampInLastBundle =
minEventTimeTimerTimestampInCurrentBundle;
+ minEventTimeTimerTimestampInCurrentBundle = Long.MAX_VALUE;
+ try {
+ if (!closed
+ && minEventTimeTimerTimestampInLastBundle < Long.MAX_VALUE
+ && minEventTimeTimerTimestampInLastBundle <=
getEffectiveInputWatermark()) {
+ ProcessingTimeService processingTimeService =
getProcessingTimeService();
+ // We are scheduling a timer for advancing the watermark. Otherwise we
+ // could potentially loop forever here when a timer keeps scheduling a
timer
+ // for the same timestamp. This in itself would not be an issue.
However,
Review comment:
Sure. Changed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]