[ 
https://issues.apache.org/jira/browse/STORM-3337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Munz updated STORM-3337:
---------------------------------
    Description: 
I see this issue happening once a twice a week for a number of topologies I 
have running in production that are reading from Kafka. I was able to reproduce 
it in a more pared down environment using 1 worker with a parallelism hint of 
at least 2 reading from a topic that had 16 partitions. The issue is 
reproducible using less partitions but it occurs less frequently.

What happens is that while committing offsets for the first transaction after a 
worker crash the partition offset data in Zookeeper is wiped for a subset of 
that worker's partitions. The state is restored after the next batch or two is 
committed and the worker continues as normal, however if the worker happens to 
crash a 2nd time before the data restores itself then it gets lost and those 
partitions reset (in my case to their earliest offsets since I used a reset 
strategy of UNCOMMITTED_EARLIEST) after the worker restarts again.

I've attached a log file showing what's happening. In this example ZK had 
offset data committed for all partitions for txid=29 before the worker crashed. 
After the worker came back up partitions 1, 3, 5, 7, 9, 11, 13, 15 lose their 
child nodes in ZK, the remaining partitions get child nodes created for 
txid=30. The important steps are:

1. Thread-7 and Thread-19 both hit 'Emitted Batch' for txid=30 here: 
[https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L146]([https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L146])
 
 Thread-7 is assigned even numbered partitions 0-14, has _index=0, 
_changedMeta=true, coordinatorMeta=[] 
 Thread-19 is assigned odd numbered partitions 1-15, has _index=1, 
_changedMeta=true, coordinatorMeta=[]
 Even though `coordinatorMeta` is empty the `KafkaTridentSpoutEmitter` ignores 
this parameter in `getPartitionsForTask` and returns partition assignments for 
each thread:
 
[https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L253]

2. Both threads hit 'Committing transaction' here: 
 
[https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L160]

3. Thread-19 begins create state for txid=30 for its partitions:
 
[https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L186]

4. Thread-7 enters this special condition since it has `_index==0` and 
`_changedMeta==true`
 
[https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L169]

5. Thread-7 calls ` _emitter.getOrderedPartitions(_savedCoordinatorMeta)`, 
which for the KakfaTridentSpoutEmitter implementation returns an empty list 
since coordinatorMeta was empty for this batch:
 
[https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L241]

6. Thread-7 does a list for all the partition nodes for this component in ZK, 
since `validIds` is empty they all pass the check and `removeState` is called 
on each of them for txid=30:
 
[https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L177]
 This is the key part that causes the bug. Thread-7 hasn't started committing 
state for txid=30 for any of its assigned partitions so the calls to 
`removeState` on those partitions won't do anything. However Thread-19 is 
running concurrently so any partitions it has already written state for will 
get deleted here.

7. Thread-7 and Thread-19 enter the success handler and call 
`cleanupBefore(txid=29)`:
 
[https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L153]
 For the partitions owned by Thread-19 that got their state removed in #6 that 
means that both the nodes for 29 and 30 are deleted and the partitions are empty

Couple things that are contributing to this bug. I'm not sure if it's expected 
that `coordinatorMeta` be passed into `emitBatch` as an empty list for the 
first batch after the worker restart. If this contained all the partitions 
assigned across all tasks (which it does on subsequent batches) then `validIds` 
wouldn't trigger for partitions that were owned by other tasks and their state 
wouldn't accidentally get removed. This is exacerbated by the fact that the 
`KafkaTridentSpoutEmitter` returns valid partitions for `getPartitionsForTask` 
even when `getOrderedPartitions` returns empty which seems to break 
expectations.

One additional safe-guard that might be worth investigation is having 
`cleanupBefore` make sure it wasn't going to leave a node in ZK without any 
children before running.

  was:
I see this issue happening once a twice a week for a number of topologies I 
have running in production that are reading from Kafka. I was able to reproduce 
it in a more pared down environment using 1 worker with a parallelism hint of 
at least 2 reading from a topic that had 16 partitions. The issue is 
reproducible using less partitions but it occurs less frequencies.

What happens is that while committing offsets for the first transaction after a 
worker crash the partition offset data in Zookeeper is wiped for a subset of 
that worker's partitions. The state is restored after the next batch or two is 
committed and the worker continues as normal, however if the worker happens to 
crash a 2nd time before the data restores itself then it gets lost and those 
partitions reset (in my case to their earliest offsets since I used a reset 
strategy of UNCOMMITTED_EARLIEST) after the worker restarts again.

I've attached a log file showing what's happening. In this example ZK had 
offset data committed for all partitions for txid=29 before the worker crashed. 
After the worker came back up partitions 1, 3, 5, 7, 9, 11, 13, 15 lose their 
child nodes in ZK, the remaining partitions get child nodes created for 
txid=30. The important steps are:

1. Thread-7 and Thread-19 both hit 'Emitted Batch' for txid=30 here: 
[https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L146](https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L146)
 
Thread-7 is assigned even numbered partitions 0-14, has _index=0, 
_changedMeta=true, coordinatorMeta=[] 
Thread-19 is assigned odd numbered partitions 1-15, has _index=1, 
_changedMeta=true, coordinatorMeta=[]
Even though `coordinatorMeta` is empty the `KafkaTridentSpoutEmitter` ignores 
this parameter in `getPartitionsForTask` and returns partition assignments for 
each thread:
https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L253

2. Both threads hit 'Committing transaction' here: 
https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L160
 
3. Thread-19 begins create state for txid=30 for its partitions:
https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L186

4. Thread-7 enters this special condition since it has `_index==0` and 
`_changedMeta==true`
https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L169

5. Thread-7 calls ` _emitter.getOrderedPartitions(_savedCoordinatorMeta)`, 
which for the KakfaTridentSpoutEmitter implementation returns an empty list 
since coordinatorMeta was empty for this batch:
https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L241

6. Thread-7 does a list for all the partition nodes for this component in ZK, 
since `validIds` is empty they all pass the check and `removeState` is called 
on each of them for txid=30:
https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L177
This is the key part that causes the bug. Thread-7 hasn't started committing 
state for txid=30 for any of its assigned partitions so the calls to 
`removeState` on those partitions won't do anything. However Thread-19 is 
running concurrently so any partitions it has already written state for will 
get deleted here.
 
7. Thread-7 and Thread-19 enter the success handler and call 
`cleanupBefore(txid=29)`:
https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L153
For the partitions owned by Thread-19 that got their state removed in #6 that 
means that both the nodes for 29 and 30 are deleted and the partitions are empty

Couple things that are contributing to this bug. I'm not sure if it's expected 
that `coordinatorMeta` be passed into `emitBatch` as an empty list for the 
first batch after the worker restart. If this contained all the partitions 
assigned across all tasks (which it does on subsequent batches) then `validIds` 
wouldn't trigger for partitions that were owned by other tasks and their state 
wouldn't accidentally get removed. This is exacerbated by the fact that the 
`KafkaTridentSpoutEmitter` returns valid partitions for `getPartitionsForTask` 
even when `getOrderedPartitions` returns empty which seems to break 
expectations.

One additional safe-guard that might be worth investigation is having 
`cleanupBefore` make sure it wasn't going to leave a node in ZK without any 
children before running.


> KafkaTridentSpoutOpaque can lose offset data in Zookeeper
> ---------------------------------------------------------
>
>                 Key: STORM-3337
>                 URL: https://issues.apache.org/jira/browse/STORM-3337
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core, storm-kafka-client
>    Affects Versions: 1.2.3
>            Reporter: Jonathan Munz
>            Priority: Major
>         Attachments: lost_offsets.log
>
>
> I see this issue happening once a twice a week for a number of topologies I 
> have running in production that are reading from Kafka. I was able to 
> reproduce it in a more pared down environment using 1 worker with a 
> parallelism hint of at least 2 reading from a topic that had 16 partitions. 
> The issue is reproducible using less partitions but it occurs less frequently.
> What happens is that while committing offsets for the first transaction after 
> a worker crash the partition offset data in Zookeeper is wiped for a subset 
> of that worker's partitions. The state is restored after the next batch or 
> two is committed and the worker continues as normal, however if the worker 
> happens to crash a 2nd time before the data restores itself then it gets lost 
> and those partitions reset (in my case to their earliest offsets since I used 
> a reset strategy of UNCOMMITTED_EARLIEST) after the worker restarts again.
> I've attached a log file showing what's happening. In this example ZK had 
> offset data committed for all partitions for txid=29 before the worker 
> crashed. After the worker came back up partitions 1, 3, 5, 7, 9, 11, 13, 15 
> lose their child nodes in ZK, the remaining partitions get child nodes 
> created for txid=30. The important steps are:
> 1. Thread-7 and Thread-19 both hit 'Emitted Batch' for txid=30 here: 
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L146]([https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L146])
>  
>  Thread-7 is assigned even numbered partitions 0-14, has _index=0, 
> _changedMeta=true, coordinatorMeta=[] 
>  Thread-19 is assigned odd numbered partitions 1-15, has _index=1, 
> _changedMeta=true, coordinatorMeta=[]
>  Even though `coordinatorMeta` is empty the `KafkaTridentSpoutEmitter` 
> ignores this parameter in `getPartitionsForTask` and returns partition 
> assignments for each thread:
>  
> [https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L253]
> 2. Both threads hit 'Committing transaction' here: 
>  
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L160]
> 3. Thread-19 begins create state for txid=30 for its partitions:
>  
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L186]
> 4. Thread-7 enters this special condition since it has `_index==0` and 
> `_changedMeta==true`
>  
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L169]
> 5. Thread-7 calls ` _emitter.getOrderedPartitions(_savedCoordinatorMeta)`, 
> which for the KakfaTridentSpoutEmitter implementation returns an empty list 
> since coordinatorMeta was empty for this batch:
>  
> [https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L241]
> 6. Thread-7 does a list for all the partition nodes for this component in ZK, 
> since `validIds` is empty they all pass the check and `removeState` is called 
> on each of them for txid=30:
>  
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L177]
>  This is the key part that causes the bug. Thread-7 hasn't started committing 
> state for txid=30 for any of its assigned partitions so the calls to 
> `removeState` on those partitions won't do anything. However Thread-19 is 
> running concurrently so any partitions it has already written state for will 
> get deleted here.
> 7. Thread-7 and Thread-19 enter the success handler and call 
> `cleanupBefore(txid=29)`:
>  
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L153]
>  For the partitions owned by Thread-19 that got their state removed in #6 
> that means that both the nodes for 29 and 30 are deleted and the partitions 
> are empty
> Couple things that are contributing to this bug. I'm not sure if it's 
> expected that `coordinatorMeta` be passed into `emitBatch` as an empty list 
> for the first batch after the worker restart. If this contained all the 
> partitions assigned across all tasks (which it does on subsequent batches) 
> then `validIds` wouldn't trigger for partitions that were owned by other 
> tasks and their state wouldn't accidentally get removed. This is exacerbated 
> by the fact that the `KafkaTridentSpoutEmitter` returns valid partitions for 
> `getPartitionsForTask` even when `getOrderedPartitions` returns empty which 
> seems to break expectations.
> One additional safe-guard that might be worth investigation is having 
> `cleanupBefore` make sure it wasn't going to leave a node in ZK without any 
> children before running.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to