[ 
https://issues.apache.org/jira/browse/STORM-3337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing closed STORM-3337.
-------------------------------------
    Resolution: Duplicate

Sounds good. I'm going to close this issue as covered by STORM-2691. Please 
reopen if it turns out 2.0.0 doesn't resolve the issue for you.

> KafkaTridentSpoutOpaque can lose offset data in Zookeeper
> ---------------------------------------------------------
>
>                 Key: STORM-3337
>                 URL: https://issues.apache.org/jira/browse/STORM-3337
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core, storm-kafka-client
>    Affects Versions: 1.2.3
>            Reporter: Jonathan Munz
>            Priority: Major
>         Attachments: lost_offsets.log
>
>
> I see this issue happening once a twice a week for a number of topologies I 
> have running in production that are reading from Kafka. I was able to 
> reproduce it in a more pared down environment using 1 worker with a 
> parallelism hint of at least 2 reading from a topic that had 16 partitions. 
> The issue is reproducible using less partitions but it occurs less frequently.
> What happens is that while committing offsets for the first transaction after 
> a worker crash the partition offset data in Zookeeper is wiped for a subset 
> of that worker's partitions. The state is restored after the next batch or 
> two is committed and the worker continues as normal, however if the worker 
> happens to crash a 2nd time before the data restores itself then it gets lost 
> and those partitions reset (in my case to their earliest offsets since I used 
> a reset strategy of UNCOMMITTED_EARLIEST) after the worker restarts again.
> I've attached a log file showing what's happening. In this example ZK had 
> offset data committed for all partitions for txid=29 before the worker 
> crashed. After the worker came back up partitions 1, 3, 5, 7, 9, 11, 13, 15 
> lose their child nodes in ZK, the remaining partitions get child nodes 
> created for txid=30. The important steps are:
> 1. Thread-7 and Thread-19 both hit 'Emitted Batch' for txid=30 here:
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L146]
>  Thread-7 is assigned even numbered partitions 0-14, has _index=0, 
> _changedMeta=true, coordinatorMeta=[] 
>  Thread-19 is assigned odd numbered partitions 1-15, has _index=1, 
> _changedMeta=true, coordinatorMeta=[]
>  Even though `coordinatorMeta` is empty the `KafkaTridentSpoutEmitter` 
> ignores this parameter in `getPartitionsForTask` and returns partition 
> assignments for each thread:
>  
> [https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L253]
> 2. Both threads hit 'Committing transaction' here: 
>  
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L160]
> 3. Thread-19 begins create state for txid=30 for its partitions:
>  
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L186]
> 4. Thread-7 enters this special condition since it has `_index==0` and 
> `_changedMeta==true`
>  
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L169]
> 5. Thread-7 calls ` _emitter.getOrderedPartitions(_savedCoordinatorMeta)`, 
> which for the KakfaTridentSpoutEmitter implementation returns an empty list 
> since coordinatorMeta was empty for this batch:
>  
> [https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L241]
> 6. Thread-7 does a list for all the partition nodes for this component in ZK 
> and since `validIds` is empty they all pass the check and `removeState` is 
> called on each of them for txid=30:
>  
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L177]
>  This is the key part that causes the bug. Thread-7 hasn't started committing 
> state for txid=30 for any of its assigned partitions so the calls to 
> `removeState` on those partitions won't do anything. However Thread-19 is 
> running concurrently so any partitions it has already written state for will 
> get deleted here.
> 7. Thread-7 and Thread-19 enter the success handler and call 
> `cleanupBefore(txid=29)`:
>  
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L153]
>  For the partitions owned by Thread-19 that got their state removed in #6 
> that means that both the nodes for 29 and 30 are deleted and the partitions' 
> sates are empty in ZK.
> Couple things that are contributing to this bug. I'm not sure if it's 
> expected that `coordinatorMeta` be passed into `emitBatch` as an empty list 
> for the first batch after the worker restart. If this contained all the 
> partitions assigned across all tasks (which it does on subsequent batches) 
> then `validIds` wouldn't trigger for partitions that were owned by other 
> tasks and their state wouldn't accidentally get removed. This is exacerbated 
> by the fact that the `KafkaTridentSpoutEmitter` returns valid partitions for 
> `getPartitionsForTask` even when `getOrderedPartitions` returns empty which 
> seems to break expectations.
> One additional safe-guard that might be worth investigation is having 
> `cleanupBefore` make sure it wasn't going to leave a node in ZK without any 
> children before running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to