Guang Du created STORM-2625:
-------------------------------

             Summary: KafkaSpout is not calculating uncommitted correctly
                 Key: STORM-2625
                 URL: https://issues.apache.org/jira/browse/STORM-2625
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-kafka-client
    Affects Versions: 1.1.1
            Reporter: Guang Du
            Priority: Minor


This happens when:
1. KafkaSpout has already committed offsets to a topic before, and is not 
running/activated now;
2. There're messages in topic after the committed offsets;
3. The same consumer group topology with multi works is started/activated again;

The underlying issue is:

a. Because workers are registering kafka consumers one by one, when the first 
consumer A registers itself with kafka broker with the consumer group, it's 
assigned all the partitions, say partition 0 & 1. Consumer A then retrieves 
messages from all the assigned partitions if possible, and started processing. 
With every tuple KafkaSpout A emits, UNCOMMITTED count numUncommittedOffsets++ 
(KafkaSpout#emitTupleIfNotEmitted());

b. At this point a second consumer B registers with the broker for the same 
consumer group. the broker then re-assigns the partitions among existing 
consumers, say consumer A is assigned partition 0, and consumer B assigned 
partition 1. 

b.1 At this point KafkaSpout A will try committing acked offsets, and remove 
the partition 1 offsets it's tracking 
(KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsRevoked()); However 
because the tuples are not all acked, KafkaSpout is not able to commit full 
list of offsets to kafka broker.

b.2 Then KafkaSpout A will remove tracked partition 1 offsets in offsetManagers 
as well as emitted (
org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsAssigned()
org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#initialize()),
 resulting the not acked tuples won't be acked for ever 
(org.apache.storm.kafka.spout.KafkaSpout#ack()), also the UNCOMMITTED count 
numUncommittedOffsets will never be reduced back to a correct result.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to