Author: davsclaus
Date: Mon Oct 22 13:34:09 2012
New Revision: 1400876

URL: http://svn.apache.org/viewvc?rev=1400876&view=rev
Log:
CAMEL-5730: Should cancel extended message visibility check task when exchange 
is done. Thanks to Alex Hutter for the patch.

Modified:
    
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java

Modified: 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1400876&r1=1400875&r2=1400876&view=diff
==============================================================================
--- 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
 (original)
+++ 
camel/trunk/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
 Mon Oct 22 13:34:09 2012
@@ -106,6 +106,35 @@ public class SqsConsumer extends Schedul
             // update pending number of exchanges
             pendingExchanges = total - index - 1;
 
+            // schedule task to extend visibility if enabled
+            Integer visibilityTimeout = 
getConfiguration().getVisibilityTimeout();
+            if (this.scheduledExecutor != null && visibilityTimeout != null && 
(visibilityTimeout.intValue() / 2) > 0) {
+                int delay = visibilityTimeout.intValue() / 2;
+                int period = visibilityTimeout.intValue();
+                LOG.debug("Scheduled TimeoutExtender task to start after {} 
delay, and run with {} period (seconds) to extend exchangeId: {}",
+                        new Object[]{delay, period, exchange.getExchangeId()});
+                final ScheduledFuture<?> scheduledFuture = 
this.scheduledExecutor.scheduleAtFixedRate(
+                        new TimeoutExtender(exchange, visibilityTimeout), 
delay, period, TimeUnit.SECONDS);
+                exchange.addOnCompletion(new Synchronization() {
+                    @Override
+                    public void onComplete(Exchange exchange) {
+                        cancelExtender(exchange);
+                    }
+
+                    @Override
+                    public void onFailure(Exchange exchange) {
+                        cancelExtender(exchange);
+                    }
+
+                    private void cancelExtender(Exchange exchange) {
+                        // cancel task as we are done
+                        LOG.trace("Processing done so cancelling 
TimeoutExtender task for exchangeId: {}", exchange.getExchangeId());
+                        scheduledFuture.cancel(true);
+                    }
+                });
+            }
+
+
             // add on completion to handle after work when the exchange is done
             exchange.addOnCompletion(new Synchronization() {
                 public void onComplete(Exchange exchange) {
@@ -123,29 +152,12 @@ public class SqsConsumer extends Schedul
             });
 
 
-            // schedule task to extend visibility if enabled
-            ScheduledFuture<?> scheduledFuture = null;
-            Integer visibilityTimeout = 
getConfiguration().getVisibilityTimeout();
-            if (scheduledExecutor != null && visibilityTimeout != null && 
(visibilityTimeout.intValue() / 2) > 0) {
-                int delay = visibilityTimeout.intValue() / 2;
-                int period = visibilityTimeout.intValue();
-                LOG.debug("Scheduled TimeoutExtender task to start after {} 
delay, and run with {} period (seconds) to extend exchangeId: {}",
-                        new Object[]{delay, period, exchange.getExchangeId()});
-                scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(
-                        new TimeoutExtender(exchange, visibilityTimeout), 
delay, period, TimeUnit.SECONDS);
-            }
-
             LOG.trace("Processing exchange [{}]...", exchange);
             try {
                 // This blocks while message is consumed.
                 getProcessor().process(exchange);
             } finally {
                 LOG.trace("Processing exchange [{}] done.", exchange);
-                // cancel task as we are done
-                if (scheduledFuture != null) {
-                    LOG.trace("Processing done so cancelling TimeoutExtender 
task for exchangeId: {}", exchange.getExchangeId());
-                    scheduledFuture.cancel(true);
-                }
             }
         }
 


Reply via email to