[
https://issues.apache.org/jira/browse/STORM-2625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16086029#comment-16086029
]
Sriharsha Chintalapani commented on STORM-2625:
-----------------------------------------------
Thanks for reporting the JIRA [~GuangDu]. This looks to be side affect of
using auto subscription ( kafka consumer balancing the partition subscription).
This shouldn't be an issue if you use manual subscription, more details are
here
https://github.com/apache/storm/blob/master/docs/storm-kafka-client.md#manual-partition-control-advanced
. Can you give it a try with manual subscription and report your findings.
> KafkaSpout is not calculating uncommitted correctly
> ---------------------------------------------------
>
> Key: STORM-2625
> URL: https://issues.apache.org/jira/browse/STORM-2625
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 1.1.1
> Reporter: Guang Du
> Priority: Minor
> Time Spent: 1h
> Remaining Estimate: 0h
>
> This happens when:
> 1. KafkaSpout has already committed offsets to a topic before, and is not
> running/activated now;
> 2. There're messages in topic after the committed offsets;
> 3. The same consumer group topology with multi works is started/activated
> again;
> The same issue may happen when running topology gets consumer group partition
> re-assignment with offsets not being able to be committed in time.
> The underlying issue is:
> a. Because workers are registering kafka consumers one by one, when the first
> consumer A registers itself with kafka broker with the consumer group, it's
> assigned all the partitions, say partition 0 & 1. Consumer A then retrieves
> messages from all the assigned partitions if possible, and started
> processing. With every tuple KafkaSpout A emits, UNCOMMITTED count
> numUncommittedOffsets++ (KafkaSpout#emitTupleIfNotEmitted());
> b. At this point a second consumer B registers with the broker for the same
> consumer group. the broker then re-assigns the partitions among existing
> consumers, say consumer A is assigned partition 0, and consumer B assigned
> partition 1.
> b.1 At this point KafkaSpout A will try committing acked offsets, and remove
> the partition 1 offsets it's tracking
> (KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsRevoked());
> However because the tuples are not all acked, KafkaSpout is not able to
> commit full list of offsets to kafka broker.
> b.2 Then KafkaSpout A will remove tracked partition 1 offsets in
> offsetManagers as well as emitted (
> org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsAssigned()
> org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#initialize()),
> resulting the not acked tuples won't be acked for ever
> (org.apache.storm.kafka.spout.KafkaSpout#ack()), also the UNCOMMITTED count
> numUncommittedOffsets will never be reduced back to a correct result.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)