[
https://issues.apache.org/jira/browse/APEXMALHAR-2419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15876082#comment-15876082
]
Munagala V. Ramanath commented on APEXMALHAR-2419:
--------------------------------------------------
The expectation seems to be that
`windowDataManager.getLargestCompletedWindow()` will be the last window on
which `windowDataManager.save()` was called in the previous failed run which in
turn means that `windowId > windowDataManager.getLargestCompletedWindow()` will
be true only when replay is complete. However as the following log messages
show, the `windowDataManager.getLargestCompletedWindow()` always returns the
same value during a run -- this may indicate a bug in FSWindowDataManager.
Notice in the initial deploy, the largestCompletedWindow is always -1 even
after a save() call in endWindow:
Initial deploy:
{quote}
2017-02-21 06:18:55,196 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705345, largestCompletedWindow = -1
2017-02-21 06:18:55,240 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: windowId = 6389565783323705345, partialWindowTuples.size = 0,
largestCompletedWindow = -1
2017-02-21 06:18:55,747 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: saved data for windowId = 6389565783323705345,
largestCompletedWindow = -1
2017-02-21 06:18:55,749 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705346, largestCompletedWindow = -1
2017-02-21 06:18:55,749 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: windowId = 6389565783323705346, partialWindowTuples.size = 0,
largestCompletedWindow = -1
2017-02-21 06:18:55,806 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: saved data for windowId = 6389565783323705346,
largestCompletedWindow = -1
2017-02-21 06:18:55,806 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705347, largestCompletedWindow = -1
2017-02-21 06:18:55,806 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: windowId = 6389565783323705347, partialWindowTuples.size = 0,
largestCompletedWindow = -1
2017-02-21 06:18:55,906 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: saved data for windowId = 6389565783323705347,
largestCompletedWindow = -1
2017-02-21 06:18:55,906 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705348, largestCompletedWindow = -1
2017-02-21 06:18:55,906 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: windowId = 6389565783323705348, partialWindowTuples.size = 0,
largestCompletedWindow = -1
2017-02-21 06:18:55,964 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: saved data for windowId = 6389565783323705348,
largestCompletedWindow = -1
2017-02-21 06:18:55,964 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705349, largestCompletedWindow = -1
2017-02-21 06:18:55,965 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: windowId = 6389565783323705349, partialWindowTuples.size = 0,
largestCompletedWindow = -1
2017-02-21 06:18:56,022 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: saved data for windowId = 6389565783323705349,
largestCompletedWindow = -1
2017-02-21 06:18:56,023 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705350, largestCompletedWindow = -1
2017-02-21 06:19:03,570 INFO
com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [3]
{quote}
Notice here that when we reach the second replay window, it is not recognized
as replay and throws the exception.
After redeploy:
{quote}
2017-02-21 06:19:11,899 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705345, largestCompletedWindow =
6389565783323705345
2017-02-21 06:19:11,900 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator: Rebuild
the partial window after 6389565783323705345
2017-02-21 06:19:13,151 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: windowId = 6389565783323705345, partialWindowTuples.size = 4,
largestCompletedWindow = 6389565783323705345
2017-02-21 06:19:13,766 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: saved data for windowId = 6389565783323705345,
largestCompletedWindow = 6389565783323705345
2017-02-21 06:19:13,766 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
beginWindow: windowId = 6389565783323705346, largestCompletedWindow =
6389565783323705345
2017-02-21 06:19:13,766 INFO
org.apache.apex.malhar.kafka.KafkaSinglePortExactlyOnceOutputOperator:
endWindow: windowId = 6389565783323705346, partialWindowTuples.size = 3,
largestCompletedWindow = 6389565783323705345
2017-02-21 06:19:13,773 ERROR
com.datatorrent.stram.engine.StreamingContainer: Operator set
[OperatorDeployInfo[id=3,name=kafkaExactlyOnceOutputOperator,type=GENERIC,checkpoint={ffffffffffffffff,
0,
0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=inputPort,streamId=linesToKafka,sourceNodeId=2,sourcePortName=output,locality=<null>,partitionMask=0,partitionKeys=<null>]],outputs=[]]]
stopped running due to an exception.
java.lang.RuntimeException: Violates Exactly once. Not all the tuples
received after operator reset.
{quote}
> KafkaSinglePortExactlyOnceOutputOperator fails on recovery
> ----------------------------------------------------------
>
> Key: APEXMALHAR-2419
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2419
> Project: Apache Apex Malhar
> Issue Type: Bug
> Reporter: Munagala V. Ramanath
>
> The KafkaSinglePortExactlyOnceOutputOperator fails on recovery with the
> message: "Violates Exactly once. Not all the tuples received after operator
> reset."
> This is because of this check in endWindow():
> {code}
> if (!partialWindowTuples.isEmpty() && windowId >
> windowDataManager.getLargestCompletedWindow()) {
> throw new RuntimeException("Violates Exactly once. Not all the tuples
> received after operator reset.");
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)