[ 
https://issues.apache.org/jira/browse/STORM-2625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guang Du updated STORM-2625:
----------------------------
    Description: 
This happens when:
1. KafkaSpout has already committed offsets to a topic before, and is not 
running/activated now;
2. There're messages in topic after the committed offsets;
3. The same consumer group topology with multi works is started/activated again;

The same issue may happen when running topology gets consumer group partition 
re-assignment with offsets not being able to be committed in time.

The underlying issue is:

a. Because workers are registering kafka consumers one by one, when the first 
consumer A registers itself with kafka broker with the consumer group, it's 
assigned all the partitions, say partition 0 & 1. Consumer A then retrieves 
messages from all the assigned partitions if possible, and started processing. 
With every tuple KafkaSpout A emits, UNCOMMITTED count numUncommittedOffsets++ 
(KafkaSpout#emitTupleIfNotEmitted());

b. At this point a second consumer B registers with the broker for the same 
consumer group. the broker then re-assigns the partitions among existing 
consumers, say consumer A is assigned partition 0, and consumer B assigned 
partition 1. 

b.1 At this point KafkaSpout A will try committing acked offsets, and remove 
the partition 1 offsets it's tracking 
(KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsRevoked()); However 
because the tuples are not all acked, KafkaSpout is not able to commit full 
list of offsets to kafka broker.

b.2 Then KafkaSpout A will remove tracked partition 1 offsets in offsetManagers 
as well as emitted (
org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsAssigned()
org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#initialize()),
 resulting the not acked tuples won't be acked for ever 
(org.apache.storm.kafka.spout.KafkaSpout#ack()), also the UNCOMMITTED count 
numUncommittedOffsets will never be reduced back to a correct result.



  was:
This happens when:
1. KafkaSpout has already committed offsets to a topic before, and is not 
running/activated now;
2. There're messages in topic after the committed offsets;
3. The same consumer group topology with multi works is started/activated again;

The underlying issue is:

a. Because workers are registering kafka consumers one by one, when the first 
consumer A registers itself with kafka broker with the consumer group, it's 
assigned all the partitions, say partition 0 & 1. Consumer A then retrieves 
messages from all the assigned partitions if possible, and started processing. 
With every tuple KafkaSpout A emits, UNCOMMITTED count numUncommittedOffsets++ 
(KafkaSpout#emitTupleIfNotEmitted());

b. At this point a second consumer B registers with the broker for the same 
consumer group. the broker then re-assigns the partitions among existing 
consumers, say consumer A is assigned partition 0, and consumer B assigned 
partition 1. 

b.1 At this point KafkaSpout A will try committing acked offsets, and remove 
the partition 1 offsets it's tracking 
(KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsRevoked()); However 
because the tuples are not all acked, KafkaSpout is not able to commit full 
list of offsets to kafka broker.

b.2 Then KafkaSpout A will remove tracked partition 1 offsets in offsetManagers 
as well as emitted (
org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsAssigned()
org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#initialize()),
 resulting the not acked tuples won't be acked for ever 
(org.apache.storm.kafka.spout.KafkaSpout#ack()), also the UNCOMMITTED count 
numUncommittedOffsets will never be reduced back to a correct result.



> KafkaSpout is not calculating uncommitted correctly
> ---------------------------------------------------
>
>                 Key: STORM-2625
>                 URL: https://issues.apache.org/jira/browse/STORM-2625
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.1
>            Reporter: Guang Du
>            Priority: Minor
>
> This happens when:
> 1. KafkaSpout has already committed offsets to a topic before, and is not 
> running/activated now;
> 2. There're messages in topic after the committed offsets;
> 3. The same consumer group topology with multi works is started/activated 
> again;
> The same issue may happen when running topology gets consumer group partition 
> re-assignment with offsets not being able to be committed in time.
> The underlying issue is:
> a. Because workers are registering kafka consumers one by one, when the first 
> consumer A registers itself with kafka broker with the consumer group, it's 
> assigned all the partitions, say partition 0 & 1. Consumer A then retrieves 
> messages from all the assigned partitions if possible, and started 
> processing. With every tuple KafkaSpout A emits, UNCOMMITTED count 
> numUncommittedOffsets++ (KafkaSpout#emitTupleIfNotEmitted());
> b. At this point a second consumer B registers with the broker for the same 
> consumer group. the broker then re-assigns the partitions among existing 
> consumers, say consumer A is assigned partition 0, and consumer B assigned 
> partition 1. 
> b.1 At this point KafkaSpout A will try committing acked offsets, and remove 
> the partition 1 offsets it's tracking 
> (KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsRevoked()); 
> However because the tuples are not all acked, KafkaSpout is not able to 
> commit full list of offsets to kafka broker.
> b.2 Then KafkaSpout A will remove tracked partition 1 offsets in 
> offsetManagers as well as emitted (
> org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#onPartitionsAssigned()
> org.apache.storm.kafka.spout.KafkaSpout.KafkaSpoutConsumerRebalanceListener#initialize()),
>  resulting the not acked tuples won't be acked for ever 
> (org.apache.storm.kafka.spout.KafkaSpout#ack()), also the UNCOMMITTED count 
> numUncommittedOffsets will never be reduced back to a correct result.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to