Repository: apex-malhar
Updated Branches:
  refs/heads/master c830f5e4f -> c84a2c867


APEXMALHAR-2493 Fixed the issue of KafkaSinglePortExactlyOnceOutputOperator 
going to the blocked state during recovery


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c784f4da
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c784f4da
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c784f4da

Branch: refs/heads/master
Commit: c784f4da46d1cf594aa4156135b9c196aa66d931
Parents: 3a36298
Author: chaitanya <[email protected]>
Authored: Thu May 18 16:07:52 2017 +0530
Committer: chaitanya <[email protected]>
Committed: Thu May 18 16:07:52 2017 +0530

----------------------------------------------------------------------
 .../kafka/KafkaSinglePortExactlyOnceOutputOperator.java     | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c784f4da/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
index 75af448..23c519f 100644
--- 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
+++ 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -352,6 +352,11 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> 
extends AbstractKafkaOu
 
         for (ConsumerRecord<String, T> consumerRecord : consumerRecords) {
 
+          if (consumerRecord.offset() >= currentOffset) {
+            crossedBoundary = true;
+            break;
+          }
+
           if (!doesKeyBelongsToThisInstance(operatorId, consumerRecord.key())) 
{
             continue;
           }
@@ -365,10 +370,6 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> 
extends AbstractKafkaOu
             partialWindowTuples.put(value, 1);
           }
 
-          if (consumerRecord.offset() >= currentOffset) {
-            crossedBoundary = true;
-            break;
-          }
         }
 
         if (crossedBoundary) {

Reply via email to