This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 692db5dce07 CAMEL-18780: Prevent blocking sqs consumer when using
message extend (#9108)
692db5dce07 is described below
commit 692db5dce0765e5e76639b3afc5e9098601e534f
Author: Simon Rasmussen <[email protected]>
AuthorDate: Sun Jan 15 13:54:17 2023 +0100
CAMEL-18780: Prevent blocking sqs consumer when using message extend (#9108)
When using message extending feature together with Threads EIP, it was
possible to completely block the consumer (deadlock) if the message
extend executor task queue would be filled up.
This could be triggered by the default threads profile where the message
queue size is set to 1000 tasks.
On default settings, the consumer would consume: `max queue size`+`max
threads`+`consumer thread` = 1000+20+1 concurrent tasks. The extender
queue size however is only 1000. The `consumer thread` is due to
`CallerRuns` policy.
There is no way to register that the consumer is called from a route
which uses threads EIP, and thus we have two options for preventing this
blocking behavior. Either increase the maxQueue size to something very
high - or make it unbound.
I went with the latter as I believe it to be safe as the queue will be
constrained by the thread profile of the route at all times and thus
cannot grow unbounded, except if the outer profile is configured without
a limit.
---
.../camel/component/aws2/sqs/Sqs2Consumer.java | 32 ++++++---
.../SqsConsumerExtendMessageVisibilityTest.java | 77 ++++++++++++++++++++++
2 files changed, 100 insertions(+), 9 deletions(-)
diff --git
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index 188a8e55763..f9e5e0627a8 100644
---
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -39,6 +39,7 @@ import org.apache.camel.health.WritableHealthCheckRepository;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.ScheduledPollConsumerScheduler;
import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.support.DefaultScheduledPollConsumerScheduler;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
@@ -209,10 +210,12 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
// add on completion to handle after work when the exchange is done
exchange.adapt(ExtendedExchange.class).addOnCompletion(new
Synchronization() {
+ @Override
public void onComplete(Exchange exchange) {
processCommit(exchange);
}
+ @Override
public void onFailure(Exchange exchange) {
processRollback(exchange);
}
@@ -316,7 +319,8 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
// Need to apply the SqsHeaderFilterStrategy this time
HeaderFilterStrategy headerFilterStrategy =
getEndpoint().getHeaderFilterStrategy();
// add all sqs message attributes as camel message headers so that
- // knowledge of the Sqs class MessageAttributeValue will not leak to
the client
+ // knowledge of the Sqs class MessageAttributeValue will not leak to
the
+ // client
for (Map.Entry<String, MessageAttributeValue> entry :
msg.messageAttributes().entrySet()) {
String header = entry.getKey();
Object value =
Sqs2MessageHelper.fromMessageAttributeValue(entry.getValue());
@@ -340,7 +344,8 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
if (newScheduler && scheduler instanceof
DefaultScheduledPollConsumerScheduler) {
DefaultScheduledPollConsumerScheduler ds =
(DefaultScheduledPollConsumerScheduler) scheduler;
ds.setConcurrentConsumers(getConfiguration().getConcurrentConsumers());
- // if using concurrent consumers then resize pool to be at least
same size
+ // if using concurrent consumers then resize pool to be at least
+ // same size
int ps = Math.max(ds.getPoolSize(),
getConfiguration().getConcurrentConsumers());
ds.setPoolSize(ps);
}
@@ -350,16 +355,26 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
protected void doStart() throws Exception {
// start scheduler first
if (getConfiguration().isExtendMessageVisibility() &&
scheduledExecutor == null) {
- this.scheduledExecutor =
getEndpoint().getCamelContext().getExecutorServiceManager()
- .newSingleThreadScheduledExecutor(this,
"SqsTimeoutExtender");
+ ThreadPoolProfile profile = new
ThreadPoolProfile("SqsTimeoutExtender");
+ profile.setPoolSize(1);
+ profile.setAllowCoreThreadTimeOut(false);
+ // the max queue is set to be unbound as there is no way to
register
+ // the required size. If using the Thread EIP, then the max queue
+ // size is equal to maxQueueSize of the consumer thread EIP+max
+ // thread count+consumer-thread.
+ // The consumer would block when this limit was reached. It is safe
+ // to set this queue to unbound as it will be limited by the
+ // consumer.
+ profile.setMaxQueueSize(-1);
+
+ this.scheduledExecutor =
getEndpoint().getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
+ "SqsTimeoutExtender", profile);
}
super.doStart();
// health-check is optional so discover and resolve
- healthCheckRepository = HealthCheckHelper.getHealthCheckRepository(
- getEndpoint().getCamelContext(),
- "components",
+ healthCheckRepository =
HealthCheckHelper.getHealthCheckRepository(getEndpoint().getCamelContext(),
"components",
WritableHealthCheckRepository.class);
if (healthCheckRepository != null) {
@@ -415,8 +430,7 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
private void logException(Exception e) {
LOG.warn("Extending visibility window failed for exchange {}"
+ ". Will not attempt to extend visibility further. This
exception will be ignored.",
- exchange,
- e);
+ exchange, e);
}
}
diff --git
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
new file mode 100644
index 00000000000..e972cd3524c
--- /dev/null
+++
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.sqs.model.Message;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SqsConsumerExtendMessageVisibilityTest extends CamelTestSupport {
+
+ private static final int TIMEOUT = 2; // 2 seconds.
+ private static final String RECEIPT_HANDLE =
"0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5";
+
+ @EndpointInject("mock:result")
+ private MockEndpoint mock;
+
+ @BindToRegistry("amazonSQSClient")
+ private AmazonSQSClientMock client = new AmazonSQSClientMock();
+
+ @Test
+ public void extendVisiblityForLongTask() throws Exception {
+ this.mock.expectedMessageCount(1);
+ this.mock.whenAnyExchangeReceived(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ // Simulate message that takes a while to receive.
+ Thread.sleep(TIMEOUT * 1500L); // 150% of TIMEOUT.
+ }
+ });
+
+ Message.Builder message = Message.builder();
+ message.body("Message 1");
+ message.md5OfBody("6a1559560f67c5e7a7d5d838bf0272ee");
+ message.messageId("f6fb6f99-5eb2-4be4-9b15-144774141458");
+ message.receiptHandle(RECEIPT_HANDLE);
+ this.client.messages.add(message.build());
+
+ // Wait for message to arrive.
+ MockEndpoint.assertIsSatisfied(context);
+
+ assertTrue(this.client.changeMessageVisibilityRequests.size() >= 1);
+ assertTrue(this.client.changeMessageVisibilityRequests.size() <= 3);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+
from("aws2-sqs://MyQueue?amazonSQSClient=#amazonSQSClient&visibilityTimeout=" +
TIMEOUT
+ + "&extendMessageVisibility=true").to("mock:result");
+ }
+ };
+ }
+}