This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 640ff65d6b3667f7d146604734c460e1aa6e9917
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Feb 26 10:27:47 2025 +0100

    CAMEL-21769: camel-aws-sqs - Error is causing the sqs message to be 
extended forever
---
 .../camel/component/aws2/sqs/Sqs2Consumer.java       | 20 ++++++++++----------
 .../sqs/SqsBatchConsumerConcurrentConsumersIT.java   |  2 +-
 2 files changed, 11 insertions(+), 11 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
 
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index 21c06b9f503..1e496915a18 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -169,11 +169,12 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
             // use default consumer callback
             AsyncCallback cb = defaultConsumerCallback(exchange, true);
             try {
-                Boolean a = getAsyncProcessor().process(exchange, cb);
+                getAsyncProcessor().process(exchange, cb);
             } catch (Error e) {
-                LOG.debug("Error processing exchange, stopping exchange from 
extending its visibility", e);
-                if (timeoutExtender != null && timeoutExtender.entries != 
null) {
-                    timeoutExtender.entries.remove(exchange.getExchangeId());
+                if (this.timeoutExtender != null) {
+                    // fatal error so stop the timeout extender
+                    timeoutExtender.cancel();
+                    timeoutExtender.entries.clear();
                 }
                 throw e;
             }
@@ -189,7 +190,6 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
      */
     protected void processCommit(Exchange exchange) {
         try {
-
             if (shouldDelete(exchange)) {
                 String receiptHandle = 
exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class);
                 DeleteMessageRequest.Builder deleteRequest
@@ -491,21 +491,21 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
 
     /**
      * Task responsible for polling the messages from Amazon SQS server.
-     * <p />
+     * <p/>
      * Depending on the configuration, the polling may involve sending one or 
more receive requests in a single task
      * call. The number of send requests depends on the {@link 
Sqs2Endpoint#getMaxMessagesPerPoll()} configuration. The
      * Amazon SQS receive API has upper limit of maximum 10 messages that can 
be fetched with a single request. To
      * enable handling greater number of messages fetched per poll, multiple 
requests are being send asynchronously and
      * then joined together.
-     * <p />
+     * <p/>
      * To preserver the ordering, an optional {@link 
Sqs2Configuration#getSortAttributeName()} can be configured. When
      * specified, all messages collected from the concurrent requests are 
being sorted using this attribute.
-     * <p />
+     * <p/>
      * In addition to that, the task is also responsible for handling 
auto-creation of the SQS queue, when its missing.
      * The queue is created when receive request returns an error about the 
missing queue and the
      * {@link Sqs2Configuration#isAutoCreateQueue()} is enabled. In such case, 
the queue will be created and the task
      * will return empty list of messages.
-     * <p />
+     * <p/>
      * If the queue creation fails with an error related to recently deleted 
queue, the queue creation will be postponed
      * for at least 30 seconds. To prevent task from blocking the consumer 
thread, the 30 second timeout is being
      * checked in each task call. If the scheduled time for queue 
auto-creation was not reached yet, the task will
@@ -531,7 +531,7 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
          * <pre>
          * 0
          * </pre>
-         *
+         * <p>
          * means there is no schedule.
          */
         private final AtomicLong queueAutoCreationScheduleTime = new 
AtomicLong(0L);
diff --git 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java
 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java
index 2b2d1ab929f..11f2d87f714 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java
@@ -34,7 +34,7 @@ public class SqsBatchConsumerConcurrentConsumersIT extends 
CamelTestSupport {
     @Test
     public void receiveBatch() throws Exception {
         mock.expectedMessageCount(6);
-        MockEndpoint.assertIsSatisfied(context, 3, TimeUnit.SECONDS);
+        MockEndpoint.assertIsSatisfied(context, 10, TimeUnit.SECONDS);
     }
 
     @BindToRegistry("amazonSQSClient")

Reply via email to