Repository: apex-malhar
Updated Branches:
  refs/heads/master c46398f11 -> 89b29378e


APEXMALHAR-2445 During recovery no need to write to WindowDataManger if window 
id <= LargestCompleted Window ID


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8c538a00
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8c538a00
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8c538a00

Branch: refs/heads/master
Commit: 8c538a003e9d26388df633f07f3e9bb9a639736c
Parents: 7f1abca
Author: Sandesh Hegde <[email protected]>
Authored: Wed Mar 15 11:24:08 2017 -0700
Committer: Sandesh Hegde <[email protected]>
Committed: Wed Mar 15 12:37:32 2017 -0700

----------------------------------------------------------------------
 .../malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8c538a00/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
index a8e333f..75af448 100644
--- 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
+++ 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -182,7 +182,11 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> 
extends AbstractKafkaOu
   @Override
   public void endWindow()
   {
-    if (!partialWindowTuples.isEmpty() && windowId > 
windowDataManager.getLargestCompletedWindow()) {
+    if (windowId <= windowDataManager.getLargestCompletedWindow()) {
+      return;
+    }
+
+    if (!partialWindowTuples.isEmpty()) {
       throw new RuntimeException("Violates Exactly once. Not all the tuples 
received after operator reset.");
     }
 

Reply via email to