Guang Du created STORM-2625:
-------------------------------
Summary: KafkaSpout is not calculating uncommitted correctly
Key: STORM-2625
URL: https://issues.apache.org/jira/browse/STORM-2625
Project: Apache Storm
Issue Type: Bug
Components: storm-kafka-client
Affects Versions: 1.1.1
Reporter: Guang Du
Priority: Minor
This happens when:
1. KafkaSpout has already committed offsets to a topic before, and is not
running/activated now;
2. There're messages in topic after the committed offsets;
3. The same consumer group topology with multi works is started/activated again;
The underlying issue is:
a. Because workers are registering kafka consumers one by one, when the first
consumer A registers itself with kafka broker with the consumer group, it's
assigned all the partitions, say partition 0 & 1. Consumer A then retrieves
messages from all the assigned partitions if possible, and started processing.
With every tuple KafkaSpout A emits, UNCOMMITTED count numUncommittedOffsets++
(KafkaSpout#emitTupleIfNotEmitted());
b. At this point a second consumer B registers with the broker for the same
consumer group. the broker then re-assigns the partitions among existing
consumers, say consumer A is assigned partition 0, and consumer B assigned
partition 1.
b.1 At this point KafkaSpout A will try committing acked offsets, and remove
the partition 1 offsets it's tracking
(KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsRevoked()); However
because the tuples are not all acked, KafkaSpout is not able to commit full
list of offsets to kafka broker.
b.2 Then KafkaSpout A will remove tracked partition 1 offsets in offsetManagers
as well as emitted (
org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsAssigned()
org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#initialize()),
resulting the not acked tuples won't be acked for ever
(org.apache.storm.kafka.spout.KafkaSpout#ack()), also the UNCOMMITTED count
numUncommittedOffsets will never be reduced back to a correct result.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)