Repository: camel Updated Branches: refs/heads/master f97ac2787 -> ee84c1816
CAMEL-9239: camel-sjms - Add completionInterval to batch consumer. Polished the code and fixed some mistakes. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ee84c181 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ee84c181 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ee84c181 Branch: refs/heads/master Commit: ee84c1816213dab372cfb2efe336ea6ec926d2a3 Parents: f97ac27 Author: Claus Ibsen <[email protected]> Authored: Mon Feb 22 09:20:06 2016 +0100 Committer: Claus Ibsen <[email protected]> Committed: Mon Feb 22 09:20:06 2016 +0100 ---------------------------------------------------------------------- .../component/sjms/batch/SjmsBatchConsumer.java | 29 ++++++++++++++------ 1 file changed, 20 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ee84c181/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java index b5d72a2..4ab7437 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java @@ -16,7 +16,9 @@ */ package org.apache.camel.component.sjms.batch; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -57,7 +59,6 @@ public class SjmsBatchConsumer extends DefaultConsumer { private ScheduledExecutorService timeoutCheckerExecutorService; private boolean shutdownTimeoutCheckerExecutorService; - private final AtomicBoolean completionTimeoutTrigger = new AtomicBoolean(); private final SjmsBatchEndpoint sjmsBatchEndpoint; private final AggregationStrategy aggregationStrategy; @@ -134,10 +135,13 @@ public class SjmsBatchConsumer extends DefaultConsumer { } consumersShutdownLatchRef.set(new CountDownLatch(consumerCount)); - jmsConsumerExecutors = getEndpoint().getCamelContext().getExecutorServiceManager() - .newFixedThreadPool(this, "SjmsBatchConsumer", consumerCount); + jmsConsumerExecutors = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "SjmsBatchConsumer", consumerCount); + + final List<AtomicBoolean> triggers = new ArrayList<>(); for (int i = 0; i < consumerCount; i++) { - jmsConsumerExecutors.execute(new BatchConsumptionLoop()); + BatchConsumptionLoop loop = new BatchConsumptionLoop(); + triggers.add(loop.getCompletionTimeoutTrigger()); + jmsConsumerExecutors.execute(loop); } if (completionInterval > 0) { @@ -147,7 +151,7 @@ public class SjmsBatchConsumer extends DefaultConsumer { shutdownTimeoutCheckerExecutorService = true; } // trigger completion based on interval - timeoutCheckerExecutorService.scheduleAtFixedRate(new CompletionIntervalTask(completionTimeoutTrigger), completionInterval, completionInterval, TimeUnit.MILLISECONDS); + timeoutCheckerExecutorService.scheduleAtFixedRate(new CompletionIntervalTask(triggers), completionInterval, completionInterval, TimeUnit.MILLISECONDS); } } @@ -190,10 +194,10 @@ public class SjmsBatchConsumer extends DefaultConsumer { */ private final class CompletionIntervalTask implements Runnable { - private final AtomicBoolean timeoutInterval; + private final List<AtomicBoolean> triggers; - public CompletionIntervalTask(AtomicBoolean timeoutInterval) { - this.timeoutInterval = timeoutInterval; + public CompletionIntervalTask(List<AtomicBoolean> triggers) { + this.triggers = triggers; } public void run() { @@ -204,14 +208,21 @@ public class SjmsBatchConsumer extends DefaultConsumer { } // signal - timeoutInterval.set(true); + for (AtomicBoolean trigger : triggers) { + trigger.set(true); + } } } private class BatchConsumptionLoop implements Runnable { + private final AtomicBoolean completionTimeoutTrigger = new AtomicBoolean(); private final BatchConsumptionTask task = new BatchConsumptionTask(completionTimeoutTrigger); + public AtomicBoolean getCompletionTimeoutTrigger() { + return completionTimeoutTrigger; + } + @Override public void run() { try {
