This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.21.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-3.21.x by this push:
     new 5739c966881 CAMEL-19562: aws sqs visibility extender is running 
forever (#10828)
5739c966881 is described below

commit 5739c966881fae93cc394a81a463ce439ad089a2
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Jul 26 12:36:35 2023 +0200

    CAMEL-19562: aws sqs visibility extender is running forever (#10828)
---
 .../camel/component/aws2/sqs/Sqs2Consumer.java     | 51 ++++++++++++++--------
 1 file changed, 32 insertions(+), 19 deletions(-)

diff --git 
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
 
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
index f9e5e0627a8..a1688a5ebc0 100644
--- 
a/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
+++ 
b/components/camel-aws/camel-aws2-sqs/src/main/java/org/apache/camel/component/aws2/sqs/Sqs2Consumer.java
@@ -26,6 +26,7 @@ import java.util.Queue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
@@ -185,9 +186,9 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
                             delay, period,
                             repeatSeconds, exchange.getExchangeId());
                 }
-                final ScheduledFuture<?> scheduledFuture = 
this.scheduledExecutor.scheduleAtFixedRate(
-                        new TimeoutExtender(exchange, repeatSeconds), delay, 
period,
-                        TimeUnit.SECONDS);
+                final TimeoutExtender extender = new TimeoutExtender(exchange, 
repeatSeconds);
+                final ScheduledFuture<?> scheduledFuture = 
this.scheduledExecutor.scheduleAtFixedRate(extender,
+                        delay, period, TimeUnit.SECONDS);
                 exchange.adapt(ExtendedExchange.class).addOnCompletion(new 
Synchronization() {
                     @Override
                     public void onComplete(Exchange exchange) {
@@ -203,7 +204,11 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
                         // cancel task as we are done
                         LOG.trace("Processing done so cancelling 
TimeoutExtender task for exchangeId: {}",
                                 exchange.getExchangeId());
-                        scheduledFuture.cancel(false);
+                        extender.cancel();
+                        boolean cancelled = scheduledFuture.cancel(true);
+                        if (!cancelled) {
+                            LOG.warn("TimeoutExtender task for exchangeId: {} 
could not be cancelled", exchange.getExchangeId());
+                        }
                     }
                 });
             }
@@ -398,32 +403,40 @@ public class Sqs2Consumer extends 
ScheduledBatchPollingConsumer {
 
         private final Exchange exchange;
         private final int repeatSeconds;
+        private final AtomicBoolean run = new AtomicBoolean(true);
 
         TimeoutExtender(Exchange exchange, int repeatSeconds) {
             this.exchange = exchange;
             this.repeatSeconds = repeatSeconds;
         }
 
+        public void cancel() {
+            // cancel by setting to no longer run
+            run.set(false);
+        }
+
         @Override
         public void run() {
-            ChangeMessageVisibilityRequest.Builder request
-                    = 
ChangeMessageVisibilityRequest.builder().queueUrl(getQueueUrl()).visibilityTimeout(repeatSeconds)
-                            
.receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, 
String.class));
-
-            try {
-                LOG.trace("Extending visibility window by {} seconds for 
exchange {}", this.repeatSeconds, this.exchange);
-                
getEndpoint().getClient().changeMessageVisibility(request.build());
-                LOG.debug("Extended visibility window by {} seconds for 
exchange {}", this.repeatSeconds, this.exchange);
-            } catch (MessageNotInflightException | 
ReceiptHandleIsInvalidException e) {
-                // Ignore.
-            } catch (SqsException e) {
-                if (e.getMessage().contains("Message does not exist or is not 
available for visibility timeout change")) {
+            if (run.get()) {
+                ChangeMessageVisibilityRequest.Builder request
+                        = 
ChangeMessageVisibilityRequest.builder().queueUrl(getQueueUrl()).visibilityTimeout(repeatSeconds)
+                        
.receiptHandle(exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, 
String.class));
+
+                try {
+                    LOG.trace("Extending visibility window by {} seconds for 
exchange {}", this.repeatSeconds, this.exchange);
+                    
getEndpoint().getClient().changeMessageVisibility(request.build());
+                    LOG.debug("Extended visibility window by {} seconds for 
exchange {}", this.repeatSeconds, this.exchange);
+                } catch (MessageNotInflightException | 
ReceiptHandleIsInvalidException e) {
                     // Ignore.
-                } else {
+                } catch (SqsException e) {
+                    if (e.getMessage().contains("Message does not exist or is 
not available for visibility timeout change")) {
+                        // Ignore.
+                    } else {
+                        logException(e);
+                    }
+                } catch (Exception e) {
                     logException(e);
                 }
-            } catch (Exception e) {
-                logException(e);
             }
         }
 

Reply via email to