Guang Du commented on STORM-2666:

Thank you very much [~Srdo] for your time investigating into this. My folk is 
https://github.com/WolfeeTJ/storm.git in case this will help.
I'm using the exponential retry service, also the kafka config is as below FYI 
if this helps.

        KafkaSpoutRetryExponentialBackoff kafkaSpoutRetryExponentialBackoff =
            new KafkaSpoutRetryExponentialBackoff(
        KafkaSpoutConfig kafkaSpoutConfig = 
            .setProp("session.timeout.ms", "120000")
            .setProp("request.timeout.ms", "180000")
        KafkaSpout kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);


Please kindly be noticed I've applied a simple fix to remove offsets before 
committed in my folk branch to make my production work.

I'm not familiar about the core component code of storm, so I'm not sure if my 
assumption is correct:
In my scenario, my Python Shell Bolt will execute quite a long time (like 8 
seconds) for each record, so it could be true to have a lot of tuples waiting 
in the Shell Bolt incoming queue. Under heavy loads, some tuples could timeout 
before they're able to be processed. In my opinion the fail() of Spout will be 
executed, thus the retry service will be involved at this time, ack & commit 
might happen at this time. 
However the tuples in the waiting queue could be processed again, and fail 
again, and enter the retry service again, resulting a resend after commit, 
causing this tuple to appear again in Spout's emitted list.
Please kindly help to review if my assumption could be valid.

Again thank you very much for your time, appreciate that very much.

> Kafka Client Spout send & ack committed offsets
> -----------------------------------------------
>                 Key: STORM-2666
>                 URL: https://issues.apache.org/jira/browse/STORM-2666
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.1.1
>            Reporter: Guang Du
> Under a certain heavy load, for failed/timeout tuples, the retry service will 
> ack tuple for failed max times. Kafka Client Spout will commit after reached 
> the commit interval. However seems some 'on the way' tuples will be failed 
> again, the retry service will cause Spout to emit again, and acked eventually 
> to OffsetManager.
> In some cases such offsets are too many, exceeding the max-uncommit, causing 
> org.apache.storm.kafka.spout.internal.OffsetManager#findNextCommitOffset 
> unable to find next commit point, and Spout for this partition will not poll 
> any more.
> By the way I've applied STORM-2549 PR#2156 from Stig Døssing to fix 
> STORM-2625, and I'm using Python Shell Bolt as processing bolt, if this 
> information helps.
> resulting logs like below. I'm not sure if the issue has already been 
> raised/fixed, glad if anyone could help to point out existing JIRA. Thank you.
> 2017-07-27 22:23:48.398 o.a.s.k.s.KafkaSpout Thread-23-spout-executor[248 
> 248] [INFO] Successful ack for tuple message 
> [{topic-partition=kafka_bd_trigger_action-20, offset=18204, numFails=0}].
> 2017-07-27 22:23:49.203 o.a.s.k.s.i.OffsetManager 
> Thread-23-spout-executor[248 248] [WARN] topic-partition 
> [kafka_bd_trigger_action-18] has unexpected offset [16002]. Current committed 
> Offset [16003]

This message was sent by Atlassian JIRA

Reply via email to