Author: davsclaus
Date: Wed Jun 16 16:40:37 2010
New Revision: 955294
URL: http://svn.apache.org/viewvc?rev=955294&view=rev
Log:
CAMEL-2723: Interceptors is now support async routing.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/VerySimpleDirectTest.java
- copied, changed from r955211,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleDirectTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithDelayerTest.java
(with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithHandleFaultTest.java
(with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithJMXTest.java
(with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithStreamCachingTest.java
- copied, changed from r955211,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithTracingTest.java
(with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
Wed Jun 16 16:40:37 2010
@@ -29,10 +29,8 @@ import org.apache.camel.FailedToCreatePr
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ProducerCallback;
-import org.apache.camel.RuntimeCamelException;
import org.apache.camel.ServicePoolAware;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
-import org.apache.camel.processor.UnitOfWorkProcessor;
import org.apache.camel.processor.UnitOfWorkProducer;
import org.apache.camel.spi.ServicePool;
import org.apache.camel.util.CamelContextHelper;
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/AsyncProcessorTypeConverter.java
Wed Jun 16 16:40:37 2010
@@ -41,6 +41,11 @@ public class AsyncProcessorTypeConverter
}
public boolean process(Exchange exchange, AsyncCallback callback) {
+ if (processor == null) {
+ // no processor then we are done
+ callback.done(true);
+ return true;
+ }
try {
processor.process(exchange);
} catch (Throwable e) {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
Wed Jun 16 16:40:37 2010
@@ -16,9 +16,10 @@
*/
package org.apache.camel.management;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.management.mbean.ManagedPerformanceCounter;
-import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.processor.DelegateAsyncProcessor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -28,7 +29,7 @@ import org.apache.commons.logging.LogFac
*
* @version $Revision$
*/
-public class InstrumentationProcessor extends DelegateProcessor {
+public class InstrumentationProcessor extends DelegateAsyncProcessor {
private static final transient Log LOG =
LogFactory.getLog(InstrumentationProcessor.class);
private PerformanceCounter counter;
@@ -54,26 +55,30 @@ public class InstrumentationProcessor ex
}
}
- public void process(Exchange exchange) throws Exception {
- if (processor != null) {
-
- // use nano time as its more accurate
- long startTime = -1;
- if (counter != null && counter.isStatisticsEnabled()) {
- startTime = System.nanoTime();
- }
-
- try {
- processor.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
+ @Override
+ public boolean process(final Exchange exchange, final AsyncCallback
callback) {
+ // use nano time as its more accurate
+ // and only record time if stats is enabled
+ long start = -1;
+ if (counter != null && counter.isStatisticsEnabled()) {
+ start = System.nanoTime();
+ }
+ final long startTime = start;
- if (startTime != -1) {
- long diff = (System.nanoTime() - startTime) / 1000000;
- recordTime(exchange, diff);
+ return super.process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ try {
+ // record end time
+ if (startTime > -1) {
+ long diff = (System.nanoTime() - startTime) / 1000000;
+ recordTime(exchange, diff);
+ }
+ } finally {
+ // and let the original callback know we are done as well
+ callback.done(doneSync);
+ }
}
- }
+ });
}
protected void recordTime(Exchange exchange, long duration) {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
Wed Jun 16 16:40:37 2010
@@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLat
import java.util.concurrent.TimeUnit;
import org.apache.camel.AlreadyStoppedException;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.commons.logging.Log;
@@ -33,7 +34,7 @@ import org.apache.commons.logging.LogFac
*
* @version $Revision$
*/
-public abstract class DelayProcessorSupport extends DelegateProcessor {
+public abstract class DelayProcessorSupport extends DelegateAsyncProcessor {
protected final transient Log log = LogFactory.getLog(getClass());
private final CountDownLatch stoppedLatch = new CountDownLatch(1);
private boolean fastStop = true;
@@ -42,9 +43,14 @@ public abstract class DelayProcessorSupp
super(processor);
}
- public void process(Exchange exchange) throws Exception {
- delay(exchange);
- super.process(exchange);
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ try {
+ delay(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
+ return super.process(exchange, callback);
}
public boolean isFastStop() {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
Wed Jun 16 16:40:37 2010
@@ -25,6 +25,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ServiceHelper;
@@ -48,6 +49,10 @@ public class DelegateAsyncProcessor exte
this.processor = processor;
}
+ public DelegateAsyncProcessor(Processor processor) {
+ this(AsyncProcessorTypeConverter.convert(processor));
+ }
+
@Override
public String toString() {
return "DelegateAsync[" + processor + "]";
@@ -57,6 +62,14 @@ public class DelegateAsyncProcessor exte
return processor;
}
+ public void setProcessor(AsyncProcessor processor) {
+ this.processor = processor;
+ }
+
+ public void setProcessor(Processor processor) {
+ this.processor = AsyncProcessorTypeConverter.convert(processor);
+ }
+
protected void doStart() throws Exception {
ServiceHelper.startServices(processor);
}
@@ -66,6 +79,11 @@ public class DelegateAsyncProcessor exte
}
public boolean process(final Exchange exchange, final AsyncCallback
callback) {
+ if (processor == null) {
+ // no processor then we are done
+ callback.done(true);
+ return true;
+ }
return processor.process(exchange, callback);
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
Wed Jun 16 16:40:37 2010
@@ -131,6 +131,9 @@ public abstract class RedeliveryErrorHan
// okay we want to continue then prepare the exchange for
that as well
prepareExchangeForContinue(exchange, data);
}
+
+ // we are breaking out so invoke the callback
+ callback.done(data.sync);
// and then return
return data.sync;
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
Wed Jun 16 16:40:37 2010
@@ -21,7 +21,6 @@ import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultUnitOfWork;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.spi.RouteContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,7 +37,7 @@ public final class UnitOfWorkProcessor e
private final RouteContext routeContext;
public UnitOfWorkProcessor(Processor processor) {
- this(null, AsyncProcessorTypeConverter.convert(processor));
+ this(null, processor);
}
public UnitOfWorkProcessor(AsyncProcessor processor) {
@@ -46,7 +45,8 @@ public final class UnitOfWorkProcessor e
}
public UnitOfWorkProcessor(RouteContext routeContext, Processor processor)
{
- this(routeContext, AsyncProcessorTypeConverter.convert(processor));
+ super(processor);
+ this.routeContext = routeContext;
}
public UnitOfWorkProcessor(RouteContext routeContext, AsyncProcessor
processor) {
@@ -81,8 +81,11 @@ public final class UnitOfWorkProcessor e
public void done(boolean doneSync) {
// Order here matters. We need to complete the callbacks
// since they will likely update the exchange with some
final results.
- callback.done(doneSync);
- doneUow(uow, exchange);
+ try {
+ callback.done(doneSync);
+ } finally {
+ doneUow(uow, exchange);
+ }
}
});
} else {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/Delayer.java
Wed Jun 16 16:40:37 2010
@@ -51,9 +51,8 @@ public class Delayer implements Intercep
return null;
}
- public Processor wrapProcessorInInterceptors(CamelContext context,
- ProcessorDefinition<?> definition, Processor target, Processor
nextTarget) throws Exception {
-
+ public Processor wrapProcessorInInterceptors(CamelContext context,
ProcessorDefinition<?> definition,
+ Processor target, Processor
nextTarget) throws Exception {
return new DelayInterceptor(definition, target, this);
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java
Wed Jun 16 16:40:37 2010
@@ -16,20 +16,20 @@
*/
package org.apache.camel.processor.interceptor;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.processor.DelegateAsyncProcessor;
-public class HandleFaultInterceptor extends DelegateProcessor {
+public class HandleFaultInterceptor extends DelegateAsyncProcessor {
public HandleFaultInterceptor() {
super();
}
public HandleFaultInterceptor(Processor processor) {
- this();
- setProcessor(processor);
+ super(processor);
}
@Override
@@ -38,18 +38,18 @@ public class HandleFaultInterceptor exte
}
@Override
- public void process(Exchange exchange) throws Exception {
- if (processor == null) {
- return;
- }
-
- try {
- processor.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
-
- handleFault(exchange);
+ public boolean process(final Exchange exchange, final AsyncCallback
callback) {
+ return getProcessor().process(exchange, new AsyncCallback() {
+ public void done(boolean doneSync) {
+ try {
+ // handle fault after we are done
+ handleFault(exchange);
+ } finally {
+ // and let the original callback know we are done as well
+ callback.done(doneSync);
+ }
+ }
+ });
}
/**
@@ -62,8 +62,8 @@ public class HandleFaultInterceptor exte
if (faultBody != null && exchange.getException() == null) {
// remove fault as we are converting it to an exception
exchange.setOut(null);
- if (faultBody instanceof Exception) {
- exchange.setException((Exception) faultBody);
+ if (faultBody instanceof Throwable) {
+ exchange.setException((Throwable) faultBody);
} else {
// wrap it in an exception
exchange.setException(new
CamelException(faultBody.toString()));
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
Wed Jun 16 16:40:37 2010
@@ -16,24 +16,25 @@
*/
package org.apache.camel.processor.interceptor;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.StreamCache;
-import org.apache.camel.processor.DelegateProcessor;
+import org.apache.camel.processor.DelegateAsyncProcessor;
import org.apache.camel.util.MessageHelper;
/**
- * {...@link DelegateProcessor} that converts a message into a re-readable
format
+ * An interceptor that converts streams messages into a re-readable format
+ * by wrapping the stream into a {...@link StreamCache}.
*/
-public class StreamCachingInterceptor extends DelegateProcessor {
+public class StreamCachingInterceptor extends DelegateAsyncProcessor {
public StreamCachingInterceptor() {
super();
}
public StreamCachingInterceptor(Processor processor) {
- this();
- setProcessor(processor);
+ super(processor);
}
@Override
@@ -42,14 +43,14 @@ public class StreamCachingInterceptor ex
}
@Override
- public void process(Exchange exchange) throws Exception {
+ public boolean process(Exchange exchange, AsyncCallback callback) {
StreamCache newBody = exchange.getIn().getBody(StreamCache.class);
if (newBody != null) {
exchange.getIn().setBody(newBody);
}
MessageHelper.resetStreamCache(exchange.getIn());
- getProcessor().process(exchange);
+ return getProcessor().process(exchange, callback);
}
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
Wed Jun 16 16:40:37 2010
@@ -32,7 +32,6 @@ import org.apache.camel.impl.DoCatchRout
import org.apache.camel.impl.DoFinallyRouteNode;
import org.apache.camel.impl.OnCompletionRouteNode;
import org.apache.camel.impl.OnExceptionRouteNode;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.model.AggregateDefinition;
import org.apache.camel.model.CatchDefinition;
import org.apache.camel.model.FinallyDefinition;
@@ -47,7 +46,6 @@ import org.apache.camel.spi.ExchangeForm
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.TracedRouteNodes;
-import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.IntrospectionSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
@@ -73,7 +71,7 @@ public class TraceInterceptor extends De
private String jpaTraceEventMessageClassName;
public TraceInterceptor(ProcessorDefinition node, Processor target,
TraceFormatter formatter, Tracer tracer) {
- super(AsyncProcessorTypeConverter.convert(target));
+ super(target);
this.tracer = tracer;
this.node = node;
this.formatter = formatter;
@@ -94,10 +92,6 @@ public class TraceInterceptor extends De
this.routeContext = routeContext;
}
- public void process(final Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
// do not trace if tracing is disabled
@@ -178,7 +172,7 @@ public class TraceInterceptor extends De
// process the exchange
try {
sync = super.process(exchange, callback);
- } catch (Exception e) {
+ } catch (Throwable e) {
exchange.setException(e);
}
} finally {
@@ -188,7 +182,7 @@ public class TraceInterceptor extends De
traceExchangeOut(exchange, traceState);
}
}
- } catch (Exception e) {
+ } catch (Throwable e) {
// some exception occurred in trace logic
if (shouldLogException(exchange)) {
logException(exchange, e);
@@ -196,7 +190,6 @@ public class TraceInterceptor extends De
exchange.setException(e);
}
- callback.done(sync);
return sync;
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java?rev=955294&r1=955293&r2=955294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InterceptStrategy.java
Wed Jun 16 16:40:37 2010
@@ -29,19 +29,24 @@ import org.apache.camel.model.ProcessorD
*/
public interface InterceptStrategy {
+ // TODO: We should force this strategy to return AsyncProcessor so custom
interceptors work nicely with async
+
/**
* This method is invoked by
* {...@link ProcessorDefinition#wrapProcessor(RouteContext, Processor)}
* to give the implementor an opportunity to wrap the target processor
* in a route.
+ * <p/>
+ * Its adviced to use an {...@link org.apache.camel.AsyncProcessor} as the
returned wrapped
+ * {...@link Processor} which ensures the interceptor works well with the
asynchronous routing engine.
*
* @param context Camel context
* @param definition the model this interceptor represents
* @param target the processor to be wrapped
* @param nextTarget the next processor to be routed to
- * @return processor wrapped with an interceptor or not wrapped
+ * @return processor wrapped with an interceptor or not wrapped.
* @throws Exception can be thrown
*/
- Processor wrapProcessorInInterceptors(CamelContext context,
- ProcessorDefinition<?> definition, Processor target, Processor
nextTarget) throws Exception;
+ Processor wrapProcessorInInterceptors(CamelContext context,
ProcessorDefinition<?> definition,
+ Processor target, Processor
nextTarget) throws Exception;
}
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/VerySimpleDirectTest.java
(from r955211,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleDirectTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/VerySimpleDirectTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/VerySimpleDirectTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleDirectTest.java&r1=955211&r2=955294&rev=955294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleDirectTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/VerySimpleDirectTest.java
Wed Jun 16 16:40:37 2010
@@ -22,11 +22,9 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class SimpleDirectTest extends ContextTestSupport {
+public class VerySimpleDirectTest extends ContextTestSupport {
- public void testSimpleDirect() throws Exception {
- getMockEndpoint("mock:foo").expectedMessageCount(1);
- getMockEndpoint("mock:bar").expectedMessageCount(1);
+ public void testVerySimpleDirect() throws Exception {
getMockEndpoint("mock:result").expectedMessageCount(1);
template.sendBody("direct:start", "Hello World");
@@ -39,11 +37,8 @@ public class SimpleDirectTest extends Co
return new RouteBuilder() {
@Override
public void configure() throws Exception {
-
from("direct:start").to("direct:foo").to("direct:bar").to("mock:result");
-
- from("direct:foo").to("mock:foo");
- from("direct:bar").to("mock:bar");
+ from("direct:start").to("mock:result");
}
};
}
-}
+}
\ No newline at end of file
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithDelayerTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithDelayerTest.java?rev=955294&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithDelayerTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithDelayerTest.java
Wed Jun 16 16:40:37 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointWithDelayerTest extends ContextTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ public void testAsyncEndpoint() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+ String reply = template.requestBody("direct:start", "Hello Camel",
String.class);
+ assertEquals("Bye Camel", reply);
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads",
beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ // enable delayer to ensure it works using async API
+ from("direct:start").delayer(100)
+ .to("mock:before")
+ .to("log:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ beforeThreadName =
Thread.currentThread().getName();
+ }
+ })
+ .to("async:foo")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ afterThreadName =
Thread.currentThread().getName();
+ }
+ })
+ .to("log:after")
+ .to("mock:after")
+ .to("mock:result");
+ }
+ };
+ }
+
+}
\ No newline at end of file
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithDelayerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithDelayerTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithHandleFaultTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithHandleFaultTest.java?rev=955294&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithHandleFaultTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithHandleFaultTest.java
Wed Jun 16 16:40:37 2010
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointWithHandleFaultTest extends ContextTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ public void testAsyncEndpoint() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedMessageCount(0);
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+
+ Exchange reply = template.request("direct:start", new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setBody("Hello Camel");
+ }
+ });
+ assertNotNull(reply);
+ assertTrue(reply.isFailed());
+ assertNotNull(reply.getException());
+ assertEquals("Faulty Bye Camel", reply.getException().getMessage());
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads",
beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ // enable handle fault to ensure it works using async API
+ from("direct:start").handleFault()
+ .to("mock:before")
+ .to("log:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ beforeThreadName =
Thread.currentThread().getName();
+ }
+ })
+ .to("async:foo")
+ .to("log:after")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ afterThreadName =
Thread.currentThread().getName();
+ exchange.getOut().setFault(true);
+ exchange.getOut().setBody("Faulty Bye Camel");
+ }
+ })
+ .to("mock:after")
+ .to("mock:result");
+ }
+ };
+ }
+
+}
\ No newline at end of file
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithHandleFaultTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithHandleFaultTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithJMXTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithJMXTest.java?rev=955294&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithJMXTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithJMXTest.java
Wed Jun 16 16:40:37 2010
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointWithJMXTest extends ContextTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ @Override
+ protected boolean useJmx() {
+ return true;
+ }
+
+ public void testAsyncEndpoint() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+ String reply = template.requestBody("direct:start", "Hello Camel",
String.class);
+ assertEquals("Bye Camel", reply);
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads",
beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ from("direct:start")
+ .to("mock:before")
+ .to("log:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ beforeThreadName =
Thread.currentThread().getName();
+ }
+ })
+ .to("async:foo")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ afterThreadName =
Thread.currentThread().getName();
+ }
+ })
+ .to("log:after")
+ .to("mock:after")
+ .to("mock:result");
+ }
+ };
+ }
+
+}
\ No newline at end of file
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithJMXTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithJMXTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithStreamCachingTest.java
(from r955211,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithStreamCachingTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithStreamCachingTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java&r1=955211&r2=955294&rev=955294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithStreamCachingTest.java
Wed Jun 16 16:40:37 2010
@@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui
/**
* @version $Revision$
*/
-public class AsyncEndpointTest extends ContextTestSupport {
+public class AsyncEndpointWithStreamCachingTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
@@ -49,7 +49,8 @@ public class AsyncEndpointTest extends C
public void configure() throws Exception {
context.addComponent("async", new MyAsyncComponent());
- from("direct:start")
+ // enable stream caching to ensure it works using async API
+ from("direct:start").streamCaching().tracing()
.to("mock:before")
.to("log:before")
.process(new Processor() {
@@ -70,4 +71,4 @@ public class AsyncEndpointTest extends C
};
}
-}
+}
\ No newline at end of file
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithTracingTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithTracingTest.java?rev=955294&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithTracingTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithTracingTest.java
Wed Jun 16 16:40:37 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.async;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class AsyncEndpointWithTracingTest extends ContextTestSupport {
+
+ private static String beforeThreadName;
+ private static String afterThreadName;
+
+ public void testAsyncEndpoint() throws Exception {
+ getMockEndpoint("mock:before").expectedBodiesReceived("Hello Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Bye Camel");
+
+ String reply = template.requestBody("direct:start", "Hello Camel",
String.class);
+ assertEquals("Bye Camel", reply);
+
+ assertMockEndpointsSatisfied();
+
+ assertFalse("Should use different threads",
beforeThreadName.equalsIgnoreCase(afterThreadName));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ context.addComponent("async", new MyAsyncComponent());
+
+ // enable tracing to ensure it works using async API
+ from("direct:start").tracing()
+ .to("mock:before")
+ .to("log:before")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ beforeThreadName =
Thread.currentThread().getName();
+ }
+ })
+ .to("async:foo")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ afterThreadName =
Thread.currentThread().getName();
+ }
+ })
+ .to("log:after")
+ .to("mock:after")
+ .to("mock:result");
+ }
+ };
+ }
+
+}
\ No newline at end of file
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithTracingTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointWithTracingTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date