[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15876082#comment-15876082
 ] 

Munagala V. Ramanath commented on APEXMALHAR-2419:
--------------------------------------------------

The expectation seems to be that 
`windowDataManager.getLargestCompletedWindow()` will be the last window on 
which `windowDataManager.save()` was called in the previous failed run which in 
turn means that `windowId > windowDataManager.getLargestCompletedWindow()` will 
be true only when replay is complete. However as the following log messages 
show, the `windowDataManager.getLargestCompletedWindow()` always returns the 
same value during a run -- this may indicate a bug in FSWindowDataManager.

Notice in the initial deploy, the largestCompletedWindow is always -1 even 
after a save() call in endWindow:

Initial deploy:
{quote}
        2017-02-21 06:18:55,196 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
beginWindow: windowId = 6389565783323705345, largestCompletedWindow = -1
        2017-02-21 06:18:55,240 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
endWindow: windowId = 6389565783323705345, partialWindowTuples.size = 0, 
largestCompletedWindow = -1
        2017-02-21 06:18:55,747 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
endWindow: saved data for windowId = 6389565783323705345, 
largestCompletedWindow = -1
        2017-02-21 06:18:55,749 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
beginWindow: windowId = 6389565783323705346, largestCompletedWindow = -1
        2017-02-21 06:18:55,749 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
endWindow: windowId = 6389565783323705346, partialWindowTuples.size = 0, 
largestCompletedWindow = -1
        2017-02-21 06:18:55,806 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
endWindow: saved data for windowId = 6389565783323705346, 
largestCompletedWindow = -1
        2017-02-21 06:18:55,806 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
beginWindow: windowId = 6389565783323705347, largestCompletedWindow = -1
        2017-02-21 06:18:55,806 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
endWindow: windowId = 6389565783323705347, partialWindowTuples.size = 0, 
largestCompletedWindow = -1
        2017-02-21 06:18:55,906 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
endWindow: saved data for windowId = 6389565783323705347, 
largestCompletedWindow = -1
        2017-02-21 06:18:55,906 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
beginWindow: windowId = 6389565783323705348, largestCompletedWindow = -1
        2017-02-21 06:18:55,906 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
endWindow: windowId = 6389565783323705348, partialWindowTuples.size = 0, 
largestCompletedWindow = -1
        2017-02-21 06:18:55,964 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
endWindow: saved data for windowId = 6389565783323705348, 
largestCompletedWindow = -1
        2017-02-21 06:18:55,964 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
beginWindow: windowId = 6389565783323705349, largestCompletedWindow = -1
        2017-02-21 06:18:55,965 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
endWindow: windowId = 6389565783323705349, partialWindowTuples.size = 0, 
largestCompletedWindow = -1
        2017-02-21 06:18:56,022 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
endWindow: saved data for windowId = 6389565783323705349, 
largestCompletedWindow = -1
        2017-02-21 06:18:56,023 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
beginWindow: windowId = 6389565783323705350, largestCompletedWindow = -1
        2017-02-21 06:19:03,570 INFO 
com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [3]
{quote}

Notice here that when we reach the second replay window, it is not recognized 
as replay and throws the exception.

After redeploy:

{quote}
        2017-02-21 06:19:11,899 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
beginWindow: windowId = 6389565783323705345, largestCompletedWindow = 
6389565783323705345
        2017-02-21 06:19:11,900 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: Rebuild 
the partial window after 6389565783323705345
        2017-02-21 06:19:13,151 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
endWindow: windowId = 6389565783323705345, partialWindowTuples.size = 4, 
largestCompletedWindow = 6389565783323705345
        2017-02-21 06:19:13,766 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
endWindow: saved data for windowId = 6389565783323705345, 
largestCompletedWindow = 6389565783323705345
        2017-02-21 06:19:13,766 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
beginWindow: windowId = 6389565783323705346, largestCompletedWindow = 
6389565783323705345
        2017-02-21 06:19:13,766 INFO 
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: 
endWindow: windowId = 6389565783323705346, partialWindowTuples.size = 3, 
largestCompletedWindow = 6389565783323705345
        2017-02-21 06:19:13,773 ERROR 
com.datatorrent.stram.engine.StreamingContainer: Operator set 
[OperatorDeployInfo[id=3,name=kafkaExactlyOnceOutputOperator,type=GENERIC,checkpoint={ffffffffffffffff,
 0, 
0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=linesToKafka,sourceNodeId=2,sourcePortName=output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
 stopped running due to an exception.
        java.lang.RuntimeException: Violates Exactly once. Not all the tuples 
received after operator reset.
 {quote}

> KafkaSinglePortExactlyOnceOutputOperator fails on recovery
> ----------------------------------------------------------
>
>                 Key: APEXMALHAR-2419
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2419
>             Project: Apache Apex Malhar
>          Issue Type: Bug
>            Reporter: Munagala V. Ramanath
>
> The KafkaSinglePortExactlyOnceOutputOperator fails on recovery with the 
> message: "Violates Exactly once. Not all the tuples received after operator 
> reset."
> This is because of this check in endWindow():
> {code}
>    if (!partialWindowTuples.isEmpty() && windowId > 
> windowDataManager.getLargestCompletedWindow()) {
>       throw new RuntimeException("Violates Exactly once. Not all the tuples 
> received after operator reset.");
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to