ReduceFnRunner.onTrigger: add short circuit for empty pane, and move inputWM and pane after the short circuit.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2efb0d56 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2efb0d56 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2efb0d56 Branch: refs/heads/gearpump-runner Commit: 2efb0d561fc62ba44bf71db6937a54708944f0f6 Parents: 38dd12d Author: Author: æ³¢ç¹ <[email protected]> Authored: Fri May 26 17:46:55 2017 +0800 Committer: Pei He <[email protected]> Committed: Thu Jun 29 14:01:54 2017 +0800 ---------------------------------------------------------------------- .../org/apache/beam/runners/core/ReduceFnRunner.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2efb0d56/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index a33bac1..ef33bef 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -953,11 +953,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { ReduceFn<K, InputT, OutputT, W>.Context renamedContext, final boolean isFinished, boolean isEndOfWindow) throws Exception { - Instant inputWM = timerInternals.currentInputWatermarkTime(); - - // Calculate the pane info. - final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read(); - // Extract the window hold, and as a side effect clear it. final WatermarkHold.OldAndNewHolds pair = watermarkHold.extractAndRelease(renamedContext, isFinished).read(); @@ -966,7 +961,13 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { @Nullable Instant newHold = pair.newHold; final boolean isEmpty = nonEmptyPanes.isEmpty(renamedContext.state()).read(); + if (isEmpty + && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_IF_NON_EMPTY + && windowingStrategy.getOnTimeBehavior() == Window.OnTimeBehavior.FIRE_IF_NON_EMPTY) { + return newHold; + } + Instant inputWM = timerInternals.currentInputWatermarkTime(); if (newHold != null) { // We can't be finished yet. checkState( @@ -998,6 +999,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { } } + // Calculate the pane info. + final PaneInfo pane = paneInfoTracker.getNextPaneInfo(directContext, isFinished).read(); + // Only emit a pane if it has data or empty panes are observable. if (needToEmit(isEmpty, isFinished, pane.getTiming())) { // Run reduceFn.onTrigger method.
