[ 
https://issues.apache.org/jira/browse/STORM-2625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16086029#comment-16086029
 ] 

Sriharsha Chintalapani commented on STORM-2625:
-----------------------------------------------

Thanks for reporting the JIRA [~GuangDu].  This looks to be side affect of 
using auto subscription ( kafka consumer balancing the partition subscription). 
This shouldn't be an issue if you use manual subscription, more details are 
here 
https://github.com/apache/storm/blob/master/docs/storm-kafka-client.md#manual-partition-control-advanced
 . Can you give it a try with manual subscription and report your findings. 

> KafkaSpout is not calculating uncommitted correctly
> ---------------------------------------------------
>
>                 Key: STORM-2625
>                 URL: https://issues.apache.org/jira/browse/STORM-2625
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.1
>            Reporter: Guang Du
>            Priority: Minor
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> This happens when:
> 1. KafkaSpout has already committed offsets to a topic before, and is not 
> running/activated now;
> 2. There're messages in topic after the committed offsets;
> 3. The same consumer group topology with multi works is started/activated 
> again;
> The same issue may happen when running topology gets consumer group partition 
> re-assignment with offsets not being able to be committed in time.
> The underlying issue is:
> a. Because workers are registering kafka consumers one by one, when the first 
> consumer A registers itself with kafka broker with the consumer group, it's 
> assigned all the partitions, say partition 0 & 1. Consumer A then retrieves 
> messages from all the assigned partitions if possible, and started 
> processing. With every tuple KafkaSpout A emits, UNCOMMITTED count 
> numUncommittedOffsets++ (KafkaSpout#emitTupleIfNotEmitted());
> b. At this point a second consumer B registers with the broker for the same 
> consumer group. the broker then re-assigns the partitions among existing 
> consumers, say consumer A is assigned partition 0, and consumer B assigned 
> partition 1. 
> b.1 At this point KafkaSpout A will try committing acked offsets, and remove 
> the partition 1 offsets it's tracking 
> (KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsRevoked()); 
> However because the tuples are not all acked, KafkaSpout is not able to 
> commit full list of offsets to kafka broker.
> b.2 Then KafkaSpout A will remove tracked partition 1 offsets in 
> offsetManagers as well as emitted (
> org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsAssigned()
> org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#initialize()),
>  resulting the not acked tuples won't be acked for ever 
> (org.apache.storm.kafka.spout.KafkaSpout#ack()), also the UNCOMMITTED count 
> numUncommittedOffsets will never be reduced back to a correct result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to