Simon Rasmussen created CAMEL-18780:
---------------------------------------
Summary: Sqs2Consumer message extended causing rejected execution
exception
Key: CAMEL-18780
URL: https://issues.apache.org/jira/browse/CAMEL-18780
Project: Camel
Issue Type: Bug
Components: camel-aws2
Affects Versions: 3.19.0
Reporter: Simon Rasmussen
The message extension feature of the Sqs2Consumer can cause rejected execution
exceptions such as:
{noformat}
2022-11-30 16:43:51.958 logLevel=WARN 10 --- [xxx]
logger=o.a.c.component.aws2.sqs.Sqs2Consumer : Failed polling endpoint:
aws2-sqs://arn:aws:sqs:eu-west-1:xxxxxxx:some_queue?delay=3000&extendMessageVisibility=true&greedy=true&visibilityTimeout=60&waitTimeSeconds=10.
Will try again at next poll. Caused by:
[java.util.concurrent.RejectedExecutionException - Task rejected due queue size
limit reached]
java.util.concurrent.RejectedExecutionException: Task rejected due queue size
limit reached
at
org.apache.camel.util.concurrent.SizedScheduledExecutorService.scheduleAtFixedRate(SizedScheduledExecutorService.java:92)
~[camel-util-3.18.2.jar:3.18.2]
at
org.apache.camel.component.aws2.sqs.Sqs2Consumer.processBatch(Sqs2Consumer.java:183)
~[camel-aws2-sqs-3.18.2.jar:3.18.2]
at
org.apache.camel.component.aws2.sqs.Sqs2Consumer.poll(Sqs2Consumer.java:121)
~[camel-aws2-sqs-3.18.2.jar:3.18.2]
at
org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202)
~[camel-support-3.18.2.jar:3.18.2]
at
org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116)
~[camel-support-3.18.2.jar:3.18.2]
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
~[na:na]
at
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
~[na:na]
at
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
~[na:na]
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
~[na:na]
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
~[na:na]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
{noformat}
The consumer is configured with a default ThreadPoolPofile, and thus has a
maxQueueSize of 1000.
The message extender is running in its own scheduled executor which is
instantiated within Sqs2Consumer:
{code:java}
this.scheduledExecutor =
getEndpoint().getCamelContext().getExecutorServiceManager()
.newSingleThreadScheduledExecutor(this,
"SqsTimeoutExtender");
{code}
Thus, also using the default thread pool profile, and thus a maxQueueSize of
1000.
A slowdown of processing the extending tasks can lead to this inner queue being
filled, causing the exceptions to be thrown (quickly flooding the logs).
Possible solutions that I can think of would be to set the maxQueueSize of the
SqsTimeoutExtender to 2x of the consumer thread pool or set the maxQueueSize to
unbound (-1).
The latter might be acceptable tasks are cancelled upon completing and thus
cannot grow unbound.
I can contribute a PR, but would need some guidance as to which solution our be
preferable.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)