Updated Branches: refs/heads/master ae170e12e -> c569bd95f
CAMEL-5828 fixed the DisruptorUnitOfWorkTest with thanks to Riccardo Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c569bd95 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c569bd95 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c569bd95 Branch: refs/heads/master Commit: c569bd95f3c2b8f13d52bc37a4c55ada10841e8f Parents: ae170e1 Author: Willem Jiang <[email protected]> Authored: Fri May 31 21:14:39 2013 +0800 Committer: Willem Jiang <[email protected]> Committed: Fri May 31 21:15:51 2013 +0800 ---------------------------------------------------------------------- .../component/disruptor/DisruptorConsumer.java | 34 ++++++++++++--- .../camel/component/disruptor/ExchangeEvent.java | 8 --- .../MultipleConsumerSynchronizedExchange.java | 3 +- .../SingleConsumerSynchronizedExchange.java | 8 +--- .../component/disruptor/SynchronizedExchange.java | 4 +- .../disruptor/DisruptorUnitOfWorkTest.java | 7 +-- 6 files changed, 36 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c569bd95/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java index a73cd5d..e355599 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java @@ -30,9 +30,9 @@ import org.apache.camel.SuspendableService; import org.apache.camel.impl.LoggingExceptionHandler; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.ShutdownAware; +import org.apache.camel.spi.Synchronization; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorConverterHelper; -import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.ExchangeHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,6 +44,13 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe private static final Logger LOGGER = LoggerFactory.getLogger(DisruptorConsumer.class); + private static final AsyncCallback NOOP_ASYNC_CALLBACK = new AsyncCallback() { + @Override + public void done(boolean doneSync) { + //Noop + } + }; + private final DisruptorEndpoint endpoint; private final AsyncProcessor processor; private ExceptionHandler exceptionHandler; @@ -145,14 +152,29 @@ public class DisruptorConsumer extends ServiceSupport implements Consumer, Suspe // send a new copied exchange with new camel context final Exchange result = prepareExchange(exchange); - // use the regular processor and use the asynchronous routing engine to support it - AsyncCallback callback = new AsyncCallback() { + + // We need to be notified when the exchange processing is complete to synchronize the original exchange + // This is however the last part of the processing of this exchange and as such can't be done + // in the AsyncCallback as that is called *AFTER* processing is considered to be done + // (see org.apache.camel.processor.CamelInternalProcessor.InternalCallback#done). + // To solve this problem, a new synchronization is set on the exchange that is to be + // processed + result.addOnCompletion(new Synchronization() { + @Override + public void onComplete(Exchange exchange) { + synchronizedExchange.consumed(result); + } + @Override - public void done(boolean doneSync) { + public void onFailure(Exchange exchange) { synchronizedExchange.consumed(result); } - }; - AsyncProcessorHelper.process(processor, result, callback); + }); + + // As the necessary post-processing of the exchange is done by the registered Synchronization, + // we can suffice with a no-op AsyncCallback + processor.process(result, NOOP_ASYNC_CALLBACK); + } catch (Exception e) { Exchange exchange = synchronizedExchange.getExchange(); http://git-wip-us.apache.org/repos/asf/camel/blob/c569bd95/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEvent.java ---------------------------------------------------------------------- diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEvent.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEvent.java index 007351c..115fbb9 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEvent.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEvent.java @@ -17,15 +17,7 @@ package org.apache.camel.component.disruptor; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.camel.Exchange; -import org.apache.camel.util.UnitOfWorkHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This is a mutable reference to an {@link Exchange}, used as contents of the Disruptors ringbuffer http://git-wip-us.apache.org/repos/asf/camel/blob/c569bd95/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/MultipleConsumerSynchronizedExchange.java ---------------------------------------------------------------------- diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/MultipleConsumerSynchronizedExchange.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/MultipleConsumerSynchronizedExchange.java index 18b66f3..d98b327 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/MultipleConsumerSynchronizedExchange.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/MultipleConsumerSynchronizedExchange.java @@ -23,7 +23,8 @@ import org.apache.camel.Exchange; import org.apache.camel.util.ExchangeHelper; /** - * TODO: documentation + * Implementation of the {@link SynchronizedExchange} interface that correctly handles all completion + * synchronisation courtesies for multiple consumers. */ public class MultipleConsumerSynchronizedExchange extends AbstractSynchronizedExchange { http://git-wip-us.apache.org/repos/asf/camel/blob/c569bd95/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SingleConsumerSynchronizedExchange.java ---------------------------------------------------------------------- diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SingleConsumerSynchronizedExchange.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SingleConsumerSynchronizedExchange.java index 9222673..93c052f 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SingleConsumerSynchronizedExchange.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SingleConsumerSynchronizedExchange.java @@ -16,22 +16,16 @@ */ package org.apache.camel.component.disruptor; -import java.util.List; - import org.apache.camel.Exchange; -import org.apache.camel.spi.Synchronization; import org.apache.camel.util.ExchangeHelper; /** - * TODO: documentation + * Implementation of the {@link SynchronizedExchange} interface optimized for single consumers. */ public class SingleConsumerSynchronizedExchange extends AbstractSynchronizedExchange { - private final List<Synchronization> synchronizations; - public SingleConsumerSynchronizedExchange(Exchange exchange) { super(exchange); - synchronizations = exchange.handoverCompletions(); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/c569bd95/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SynchronizedExchange.java ---------------------------------------------------------------------- diff --git a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SynchronizedExchange.java b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SynchronizedExchange.java index 2cef627..28b158b 100644 --- a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SynchronizedExchange.java +++ b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SynchronizedExchange.java @@ -19,7 +19,9 @@ package org.apache.camel.component.disruptor; import org.apache.camel.Exchange; /** - * TODO: documentation + * This interface describes an immutable container of an Exchange and provides handles for + * {@link DisruptorConsumer}s to correctly deal with the on completion synchronization, even in multicast + * circumstances. */ public interface SynchronizedExchange { Exchange getExchange(); http://git-wip-us.apache.org/repos/asf/camel/blob/c569bd95/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorUnitOfWorkTest.java ---------------------------------------------------------------------- diff --git a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorUnitOfWorkTest.java b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorUnitOfWorkTest.java index 5f9ef7d..80b2e35 100644 --- a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorUnitOfWorkTest.java +++ b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorUnitOfWorkTest.java @@ -41,15 +41,12 @@ public class DisruptorUnitOfWorkTest extends CamelTestSupport { final MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); - - + + template.sendBody("direct:start", "Hello World"); assertMockEndpointsSatisfied(); notify.matchesMockWaitTime(); - // need to sleep a while to wait for the calling of onComplete - Thread.sleep(200); - assertEquals("onCompleteA", sync); assertEquals("onCompleteA", lastOne);
