This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 640ff65d6b3667f7d146604734c460e1aa6e9917 Author: Claus Ibsen <[email protected]> AuthorDate: Wed Feb 26 10:27:47 2025 +0100 CAMEL-21769: camel-aws-sqs - Error is causing the sqs message to be extended forever --- .../camel/component/aws2/sqs/Sqs2Consumer.java | 20 ++++++++++---------- .../sqs/SqsBatchConsumerConcurrentConsumersIT.java | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java index 21c06b9f503..1e496915a18 100644 --- a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java +++ b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java @@ -169,11 +169,12 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { // use default consumer callback AsyncCallback cb = defaultConsumerCallback(exchange, true); try { - Boolean a = getAsyncProcessor().process(exchange, cb); + getAsyncProcessor().process(exchange, cb); } catch (Error e) { - LOG.debug("Error processing exchange, stopping exchange from extending its visibility", e); - if (timeoutExtender != null && timeoutExtender.entries != null) { - timeoutExtender.entries.remove(exchange.getExchangeId()); + if (this.timeoutExtender != null) { + // fatal error so stop the timeout extender + timeoutExtender.cancel(); + timeoutExtender.entries.clear(); } throw e; } @@ -189,7 +190,6 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { */ protected void processCommit(Exchange exchange) { try { - if (shouldDelete(exchange)) { String receiptHandle = exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class); DeleteMessageRequest.Builder deleteRequest @@ -491,21 +491,21 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { /** * Task responsible for polling the messages from Amazon SQS server. - * <p /> + * <p/> * Depending on the configuration, the polling may involve sending one or more receive requests in a single task * call. The number of send requests depends on the {@link Sqs2Endpoint#getMaxMessagesPerPoll()} configuration. The * Amazon SQS receive API has upper limit of maximum 10 messages that can be fetched with a single request. To * enable handling greater number of messages fetched per poll, multiple requests are being send asynchronously and * then joined together. - * <p /> + * <p/> * To preserver the ordering, an optional {@link Sqs2Configuration#getSortAttributeName()} can be configured. When * specified, all messages collected from the concurrent requests are being sorted using this attribute. - * <p /> + * <p/> * In addition to that, the task is also responsible for handling auto-creation of the SQS queue, when its missing. * The queue is created when receive request returns an error about the missing queue and the * {@link Sqs2Configuration#isAutoCreateQueue()} is enabled. In such case, the queue will be created and the task * will return empty list of messages. - * <p /> + * <p/> * If the queue creation fails with an error related to recently deleted queue, the queue creation will be postponed * for at least 30 seconds. To prevent task from blocking the consumer thread, the 30 second timeout is being * checked in each task call. If the scheduled time for queue auto-creation was not reached yet, the task will @@ -531,7 +531,7 @@ public class Sqs2Consumer extends ScheduledBatchPollingConsumer { * <pre> * 0 * </pre> - * + * <p> * means there is no schedule. */ private final AtomicLong queueAutoCreationScheduleTime = new AtomicLong(0L); diff --git a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java index 2b2d1ab929f..11f2d87f714 100644 --- a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java +++ b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsBatchConsumerConcurrentConsumersIT.java @@ -34,7 +34,7 @@ public class SqsBatchConsumerConcurrentConsumersIT extends CamelTestSupport { @Test public void receiveBatch() throws Exception { mock.expectedMessageCount(6); - MockEndpoint.assertIsSatisfied(context, 3, TimeUnit.SECONDS); + MockEndpoint.assertIsSatisfied(context, 10, TimeUnit.SECONDS); } @BindToRegistry("amazonSQSClient")
