[
https://issues.apache.org/jira/browse/STORM-2625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guang Du updated STORM-2625:
----------------------------
Description:
This happens when:
1. KafkaSpout has already committed offsets to a topic before, and is not
running/activated now;
2. There're messages in topic after the committed offsets;
3. The same consumer group topology with multi works is started/activated again;
The same issue may happen when running topology gets consumer group partition
re-assignment with offsets not being able to be committed in time.
The underlying issue is:
a. Because workers are registering kafka consumers one by one, when the first
consumer A registers itself with kafka broker with the consumer group, it's
assigned all the partitions, say partition 0 & 1. Consumer A then retrieves
messages from all the assigned partitions if possible, and started processing.
With every tuple KafkaSpout A emits, UNCOMMITTED count numUncommittedOffsets++
(KafkaSpout#emitTupleIfNotEmitted());
b. At this point a second consumer B registers with the broker for the same
consumer group. the broker then re-assigns the partitions among existing
consumers, say consumer A is assigned partition 0, and consumer B assigned
partition 1.
b.1 At this point KafkaSpout A will try committing acked offsets, and remove
the partition 1 offsets it's tracking
(KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsRevoked()); However
because the tuples are not all acked, KafkaSpout is not able to commit full
list of offsets to kafka broker.
b.2 Then KafkaSpout A will remove tracked partition 1 offsets in offsetManagers
as well as emitted (
org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsAssigned()
org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#initialize()),
resulting the not acked tuples won't be acked for ever
(org.apache.storm.kafka.spout.KafkaSpout#ack()), also the UNCOMMITTED count
numUncommittedOffsets will never be reduced back to a correct result.
was:
This happens when:
1. KafkaSpout has already committed offsets to a topic before, and is not
running/activated now;
2. There're messages in topic after the committed offsets;
3. The same consumer group topology with multi works is started/activated again;
The underlying issue is:
a. Because workers are registering kafka consumers one by one, when the first
consumer A registers itself with kafka broker with the consumer group, it's
assigned all the partitions, say partition 0 & 1. Consumer A then retrieves
messages from all the assigned partitions if possible, and started processing.
With every tuple KafkaSpout A emits, UNCOMMITTED count numUncommittedOffsets++
(KafkaSpout#emitTupleIfNotEmitted());
b. At this point a second consumer B registers with the broker for the same
consumer group. the broker then re-assigns the partitions among existing
consumers, say consumer A is assigned partition 0, and consumer B assigned
partition 1.
b.1 At this point KafkaSpout A will try committing acked offsets, and remove
the partition 1 offsets it's tracking
(KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsRevoked()); However
because the tuples are not all acked, KafkaSpout is not able to commit full
list of offsets to kafka broker.
b.2 Then KafkaSpout A will remove tracked partition 1 offsets in offsetManagers
as well as emitted (
org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsAssigned()
org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#initialize()),
resulting the not acked tuples won't be acked for ever
(org.apache.storm.kafka.spout.KafkaSpout#ack()), also the UNCOMMITTED count
numUncommittedOffsets will never be reduced back to a correct result.
> KafkaSpout is not calculating uncommitted correctly
> ---------------------------------------------------
>
> Key: STORM-2625
> URL: https://issues.apache.org/jira/browse/STORM-2625
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 1.1.1
> Reporter: Guang Du
> Priority: Minor
>
> This happens when:
> 1. KafkaSpout has already committed offsets to a topic before, and is not
> running/activated now;
> 2. There're messages in topic after the committed offsets;
> 3. The same consumer group topology with multi works is started/activated
> again;
> The same issue may happen when running topology gets consumer group partition
> re-assignment with offsets not being able to be committed in time.
> The underlying issue is:
> a. Because workers are registering kafka consumers one by one, when the first
> consumer A registers itself with kafka broker with the consumer group, it's
> assigned all the partitions, say partition 0 & 1. Consumer A then retrieves
> messages from all the assigned partitions if possible, and started
> processing. With every tuple KafkaSpout A emits, UNCOMMITTED count
> numUncommittedOffsets++ (KafkaSpout#emitTupleIfNotEmitted());
> b. At this point a second consumer B registers with the broker for the same
> consumer group. the broker then re-assigns the partitions among existing
> consumers, say consumer A is assigned partition 0, and consumer B assigned
> partition 1.
> b.1 At this point KafkaSpout A will try committing acked offsets, and remove
> the partition 1 offsets it's tracking
> (KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsRevoked());
> However because the tuples are not all acked, KafkaSpout is not able to
> commit full list of offsets to kafka broker.
> b.2 Then KafkaSpout A will remove tracked partition 1 offsets in
> offsetManagers as well as emitted (
> org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsAssigned()
> org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#initialize()),
> resulting the not acked tuples won't be acked for ever
> (org.apache.storm.kafka.spout.KafkaSpout#ack()), also the UNCOMMITTED count
> numUncommittedOffsets will never be reduced back to a correct result.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)