This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4d7d56a8244ad3fbe2cda49ab7a6b8ea073a58bf Author: Narsi Nallamilli <[email protected]> AuthorDate: Wed Feb 26 14:52:33 2025 +0530 CAMEL-21769: camel-aws-sqs - Error is causing the sqs message to be extended forever (#17262) * CAMEL-21769: stop exchange from extending on error * CAMEL-21769: handle error --- .../camel/component/aws2/sqs/Sqs2Consumer.java | 10 +++++- .../camel/component/aws2/sqs/Sqs2ConsumerTest.java | 40 ++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java index b2603198499..21c06b9f503 100644 --- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java +++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java @@ -168,7 +168,15 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { // use default consumer callback AsyncCallback cb = defaultConsumerCallback(exchange, true); - getAsyncProcessor().process(exchange, cb); + try { + Boolean a = getAsyncProcessor().process(exchange, cb); + } catch (Error e) { + LOG.debug("Error processing exchange, stopping exchange from extending its visibility", e); + if (timeoutExtender != null && timeoutExtender.entries != null) { + timeoutExtender.entries.remove(exchange.getExchangeId()); + } + throw e; + } } return total; diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerTest.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerTest.java index 71767ce77fb..4f6df74ebb7 100644 --- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerTest.java +++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/Sqs2ConsumerTest.java @@ -25,6 +25,7 @@ import java.util.stream.IntStream; import org.apache.camel.ContextEvents; import org.apache.camel.Exchange; +import org.apache.camel.Processor; import org.apache.camel.clock.EventClock; import org.apache.camel.test.junit5.CamelTestSupport; import org.junit.jupiter.api.BeforeEach; @@ -210,6 +211,30 @@ class Sqs2ConsumerTest extends CamelTestSupport { } } + @Test + void consumerStopsExchangeFromExtendingVisibilityOnError() throws Exception { + // given + sqsClientMock.addMessage(message("A")); + configuration.setExtendMessageVisibility(true); + configuration.setWaitTimeSeconds(5); + configuration.setVisibilityTimeout(2); + configuration.setConcurrentConsumers(1); + try (var tested = createConsumerWithProcessor(1, exchange -> { + Thread.sleep(2000L); + throw new OutOfMemoryError(); + })) { + //when + try { + tested.poll(); + } catch (Error e) { + } + //simulate some time pass after error + Thread.sleep(2000L); + // then + assertThat(sqsClientMock.getChangeMessageVisibilityBatchRequests().size()).isEqualTo(2); + } + } + @Test void shouldRequest10MessagesWithSingleReceiveRequest() throws Exception { // given @@ -624,6 +649,21 @@ class Sqs2ConsumerTest extends CamelTestSupport { return consumer; } + private Sqs2Consumer createConsumerWithProcessor(int maxNumberOfMessages, Processor processor) throws Exception { + var component = new Sqs2Component(context()); + component.setConfiguration(configuration); + + var endpoint = (Sqs2Endpoint) component.createEndpoint("aws2-sqs://%s?maxMessagesPerPoll=%s" + .formatted(configuration.getQueueName(), maxNumberOfMessages)); + endpoint.setClient(sqsClientMock); + endpoint.setClock(clock); + + var consumer = new Sqs2Consumer(endpoint, processor); + consumer.setStartScheduler(false); + consumer.start(); + return consumer; + } + private List<Object> receiveMessageBodies() { return receivedExchanges.stream().map(it -> it.getIn().getBody()).toList(); }
