Author: davsclaus
Date: Mon Oct 22 13:35:37 2012
New Revision: 1400878
URL: http://svn.apache.org/viewvc?rev=1400878&view=rev
Log:
CAMEL-5730: Should cancel extended message visibility check task when exchange
is done. Thanks to Alex Hutter for the patch.
Modified:
camel/branches/camel-2.10.x/ (props changed)
camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Merged /camel/trunk:r1400876
Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java?rev=1400878&r1=1400877&r2=1400878&view=diff
==============================================================================
---
camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
(original)
+++
camel/branches/camel-2.10.x/components/camel-aws/src/main/java/org/apache/camel/component/aws/sqs/SqsConsumer.java
Mon Oct 22 13:35:37 2012
@@ -106,6 +106,35 @@ public class SqsConsumer extends Schedul
// update pending number of exchanges
pendingExchanges = total - index - 1;
+ // schedule task to extend visibility if enabled
+ Integer visibilityTimeout =
getConfiguration().getVisibilityTimeout();
+ if (this.scheduledExecutor != null && visibilityTimeout != null &&
(visibilityTimeout.intValue() / 2) > 0) {
+ int delay = visibilityTimeout.intValue() / 2;
+ int period = visibilityTimeout.intValue();
+ LOG.debug("Scheduled TimeoutExtender task to start after {}
delay, and run with {} period (seconds) to extend exchangeId: {}",
+ new Object[]{delay, period, exchange.getExchangeId()});
+ final ScheduledFuture<?> scheduledFuture =
this.scheduledExecutor.scheduleAtFixedRate(
+ new TimeoutExtender(exchange, visibilityTimeout),
delay, period, TimeUnit.SECONDS);
+ exchange.addOnCompletion(new Synchronization() {
+ @Override
+ public void onComplete(Exchange exchange) {
+ cancelExtender(exchange);
+ }
+
+ @Override
+ public void onFailure(Exchange exchange) {
+ cancelExtender(exchange);
+ }
+
+ private void cancelExtender(Exchange exchange) {
+ // cancel task as we are done
+ LOG.trace("Processing done so cancelling
TimeoutExtender task for exchangeId: {}", exchange.getExchangeId());
+ scheduledFuture.cancel(true);
+ }
+ });
+ }
+
+
// add on completion to handle after work when the exchange is done
exchange.addOnCompletion(new Synchronization() {
public void onComplete(Exchange exchange) {
@@ -123,29 +152,12 @@ public class SqsConsumer extends Schedul
});
- // schedule task to extend visibility if enabled
- ScheduledFuture<?> scheduledFuture = null;
- Integer visibilityTimeout =
getConfiguration().getVisibilityTimeout();
- if (scheduledExecutor != null && visibilityTimeout != null &&
(visibilityTimeout.intValue() / 2) > 0) {
- int delay = visibilityTimeout.intValue() / 2;
- int period = visibilityTimeout.intValue();
- LOG.debug("Scheduled TimeoutExtender task to start after {}
delay, and run with {} period (seconds) to extend exchangeId: {}",
- new Object[]{delay, period, exchange.getExchangeId()});
- scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(
- new TimeoutExtender(exchange, visibilityTimeout),
delay, period, TimeUnit.SECONDS);
- }
-
LOG.trace("Processing exchange [{}]...", exchange);
try {
// This blocks while message is consumed.
getProcessor().process(exchange);
} finally {
LOG.trace("Processing exchange [{}] done.", exchange);
- // cancel task as we are done
- if (scheduledFuture != null) {
- LOG.trace("Processing done so cancelling TimeoutExtender
task for exchangeId: {}", exchange.getExchangeId());
- scheduledFuture.cancel(true);
- }
}
}