Repository: apex-malhar Updated Branches: refs/heads/master c46398f11 -> 89b29378e
APEXMALHAR-2445 During recovery no need to write to WindowDataManger if window id <= LargestCompleted Window ID Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8c538a00 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8c538a00 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8c538a00 Branch: refs/heads/master Commit: 8c538a003e9d26388df633f07f3e9bb9a639736c Parents: 7f1abca Author: Sandesh Hegde <[email protected]> Authored: Wed Mar 15 11:24:08 2017 -0700 Committer: Sandesh Hegde <[email protected]> Committed: Wed Mar 15 12:37:32 2017 -0700 ---------------------------------------------------------------------- .../malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8c538a00/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java index a8e333f..75af448 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java @@ -182,7 +182,11 @@ public class KafkaSinglePortExactlyOnceOutputOperator<T> extends AbstractKafkaOu @Override public void endWindow() { - if (!partialWindowTuples.isEmpty() && windowId > windowDataManager.getLargestCompletedWindow()) { + if (windowId <= windowDataManager.getLargestCompletedWindow()) { + return; + } + + if (!partialWindowTuples.isEmpty()) { throw new RuntimeException("Violates Exactly once. Not all the tuples received after operator reset."); }
