[
https://issues.apache.org/jira/browse/STORM-3337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16770110#comment-16770110
]
Stig Rohde Døssing commented on STORM-3337:
-------------------------------------------
[~Jonathan Munz] I think it is very likely that this is fixed by
https://issues.apache.org/jira/browse/STORM-2691.
As you mention, it is not expected that coordinatorMeta will be empty on the
first batch. This happens because the 1.x implementation of the spout uses a
static variable to communicate coordinatorMeta from the emitter to the
coordinator (and no, this is not a good way to do this). See
https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java#L50
which delegates to
https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java#L28.
The registry is initialized when the emitter calls
https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L131.
The subscribed partitions are not initialized before the first batch is
emitted. Unfortunately the 1.x version works this way, because 1.x
storm-kafka-client exposes the Subscription interface to users for both Trident
and non-Trident users to set up partition subscriptions. This interface fits
Trident extremely poorly, but replacing it is a breaking change that will also
affect non-Trident users.
In 2.x, the spout has been mostly rewritten. The coordinator now has its own
KafkaConsumer, and the Subscription interface has been replaced with more
granular interfaces that fit Trident better. The coordinatorMeta shouldn't be
empty on the first batch anymore. The issue you mention with
getPartitionsForTask ignoring coordinatorMeta has also been fixed. See
https://github.com/apache/storm/blob/9fea42fe11f808550e604ec349b4b3366d20929b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L314
and
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java#L76.
I don't have a strong stance against backporting these fixes, but since they
require breaking changes (the removal of Subscription), I would lean toward
just overwriting the 1.x storm-kafka-client with the 2.x version. On the other
hand, I don't think it will be too long before a 2.0.0 release, so maybe
upgrading to that is the better option for you.
> KafkaTridentSpoutOpaque can lose offset data in Zookeeper
> ---------------------------------------------------------
>
> Key: STORM-3337
> URL: https://issues.apache.org/jira/browse/STORM-3337
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-core, storm-kafka-client
> Affects Versions: 1.2.3
> Reporter: Jonathan Munz
> Priority: Major
> Attachments: lost_offsets.log
>
>
> I see this issue happening once a twice a week for a number of topologies I
> have running in production that are reading from Kafka. I was able to
> reproduce it in a more pared down environment using 1 worker with a
> parallelism hint of at least 2 reading from a topic that had 16 partitions.
> The issue is reproducible using less partitions but it occurs less frequently.
> What happens is that while committing offsets for the first transaction after
> a worker crash the partition offset data in Zookeeper is wiped for a subset
> of that worker's partitions. The state is restored after the next batch or
> two is committed and the worker continues as normal, however if the worker
> happens to crash a 2nd time before the data restores itself then it gets lost
> and those partitions reset (in my case to their earliest offsets since I used
> a reset strategy of UNCOMMITTED_EARLIEST) after the worker restarts again.
> I've attached a log file showing what's happening. In this example ZK had
> offset data committed for all partitions for txid=29 before the worker
> crashed. After the worker came back up partitions 1, 3, 5, 7, 9, 11, 13, 15
> lose their child nodes in ZK, the remaining partitions get child nodes
> created for txid=30. The important steps are:
> 1. Thread-7 and Thread-19 both hit 'Emitted Batch' for txid=30 here:
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L146]
> Thread-7 is assigned even numbered partitions 0-14, has _index=0,
> _changedMeta=true, coordinatorMeta=[]
> Thread-19 is assigned odd numbered partitions 1-15, has _index=1,
> _changedMeta=true, coordinatorMeta=[]
> Even though `coordinatorMeta` is empty the `KafkaTridentSpoutEmitter`
> ignores this parameter in `getPartitionsForTask` and returns partition
> assignments for each thread:
>
> [https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L253]
> 2. Both threads hit 'Committing transaction' here:
>
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L160]
> 3. Thread-19 begins create state for txid=30 for its partitions:
>
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L186]
> 4. Thread-7 enters this special condition since it has `_index==0` and
> `_changedMeta==true`
>
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L169]
> 5. Thread-7 calls ` _emitter.getOrderedPartitions(_savedCoordinatorMeta)`,
> which for the KakfaTridentSpoutEmitter implementation returns an empty list
> since coordinatorMeta was empty for this batch:
>
> [https://github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java#L241]
> 6. Thread-7 does a list for all the partition nodes for this component in ZK
> and since `validIds` is empty they all pass the check and `removeState` is
> called on each of them for txid=30:
>
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L177]
> This is the key part that causes the bug. Thread-7 hasn't started committing
> state for txid=30 for any of its assigned partitions so the calls to
> `removeState` on those partitions won't do anything. However Thread-19 is
> running concurrently so any partitions it has already written state for will
> get deleted here.
> 7. Thread-7 and Thread-19 enter the success handler and call
> `cleanupBefore(txid=29)`:
>
> [https://github.com/apache/storm/blob/v1.2.3/storm-core/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java#L153]
> For the partitions owned by Thread-19 that got their state removed in #6
> that means that both the nodes for 29 and 30 are deleted and the partitions'
> sates are empty in ZK.
> Couple things that are contributing to this bug. I'm not sure if it's
> expected that `coordinatorMeta` be passed into `emitBatch` as an empty list
> for the first batch after the worker restart. If this contained all the
> partitions assigned across all tasks (which it does on subsequent batches)
> then `validIds` wouldn't trigger for partitions that were owned by other
> tasks and their state wouldn't accidentally get removed. This is exacerbated
> by the fact that the `KafkaTridentSpoutEmitter` returns valid partitions for
> `getPartitionsForTask` even when `getOrderedPartitions` returns empty which
> seems to break expectations.
> One additional safe-guard that might be worth investigation is having
> `cleanupBefore` make sure it wasn't going to leave a node in ZK without any
> children before running.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)