Repository: beam
Updated Branches:
  refs/heads/release-2.1.0 608de07e4 -> 1cf560b4b


[BEAM-2571] Clarify pushedback variable name in DoFnOperator


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5c4a95aa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5c4a95aa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5c4a95aa

Branch: refs/heads/release-2.1.0
Commit: 5c4a95aa63da23027b619a50928ebebe5beb05c2
Parents: 1686805
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Jul 12 14:39:58 2017 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Mon Jul 24 14:29:55 2017 +0200

----------------------------------------------------------------------
 .../flink/translation/wrappers/streaming/DoFnOperator.java     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5c4a95aa/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 751eba1..8da8de5 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -470,15 +470,15 @@ public class DoFnOperator<InputT, OutputT>
       setCurrentInputWatermark(mark.getTimestamp());
 
       // hold back by the pushed back values waiting for side inputs
-      long actualInputWatermark = Math.min(getPushbackWatermarkHold(), 
mark.getTimestamp());
+      long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), 
mark.getTimestamp());
 
-      timerService.advanceWatermark(actualInputWatermark);
+      timerService.advanceWatermark(pushedBackInputWatermark);
 
       Instant watermarkHold = stateInternals.watermarkHold();
 
       long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), 
getPushbackWatermarkHold());
 
-      long potentialOutputWatermark = Math.min(currentInputWatermark, 
combinedWatermarkHold);
+      long potentialOutputWatermark = Math.min(pushedBackInputWatermark, 
combinedWatermarkHold);
 
       if (potentialOutputWatermark > currentOutputWatermark) {
         setCurrentOutputWatermark(potentialOutputWatermark);

Reply via email to