[
https://issues.apache.org/jira/browse/BEAM-6116?focusedWorklogId=169002&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-169002
]
ASF GitHub Bot logged work on BEAM-6116:
----------------------------------------
Author: ASF GitHub Bot
Created on: 23/Nov/18 09:59
Start Date: 23/Nov/18 09:59
Worklog Time Spent: 10m
Work Description: mxm closed pull request #7124: [BEAM-6116] [portable
flink streaming] Emit pushed back watermark when bundle is complete
URL: https://github.com/apache/beam/pull/7124
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 20717023ef0..31ad20a9891 100644
---
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -43,7 +43,6 @@
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.core.construction.graph.TimerReference;
import
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import
org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
@@ -105,6 +104,7 @@
private transient StageBundleFactory stageBundleFactory;
private transient ExecutableStage executableStage;
private transient SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner;
+ private transient long backupWatermarkHold = Long.MIN_VALUE;
public ExecutableStageDoFnOperator(
String stepName,
@@ -353,7 +353,6 @@ protected void
addSideInputValue(StreamRecord<RawUnionValue> streamRecord) {
progressHandler,
outputManager,
outputMap,
- executableStage.getTimers(),
(Coder<BoundedWindow>)
windowingStrategy.getWindowFn().windowCoder(),
keySelector,
timerInternals);
@@ -394,11 +393,16 @@ public void processWatermark(Watermark mark) throws
Exception {
} else {
// It is not safe to advance the output watermark yet, so add a hold
on the current
// output watermark.
- setPushedBackWatermark(Math.min(currentOutputWatermark,
getPushbackWatermarkHold()));
+ backupWatermarkHold = Math.max(backupWatermarkHold,
getPushbackWatermarkHold());
+ setPushedBackWatermark(Math.min(currentOutputWatermark,
backupWatermarkHold));
sdkHarnessRunner.setBundleFinishedCallback(
() -> {
try {
- processWatermark(mark);
+ LOG.debug("processing pushed back watermark: {}", mark);
+ // at this point the bundle is finished, allow the watermark
to pass
+ // we are restoring the previous hold in case it was already
set for side inputs
+ setPushedBackWatermark(backupWatermarkHold);
+ super.processWatermark(mark);
} catch (Exception e) {
throw new RuntimeException(
"Failed to process pushed back watermark after finished
bundle.", e);
@@ -442,7 +446,6 @@ public SdkHarnessDoFnRunner(
BundleProgressHandler progressHandler,
BufferedOutputManager<OutputT> outputManager,
Map<String, TupleTag<?>> outputMap,
- Collection<TimerReference> timers,
Coder<BoundedWindow> windowCoder,
KeySelector<WindowedValue<InputT>, ?> keySelector,
TimerInternals timerInternals) {
diff --git
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
index cc282c530ed..c34dd08255f 100644
---
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
+++
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
@@ -19,6 +19,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@@ -273,13 +274,25 @@ public void close() {}
OneInputStreamOperatorTestHarness<WindowedValue<Integer>,
WindowedValue<Integer>> testHarness =
new OneInputStreamOperatorTestHarness<>(operator);
+ long watermark = testHarness.getCurrentWatermark() + 1;
testHarness.open();
+
testHarness.processElement(new StreamRecord<>(zero));
+
+ testHarness.processWatermark(watermark);
+ watermark++;
+ testHarness.processWatermark(watermark);
+
+ assertEquals(watermark, testHarness.getCurrentWatermark());
+ // watermark hold until bundle complete
+ assertEquals(0, testHarness.getOutput().size());
+
testHarness.close(); // triggers finish bundle
assertThat(
testHarness.getOutput(),
- contains(new StreamRecord<>(three), new Watermark(Long.MAX_VALUE)));
+ contains(
+ new StreamRecord<>(three), new Watermark(watermark), new
Watermark(Long.MAX_VALUE)));
assertThat(
testHarness.getSideOutput(tagsToOutputTags.get(additionalOutput1)),
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 169002)
Time Spent: 2h 10m (was: 2h)
> Pushed back watermark never emitted in portable Flink runner
> ------------------------------------------------------------
>
> Key: BEAM-6116
> URL: https://issues.apache.org/jira/browse/BEAM-6116
> Project: Beam
> Issue Type: Bug
> Components: runner-flink
> Affects Versions: 2.9.0
> Reporter: Thomas Weise
> Assignee: Thomas Weise
> Priority: Critical
> Labels: portability-flink
> Fix For: 2.9.0
>
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> The portable runner pushes back the watermark while the bundle processes in
> the SDK worker. After the bundle is complete, that watermark needs to be
> emitted. That never happens, because ExecutableStageDoFnOperator does not
> clear the hold.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)