[ 
https://issues.apache.org/jira/browse/CAMEL-22410?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen reassigned CAMEL-22410:
-----------------------------------

    Assignee: Claus Ibsen

> SchedulingPollConsumer is not thread safe during graceful shutdown.
> -------------------------------------------------------------------
>
>                 Key: CAMEL-22410
>                 URL: https://issues.apache.org/jira/browse/CAMEL-22410
>             Project: Camel
>          Issue Type: Bug
>    Affects Versions: 4.12.0
>            Reporter: Ruben Lapauw
>            Assignee: Claus Ibsen
>            Priority: Minor
>              Labels: sqs
>             Fix For: 4.15.0
>
>
> SchedulingPollConsumer has a race condition that causes the SQS library to 
> throw an SdkInterruptedException during shutdown. This results in timed-out 
> messages delivered to the dead-letter-queue.
> The bug is rare when concurrentConsumers=1 but can be reliably triggered when 
> concurrentConsumers>=2.
> h6. Replication:
>  * Connect to
>  
> {code:java}
> - route:
>     id: "test-route"
>     shutdownRunningTask: "CompleteAllTasks"
>     from:
>       uri: 
> aws2-sqs://sqs-queue?concurrentConsumers=2&waitTimeSeconds=20&maxMessagesPerPoll=1
>       steps:
>         - log:
>             message: Endpoint result is ${body} {code}
>  * Trigger a shutdown.
>  * Send three messages staggered two seconds apart.
>  * Observe SdkInterruptedException for the second message
> h6. Cause:
> I am viewing the source code commit 7b766867.
>  * For concurrentConsumers = 2 or more
> When concurrentConsumers = 2 then two threads poll at the same time. Thus 
> when each starts a poll, both set the shared boolean variable 'polling' to 
> true [line 203 of ScheduledPollConsumer]. The first to finish polling, e.g. 
> by receiving sufficient messages, resets 'polling' to false [line 236]. This 
> allows the DefaultShutdownStrategy to progress to interrupting the threads 
> past the wait-loop checking for pendingInflightExchanges [lines 674, 782].
> The SQS library checks after receiving messages and before processing them 
> whether the Thread was interrupted. (I consider this reasonable behaviour.)
> The end result is a race condition where messages time out after being 
> received because they are not fully processed.
>  * For concurrentConsumers = 1
> In ScheduledPollConsumer line 194 to line 201 there is a gap during which a 
> thread can sleep while the DefaultShutdownStrategy can progress from 
> deferring the Shutdown to waiting for inflight messages to interrupting the 
> threads of the deferredConsumers.
> The same race condition is as before is possible though with reduced scope.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to