Updated Branches:
  refs/heads/master ae170e12e -> c569bd95f

CAMEL-5828 fixed the DisruptorUnitOfWorkTest with thanks to Riccardo


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c569bd95
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c569bd95
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c569bd95

Branch: refs/heads/master
Commit: c569bd95f3c2b8f13d52bc37a4c55ada10841e8f
Parents: ae170e1
Author: Willem Jiang <[email protected]>
Authored: Fri May 31 21:14:39 2013 +0800
Committer: Willem Jiang <[email protected]>
Committed: Fri May 31 21:15:51 2013 +0800

----------------------------------------------------------------------
 .../component/disruptor/DisruptorConsumer.java     |   34 ++++++++++++---
 .../camel/component/disruptor/ExchangeEvent.java   |    8 ---
 .../MultipleConsumerSynchronizedExchange.java      |    3 +-
 .../SingleConsumerSynchronizedExchange.java        |    8 +---
 .../component/disruptor/SynchronizedExchange.java  |    4 +-
 .../disruptor/DisruptorUnitOfWorkTest.java         |    7 +--
 6 files changed, 36 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c569bd95/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
index a73cd5d..e355599 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/DisruptorConsumer.java
@@ -30,9 +30,9 @@ import org.apache.camel.SuspendableService;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.ShutdownAware;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +44,13 @@ public class DisruptorConsumer extends ServiceSupport 
implements Consumer, Suspe
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DisruptorConsumer.class);
 
+    private static final AsyncCallback NOOP_ASYNC_CALLBACK = new 
AsyncCallback() {
+        @Override
+        public void done(boolean doneSync) {
+            //Noop
+        }
+    };
+
     private final DisruptorEndpoint endpoint;
     private final AsyncProcessor processor;
     private ExceptionHandler exceptionHandler;
@@ -145,14 +152,29 @@ public class DisruptorConsumer extends ServiceSupport 
implements Consumer, Suspe
 
             // send a new copied exchange with new camel context
             final Exchange result = prepareExchange(exchange);
-            // use the regular processor and use the asynchronous routing 
engine to support it
-            AsyncCallback callback = new AsyncCallback() {
+
+            // We need to be notified when the exchange processing is complete 
to synchronize the original exchange
+            // This is however the last part of the processing of this 
exchange and as such can't be done
+            // in the AsyncCallback as that is called *AFTER* processing is 
considered to be done
+            // (see 
org.apache.camel.processor.CamelInternalProcessor.InternalCallback#done).
+            // To solve this problem, a new synchronization is set on the 
exchange that is to be
+            // processed
+            result.addOnCompletion(new Synchronization() {
+                @Override
+                public void onComplete(Exchange exchange) {
+                    synchronizedExchange.consumed(result);
+                }
+
                 @Override
-                public void done(boolean doneSync) {
+                public void onFailure(Exchange exchange) {
                     synchronizedExchange.consumed(result);
                 }
-            };
-            AsyncProcessorHelper.process(processor, result, callback);
+            });
+
+            // As the necessary post-processing of the exchange is done by the 
registered Synchronization,
+            // we can suffice with a no-op AsyncCallback
+            processor.process(result, NOOP_ASYNC_CALLBACK);
+
         } catch (Exception e) {
             Exchange exchange = synchronizedExchange.getExchange();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/c569bd95/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEvent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEvent.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEvent.java
index 007351c..115fbb9 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEvent.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/ExchangeEvent.java
@@ -17,15 +17,7 @@
 
 package org.apache.camel.component.disruptor;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.camel.Exchange;
-import org.apache.camel.util.UnitOfWorkHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This is a mutable reference to an {@link Exchange}, used as contents of the 
Disruptors ringbuffer

http://git-wip-us.apache.org/repos/asf/camel/blob/c569bd95/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/MultipleConsumerSynchronizedExchange.java
----------------------------------------------------------------------
diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/MultipleConsumerSynchronizedExchange.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/MultipleConsumerSynchronizedExchange.java
index 18b66f3..d98b327 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/MultipleConsumerSynchronizedExchange.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/MultipleConsumerSynchronizedExchange.java
@@ -23,7 +23,8 @@ import org.apache.camel.Exchange;
 import org.apache.camel.util.ExchangeHelper;
 
 /**
- * TODO: documentation
+ * Implementation of the {@link SynchronizedExchange} interface that correctly 
handles all completion
+ * synchronisation courtesies for multiple consumers.
  */
 public class MultipleConsumerSynchronizedExchange extends 
AbstractSynchronizedExchange {
 

http://git-wip-us.apache.org/repos/asf/camel/blob/c569bd95/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SingleConsumerSynchronizedExchange.java
----------------------------------------------------------------------
diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SingleConsumerSynchronizedExchange.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SingleConsumerSynchronizedExchange.java
index 9222673..93c052f 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SingleConsumerSynchronizedExchange.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SingleConsumerSynchronizedExchange.java
@@ -16,22 +16,16 @@
  */
 package org.apache.camel.component.disruptor;
 
-import java.util.List;
-
 import org.apache.camel.Exchange;
-import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.ExchangeHelper;
 
 /**
- * TODO: documentation
+ * Implementation of the {@link SynchronizedExchange} interface optimized for 
single consumers.
  */
 public class SingleConsumerSynchronizedExchange extends  
AbstractSynchronizedExchange {
 
-    private final List<Synchronization> synchronizations;
-
     public SingleConsumerSynchronizedExchange(Exchange exchange) {
         super(exchange);
-        synchronizations = exchange.handoverCompletions();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/c569bd95/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SynchronizedExchange.java
----------------------------------------------------------------------
diff --git 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SynchronizedExchange.java
 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SynchronizedExchange.java
index 2cef627..28b158b 100644
--- 
a/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SynchronizedExchange.java
+++ 
b/components/camel-disruptor/src/main/java/org/apache/camel/component/disruptor/SynchronizedExchange.java
@@ -19,7 +19,9 @@ package org.apache.camel.component.disruptor;
 import org.apache.camel.Exchange;
 
 /**
- * TODO: documentation
+ * This interface describes an immutable container of an Exchange and provides 
handles for
+ * {@link DisruptorConsumer}s to correctly deal with the on completion 
synchronization, even in multicast
+ * circumstances.
  */
 public interface SynchronizedExchange {
     Exchange getExchange();

http://git-wip-us.apache.org/repos/asf/camel/blob/c569bd95/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorUnitOfWorkTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorUnitOfWorkTest.java
 
b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorUnitOfWorkTest.java
index 5f9ef7d..80b2e35 100644
--- 
a/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorUnitOfWorkTest.java
+++ 
b/components/camel-disruptor/src/test/java/org/apache/camel/component/disruptor/DisruptorUnitOfWorkTest.java
@@ -41,15 +41,12 @@ public class DisruptorUnitOfWorkTest extends 
CamelTestSupport {
 
         final MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
-        
-        
+
+
         template.sendBody("direct:start", "Hello World");
 
         assertMockEndpointsSatisfied();
         notify.matchesMockWaitTime();
-        // need to sleep a while to wait for the calling of onComplete
-        Thread.sleep(200);
-        
 
         assertEquals("onCompleteA", sync);
         assertEquals("onCompleteA", lastOne);

Reply via email to