[
https://issues.apache.org/jira/browse/STORM-2666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16125893#comment-16125893
]
Stig Rohde Døssing commented on STORM-2666:
-------------------------------------------
You are right, those logs indicate that committed offsets are being emitted
again. I think I've figured out what's happening. The consumer position can
sometimes fall behind the committed offsets.
Say there are partition 0 - 10, and this spout is assigned partition 0, and has
currently committed up to offset 0.
Emit offset 0-100
Ack offset 0-100, except offset 50
Reassign partitions before commit happens. Keep partition 0 assigned to this
spout.
Reassignment logic does not remove the offset manager for partition 0, so 0-49
and 51-100 are still acked.
Reassignment logic seeks the consumer back to the committed offset for all
assigned partitions, including partition 0. The consumer position for partition
0 is now 0.
Ack offset 50
NextTuple is called, and commit of offset 0-100 happens.
Offsets 0 - 100 are emitted again, because the consumer position was 0. The
spout is now in a bad state, and when 0-100 are acked, the offset manager will
complain in the log.
A similar scenario can be constructed without partition reassignment
Say max poll records is 10
Emit offset 0-100 over 10 polls
Ack 1-100
Fail 0 and retry
Ack 0
When nextTuple is called, 0-100 will be acked, but the consumer position will
be at most 10 due to max.poll.records. Once the offsets are committed, the
spout is again in a state where the consumer position is behind the committed
offset, and the same problem occurs.
I think the following changes will fix it:
Don't seek to committed offsets for partitions during reassignment if they were
assigned to the spout previously. This fix isn't strictly necessary, but I
don't think it makes sense to seek when we keep the rest of the state.
When committing offsets, we should check the consumer position. If the position
is behind the committed offset, seek up to the committed offset.
Your configuration sets max.poll.records to 4, so I suspect this might be the
case you're seeing.
I've put up a branch with proposed fixes here
https://github.com/srdo/storm/tree/STORM-2666 (the last commit only, the others
are from STORM-2549 which you should already have). I'd appreciate if you would
try them out.
> Kafka Client Spout send & ack committed offsets
> -----------------------------------------------
>
> Key: STORM-2666
> URL: https://issues.apache.org/jira/browse/STORM-2666
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 1.1.1
> Reporter: Guang Du
>
> Under a certain heavy load, for failed/timeout tuples, the retry service will
> ack tuple for failed max times. Kafka Client Spout will commit after reached
> the commit interval. However seems some 'on the way' tuples will be failed
> again, the retry service will cause Spout to emit again, and acked eventually
> to OffsetManager.
> In some cases such offsets are too many, exceeding the max-uncommit, causing
> org.apache.storm.kafka.spout.internal.OffsetManager#findNextCommitOffset
> unable to find next commit point, and Spout for this partition will not poll
> any more.
> By the way I've applied STORM-2549 PR#2156 from Stig Døssing to fix
> STORM-2625, and I'm using Python Shell Bolt as processing bolt, if this
> information helps.
> resulting logs like below. I'm not sure if the issue has already been
> raised/fixed, glad if anyone could help to point out existing JIRA. Thank you.
> 2017-07-27 22:23:48.398 o.a.s.k.s.KafkaSpout Thread-23-spout-executor[248
> 248] [INFO] Successful ack for tuple message
> [{topic-partition=kafka_bd_trigger_action-20, offset=18204, numFails=0}].
> 2017-07-27 22:23:49.203 o.a.s.k.s.i.OffsetManager
> Thread-23-spout-executor[248 248] [WARN] topic-partition
> [kafka_bd_trigger_action-18] has unexpected offset [16002]. Current committed
> Offset [16003]
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)