[ 
https://issues.apache.org/jira/browse/STORM-2666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125893#comment-16125893
 ] 

Stig Rohde Døssing commented on STORM-2666:
-------------------------------------------

You are right, those logs indicate that committed offsets are being emitted 
again. I think I've figured out what's happening. The consumer position can 
sometimes fall behind the committed offsets.

Say there are partition 0 - 10, and this spout is assigned partition 0, and has 
currently committed up to offset 0.
Emit offset 0-100
Ack offset 0-100, except offset 50
Reassign partitions before commit happens. Keep partition 0 assigned to this 
spout.
Reassignment logic does not remove the offset manager for partition 0, so 0-49 
and 51-100 are still acked.
Reassignment logic seeks the consumer back to the committed offset for all 
assigned partitions, including partition 0. The consumer position for partition 
0 is now 0.
Ack offset 50
NextTuple is called, and commit of offset 0-100 happens.
Offsets 0 - 100 are emitted again, because the consumer position was 0. The 
spout is now in a bad state, and when 0-100 are acked, the offset manager will 
complain in the log.

A similar scenario can be constructed without partition reassignment

Say max poll records is 10
Emit offset 0-100 over 10 polls
Ack 1-100
Fail 0 and retry
Ack 0
When nextTuple is called, 0-100 will be acked, but the consumer position will 
be at most 10 due to max.poll.records. Once the offsets are committed, the 
spout is again in a state where the consumer position is behind the committed 
offset, and the same problem occurs.

I think the following changes will fix it:
Don't seek to committed offsets for partitions during reassignment if they were 
assigned to the spout previously. This fix isn't strictly necessary, but I 
don't think it makes sense to seek when we keep the rest of the state.
When committing offsets, we should check the consumer position. If the position 
is behind the committed offset, seek up to the committed offset.

Your configuration sets max.poll.records to 4, so I suspect this might be the 
case you're seeing.

I've put up a branch with proposed fixes here 
https://github.com/srdo/storm/tree/STORM-2666 (the last commit only, the others 
are from STORM-2549 which you should already have). I'd appreciate if you would 
try them out. 

> Kafka Client Spout send & ack committed offsets
> -----------------------------------------------
>
>                 Key: STORM-2666
>                 URL: https://issues.apache.org/jira/browse/STORM-2666
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.1
>            Reporter: Guang Du
>
> Under a certain heavy load, for failed/timeout tuples, the retry service will 
> ack tuple for failed max times. Kafka Client Spout will commit after reached 
> the commit interval. However seems some 'on the way' tuples will be failed 
> again, the retry service will cause Spout to emit again, and acked eventually 
> to OffsetManager.
> In some cases such offsets are too many, exceeding the max-uncommit, causing 
> org.apache.storm.kafka.spout.internal.OffsetManager#findNextCommitOffset 
> unable to find next commit point, and Spout for this partition will not poll 
> any more.
> By the way I've applied STORM-2549 PR#2156 from Stig Døssing to fix 
> STORM-2625, and I'm using Python Shell Bolt as processing bolt, if this 
> information helps.
> resulting logs like below. I'm not sure if the issue has already been 
> raised/fixed, glad if anyone could help to point out existing JIRA. Thank you.
> 2017-07-27 22:23:48.398 o.a.s.k.s.KafkaSpout Thread-23-spout-executor[248 
> 248] [INFO] Successful ack for tuple message 
> [{topic-partition=kafka_bd_trigger_action-20, offset=18204, numFails=0}].
> 2017-07-27 22:23:49.203 o.a.s.k.s.i.OffsetManager 
> Thread-23-spout-executor[248 248] [WARN] topic-partition 
> [kafka_bd_trigger_action-18] has unexpected offset [16002]. Current committed 
> Offset [16003]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to