Repository: beam Updated Branches: refs/heads/release-2.1.0 608de07e4 -> 1cf560b4b
[BEAM-2571] Clarify pushedback variable name in DoFnOperator Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5c4a95aa Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5c4a95aa Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5c4a95aa Branch: refs/heads/release-2.1.0 Commit: 5c4a95aa63da23027b619a50928ebebe5beb05c2 Parents: 1686805 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Jul 12 14:39:58 2017 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Mon Jul 24 14:29:55 2017 +0200 ---------------------------------------------------------------------- .../flink/translation/wrappers/streaming/DoFnOperator.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5c4a95aa/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 751eba1..8da8de5 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -470,15 +470,15 @@ public class DoFnOperator<InputT, OutputT> setCurrentInputWatermark(mark.getTimestamp()); // hold back by the pushed back values waiting for side inputs - long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp()); + long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp()); - timerService.advanceWatermark(actualInputWatermark); + timerService.advanceWatermark(pushedBackInputWatermark); Instant watermarkHold = stateInternals.watermarkHold(); long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold()); - long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold); + long potentialOutputWatermark = Math.min(pushedBackInputWatermark, combinedWatermarkHold); if (potentialOutputWatermark > currentOutputWatermark) { setCurrentOutputWatermark(potentialOutputWatermark);