ReduceFnRunner.onTrigger: skip storeCurrentPaneInfo() if trigger isFinished.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd886300 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd886300 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd886300 Branch: refs/heads/gearpump-runner Commit: cd886300719ac9d702fbe7b105b09bdc5bbe0d3b Parents: 3746d4c Author: Author: æ³¢ç¹ <[email protected]> Authored: Fri May 26 17:40:27 2017 +0800 Committer: Pei He <[email protected]> Committed: Wed Jun 21 14:56:58 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/runners/core/ReduceFnRunner.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/cd886300/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 62d519f..b5c3e3e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -948,7 +948,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { private Instant onTrigger( final ReduceFn<K, InputT, OutputT, W>.Context directContext, ReduceFn<K, InputT, OutputT, W>.Context renamedContext, - boolean isFinished, boolean isEndOfWindow) + final boolean isFinished, boolean isEndOfWindow) throws Exception { Instant inputWM = timerInternals.currentInputWatermarkTime(); @@ -1005,9 +1005,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { @Override public void output(OutputT toOutput) { // We're going to output panes, so commit the (now used) PaneInfo. - // TODO: This is unnecessary if the trigger isFinished since the saved + // This is unnecessary if the trigger isFinished since the saved // state will be immediately deleted. - paneInfoTracker.storeCurrentPaneInfo(directContext, pane); + if (!isFinished) { + paneInfoTracker.storeCurrentPaneInfo(directContext, pane); + } // Output the actual value. outputter.outputWindowedValue(
