Repository: apex-malhar Updated Branches: refs/heads/master c830f5e4f -> c84a2c867
APEXMALHAR-2493 Fixed the issue of KafkaSinglePortExactlyOnceOutputOperator going to the blocked state during recovery Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/c784f4da Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/c784f4da Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/c784f4da Branch: refs/heads/master Commit: c784f4da46d1cf594aa4156135b9c196aa66d931 Parents: 3a36298 Author: chaitanya <[email protected]> Authored: Thu May 18 16:07:52 2017 +0530 Committer: chaitanya <[email protected]> Committed: Thu May 18 16:07:52 2017 +0530 ---------------------------------------------------------------------- .../kafka/KafkaSinglePortExactlyOnceOutputOperator.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/c784f4da/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java index 75af448..23c519f 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java @@ -352,6 +352,11 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu for (ConsumerRecord<String, T> consumerRecord : consumerRecords) { + if (consumerRecord.offset() >= currentOffset) { + crossedBoundary = true; + break; + } + if (!doesKeyBelongsToThisInstance(operatorId, consumerRecord.key())) { continue; } @@ -365,10 +370,6 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu partialWindowTuples.put(value, 1); } - if (consumerRecord.offset() >= currentOffset) { - crossedBoundary = true; - break; - } } if (crossedBoundary) {
