[ https://issues.apache.org/jira/browse/APEXMALHAR-2419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15876082#comment-15876082 ]
Munagala V. Ramanath commented on APEXMALHAR-2419: -------------------------------------------------- The expectation seems to be that `windowDataManager.getLargestCompletedWindow()` will be the last window on which `windowDataManager.save()` was called in the previous failed run which in turn means that `windowId > windowDataManager.getLargestCompletedWindow()` will be true only when replay is complete. However as the following log messages show, the `windowDataManager.getLargestCompletedWindow()` always returns the same value during a run -- this may indicate a bug in FSWindowDataManager. Notice in the initial deploy, the largestCompletedWindow is always -1 even after a save() call in endWindow: Initial deploy: {quote} 2017-02-21 06:18:55,196 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: beginWindow: windowId = 6389565783323705345, largestCompletedWindow = -1 2017-02-21 06:18:55,240 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: endWindow: windowId = 6389565783323705345, partialWindowTuples.size = 0, largestCompletedWindow = -1 2017-02-21 06:18:55,747 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: endWindow: saved data for windowId = 6389565783323705345, largestCompletedWindow = -1 2017-02-21 06:18:55,749 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: beginWindow: windowId = 6389565783323705346, largestCompletedWindow = -1 2017-02-21 06:18:55,749 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: endWindow: windowId = 6389565783323705346, partialWindowTuples.size = 0, largestCompletedWindow = -1 2017-02-21 06:18:55,806 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: endWindow: saved data for windowId = 6389565783323705346, largestCompletedWindow = -1 2017-02-21 06:18:55,806 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: beginWindow: windowId = 6389565783323705347, largestCompletedWindow = -1 2017-02-21 06:18:55,806 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: endWindow: windowId = 6389565783323705347, partialWindowTuples.size = 0, largestCompletedWindow = -1 2017-02-21 06:18:55,906 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: endWindow: saved data for windowId = 6389565783323705347, largestCompletedWindow = -1 2017-02-21 06:18:55,906 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: beginWindow: windowId = 6389565783323705348, largestCompletedWindow = -1 2017-02-21 06:18:55,906 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: endWindow: windowId = 6389565783323705348, partialWindowTuples.size = 0, largestCompletedWindow = -1 2017-02-21 06:18:55,964 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: endWindow: saved data for windowId = 6389565783323705348, largestCompletedWindow = -1 2017-02-21 06:18:55,964 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: beginWindow: windowId = 6389565783323705349, largestCompletedWindow = -1 2017-02-21 06:18:55,965 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: endWindow: windowId = 6389565783323705349, partialWindowTuples.size = 0, largestCompletedWindow = -1 2017-02-21 06:18:56,022 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: endWindow: saved data for windowId = 6389565783323705349, largestCompletedWindow = -1 2017-02-21 06:18:56,023 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: beginWindow: windowId = 6389565783323705350, largestCompletedWindow = -1 2017-02-21 06:19:03,570 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [3] {quote} Notice here that when we reach the second replay window, it is not recognized as replay and throws the exception. After redeploy: {quote} 2017-02-21 06:19:11,899 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: beginWindow: windowId = 6389565783323705345, largestCompletedWindow = 6389565783323705345 2017-02-21 06:19:11,900 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: Rebuild the partial window after 6389565783323705345 2017-02-21 06:19:13,151 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: endWindow: windowId = 6389565783323705345, partialWindowTuples.size = 4, largestCompletedWindow = 6389565783323705345 2017-02-21 06:19:13,766 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: endWindow: saved data for windowId = 6389565783323705345, largestCompletedWindow = 6389565783323705345 2017-02-21 06:19:13,766 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: beginWindow: windowId = 6389565783323705346, largestCompletedWindow = 6389565783323705345 2017-02-21 06:19:13,766 INFO org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: endWindow: windowId = 6389565783323705346, partialWindowTuples.size = 3, largestCompletedWindow = 6389565783323705345 2017-02-21 06:19:13,773 ERROR com.datatorrent.stram.engine.StreamingContainer: Operator set [OperatorDeployInfo[id=3,name=kafkaExactlyOnceOutputOperator,type=GENERIC,checkpoint={ffffffffffffffff, 0, 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=linesToKafka,sourceNodeId=2,sourcePortName=output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]] stopped running due to an exception. java.lang.RuntimeException: Violates Exactly once. Not all the tuples received after operator reset. {quote} > KafkaSinglePortExactlyOnceOutputOperator fails on recovery > ---------------------------------------------------------- > > Key: APEXMALHAR-2419 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2419 > Project: Apache Apex Malhar > Issue Type: Bug > Reporter: Munagala V. Ramanath > > The KafkaSinglePortExactlyOnceOutputOperator fails on recovery with the > message: "Violates Exactly once. Not all the tuples received after operator > reset." > This is because of this check in endWindow(): > {code} > if (!partialWindowTuples.isEmpty() && windowId > > windowDataManager.getLargestCompletedWindow()) { > throw new RuntimeException("Violates Exactly once. Not all the tuples > received after operator reset."); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)