This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4d7d56a8244ad3fbe2cda49ab7a6b8ea073a58bf
Author: Narsi Nallamilli <[email protected]>
AuthorDate: Wed Feb 26 14:52:33 2025 +0530

    CAMEL-21769: camel-aws-sqs - Error is causing the sqs message to be 
extended forever (#17262)
    
    * CAMEL-21769: stop exchange from extending on error
    
    * CAMEL-21769: handle error
---
 .../camel/component/aws2/sqs/Sqs2Consumer.java     | 10 +++++-
 .../camel/component/aws2/sqs/Sqs2ConsumerTest.java | 40 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 1 deletion(-)

diff --git 
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
 
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index b2603198499..21c06b9f503 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -168,7 +168,15 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
 
             // use default consumer callback
             AsyncCallback cb = defaultConsumerCallback(exchange, true);
-            getAsyncProcessor().process(exchange, cb);
+            try {
+                Boolean a = getAsyncProcessor().process(exchange, cb);
+            } catch (Error e) {
+                LOG.debug("Error processing exchange, stopping exchange from 
extending its visibility", e);
+                if (timeoutExtender != null && timeoutExtender.entries != 
null) {
+                    timeoutExtender.entries.remove(exchange.getExchangeId());
+                }
+                throw e;
+            }
         }
 
         return total;
diff --git 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerTest.java
 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerTest.java
index 71767ce77fb..4f6df74ebb7 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerTest.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerTest.java
@@ -25,6 +25,7 @@ import java.util.stream.IntStream;
 
 import org.apache.camel.ContextEvents;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.clock.EventClock;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.junit.jupiter.api.BeforeEach;
@@ -210,6 +211,30 @@ class Sqs2ConsumerTest extends CamelTestSupport {
         }
     }
 
+    @Test
+    void consumerStopsExchangeFromExtendingVisibilityOnError() throws 
Exception {
+        // given
+        sqsClientMock.addMessage(message("A"));
+        configuration.setExtendMessageVisibility(true);
+        configuration.setWaitTimeSeconds(5);
+        configuration.setVisibilityTimeout(2);
+        configuration.setConcurrentConsumers(1);
+        try (var tested = createConsumerWithProcessor(1, exchange -> {
+            Thread.sleep(2000L);
+            throw new OutOfMemoryError();
+        })) {
+            //when
+            try {
+                tested.poll();
+            } catch (Error e) {
+            }
+            //simulate some time pass after error
+            Thread.sleep(2000L);
+            // then
+            
assertThat(sqsClientMock.getChangeMessageVisibilityBatchRequests().size()).isEqualTo(2);
+        }
+    }
+
     @Test
     void shouldRequest10MessagesWithSingleReceiveRequest() throws Exception {
         // given
@@ -624,6 +649,21 @@ class Sqs2ConsumerTest extends CamelTestSupport {
         return consumer;
     }
 
+    private Sqs2Consumer createConsumerWithProcessor(int maxNumberOfMessages, 
Processor processor) throws Exception {
+        var component = new Sqs2Component(context());
+        component.setConfiguration(configuration);
+
+        var endpoint = (Sqs2Endpoint) 
component.createEndpoint("aws2-sqs://%s?maxMessagesPerPoll=%s"
+                .formatted(configuration.getQueueName(), maxNumberOfMessages));
+        endpoint.setClient(sqsClientMock);
+        endpoint.setClock(clock);
+
+        var consumer = new Sqs2Consumer(endpoint, processor);
+        consumer.setStartScheduler(false);
+        consumer.start();
+        return consumer;
+    }
+
     private List<Object> receiveMessageBodies() {
         return receivedExchanges.stream().map(it -> 
it.getIn().getBody()).toList();
     }

Reply via email to