Simon Rasmussen created CAMEL-18780:
---------------------------------------

             Summary: Sqs2Consumer message extended causing rejected execution 
exception
                 Key: CAMEL-18780
                 URL: https://issues.apache.org/jira/browse/CAMEL-18780
             Project: Camel
          Issue Type: Bug
          Components: camel-aws2
    Affects Versions: 3.19.0
            Reporter: Simon Rasmussen


The message extension feature of the Sqs2Consumer can cause rejected execution 
exceptions such as:

{noformat}
2022-11-30 16:43:51.958 logLevel=WARN 10 --- [xxx] 
logger=o.a.c.component.aws2.sqs.Sqs2Consumer    : Failed polling endpoint: 
aws2-sqs://arn:aws:sqs:eu-west-1:xxxxxxx:some_queue?delay=3000&extendMessageVisibility=true&greedy=true&visibilityTimeout=60&waitTimeSeconds=10.
 Will try again at next poll. Caused by: 
[java.util.concurrent.RejectedExecutionException - Task rejected due queue size 
limit reached]

java.util.concurrent.RejectedExecutionException: Task rejected due queue size 
limit reached
        at 
org.apache.camel.util.concurrent.SizedScheduledExecutorService.scheduleAtFixedRate(SizedScheduledExecutorService.java:92)
 ~[camel-util-3.18.2.jar:3.18.2]
        at 
org.apache.camel.component.aws2.sqs.Sqs2Consumer.processBatch(Sqs2Consumer.java:183)
 ~[camel-aws2-sqs-3.18.2.jar:3.18.2]
        at 
org.apache.camel.component.aws2.sqs.Sqs2Consumer.poll(Sqs2Consumer.java:121) 
~[camel-aws2-sqs-3.18.2.jar:3.18.2]
        at 
org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202)
 ~[camel-support-3.18.2.jar:3.18.2]
        at 
org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116)
 ~[camel-support-3.18.2.jar:3.18.2]
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
 ~[na:na]
        at 
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) 
~[na:na]
        at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
 ~[na:na]
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 ~[na:na]
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
{noformat}

The consumer is configured with a default ThreadPoolPofile, and thus has a 
maxQueueSize of 1000.

The message extender is running in its own scheduled executor which is 
instantiated within Sqs2Consumer:

{code:java}
this.scheduledExecutor = 
getEndpoint().getCamelContext().getExecutorServiceManager()
                    .newSingleThreadScheduledExecutor(this, 
"SqsTimeoutExtender");
{code}

Thus, also using the default thread pool profile, and thus a maxQueueSize of 
1000.

A slowdown of processing the extending tasks can lead to this inner queue being 
filled, causing the exceptions to be thrown (quickly flooding the logs).

Possible solutions that I can think of would be to set the maxQueueSize of the 
SqsTimeoutExtender to 2x of the consumer thread pool or set the maxQueueSize to 
unbound (-1). 

The latter might be acceptable tasks are cancelled upon completing and thus 
cannot grow unbound.

I can contribute a PR, but would need some guidance as to which solution our be 
preferable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to