This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.4.x by this push:
new 945bb0a8d83 CAMEL-21550: start the extender before visibility (#16592)
945bb0a8d83 is described below
commit 945bb0a8d838e98a7016cf423d82c340d911528f
Author: Narsi Nallamilli <[email protected]>
AuthorDate: Wed Dec 18 14:40:26 2024 +0530
CAMEL-21550: start the extender before visibility (#16592)
---
.../camel/component/aws2/sqs/Sqs2Consumer.java | 36 +++++++---------------
.../SqsConsumerExtendMessageVisibilityTest.java | 2 +-
2 files changed, 12 insertions(+), 26 deletions(-)
diff --git
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index 1ff99a1a63f..019f1dd51e9 100644
---
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -48,17 +48,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.sqs.SqsClient;
-import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
-import
software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
-import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
-import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
-import software.amazon.awssdk.services.sqs.model.MessageNotInflightException;
-import software.amazon.awssdk.services.sqs.model.QueueDeletedRecentlyException;
-import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
-import
software.amazon.awssdk.services.sqs.model.ReceiptHandleIsInvalidException;
-import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
-import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
-import software.amazon.awssdk.services.sqs.model.SqsException;
+import software.amazon.awssdk.services.sqs.model.*;
/**
* A Consumer of messages from the Amazon Web Service Simple Queue Service <a
href="http://aws.amazon.com/sqs/">AWS
@@ -348,17 +338,18 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
Integer visibilityTimeout =
getConfiguration().getVisibilityTimeout();
if (visibilityTimeout != null && visibilityTimeout > 0) {
- int delay = visibilityTimeout;
+ int initialDelay = visibilityTimeout / 2;
+ int period = visibilityTimeout;
int repeatSeconds = (int) (visibilityTimeout.doubleValue() *
1.5);
this.timeoutExtender = new TimeoutExtender(repeatSeconds);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Scheduled TimeoutExtender task to start after {}
delay, and run with {}/{} period/repeat (seconds)",
- delay, delay, repeatSeconds);
+ initialDelay, period, repeatSeconds);
}
this.scheduledFuture
- =
scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, delay, delay,
TimeUnit.SECONDS);
+ =
scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, initialDelay,
period, TimeUnit.SECONDS);
}
}
@@ -448,18 +439,13 @@ public class Sqs2Consumer extends
ScheduledBatchPollingConsumer {
try {
LOG.trace("Extending visibility window by {} seconds
for request entries {}", repeatSeconds,
batchEntries);
-
getEndpoint().getClient().changeMessageVisibilityBatch(request);
- LOG.debug("Extended visibility window for request
entries {}", batchEntries);
- } catch (MessageNotInflightException |
ReceiptHandleIsInvalidException e) {
- // Ignore.
+ ChangeMessageVisibilityBatchResponse br
+ =
getEndpoint().getClient().changeMessageVisibilityBatch(request);
+ LOG.debug("Extended visibility window for request
entries successful {}", br.successful());
+ LOG.debug("Extended visibility window for request
entries failed {}", br.failed());
} catch (SqsException e) {
- if (e.getMessage()
- .contains("Message does not exist or is not
available for visibility timeout change")) {
- // Ignore.
- } else {
- logException(e, batchEntries);
- }
- } catch (Exception e) {
+ logException(e, batchEntries);
+ } catch (SdkException e) {
logException(e, batchEntries);
}
}
diff --git
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
index 1ecb28ae27d..6eb41f34762 100644
---
a/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
+++
b/components/camel-aws/camel-aws2-sqs/src/test/java/org/apache/camel/component/aws2/sqs/SqsConsumerExtendMessageVisibilityTest.java
@@ -46,7 +46,7 @@ public class SqsConsumerExtendMessageVisibilityTest extends
CamelTestSupport {
@Override
public void process(Exchange exchange) throws Exception {
// Simulate message that takes a while to receive.
- Thread.sleep(TIMEOUT * 1500L); // 150% of TIMEOUT.
+ Thread.sleep(TIMEOUT * 3000L); // 150% of TIMEOUT.
}
});