Author: davsclaus
Date: Fri Aug 19 16:53:51 2011
New Revision: 1159682
URL: http://svn.apache.org/viewvc?rev=1159682&view=rev
Log:
CAMEL-4354: Fixed issue with ExchangeSentEvent being emitted to soon when using
async routing engine.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java
(contents, props changed)
- copied, changed from r1159650,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=1159682&r1=1159681&r2=1159682&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
Fri Aug 19 16:53:51 2011
@@ -259,11 +259,12 @@ public class ProducerCache extends Servi
* @param producerCallback the producer template callback to be executed
* @return (doneSync) <tt>true</tt> to continue execute synchronously,
<tt>false</tt> to continue being executed asynchronously
*/
- public boolean doInAsyncProducer(Endpoint endpoint, Exchange exchange,
ExchangePattern pattern, AsyncCallback callback, AsyncProducerCallback
producerCallback) {
+ public boolean doInAsyncProducer(final Endpoint endpoint, final Exchange
exchange, final ExchangePattern pattern,
+ final AsyncCallback callback, final
AsyncProducerCallback producerCallback) {
boolean sync = true;
// get the producer and we do not mind if its pooled as we can handle
returning it back to the pool
- Producer producer = doGetProducer(endpoint, true);
+ final Producer producer = doGetProducer(endpoint, true);
if (producer == null) {
if (isStopped()) {
@@ -274,39 +275,44 @@ public class ProducerCache extends Servi
}
}
- StopWatch watch = null;
- if (exchange != null) {
- // record timing for sending the exchange using the producer
- watch = new StopWatch();
- }
+ // record timing for sending the exchange using the producer
+ final StopWatch watch = exchange != null ? new StopWatch() : null;
try {
// invoke the callback
AsyncProcessor asyncProcessor =
AsyncProcessorTypeConverter.convert(producer);
- sync = producerCallback.doInAsyncProducer(producer,
asyncProcessor, exchange, pattern, callback);
+ sync = producerCallback.doInAsyncProducer(producer,
asyncProcessor, exchange, pattern, new AsyncCallback() {
+ @Override
+ public void done(boolean doneSync) {
+ try {
+ if (watch != null) {
+ long timeTaken = watch.stop();
+ // emit event that the exchange was sent to the
endpoint
+
EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint,
timeTaken);
+ }
+
+ if (producer instanceof ServicePoolAware) {
+ // release back to the pool
+ pool.release(endpoint, producer);
+ } else if (!producer.isSingleton()) {
+ // stop non singleton producers as we should not
leak resources
+ try {
+ ServiceHelper.stopService(producer);
+ } catch (Exception e) {
+ // ignore and continue
+ LOG.warn("Error stopping producer: " +
producer, e);
+ }
+ }
+ } finally {
+ callback.done(doneSync);
+ }
+ }
+ });
} catch (Throwable e) {
// ensure exceptions is caught and set on the exchange
if (exchange != null) {
exchange.setException(e);
}
- } finally {
- if (exchange != null && exchange.getException() == null) {
- long timeTaken = watch.stop();
- // emit event that the exchange was sent to the endpoint
- EventHelper.notifyExchangeSent(exchange.getContext(),
exchange, endpoint, timeTaken);
- }
- if (producer instanceof ServicePoolAware) {
- // release back to the pool
- pool.release(endpoint, producer);
- } else if (!producer.isSingleton()) {
- // stop non singleton producers as we should not leak resources
- try {
- ServiceHelper.stopService(producer);
- } catch (Exception e) {
- // ignore and continue
- LOG.warn("Error stopping producer: " + producer, e);
- }
- }
}
return sync;
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java?rev=1159682&r1=1159681&r2=1159682&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierFailureHandledEventsTest.java
Fri Aug 19 16:53:51 2011
@@ -94,17 +94,19 @@ public class EventNotifierFailureHandled
assertIsInstanceOf(RouteStartedEvent.class, events.get(1));
assertIsInstanceOf(CamelContextStartedEvent.class, events.get(2));
assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(3));
+ assertIsInstanceOf(ExchangeSentEvent.class, events.get(4));
- ExchangeFailureHandledEvent e =
assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(4));
+ ExchangeFailureHandledEvent e =
assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
assertEquals("should be DLC", true, e.isDeadLetterChannel());
SendProcessor send = assertIsInstanceOf(SendProcessor.class,
e.getFailureHandler());
assertEquals("mock://dead", send.getDestination().getEndpointUri());
// dead letter channel will mark the exchange as completed
- assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(5));
- // and the sent will be logged after they are complete sending as it
record the time taken as well
- assertIsInstanceOf(ExchangeSentEvent.class, events.get(6));
+ assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6));
+ // and the last event should be the direct:start
assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
+ ExchangeSentEvent sent = (ExchangeSentEvent) events.get(7);
+ assertEquals("direct://start", sent.getEndpoint().getEndpointUri());
}
public void testExchangeOnException() throws Exception {
@@ -127,14 +129,17 @@ public class EventNotifierFailureHandled
assertIsInstanceOf(RouteStartedEvent.class, events.get(1));
assertIsInstanceOf(CamelContextStartedEvent.class, events.get(2));
assertIsInstanceOf(ExchangeCreatedEvent.class, events.get(3));
+ assertIsInstanceOf(ExchangeSentEvent.class, events.get(4));
- ExchangeFailureHandledEvent e =
assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(4));
+ ExchangeFailureHandledEvent e =
assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
assertEquals("should NOT be DLC", false, e.isDeadLetterChannel());
// onException will handle the exception
- assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(5));
- assertIsInstanceOf(ExchangeSentEvent.class, events.get(6));
+ assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6));
+ // and the last event should be the direct:start
assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
+ ExchangeSentEvent sent = (ExchangeSentEvent) events.get(7);
+ assertEquals("direct://start", sent.getEndpoint().getEndpointUri());
}
}
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java?rev=1159682&r1=1159681&r2=1159682&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/management/EventNotifierRedeliveryEventsTest.java
Fri Aug 19 16:53:51 2011
@@ -100,9 +100,9 @@ public class EventNotifierRedeliveryEven
assertEquals(3, e.getAttempt());
e = assertIsInstanceOf(ExchangeRedeliveryEvent.class, events.get(4));
assertEquals(4, e.getAttempt());
- assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
- assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(6));
- assertIsInstanceOf(ExchangeSentEvent.class, events.get(7));
+ assertIsInstanceOf(ExchangeSentEvent.class, events.get(5));
+ assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(6));
+ assertIsInstanceOf(ExchangeCompletedEvent.class, events.get(7));
assertIsInstanceOf(ExchangeSentEvent.class, events.get(8));
}
@@ -134,7 +134,6 @@ public class EventNotifierRedeliveryEven
assertEquals(3, e.getAttempt());
e = assertIsInstanceOf(ExchangeRedeliveryEvent.class, events.get(4));
assertEquals(4, e.getAttempt());
- assertIsInstanceOf(ExchangeFailureHandledEvent.class, events.get(5));
// since its async the ordering of the rest can be different depending
per OS and timing
}
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java
(from r1159650,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java&r1=1159650&r2=1159682&rev=1159682&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEnricherTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java
Fri Aug 19 16:53:51 2011
@@ -16,22 +16,28 @@
*/
package org.apache.camel.processor.async;
+import java.util.EventObject;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.management.EventNotifierSupport;
+import org.apache.camel.management.event.ExchangeSentEvent;
/**
* @version
*/
-public class AsyncEndpointEnricherTest extends ContextTestSupport {
+public class AsyncEndpointEventNotifierTest extends ContextTestSupport {
- private static String beforeThreadName;
- private static String afterThreadName;
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private final AtomicLong time = new AtomicLong();
- public void testAsyncEndpoint() throws Exception {
+ public void testAsyncEndpointEventNotifer() throws Exception {
getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
- getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
String reply = template.requestBody("direct:start", "Hello Camel",
String.class);
@@ -39,7 +45,43 @@ public class AsyncEndpointEnricherTest e
assertMockEndpointsSatisfied();
- assertFalse("Should use different threads",
beforeThreadName.equalsIgnoreCase(afterThreadName));
+ assertTrue("Should count down", latch.await(10, TimeUnit.SECONDS));
+
+ long delta = time.get();
+ assertTrue("Should take about 1000 millis sec, was: " + delta, delta >
800);
+ }
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ DefaultCamelContext context = new
DefaultCamelContext(createRegistry());
+ context.getManagementStrategy().addEventNotifier(new
EventNotifierSupport() {
+ public void notify(EventObject event) throws Exception {
+ try {
+ ExchangeSentEvent sent = (ExchangeSentEvent) event;
+ time.set(sent.getTimeTaken());
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ public boolean isEnabled(EventObject event) {
+ // we only want the async endpoint
+ if (event instanceof ExchangeSentEvent) {
+ ExchangeSentEvent sent = (ExchangeSentEvent) event;
+ return
sent.getEndpoint().getEndpointUri().startsWith("async");
+ }
+ return false;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ }
+ });
+ return context;
}
@Override
@@ -51,20 +93,7 @@ public class AsyncEndpointEnricherTest e
from("direct:start")
.to("mock:before")
- .to("log:before")
- .process(new Processor() {
- public void process(Exchange exchange) throws
Exception {
- beforeThreadName =
Thread.currentThread().getName();
- }
- })
- .enrich("async:Bye Camel")
- .process(new Processor() {
- public void process(Exchange exchange) throws
Exception {
- afterThreadName =
Thread.currentThread().getName();
- }
- })
- .to("log:after")
- .to("mock:after")
+ .to("async:Bye Camel?delay=1000")
.to("mock:result");
}
};
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointEventNotifierTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date