Author: davsclaus
Date: Fri Aug 19 16:53:51 2011
New Revision: 1159682

URL: http://svn.apache.org/viewvc?rev=1159682&view=rev
Log:
CAMEL-4354: Fixed issue with ExchangeSentEvent being emitted to soon when using 
async routing engine.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java
   (contents, props changed)
      - copied, changed from r1159650, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=1159682&r1=1159681&r2=1159682&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java 
Fri Aug 19 16:53:51 2011
@@ -259,11 +259,12 @@ public class ProducerCache extends Servi
      * @param producerCallback the producer template callback to be executed
      * @return (doneSync) <tt>true</tt> to continue execute synchronously, 
<tt>false</tt> to continue being executed asynchronously
      */
-    public boolean doInAsyncProducer(Endpoint endpoint, Exchange exchange, 
ExchangePattern pattern, AsyncCallback callback, AsyncProducerCallback 
producerCallback) {
+    public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange 
exchange, final ExchangePattern pattern,
+                                     final AsyncCallback callback, final 
AsyncProducerCallback producerCallback) {
         boolean sync = true;
 
         // get the producer and we do not mind if its pooled as we can handle 
returning it back to the pool
-        Producer producer = doGetProducer(endpoint, true);
+        final Producer producer = doGetProducer(endpoint, true);
 
         if (producer == null) {
             if (isStopped()) {
@@ -274,39 +275,44 @@ public class ProducerCache extends Servi
             }
         }
 
-        StopWatch watch = null;
-        if (exchange != null) {
-            // record timing for sending the exchange using the producer
-            watch = new StopWatch();
-        }
+        // record timing for sending the exchange using the producer
+        final StopWatch watch = exchange != null ? new StopWatch() : null;
 
         try {
             // invoke the callback
             AsyncProcessor asyncProcessor = 
AsyncProcessorTypeConverter.convert(producer);
-            sync = producerCallback.doInAsyncProducer(producer, 
asyncProcessor, exchange, pattern, callback);
+            sync = producerCallback.doInAsyncProducer(producer, 
asyncProcessor, exchange, pattern, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    try {
+                        if (watch != null) {
+                            long timeTaken = watch.stop();
+                            // emit event that the exchange was sent to the 
endpoint
+                            
EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, 
timeTaken);
+                        }
+
+                        if (producer instanceof ServicePoolAware) {
+                            // release back to the pool
+                            pool.release(endpoint, producer);
+                        } else if (!producer.isSingleton()) {
+                            // stop non singleton producers as we should not 
leak resources
+                            try {
+                                ServiceHelper.stopService(producer);
+                            } catch (Exception e) {
+                                // ignore and continue
+                                LOG.warn("Error stopping producer: " + 
producer, e);
+                            }
+                        }
+                    } finally {
+                        callback.done(doneSync);
+                    }
+                }
+            });
         } catch (Throwable e) {
             // ensure exceptions is caught and set on the exchange
             if (exchange != null) {
                 exchange.setException(e);
             }
-        } finally {
-            if (exchange != null && exchange.getException() == null) {
-                long timeTaken = watch.stop();
-                // emit event that the exchange was sent to the endpoint
-                EventHelper.notifyExchangeSent(exchange.getContext(), 
exchange, endpoint, timeTaken);
-            }
-            if (producer instanceof ServicePoolAware) {
-                // release back to the pool
-                pool.release(endpoint, producer);
-            } else if (!producer.isSingleton()) {
-                // stop non singleton producers as we should not leak resources
-                try {
-                    ServiceHelper.stopService(producer);
-                } catch (Exception e) {
-                    // ignore and continue
-                    LOG.warn("Error stopping producer: " + producer, e);
-                }
-            }
         }
 
         return sync;

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java?rev=1159682&r1=1159681&r2=1159682&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java
 Fri Aug 19 16:53:51 2011
@@ -94,17 +94,19 @@ public class EventNotifierFailureHandled
         assertIsInstanceOf(RouteStartedEvent.class, events.get(1));
         assertIsInstanceOf(CamelContextStartedEvent.class, events.get(2));
         assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(3));
+        assertIsInstanceOf(ExchangeSentEvent.class, events.get(4));
 
-        ExchangeFailureHandledEvent e = 
assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(4));
+        ExchangeFailureHandledEvent e = 
assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
         assertEquals("should be DLC", true, e.isDeadLetterChannel());
         SendProcessor send = assertIsInstanceOf(SendProcessor.class, 
e.getFailureHandler());
         assertEquals("mock://dead", send.getDestination().getEndpointUri());
 
         // dead letter channel will mark the exchange as completed
-        assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(5));
-        // and the sent will be logged after they are complete sending as it 
record the time taken as well
-        assertIsInstanceOf(ExchangeSentEvent.class, events.get(6));
+        assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6));
+        // and the last event should be the direct:start
         assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
+        ExchangeSentEvent sent = (ExchangeSentEvent) events.get(7);
+        assertEquals("direct://start", sent.getEndpoint().getEndpointUri());
     }
 
     public void testExchangeOnException() throws Exception {
@@ -127,14 +129,17 @@ public class EventNotifierFailureHandled
         assertIsInstanceOf(RouteStartedEvent.class, events.get(1));
         assertIsInstanceOf(CamelContextStartedEvent.class, events.get(2));
         assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(3));
+        assertIsInstanceOf(ExchangeSentEvent.class, events.get(4));
 
-        ExchangeFailureHandledEvent e = 
assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(4));
+        ExchangeFailureHandledEvent e = 
assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
         assertEquals("should NOT be DLC", false, e.isDeadLetterChannel());
 
         // onException will handle the exception
-        assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(5));
-        assertIsInstanceOf(ExchangeSentEvent.class, events.get(6));
+        assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6));
+        // and the last event should be the direct:start
         assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
+        ExchangeSentEvent sent = (ExchangeSentEvent) events.get(7);
+        assertEquals("direct://start", sent.getEndpoint().getEndpointUri());
     }
 
 }

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java?rev=1159682&r1=1159681&r2=1159682&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
 Fri Aug 19 16:53:51 2011
@@ -100,9 +100,9 @@ public class EventNotifierRedeliveryEven
         assertEquals(3, e.getAttempt());
         e = assertIsInstanceOf(ExchangeRedeliveryEvent.class, events.get(4));
         assertEquals(4, e.getAttempt());
-        assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
-        assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6));
-        assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
+        assertIsInstanceOf(ExchangeSentEvent.class, events.get(5));
+        assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(6));
+        assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(7));
         assertIsInstanceOf(ExchangeSentEvent.class, events.get(8));
     }
 
@@ -134,7 +134,6 @@ public class EventNotifierRedeliveryEven
         assertEquals(3, e.getAttempt());
         e = assertIsInstanceOf(ExchangeRedeliveryEvent.class, events.get(4));
         assertEquals(4, e.getAttempt());
-        assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
 
         // since its async the ordering of the rest can be different depending 
per OS and timing
     }

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java
 (from r1159650, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java&r1=1159650&r2=1159682&rev=1159682&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java
 Fri Aug 19 16:53:51 2011
@@ -16,22 +16,28 @@
  */
 package org.apache.camel.processor.async;
 
+import java.util.EventObject;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.camel.CamelContext;
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.management.EventNotifierSupport;
+import org.apache.camel.management.event.ExchangeSentEvent;
 
 /**
  * @version 
  */
-public class AsyncEndpointEnricherTest extends ContextTestSupport {
+public class AsyncEndpointEventNotifierTest extends ContextTestSupport {
 
-    private static String beforeThreadName;
-    private static String afterThreadName;
+    private final CountDownLatch latch = new CountDownLatch(1);
+    private final AtomicLong time = new AtomicLong();
 
-    public void testAsyncEndpoint() throws Exception {
+    public void testAsyncEndpointEventNotifer() throws Exception {
         getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
-        getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
         getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
 
         String reply = template.requestBody("direct:start", "Hello Camel", 
String.class);
@@ -39,7 +45,43 @@ public class AsyncEndpointEnricherTest e
 
         assertMockEndpointsSatisfied();
 
-        assertFalse("Should use different threads", 
beforeThreadName.equalsIgnoreCase(afterThreadName));
+        assertTrue("Should count down", latch.await(10, TimeUnit.SECONDS));
+
+        long delta = time.get();
+        assertTrue("Should take about 1000 millis sec, was: " + delta, delta > 
800);
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        DefaultCamelContext context = new 
DefaultCamelContext(createRegistry());
+        context.getManagementStrategy().addEventNotifier(new 
EventNotifierSupport() {
+            public void notify(EventObject event) throws Exception {
+                try {
+                    ExchangeSentEvent sent = (ExchangeSentEvent) event;
+                    time.set(sent.getTimeTaken());
+                } finally {
+                    latch.countDown();
+                }
+            }
+
+            public boolean isEnabled(EventObject event) {
+                // we only want the async endpoint
+                if (event instanceof ExchangeSentEvent) {
+                    ExchangeSentEvent sent = (ExchangeSentEvent) event;
+                    return 
sent.getEndpoint().getEndpointUri().startsWith("async");
+                }
+                return false;
+            }
+
+            @Override
+            protected void doStart() throws Exception {
+            }
+
+            @Override
+            protected void doStop() throws Exception {
+            }
+        });
+        return context;
     }
 
     @Override
@@ -51,20 +93,7 @@ public class AsyncEndpointEnricherTest e
 
                 from("direct:start")
                         .to("mock:before")
-                        .to("log:before")
-                        .process(new Processor() {
-                            public void process(Exchange exchange) throws 
Exception {
-                                beforeThreadName = 
Thread.currentThread().getName();
-                            }
-                        })
-                        .enrich("async:Bye Camel")
-                        .process(new Processor() {
-                            public void process(Exchange exchange) throws 
Exception {
-                                afterThreadName = 
Thread.currentThread().getName();
-                            }
-                        })
-                        .to("log:after")
-                        .to("mock:after")
+                        .to("async:Bye Camel?delay=1000")
                         .to("mock:result");
             }
         };

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to