[ 
https://issues.apache.org/jira/browse/BEAM-6116?focusedWorklogId=169002&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-169002
 ]

ASF GitHub Bot logged work on BEAM-6116:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Nov/18 09:59
            Start Date: 23/Nov/18 09:59
    Worklog Time Spent: 10m 
      Work Description: mxm closed pull request #7124: [BEAM-6116] [portable 
flink streaming] Emit pushed back watermark when bundle is complete
URL: https://github.com/apache/beam/pull/7124
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 20717023ef0..31ad20a9891 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -43,7 +43,6 @@
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.construction.Timer;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.core.construction.graph.TimerReference;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
 import 
org.apache.beam.runners.flink.translation.functions.FlinkStreamingSideInputHandlerFactory;
 import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
@@ -105,6 +104,7 @@
   private transient StageBundleFactory stageBundleFactory;
   private transient ExecutableStage executableStage;
   private transient SdkHarnessDoFnRunner<InputT, OutputT> sdkHarnessRunner;
+  private transient long backupWatermarkHold = Long.MIN_VALUE;
 
   public ExecutableStageDoFnOperator(
       String stepName,
@@ -353,7 +353,6 @@ protected void 
addSideInputValue(StreamRecord<RawUnionValue> streamRecord) {
             progressHandler,
             outputManager,
             outputMap,
-            executableStage.getTimers(),
             (Coder<BoundedWindow>) 
windowingStrategy.getWindowFn().windowCoder(),
             keySelector,
             timerInternals);
@@ -394,11 +393,16 @@ public void processWatermark(Watermark mark) throws 
Exception {
       } else {
         // It is not safe to advance the output watermark yet, so add a hold 
on the current
         // output watermark.
-        setPushedBackWatermark(Math.min(currentOutputWatermark, 
getPushbackWatermarkHold()));
+        backupWatermarkHold = Math.max(backupWatermarkHold, 
getPushbackWatermarkHold());
+        setPushedBackWatermark(Math.min(currentOutputWatermark, 
backupWatermarkHold));
         sdkHarnessRunner.setBundleFinishedCallback(
             () -> {
               try {
-                processWatermark(mark);
+                LOG.debug("processing pushed back watermark: {}", mark);
+                // at this point the bundle is finished, allow the watermark 
to pass
+                // we are restoring the previous hold in case it was already 
set for side inputs
+                setPushedBackWatermark(backupWatermarkHold);
+                super.processWatermark(mark);
               } catch (Exception e) {
                 throw new RuntimeException(
                     "Failed to process pushed back watermark after finished 
bundle.", e);
@@ -442,7 +446,6 @@ public SdkHarnessDoFnRunner(
         BundleProgressHandler progressHandler,
         BufferedOutputManager<OutputT> outputManager,
         Map<String, TupleTag<?>> outputMap,
-        Collection<TimerReference> timers,
         Coder<BoundedWindow> windowCoder,
         KeySelector<WindowedValue<InputT>, ?> keySelector,
         TimerInternals timerInternals) {
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
index cc282c530ed..c34dd08255f 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/ExecutableStageDoFnOperatorTest.java
@@ -19,6 +19,7 @@
 
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
@@ -273,13 +274,25 @@ public void close() {}
     OneInputStreamOperatorTestHarness<WindowedValue<Integer>, 
WindowedValue<Integer>> testHarness =
         new OneInputStreamOperatorTestHarness<>(operator);
 
+    long watermark = testHarness.getCurrentWatermark() + 1;
     testHarness.open();
+
     testHarness.processElement(new StreamRecord<>(zero));
+
+    testHarness.processWatermark(watermark);
+    watermark++;
+    testHarness.processWatermark(watermark);
+
+    assertEquals(watermark, testHarness.getCurrentWatermark());
+    // watermark hold until bundle complete
+    assertEquals(0, testHarness.getOutput().size());
+
     testHarness.close(); // triggers finish bundle
 
     assertThat(
         testHarness.getOutput(),
-        contains(new StreamRecord<>(three), new Watermark(Long.MAX_VALUE)));
+        contains(
+            new StreamRecord<>(three), new Watermark(watermark), new 
Watermark(Long.MAX_VALUE)));
 
     assertThat(
         testHarness.getSideOutput(tagsToOutputTags.get(additionalOutput1)),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 169002)
    Time Spent: 2h 10m  (was: 2h)

> Pushed back watermark never emitted in portable Flink runner
> ------------------------------------------------------------
>
>                 Key: BEAM-6116
>                 URL: https://issues.apache.org/jira/browse/BEAM-6116
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.9.0
>            Reporter: Thomas Weise
>            Assignee: Thomas Weise
>            Priority: Critical
>              Labels: portability-flink
>             Fix For: 2.9.0
>
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> The portable runner pushes back the watermark while the bundle processes in 
> the SDK worker. After the bundle is complete, that watermark needs to be 
> emitted. That never happens, because ExecutableStageDoFnOperator does not 
> clear the hold.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to