Repository: beam
Updated Branches:
  refs/heads/master 3746d4cad -> e015168a3


ReduceFnRunner.onTrigger: skip storeCurrentPaneInfo() if trigger isFinished.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/cd886300
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/cd886300
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/cd886300

Branch: refs/heads/master
Commit: cd886300719ac9d702fbe7b105b09bdc5bbe0d3b
Parents: 3746d4c
Author: Author: 波特 <[email protected]>
Authored: Fri May 26 17:40:27 2017 +0800
Committer: Pei He <[email protected]>
Committed: Wed Jun 21 14:56:58 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/runners/core/ReduceFnRunner.java    | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/cd886300/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 62d519f..b5c3e3e 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -948,7 +948,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
   private Instant onTrigger(
       final ReduceFn<K, InputT, OutputT, W>.Context directContext,
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
-      boolean isFinished, boolean isEndOfWindow)
+      final boolean isFinished, boolean isEndOfWindow)
           throws Exception {
     Instant inputWM = timerInternals.currentInputWatermarkTime();
 
@@ -1005,9 +1005,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W 
extends BoundedWindow> {
                 @Override
                 public void output(OutputT toOutput) {
                   // We're going to output panes, so commit the (now used) 
PaneInfo.
-                  // TODO: This is unnecessary if the trigger isFinished since 
the saved
+                  // This is unnecessary if the trigger isFinished since the 
saved
                   // state will be immediately deleted.
-                  paneInfoTracker.storeCurrentPaneInfo(directContext, pane);
+                  if (!isFinished) {
+                    paneInfoTracker.storeCurrentPaneInfo(directContext, pane);
+                  }
 
                   // Output the actual value.
                   outputter.outputWindowedValue(

Reply via email to