Author: davsclaus
Date: Fri Jul 22 11:32:58 2011
New Revision: 1149570
URL: http://svn.apache.org/viewvc?rev=1149570&view=rev
Log:
CAMEL-4246: Fixed tracer to better leverage the async routing engine.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncTraceHandlerTest.java
- copied, changed from r1149512,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCBRTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java?rev=1149570&r1=1149569&r2=1149570&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
Fri Jul 22 11:32:58 2011
@@ -93,7 +93,7 @@ public class TraceInterceptor extends De
}
@Override
- public boolean process(Exchange exchange, AsyncCallback callback) {
+ public boolean process(final Exchange exchange, final AsyncCallback
callback) {
// do not trace if tracing is disabled
if (!tracer.isEnabled() || (routeContext != null &&
!routeContext.isTracing())) {
return super.process(exchange, callback);
@@ -106,7 +106,7 @@ public class TraceInterceptor extends De
return super.process(exchange, callback);
}
- boolean shouldLog = shouldLogNode(node) && shouldLogExchange(exchange);
+ final boolean shouldLog = shouldLogNode(node) &&
shouldLogExchange(exchange);
// whether we should trace it or not, some nodes should be skipped as
they are abstract
// intermediate steps for instance related to on completion
@@ -149,37 +149,47 @@ public class TraceInterceptor extends De
}
// log and trace the processor
- Object traceState = null;
+ Object state = null;
if (shouldLog && trace) {
logExchange(exchange);
// either call the in or generic trace method depending on OUT
has been enabled or not
if (tracer.isTraceOutExchanges()) {
- traceState = traceExchangeIn(exchange);
+ state = traceExchangeIn(exchange);
} else {
traceExchange(exchange);
}
}
+ final Object traceState = state;
- try {
- // special for interceptor where we need to keep booking how
far we have routed in the intercepted processors
- if (node.getParent() instanceof InterceptDefinition &&
exchange.getUnitOfWork() != null) {
- TracedRouteNodes traced =
exchange.getUnitOfWork().getTracedRouteNodes();
- traceIntercept((InterceptDefinition) node.getParent(),
traced, exchange);
- }
+ // special for interceptor where we need to keep booking how far
we have routed in the intercepted processors
+ if (node.getParent() instanceof InterceptDefinition &&
exchange.getUnitOfWork() != null) {
+ TracedRouteNodes traced =
exchange.getUnitOfWork().getTracedRouteNodes();
+ traceIntercept((InterceptDefinition) node.getParent(), traced,
exchange);
+ }
- // process the exchange
- try {
- sync = super.process(exchange, callback);
- } catch (Throwable e) {
- exchange.setException(e);
- }
- } finally {
- // after (trace out)
- if (shouldLog && tracer.isTraceOutExchanges()) {
- logExchange(exchange);
- traceExchangeOut(exchange, traceState);
+ // process the exchange
+ sync = super.process(exchange, new AsyncCallback() {
+ @Override
+ public void done(boolean doneSync) {
+ try {
+ // after (trace out)
+ if (shouldLog && tracer.isTraceOutExchanges()) {
+ logExchange(exchange);
+ traceExchangeOut(exchange, traceState);
+ }
+ } catch (Throwable e) {
+ // some exception occurred in trace logic
+ if (shouldLogException(exchange)) {
+ logException(exchange, e);
+ }
+ exchange.setException(e);
+ } finally {
+ // ensure callback is always invoked
+ callback.done(doneSync);
+ }
}
- }
+ });
+
} catch (Throwable e) {
// some exception occurred in trace logic
if (shouldLogException(exchange)) {
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncTraceHandlerTest.java
(from r1149512,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCBRTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncTraceHandlerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncTraceHandlerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCBRTest.java&r1=1149512&r2=1149570&rev=1149570&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointCBRTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncTraceHandlerTest.java
Fri Jul 22 11:32:58 2011
@@ -16,22 +16,25 @@
*/
package org.apache.camel.processor.async;
+import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.interceptor.TraceEventHandler;
+import org.apache.camel.processor.interceptor.TraceInterceptor;
+import org.apache.camel.processor.interceptor.Tracer;
/**
* @version
*/
-public class AsyncEndpointCBRTest extends ContextTestSupport {
+public class AsyncTraceHandlerTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
- public void testAsyncEndpoint() throws Exception {
- getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
- getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ public void testAsyncTraceHandler() throws Exception {
getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
String reply = template.requestBody("direct:start", "Hello Camel",
String.class);
@@ -43,6 +46,18 @@ public class AsyncEndpointCBRTest extend
}
@Override
+ protected CamelContext createCamelContext() throws Exception {
+ CamelContext contextLocal = super.createCamelContext();
+
+ Tracer tracer = (Tracer) contextLocal.getDefaultTracer();
+ tracer.setEnabled(true);
+ tracer.setTraceHandler(new MyTraceHandler());
+ tracer.setTraceOutExchanges(true);
+
+ return contextLocal;
+ }
+
+ @Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
@@ -50,27 +65,35 @@ public class AsyncEndpointCBRTest extend
context.addComponent("async", new MyAsyncComponent());
from("direct:start")
- .to("mock:before")
+ .tracing()
.to("log:before")
- .choice()
- .when(body().contains("Camel"))
- .process(new Processor() {
- public void process(Exchange exchange)
throws Exception {
- beforeThreadName =
Thread.currentThread().getName();
- }
- })
- .to("async:Bye Camel")
- .process(new Processor() {
- public void process(Exchange exchange)
throws Exception {
- afterThreadName =
Thread.currentThread().getName();
- }
- })
- .to("log:after")
- .to("mock:after")
- .end()
+ .to("async:Bye Camel").id("async")
+ .to("log:after")
.to("mock:result");
}
};
}
+ private class MyTraceHandler implements TraceEventHandler {
+
+ @Override
+ public void traceExchange(ProcessorDefinition node, Processor target,
TraceInterceptor traceInterceptor, Exchange exchange) throws Exception {
+ // noop
+ }
+
+ @Override
+ public Object traceExchangeIn(ProcessorDefinition node, Processor
target, TraceInterceptor traceInterceptor, Exchange exchange) throws Exception {
+ if (node.getId().equals("async")) {
+ beforeThreadName = Thread.currentThread().getName();
+ }
+ return null;
+ }
+
+ @Override
+ public void traceExchangeOut(ProcessorDefinition node, Processor
target, TraceInterceptor traceInterceptor, Exchange exchange, Object
traceState) throws Exception {
+ if (node.getId().equals("async")) {
+ afterThreadName = Thread.currentThread().getName();
+ }
+ }
+ }
}
\ No newline at end of file