Stig Rohde Døssing commented on STORM-2666:

[~GuangDu] I've been looking at this for a while now, and I'm kind of stumped.

Some things I've looked at:
* Storm won't ack or fail multiple times for the same tuple. Once the spout 
executor receives either an ack or a fail for a tuple, any further acks or 
fails are discarded.
* The spout disregards acks or fails for messages that are not in the emitted 
list. We remove message ids from emitted whenever we ack or fail a tuple, so 
we're effectively ensuring the same thing as the executor in the spout code as 
well. If partitions get reassigned, the emitted list for revoked partitions is 
emptied out, so we won't care about acks/fails for partitions that are not 
assigned to this spout. 
* We don't remove pending retries for messages when that message id is acked. I 
can't come up with a way for a tuple to be acked while the same message id is 
in the retry service though.
* The spout won't emit a tuple for a message id that is already emitted, or is 
currently in the set of acked tuples. It is possible the spout will emit a 
tuple for a committed offset, since we don't check for that. This requires the 
tuple to somehow be in RetryService while it is also acked.
* The default RetryService won't duplicate retry schedules, so an offset can't 
have multiple retries scheduled at the same time. We remove the scheduled retry 
before emitting a message, so I don't see how it is possible to have a pending 
tuple while the same message is scheduled on the retry service.

So as far as I can tell, it should not be possible that RetryService replays 
tuples that are already acked, and if a tuple gets acked, any subsequent 
ack/fail calls should not have any effect, unless the spout emits the tuple 
again after this happens.

I agree that the spout code can't currently handle a tuple being acked and then 
replayed later, but I'm having trouble understanding how we can get into that 
situation. We could maybe get rid of some assumptions by making the spout 
remove scheduled retries once an ack is received, or dropping tuples if we're 
trying to emit something lower than the committed offset, but I can't really 
spout the faulty assumption I must be making, because in my mind those changes 
should not be necessary.

Can you share the storm-kafka-client forked code you're running on?
Are you using the default retry service?
Can you share your Kafka spout configuration?

> Kafka Client Spout send & ack committed offsets
> -----------------------------------------------
>                 Key: STORM-2666
>                 URL: https://issues.apache.org/jira/browse/STORM-2666
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.1
>            Reporter: Guang Du
> Under a certain heavy load, for failed/timeout tuples, the retry service will 
> ack tuple for failed max times. Kafka Client Spout will commit after reached 
> the commit interval. However seems some 'on the way' tuples will be failed 
> again, the retry service will cause Spout to emit again, and acked eventually 
> to OffsetManager.
> In some cases such offsets are too many, exceeding the max-uncommit, causing 
> org.apache.storm.kafka.spout.internal.OffsetManager#findNextCommitOffset 
> unable to find next commit point, and Spout for this partition will not poll 
> any more.
> By the way I've applied STORM-2549 PR#2156 from Stig Døssing to fix 
> STORM-2625, and I'm using Python Shell Bolt as processing bolt, if this 
> information helps.
> resulting logs like below. I'm not sure if the issue has already been 
> raised/fixed, glad if anyone could help to point out existing JIRA. Thank you.
> 2017-07-27 22:23:48.398 o.a.s.k.s.KafkaSpout Thread-23-spout-executor[248 
> 248] [INFO] Successful ack for tuple message 
> [{topic-partition=kafka_bd_trigger_action-20, offset=18204, numFails=0}].
> 2017-07-27 22:23:49.203 o.a.s.k.s.i.OffsetManager 
> Thread-23-spout-executor[248 248] [WARN] topic-partition 
> [kafka_bd_trigger_action-18] has unexpected offset [16002]. Current committed 
> Offset [16003]

This message was sent by Atlassian JIRA

Reply via email to