Author: davsclaus
Date: Thu May 27 13:30:20 2010
New Revision: 948830

URL: http://svn.apache.org/viewvc?rev=948830&view=rev
Log:
CAMEL-2760: Fixed @Consume to use UoW so callbacks is invoked when the routing 
is done.

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java?rev=948830&r1=948829&r2=948830&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/CamelPostProcessorHelper.java
 Thu May 27 13:30:20 2010
@@ -33,6 +33,7 @@ import org.apache.camel.ProducerTemplate
 import org.apache.camel.Service;
 import org.apache.camel.component.bean.BeanProcessor;
 import org.apache.camel.component.bean.ProxyHelper;
+import org.apache.camel.processor.UnitOfWorkProcessor;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -126,7 +127,8 @@ public class CamelPostProcessorHelper im
     protected Processor createConsumerProcessor(final Object pojo, final 
Method method, final Endpoint endpoint) {
         BeanProcessor answer = new BeanProcessor(pojo, getCamelContext());
         answer.setMethodObject(method);
-        return answer;
+        // must ensure the consumer is being executed in an unit of work so 
synchronization callbacks etc is invoked
+        return new UnitOfWorkProcessor(answer);
     }
 
     protected Endpoint getEndpointInjection(String uri, String name, String 
injectionPointName, boolean mandatory) {

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperTest.java?rev=948830&r1=948829&r2=948830&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/CamelPostProcessorHelperTest.java
 Thu May 27 13:30:20 2010
@@ -35,6 +35,8 @@ import org.apache.camel.util.ObjectHelpe
  */
 public class CamelPostProcessorHelperTest extends ContextTestSupport {
 
+    private MySynchronization mySynchronization;
+
     public void testConstructor() {
         CamelPostProcessorHelper helper = new CamelPostProcessorHelper();
         assertNull(helper.getCamelContext());
@@ -69,6 +71,24 @@ public class CamelPostProcessorHelperTes
         assertMockEndpointsSatisfied();
     }
 
+    public void testConsumeSynchronization() throws Exception {
+        mySynchronization = new MySynchronization();
+        CamelPostProcessorHelper helper = new 
CamelPostProcessorHelper(context);
+
+        MyConsumeAndSynchronizationBean my = new 
MyConsumeAndSynchronizationBean();
+        Method method = my.getClass().getMethod("consumeSomething", 
String.class, Exchange.class);
+        helper.consumerInjection(method, my, "foo");
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template.sendBody("seda:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals("Should have invoked onDone", true, 
mySynchronization.isOnDone());
+    }
+
     public void testEndpointInjectProducerTemplate() throws Exception {
         CamelPostProcessorHelper helper = new 
CamelPostProcessorHelper(context);
 
@@ -257,6 +277,30 @@ public class CamelPostProcessorHelperTes
         }
     }
 
+    public class MyConsumeAndSynchronizationBean {
+
+        @Consume(uri = "seda:foo")
+        public void consumeSomething(String body, Exchange exchange) {
+            exchange.addOnCompletion(mySynchronization);
+            assertEquals("Hello World", body);
+            template.sendBody("mock:result", body);
+        }
+    }
+
+    private class MySynchronization extends SynchronizationAdapter {
+
+        private boolean onDone;
+
+        @Override
+        public void onDone(Exchange exchange) {
+            onDone = true;
+        }
+
+        public boolean isOnDone() {
+            return onDone;
+        }
+    }
+
     public class MyEndpointInjectBeanProducerTemplate {
 
         private ProducerTemplate producer;


Reply via email to