[BEAM-2571] Clarify pushedback variable name in DoFnOperator

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b03c4f07
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b03c4f07
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b03c4f07

Branch: refs/heads/master
Commit: b03c4f0790fa639d739f7f3fdeaa4a703fadb8fa
Parents: 8449931
Author: Aljoscha Krettek <[email protected]>
Authored: Wed Jul 12 14:39:58 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Mon Jul 24 09:47:44 2017 +0200

----------------------------------------------------------------------
 .../flink/translation/wrappers/streaming/DoFnOperator.java     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b03c4f07/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index b1f3b86..3b234ac 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -470,15 +470,15 @@ public class DoFnOperator<InputT, OutputT>
       setCurrentInputWatermark(mark.getTimestamp());
 
       // hold back by the pushed back values waiting for side inputs
-      long actualInputWatermark = Math.min(getPushbackWatermarkHold(), 
mark.getTimestamp());
+      long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), 
mark.getTimestamp());
 
-      timerService.advanceWatermark(actualInputWatermark);
+      timerService.advanceWatermark(pushedBackInputWatermark);
 
       Instant watermarkHold = stateInternals.watermarkHold();
 
       long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), 
getPushbackWatermarkHold());
 
-      long potentialOutputWatermark = Math.min(currentInputWatermark, 
combinedWatermarkHold);
+      long potentialOutputWatermark = Math.min(pushedBackInputWatermark, 
combinedWatermarkHold);
 
       if (potentialOutputWatermark > currentOutputWatermark) {
         setCurrentOutputWatermark(potentialOutputWatermark);

Reply via email to