This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.20.x by this push:
     new 6120a20ed41 CAMEL-18780: Prevent blocking sqs consumer when using 
message extend (#9108)
6120a20ed41 is described below

commit 6120a20ed41585f2033a0c5c4c864cb1a5b01a22
Author: Simon Rasmussen <[email protected]>
AuthorDate: Sun Jan 15 13:54:17 2023 +0100

    CAMEL-18780: Prevent blocking sqs consumer when using message extend (#9108)
    
    When using message extending feature together with Threads EIP, it was
    possible to completely block the consumer (deadlock) if the message
    extend executor task queue would be filled up.
    
    This could be triggered by the default threads profile where the message
    queue size is set to 1000 tasks.
    
    On default settings, the consumer would consume: `max queue size`+`max
    threads`+`consumer thread` = 1000+20+1 concurrent tasks. The extender
    queue size however is only 1000. The `consumer thread` is due to
    `CallerRuns` policy.
    
    There is no way to register that the consumer is called from a route
    which uses threads EIP, and thus we have two options for preventing this
    blocking behavior. Either increase the maxQueue size to something very
    high - or make it unbound.
    
    I went with the latter as I believe it to be safe as the queue will be
    constrained by the thread profile of the route at all times and thus
    cannot grow unbounded, except if the outer profile is configured without
    a limit.
---
 .../camel/component/aws2/sqs/Sqs2Consumer.java     | 32 ++++++---
 .../SqsConsumerExtendMessageVisibilityTest.java    | 77 ++++++++++++++++++++++
 2 files changed, 100 insertions(+), 9 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
 
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index 188a8e55763..f9e5e0627a8 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -39,6 +39,7 @@ import org.apache.camel.health.WritableHealthCheckRepository;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.spi.ScheduledPollConsumerScheduler;
 import org.apache.camel.spi.Synchronization;
+import org.apache.camel.spi.ThreadPoolProfile;
 import org.apache.camel.support.DefaultScheduledPollConsumerScheduler;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
@@ -209,10 +210,12 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
 
             // add on completion to handle after work when the exchange is done
             exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
Synchronization() {
+                @Override
                 public void onComplete(Exchange exchange) {
                     processCommit(exchange);
                 }
 
+                @Override
                 public void onFailure(Exchange exchange) {
                     processRollback(exchange);
                 }
@@ -316,7 +319,8 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
         // Need to apply the SqsHeaderFilterStrategy this time
         HeaderFilterStrategy headerFilterStrategy = 
getEndpoint().getHeaderFilterStrategy();
         // add all sqs message attributes as camel message headers so that
-        // knowledge of the Sqs class MessageAttributeValue will not leak to 
the client
+        // knowledge of the Sqs class MessageAttributeValue will not leak to 
the
+        // client
         for (Map.Entry<String, MessageAttributeValue> entry : 
msg.messageAttributes().entrySet()) {
             String header = entry.getKey();
             Object value = 
Sqs2MessageHelper.fromMessageAttributeValue(entry.getValue());
@@ -340,7 +344,8 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
         if (newScheduler && scheduler instanceof 
DefaultScheduledPollConsumerScheduler) {
             DefaultScheduledPollConsumerScheduler ds = 
(DefaultScheduledPollConsumerScheduler) scheduler;
             
ds.setConcurrentConsumers(getConfiguration().getConcurrentConsumers());
-            // if using concurrent consumers then resize pool to be at least 
same size
+            // if using concurrent consumers then resize pool to be at least
+            // same size
             int ps = Math.max(ds.getPoolSize(), 
getConfiguration().getConcurrentConsumers());
             ds.setPoolSize(ps);
         }
@@ -350,16 +355,26 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
     protected void doStart() throws Exception {
         // start scheduler first
         if (getConfiguration().isExtendMessageVisibility() && 
scheduledExecutor == null) {
-            this.scheduledExecutor = 
getEndpoint().getCamelContext().getExecutorServiceManager()
-                    .newSingleThreadScheduledExecutor(this, 
"SqsTimeoutExtender");
+            ThreadPoolProfile profile = new 
ThreadPoolProfile("SqsTimeoutExtender");
+            profile.setPoolSize(1);
+            profile.setAllowCoreThreadTimeOut(false);
+            // the max queue is set to be unbound as there is no way to 
register
+            // the required size. If using the Thread EIP, then the max queue
+            // size is equal to maxQueueSize of the consumer thread EIP+max
+            // thread count+consumer-thread.
+            // The consumer would block when this limit was reached. It is safe
+            // to set this queue to unbound as it will be limited by the
+            // consumer.
+            profile.setMaxQueueSize(-1);
+
+            this.scheduledExecutor = 
getEndpoint().getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this,
+                    "SqsTimeoutExtender", profile);
         }
 
         super.doStart();
 
         // health-check is optional so discover and resolve
-        healthCheckRepository = HealthCheckHelper.getHealthCheckRepository(
-                getEndpoint().getCamelContext(),
-                "components",
+        healthCheckRepository = 
HealthCheckHelper.getHealthCheckRepository(getEndpoint().getCamelContext(), 
"components",
                 WritableHealthCheckRepository.class);
 
         if (healthCheckRepository != null) {
@@ -415,8 +430,7 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
         private void logException(Exception e) {
             LOG.warn("Extending visibility window failed for exchange {}"
                      + ". Will not attempt to extend visibility further. This 
exception will be ignored.",
-                    exchange,
-                    e);
+                    exchange, e);
         }
     }
 
diff --git 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
new file mode 100644
index 00000000000..e972cd3524c
--- /dev/null
+++ 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.sqs;
+
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.sqs.model.Message;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SqsConsumerExtendMessageVisibilityTest extends CamelTestSupport {
+
+    private static final int TIMEOUT = 2; // 2 seconds.
+    private static final String RECEIPT_HANDLE = 
"0NNAq8PwvXsyZkR6yu4nQ07FGxNmOBWi5";
+
+    @EndpointInject("mock:result")
+    private MockEndpoint mock;
+
+    @BindToRegistry("amazonSQSClient")
+    private AmazonSQSClientMock client = new AmazonSQSClientMock();
+
+    @Test
+    public void extendVisiblityForLongTask() throws Exception {
+        this.mock.expectedMessageCount(1);
+        this.mock.whenAnyExchangeReceived(new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                // Simulate message that takes a while to receive.
+                Thread.sleep(TIMEOUT * 1500L); // 150% of TIMEOUT.
+            }
+        });
+
+        Message.Builder message = Message.builder();
+        message.body("Message 1");
+        message.md5OfBody("6a1559560f67c5e7a7d5d838bf0272ee");
+        message.messageId("f6fb6f99-5eb2-4be4-9b15-144774141458");
+        message.receiptHandle(RECEIPT_HANDLE);
+        this.client.messages.add(message.build());
+
+        // Wait for message to arrive.
+        MockEndpoint.assertIsSatisfied(context);
+
+        assertTrue(this.client.changeMessageVisibilityRequests.size() >= 1);
+        assertTrue(this.client.changeMessageVisibilityRequests.size() <= 3);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                
from("aws2-sqs://MyQueue?amazonSQSClient=#amazonSQSClient&visibilityTimeout=" + 
TIMEOUT
+                     + "&extendMessageVisibility=true").to("mock:result");
+            }
+        };
+    }
+}

Reply via email to