[ 
https://issues.apache.org/jira/browse/STORM-2666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16124849#comment-16124849
 ] 

Stig Rohde Døssing commented on STORM-2666:
-------------------------------------------

[~GuangDu] Am I right that the branch you're using is 1.1.x-branch in your 
fork? Just to be sure I'm looking at the right thing.

Are you using Kafka's topic compaction, and do you ever see this log 
https://github.com/apache/storm/blob/v1.1.1/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java#L94

Just to explain why I think Storm doesn't ack/fail the same tuple more than 
once:
When the spout emits a tuple, it is assigned a random id used for tracking 
whether the tuple tree has completed. This happens here 
https://github.com/apache/storm/blob/v1.1.1/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L565,
 and is stored in a map id -> tuple info called "pending", where tuple info 
includes the msg id used by the spout internally. The store into the map is 
here 
https://github.com/apache/storm/blob/v1.1.1/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L582.
 
When the spout receives an ack or fail from the acker bolt, it removes the id 
-> tuple info mapping from the pending map. It checks that there was such a 
mapping, and if there were it will call either ack or fail on the spout 
instance. If the mapping isn't there, the message is ignored. This happens here 
https://github.com/apache/storm/blob/v1.1.1/storm-core/src/clj/org/apache/storm/daemon/executor.clj#L528
 and in the line below that.

Given this mechanism, I think it isn't possible for Storm to call ack or fail 
multiple times for the same tuple, because any ack or fail after the first one 
is ignored. The reason we do a similar check in the Kafka spout code has to do 
with partition reassignment. If the spout emits msgId 0 on partition 0 and 
partition 0 gets assigned to some other spout instance, the spout will throw 
away its internal state relating to partition 0, including which message ids it 
thinks it emitted on that partition. If partition 0 then gets assigned back to 
this spout instance, it might receive the ack or fail for the tuple that was 
emitted before partitions were reassigned. Since the pending map in the generic 
Storm code still contains tracking information for that tuple, the spout must 
be able to handle this ack or fail. We do this by keeping the list of msg ids 
the spout has emitted, and if we receive an ack or fail for a tuple the spout 
doesn't think it emitted, we just ignore it. 

This should be enough to ensure that we can't receive double acks or fails. The 
generic mechanism in Storm ensures that we can't get duplicates for the same 
tuple, while the mechanism in the Kafka spout ensures that if we receive an ack 
or fail for a message id (which can be duplicated when partition reassignment 
occurs), we ignore everything except the first one.

Taking a look at the scenario you describe, I agree that some tuples may time 
out in the shell bolt queue. They will then fail on the spout, and be added to 
the retry service. Once this happens the tuple ids generated by Storm for those 
tuples have been removed from the pending map. So if an ack/fail for the same 
tuple tree is received, it should be dropped before even reaching the spout. If 
partition reassignment happens, you could get some message id duplication, but 
when the message id was failed and added to the retry service, it was removed 
from the emitted set inside the spout. This means any more acks/fails are 
ignored.

> Kafka Client Spout send & ack committed offsets
> -----------------------------------------------
>
>                 Key: STORM-2666
>                 URL: https://issues.apache.org/jira/browse/STORM-2666
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.1
>            Reporter: Guang Du
>
> Under a certain heavy load, for failed/timeout tuples, the retry service will 
> ack tuple for failed max times. Kafka Client Spout will commit after reached 
> the commit interval. However seems some 'on the way' tuples will be failed 
> again, the retry service will cause Spout to emit again, and acked eventually 
> to OffsetManager.
> In some cases such offsets are too many, exceeding the max-uncommit, causing 
> org.apache.storm.kafka.spout.internal.OffsetManager#findNextCommitOffset 
> unable to find next commit point, and Spout for this partition will not poll 
> any more.
> By the way I've applied STORM-2549 PR#2156 from Stig Døssing to fix 
> STORM-2625, and I'm using Python Shell Bolt as processing bolt, if this 
> information helps.
> resulting logs like below. I'm not sure if the issue has already been 
> raised/fixed, glad if anyone could help to point out existing JIRA. Thank you.
> 2017-07-27 22:23:48.398 o.a.s.k.s.KafkaSpout Thread-23-spout-executor[248 
> 248] [INFO] Successful ack for tuple message 
> [{topic-partition=kafka_bd_trigger_action-20, offset=18204, numFails=0}].
> 2017-07-27 22:23:49.203 o.a.s.k.s.i.OffsetManager 
> Thread-23-spout-executor[248 248] [WARN] topic-partition 
> [kafka_bd_trigger_action-18] has unexpected offset [16002]. Current committed 
> Offset [16003]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to