[
https://issues.apache.org/jira/browse/STORM-2666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16278152#comment-16278152
]
Stig Rohde Døssing commented on STORM-2666:
-------------------------------------------
[~hmclouro] I think you are right and the example is wrong. It can't happen
that partitions are reassigned before committing. It was still possible to
produce the bad state though. Here's a modified sequence without the _Reassign
partitions before commit happens. Keep partition 0 assigned to this spout._
part.
Say there are partition 0 - 10, and this spout is assigned partition 0, and has
currently committed up to offset 0.
Emit offset 0-100
Ack offset 0-100, except offset 50
Commit and reassign partitions. Keep partition 0 assigned to this spout.
Offsets 0-49 are committed (i.e. we call commitSync with offset 50 so the
consumer will restart there).
Reassignment logic does not remove the offset manager for partition 0, so
51-100 are still acked.
Reassignment logic seeks the consumer back to the committed offset for all
assigned partitions, including partition 0. The consumer position for partition
0 is now 50.
Ack offset 50
NextTuple is called, and commit of offset 50-100 happens.
Offsets 50 - 100 are emitted again, because the consumer position was 50. Since
50-100 were committed, they're no longer considered emitted/acked, so the spout
will emit them. The spout is now in a bad state, and when 50-100 are acked, the
offset manager will complain in the log.
> Storm-kafka-client spout can sometimes emit messages that were already
> committed.
> ----------------------------------------------------------------------------------
>
> Key: STORM-2666
> URL: https://issues.apache.org/jira/browse/STORM-2666
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 1.0.0, 2.0.0, 1.1.0, 1.1.1, 1.2.0
> Reporter: Guang Du
> Assignee: Stig Rohde Døssing
> Labels: pull-request-available
> Fix For: 2.0.0, 1.2.0, 1.1.2
>
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> Under a certain heavy load, for failed/timeout tuples, the retry service will
> ack tuple for failed max times. Kafka Client Spout will commit after reached
> the commit interval. However seems some 'on the way' tuples will be failed
> again, the retry service will cause Spout to emit again, and acked eventually
> to OffsetManager.
> In some cases such offsets are too many, exceeding the max-uncommit, causing
> org.apache.storm.kafka.spout.internal.OffsetManager#findNextCommitOffset
> unable to find next commit point, and Spout for this partition will not poll
> any more.
> By the way I've applied STORM-2549 PR#2156 from Stig Døssing to fix
> STORM-2625, and I'm using Python Shell Bolt as processing bolt, if this
> information helps.
> resulting logs like below. I'm not sure if the issue has already been
> raised/fixed, glad if anyone could help to point out existing JIRA. Thank you.
> 2017-07-27 22:23:48.398 o.a.s.k.s.KafkaSpout Thread-23-spout-executor[248
> 248] [INFO] Successful ack for tuple message
> [{topic-partition=kafka_bd_trigger_action-20, offset=18204, numFails=0}].
> 2017-07-27 22:23:49.203 o.a.s.k.s.i.OffsetManager
> Thread-23-spout-executor[248 248] [WARN] topic-partition
> [kafka_bd_trigger_action-18] has unexpected offset [16002]. Current committed
> Offset [16003]
> Edit:
> See
> https://issues.apache.org/jira/browse/STORM-2666?focusedCommentId=16125893&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16125893
> for the current best guess at the root cause of this issue.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)