[BEAM-2571] Clarify pushedback variable name in DoFnOperator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b03c4f07 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b03c4f07 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b03c4f07 Branch: refs/heads/master Commit: b03c4f0790fa639d739f7f3fdeaa4a703fadb8fa Parents: 8449931 Author: Aljoscha Krettek <[email protected]> Authored: Wed Jul 12 14:39:58 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Jul 24 09:47:44 2017 +0200 ---------------------------------------------------------------------- .../flink/translation/wrappers/streaming/DoFnOperator.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b03c4f07/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index b1f3b86..3b234ac 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -470,15 +470,15 @@ public class DoFnOperator<InputT, OutputT> setCurrentInputWatermark(mark.getTimestamp()); // hold back by the pushed back values waiting for side inputs - long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp()); + long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp()); - timerService.advanceWatermark(actualInputWatermark); + timerService.advanceWatermark(pushedBackInputWatermark); Instant watermarkHold = stateInternals.watermarkHold(); long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold()); - long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold); + long potentialOutputWatermark = Math.min(pushedBackInputWatermark, combinedWatermarkHold); if (potentialOutputWatermark > currentOutputWatermark) { setCurrentOutputWatermark(potentialOutputWatermark);
