[
https://issues.apache.org/jira/browse/CAMEL-18780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17641938#comment-17641938
]
Simon Rasmussen commented on CAMEL-18780:
-----------------------------------------
I have hacked together a test case which exposes the problem... and it is
actually worse than first expected as it completely blocks the consumer once
the maxQueueSize is reached for the extender. This is also what we saw in
production though we believed it to be processing very slowly.
If I increase the maxQueueSize as suggested, then it doesn't block. I have to
increase it to the max queue size of the route + max pool size + 1 (the +1 is
needed in case the main route thread processes the task (due to CallerRuns
policy).
The test case right now looks this:
{code:java}
package org.apache.camel.component.aws2.sqs;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
import org.apache.camel.BindToRegistry;
import org.apache.camel.EndpointInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit5.CamelTestSupport;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.utils.Md5Utils;
public class SqsConsumerMessageExtendTest extends CamelTestSupport {
@EndpointInject("mock:result")
private MockEndpoint mock;
@Test
void messageExtendShouldSurvive() throws Exception {
var start = LocalDateTime.now();
mock.expectedMessageCount(20);
MockEndpoint.assertIsSatisfied(context, 30, TimeUnit.SECONDS);
var end = LocalDateTime.now();
System.out.println("Running time was: " +
Duration.between(start, end));
}
@BindToRegistry("amazonSQSClient")
public AmazonSQSClientMock addClient() {
AmazonSQSClientMock clientMock = new AmazonSQSClientMock();
// add 20 messages
for (int counter = 0; counter < 20; counter++) {
Message.Builder message = Message.builder();
String body = "Message " + counter;
message.body(body);
message.md5OfBody(Md5Utils.md5AsBase64(body.getBytes()));
message.messageId("id" + counter);
message.receiptHandle("handle" + counter);
clientMock.messages.add(message.build());
}
return clientMock;
}
@Override
protected RouteBuilder createRouteBuilder() {
final int visibilityTimeout = 2;
final int taskProcessingTime = 3;
return new RouteBuilder() {
@Override
public void configure() {
from("aws2-sqs://MyQueue?amazonSQSClient=#amazonSQSClient&maxMessagesPerPoll=1&visibilityTimeout="
+ visibilityTimeout
+
"&extendMessageVisibility=true").threads(1, 4,
"route").maxQueueSize(10).process(exchange -> {
TimeUnit.SECONDS.sleep(taskProcessingTime);
System.out.println(Thread.currentThread().getName() + ": " +
exchange.getIn().getBody(String.class));
}).to("mock:result");
}
};
}
}
{code}
and modified Sqs2Consumer:
{code:java}
@Override
protected void doStart() throws Exception {
// start scheduler first
if (getConfiguration().isExtendMessageVisibility() &&
scheduledExecutor == null) {
ThreadPoolProfile profile = new
ThreadPoolProfile("SqsTimeoutExtender");
profile.setPoolSize(1);
profile.setAllowCoreThreadTimeOut(false);
int maxQueueSize = 10; // TODO how to find this number?
int routeMaxPoolSize = 4; // TODO how to find this
number?
// the maximum number of pending tasks, +1 for the
consumer thread which has CallerRuns policy
profile.setMaxQueueSize(maxQueueSize + routeMaxPoolSize
+ 1);
this.scheduledExecutor = getEndpoint().getCamelContext()
.getExecutorServiceManager()
.newScheduledThreadPool(this,
"SqsTimeoutExtender", profile);
// this.scheduledExecutor =
getEndpoint().getCamelContext()
// .getExecutorServiceManager()
// .newSingleThreadScheduledExecutor(this,
"SqsTimeoutExtender");
}
{code}
I have left the timeouts large in the test case for now as I have been testing
with generous debugging enable in order to understand what was happening. The
test case can be adjusted.
To set back to current behavior, then the line:
{code:java}
profile.setMaxQueueSize(maxQueueSize + routeMaxPoolSize + 1);
{code}
.. can be adjusted to just:
{code:java}
profile.setMaxQueueSize(maxQueueSize);
{code}
Any idea how to get these values?:
{code:java}
int maxQueueSize = 10; // TODO how to find this number?
int routeMaxPoolSize = 4; // TODO how to find this number?
{code}
> Sqs2Consumer message extended causing rejected execution exception
> ------------------------------------------------------------------
>
> Key: CAMEL-18780
> URL: https://issues.apache.org/jira/browse/CAMEL-18780
> Project: Camel
> Issue Type: Bug
> Components: camel-aws2
> Affects Versions: 3.19.0
> Reporter: Simon Rasmussen
> Priority: Major
> Fix For: 3.18.5, 3.20.0
>
>
> The message extension feature of the Sqs2Consumer can cause rejected
> execution exceptions such as:
> {noformat}
> 2022-11-30 16:43:51.958 logLevel=WARN 10 --- [xxx]
> logger=o.a.c.component.aws2.sqs.Sqs2Consumer : Failed polling endpoint:
> aws2-sqs://arn:aws:sqs:eu-west-1:xxxxxxx:some_queue?delay=3000&extendMessageVisibility=true&greedy=true&visibilityTimeout=60&waitTimeSeconds=10.
> Will try again at next poll. Caused by:
> [java.util.concurrent.RejectedExecutionException - Task rejected due queue
> size limit reached]
> java.util.concurrent.RejectedExecutionException: Task rejected due queue size
> limit reached
> at
> org.apache.camel.util.concurrent.SizedScheduledExecutorService.scheduleAtFixedRate(SizedScheduledExecutorService.java:92)
> ~[camel-util-3.18.2.jar:3.18.2]
> at
> org.apache.camel.component.aws2.sqs.Sqs2Consumer.processBatch(Sqs2Consumer.java:183)
> ~[camel-aws2-sqs-3.18.2.jar:3.18.2]
> at
> org.apache.camel.component.aws2.sqs.Sqs2Consumer.poll(Sqs2Consumer.java:121)
> ~[camel-aws2-sqs-3.18.2.jar:3.18.2]
> at
> org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:202)
> ~[camel-support-3.18.2.jar:3.18.2]
> at
> org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:116)
> ~[camel-support-3.18.2.jar:3.18.2]
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
> ~[na:na]
> at
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
> ~[na:na]
> at
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
> ~[na:na]
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> ~[na:na]
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> ~[na:na]
> at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
> {noformat}
> The consumer is configured with a default ThreadPoolPofile, and thus has a
> maxQueueSize of 1000.
> The message extender is running in its own scheduled executor which is
> instantiated within Sqs2Consumer:
> {code:java}
> this.scheduledExecutor =
> getEndpoint().getCamelContext().getExecutorServiceManager()
> .newSingleThreadScheduledExecutor(this,
> "SqsTimeoutExtender");
> {code}
> Thus, also using the default thread pool profile, and thus a maxQueueSize of
> 1000.
> A slowdown of processing the extending tasks can lead to this inner queue
> being filled, causing the exceptions to be thrown (quickly flooding the logs).
> Possible solutions that I can think of would be to set the maxQueueSize of
> the SqsTimeoutExtender to 2x of the consumer thread pool or set the
> maxQueueSize to unbound (-1).
> The latter might be acceptable tasks are cancelled upon completing and thus
> cannot grow unbound.
> I can contribute a PR, but would need some guidance as to which solution our
> be preferable.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)