Hey..
so I prototype an initial implementation of this. I'm going to attach
that patch for review and comments. It's still kinda of rough and I
broke a few other test cases. I'll work on fixing that later today
but this does solve the async processing from a file:// endpoint
problem that this thread originally described. Just run the included
FileAsyncRouteTest case.
Regards,
Hiram
On 8/23/07, James Strachan <[EMAIL PROTECTED]> wrote:
> On 8/23/07, Hiram Chirino <[EMAIL PROTECTED]> wrote:
> > On 8/23/07, James Strachan <[EMAIL PROTECTED]> wrote:
> > > On 8/22/07, Hiram Chirino <[EMAIL PROTECTED]> wrote:
> > > > Hi,
> > > >
> > > > Most of our components currently depend on synchronous processing of
> > > > the Exchange or bad things can happen. For example the following does
> > > > not work:
> > > >
> > > > from("file:/tmp/foo").to("seda:test");
> > > > from("seda:test").process( myProcessor );
> > > >
> > > > Why? because the file component delete the file as soon as the
> > > > exchange returns from being sent to seda:test. What would have been
> > > > nice is that file deletion did not occur until after the exchange is
> > > > processed by myProcessor. But that's occuring in an asynchronous
> > > > thread.
> > > >
> > > > Here's an idea that might help solve this problem.
> > > > Have the seda component call something like
> > > > exchange.getExchangeFuture().done()
> > > > when the message is processed in it's async thread.
> > > >
> > > > and in the file component, have it call
> > > > exchange.getExchangeFuture().get();
> > > > // then the code that deletes the file
> > > > or
> > > > exchange.getExchangeFuture().setCallback( new Callback() {
> > > > public void done( Exchange exch ) {
> > > > // then the code that deletes the file
> > > > }
> > > > })
> > >
> > > I was pondering about this with relation to this thread the other day...
> > > http://www.nabble.com/Consuming-FTP-file-and-deleting-after-processing-tf4300515s22882.html
> > >
> > > I definitely think we need a standard way to register
> > > post-commit/rollback hooks. i.e. on completion of processing (either
> > > on a commit/completed or rollback/failed) allow a
> > > processor/consumer/producer to register some logic such as to delete a
> > > file, flush some cache etc. Note this is mostly required for
> > > non-transactional things. e.g. in JPA and JMS we can just use
> > > transactions for this.
> >
> > Actually transaction things are easy since they require all processing
> > in the transaction to be done synchronously.
>
> You could suspend the transaction and resume it in another thread; its
> rarely done but it is a possible approach.
>
>
> > The hard bit is
> > processing the exchanges async.
>
> Yeah - async is hard full stop I think :)
>
>
> > > I'm kinda wondering; should we just try make things like files, FTP
> > > and the like transactional; that is to say, we implement transaction
> > > hooks so that we can do a file 'delete/rename' which is registered as
> > > a transaction commit status listener? Just registering some kind of
> > > onCommit/onRollback callbacks would do the trick though as you
> > > suggest.
> > >
> >
> > I don't like the idea of making this looks like transaction semantics
> > when it's not. Traditional transaction semantics force you to do
> > processing synchronously. And the point of this is exactly the
> > opposite.
>
> I could counter that by saying I don't like 2 different mechanisms to
> describe a 'unit of work' with callback mechanisms for knowing when it
> completes successfully or fails. i.e. having transaction callbacks and
> async callbacks; we should have just one really.
>
> e.g. using synchronous processing, I might want to process a file and
> do a JDBC insert; only if the transaction commits do I want to delete
> the file.
>
> So I think being able to have file operations work nicely with
> transactions (whether in sync or async mode) is a pretty common
> requirement.
>
> The issue though is; should we treat async processing as
> suspending/resuming a transaction or not (as async processing of
> transactions is tricky).
>
>
> > > There's a second issue which is asynchronous processing; such as a
> > > producer invoking an asynchronous processor then wanting some kind of
> > > callback that the processing has completed. I wanted to make the easy
> > > things really easy with Camel; so was a bit reluctant to add
> > > asynchronous processing explicitly from the start for fear of making
> > > the API very complex; most components afterall tend to be synchronous
> > > (which makes transactions very easy to do too btw).
> > >
> >
> > I agree with this.. and this is my greatest fear. We need to make
> > sure that the synchronous components stay as simple as they are today.
> > But allow async aware components support having their exchanges be
> > processed async.
>
> Yeah. I think there's gonna be few of 'em that are truly async too btw
> - so only a few component ninja's will have to worry about that.
>
>
> > > I was thinking we could add some optional API for AsyncProcessor which
> > > is-a Processor but adds an asynchronous invocation API style; rather
> > > like the Channel does in the ServiceMix 4 API...
> > >
> > > // sync API
> > > interface Processor {
> > > void process(Exchange exchange);
> > > }
> > >
> > > interface AsyncProcessor extends Processor {
> > > // async methods
> > > Future<Exchange> processAsync(Exchange exchange)
> > > Future<Exchange> processsync(Exchange exchange, AsyncHandler
> > > handler)
> > > }
> > >
> > > Then rather than adding a kinda done() method to the Exchange and
> > > calling it throughout every single producer/consumer/Processor
> > > implementation; we could just use the Future object to know when a
> > > particular asynchronous operation has completed. i.e. keep the async
> > > API to the side, for those rare cases folks really wanna use it -
> > > otherwise we can all stick to the simple sync API that works easily
> > > with transactions.
> > >
> >
> > This might be a good option. I think that we don't need the
> > "Future<Exchange> processAsync(Exchange exchange)" call since to
> > make an exchange async you just need to route it through a seda:
> > component.
> >
> > so perhaps we just add:
> > Future<Exchange> processs(Exchange exchange, AsyncHandler handler)
> >
> > If the path of the exchange is sync, the it's a blocking call and by
> > the time it returns the Future will be done. But it reached an async
> > component like seda: then it will return without the Future being
> > completed.
> >
> > > Thoughts?
> >
> > Sounds like like a good approach... Perhaps I'll prototype it..
>
> Go for it! :)
>
> --
> James
> -------
> http://macstrac.blogspot.com/
>
--
Regards,
Hiram
Blog: http://hiramchirino.com
Index: camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (revision 570270)
+++ camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (working copy)
@@ -17,13 +17,18 @@
package org.apache.camel.processor;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.Processor;
-import org.apache.camel.Message;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,7 +38,7 @@
*
* @version $Revision$
*/
-public class Pipeline extends MulticastProcessor implements Processor {
+public class Pipeline extends MulticastProcessor implements AsyncProcessor {
private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
public Pipeline(Collection<Processor> processors) {
@@ -50,18 +55,67 @@
}
public void process(Exchange exchange) throws Exception {
+ // This could become a base class method for an AsyncProcessor
+ final CountDownLatch latch = new CountDownLatch(1);
+ if (!process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+ if (sync) {
+ return;
+ }
+ latch.countDown();
+ }
+ })) {
+ latch.await();
+ }
+ // If there was an exception associated with the exchange, throw it.
+ exchange.throwException();
+ }
+
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ Iterator<Processor> processors = getProcessors().iterator();
Exchange nextExchange = exchange;
- boolean first = true;
- for (Processor producer : getProcessors()) {
- if (first) {
- first = false;
+ while (processors.hasNext()) {
+ AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
+ boolean sync = process(nextExchange, callback, processors, processor);
+ // Continue processing the pipeline synchronously ...
+ if (sync) {
+ nextExchange = createNextExchange(processor, exchange);
} else {
- nextExchange = createNextExchange(producer, nextExchange);
+ // The pipeline will be completed async...
+ return true;
}
- producer.process(nextExchange);
}
+ // If we get here then the pipeline was processed entirely
+ // synchronously.
+ callback.done(true);
+ return true;
}
+ private boolean process(final Exchange exchange, final AsyncCallback callback, final Iterator<Processor> processors, AsyncProcessor processor) {
+ return processor.process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+
+ // We only have to handle async completion of
+ // the pipeline..
+ if( sync ) {
+ return;
+ }
+
+ // Continue processing the pipeline...
+ Exchange nextExchange = exchange;
+ while( processors.hasNext() ) {
+ AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
+ nextExchange = createNextExchange(processor, exchange);
+ sync = process( nextExchange, callback, processors, processor);
+ if( !sync ) {
+ return;
+ }
+ }
+ callback.done(true);
+ }
+ });
+ }
+
/**
* Strategy method to create the next exchange from the
*
@@ -111,4 +165,5 @@
public String toString() {
return "Pipeline" + getProcessors();
}
+
}
Index: camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (revision 570270)
+++ camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (working copy)
@@ -16,22 +16,25 @@
*/
package org.apache.camel.processor;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Service;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @version $Revision$
*/
-public class SendProcessor extends ServiceSupport implements Processor, Service {
+public class SendProcessor extends ServiceSupport implements AsyncProcessor, Service {
private static final transient Log LOG = LogFactory.getLog(SendProcessor.class);
private Endpoint destination;
private Producer producer;
+ private AsyncProcessor processor;
public SendProcessor(Endpoint destination) {
if (destination == null) {
@@ -57,6 +60,21 @@
}
}
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ if (producer == null) {
+ if (isStopped()) {
+ LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
+ } else {
+ exchange.setException(new IllegalStateException("No producer, this processor has not been started!"));
+ }
+ callback.done(true);
+ return true;
+ } else {
+ return processor.process(exchange, callback);
+ }
+ }
+
+
public Endpoint getDestination() {
return destination;
}
@@ -64,6 +82,7 @@
protected void doStart() throws Exception {
this.producer = destination.createProducer();
this.producer.start();
+ this.processor = AsyncProcessorTypeConverter.convert(producer);
}
protected void doStop() throws Exception {
@@ -72,6 +91,7 @@
producer.stop();
} finally {
producer = null;
+ processor = null;
}
}
}
Index: camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (revision 570270)
+++ camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (working copy)
@@ -16,11 +16,13 @@
*/
package org.apache.camel.processor;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.model.ExceptionType;
-import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.util.ServiceHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,13 +35,22 @@
*
* @version $Revision$
*/
-public class DeadLetterChannel extends ErrorHandlerSupport {
+public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor {
public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter";
public static final String REDELIVERED = "org.apache.camel.Redelivered";
+ private class RedeliveryData {
+ int redeliveryCounter;
+ long redeliveryDelay;
+
+ // default behaviour which can be overloaded on a per exception basis
+ RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
+ Processor failureProcessor = deadLetter;
+ }
+
private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
- private Processor output;
- private Processor deadLetter;
+ private AsyncProcessor output;
+ private AsyncProcessor deadLetter;
private RedeliveryPolicy redeliveryPolicy;
private Logger logger;
@@ -47,14 +58,13 @@
this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger());
}
- public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy,
- Logger logger) {
- this.deadLetter = deadLetter;
- this.output = output;
+ public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy, Logger logger) {
+ this.deadLetter = AsyncProcessorTypeConverter.convert(deadLetter);
+ this.output = AsyncProcessorTypeConverter.convert(output);
this.redeliveryPolicy = redeliveryPolicy;
this.logger = logger;
}
-
+
public static <E extends Exchange> Logger createDefaultLogger() {
return new Logger(LOG, LoggingLevel.ERROR);
}
@@ -64,6 +74,67 @@
return "DeadLetterChannel[" + output + ", " + deadLetter + ", " + redeliveryPolicy + "]";
}
+ public boolean process(Exchange exchange, final AsyncCallback callback) {
+ return process(exchange, callback, new RedeliveryData());
+ }
+
+ public boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
+
+ while (true) {
+ if (exchange.getException() != null) {
+ Throwable e = exchange.getException();
+ exchange.setException(null); // Reset it since we are handling it.
+
+ logger.log("On delivery attempt: " + data.redeliveryCounter + " caught: " + e, e);
+ data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
+
+ ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
+ if (exceptionPolicy != null) {
+ data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(data.currentRedeliveryPolicy);
+ Processor processor = exceptionPolicy.getErrorHandler();
+ if (processor != null) {
+ data.failureProcessor = processor;
+ }
+ }
+ }
+
+ if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
+ AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
+ return afp.process(exchange, callback);
+ }
+
+ if (data.redeliveryCounter > 0) {
+ // Figure out how long we should wait to resend this message.
+ data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
+ sleep(data.redeliveryDelay);
+ }
+
+ boolean sync = output.process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+ // Only handle the async case...
+ if (sync) {
+ return;
+ }
+ if (exchange.getException() != null) {
+ process(exchange, callback, data);
+ } else {
+ callback.done(sync);
+ }
+ }
+ });
+ if (!sync) {
+ // It is going to be processed async..
+ return false;
+ }
+ if (exchange.getException() == null) {
+ // If everything went well.. then we exit here..
+ return true;
+ }
+ // error occured so loop back around.....
+ }
+
+ }
+
public void process(Exchange exchange) throws Exception {
int redeliveryCounter = 0;
long redeliveryDelay = 0;
@@ -86,7 +157,6 @@
logger.log("On delivery attempt: " + redeliveryCounter + " caught: " + e, e);
redeliveryCounter = incrementRedeliveryCounter(exchange, e);
-
ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
if (exceptionPolicy != null) {
currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(currentRedeliveryPolicy);
@@ -185,4 +255,5 @@
protected void doStop() throws Exception {
ServiceHelper.stopServices(deadLetter, output);
}
+
}
Index: camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java (revision 570270)
+++ camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java (working copy)
@@ -47,6 +47,7 @@
public DefaultTypeConverter(Injector injector) {
typeConverterLoaders.add(new AnnotationTypeConverterLoader());
this.injector = injector;
+ addFallbackConverter(new AsyncProcessorTypeConverter());
addFallbackConverter(new PropertyEditorTypeConverter());
addFallbackConverter(new ToStringTypeConverter());
addFallbackConverter(new ArrayTypeConverter());
Index: camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (revision 570270)
+++ camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (working copy)
@@ -16,10 +16,15 @@
*/
package org.apache.camel.impl;
+import java.util.concurrent.Future;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.util.ServiceHelper;
@@ -29,6 +34,7 @@
public class DefaultConsumer<E extends Exchange> extends ServiceSupport implements Consumer<E> {
private Endpoint<E> endpoint;
private Processor processor;
+ private AsyncProcessor asyncProcessor;
private ExceptionHandler exceptionHandler;
public DefaultConsumer(Endpoint<E> endpoint, Processor processor) {
@@ -49,6 +55,20 @@
return processor;
}
+ /**
+ * Provides an [EMAIL PROTECTED] AsyncProcessor} interface to the configured
+ * processor on the consumer. If the processor does not implement
+ * the interface, it will be adapted so that it does.
+ *
+ * @return
+ */
+ public AsyncProcessor getAsyncProcessor() {
+ if (asyncProcessor == null) {
+ asyncProcessor = AsyncProcessorTypeConverter.convert(processor);
+ }
+ return asyncProcessor;
+ }
+
public ExceptionHandler getExceptionHandler() {
if (exceptionHandler == null) {
exceptionHandler = new LoggingExceptionHandler(getClass());
Index: camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java (revision 570270)
+++ camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java (working copy)
@@ -22,6 +22,7 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.util.UuidGenerator;
/**
@@ -182,6 +183,19 @@
this.exception = exception;
}
+ public void throwException() throws Exception {
+ if (exception == null) {
+ return;
+ }
+ if (exception instanceof Exception) {
+ throw (Exception)exception;
+ }
+ if (exception instanceof RuntimeException) {
+ throw (RuntimeException)exception;
+ }
+ throw new RuntimeCamelException(exception);
+ }
+
public Message getFault() {
return fault;
}
@@ -222,4 +236,5 @@
messageSupport.setExchange(this);
}
}
+
}
Index: camel-core/src/main/java/org/apache/camel/Exchange.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/Exchange.java (revision 570270)
+++ camel-core/src/main/java/org/apache/camel/Exchange.java (working copy)
@@ -133,6 +133,11 @@
void setException(Throwable e);
/**
+ * Throws the exception associated with this exchange.
+ */
+ void throwException() throws Exception;
+
+ /**
* Returns the container so that a processor can resolve endpoints from URIs
*
* @return the container which owns this exchange
@@ -152,4 +157,5 @@
* copied
*/
void copyFrom(Exchange source);
+
}
Index: camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (revision 570270)
+++ camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java (working copy)
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.file;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.file.strategy.FileProcessStrategy;
import org.apache.camel.impl.ScheduledPollConsumer;
@@ -23,6 +25,9 @@
import org.apache.commons.logging.LogFactory;
import java.io.File;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
/**
* @version $Revision: 523016 $
@@ -62,8 +67,12 @@
LOG.debug("Skipping directory " + fileOrDirectory);
}
}
+
+ ConcurrentHashMap<File, File> filesBeingProcessed = new ConcurrentHashMap<File, File>();
protected void pollFile(final File file) {
+
+
if (!file.exists()) {
return;
}
@@ -77,10 +86,15 @@
}
return;
}
+ } else {
+ if (filesBeingProcessed.contains(file)) {
+ return;
+ }
+ filesBeingProcessed.put(file, file);
}
- FileProcessStrategy processStrategy = endpoint.getFileStrategy();
- FileExchange exchange = endpoint.createExchange(file);
+ final FileProcessStrategy processStrategy = endpoint.getFileStrategy();
+ final FileExchange exchange = endpoint.createExchange(file);
if (isPreserveFileName()) {
String relativePath = file.getPath().substring(endpoint.getFile().getPath().length());
@@ -95,8 +109,25 @@
LOG.debug("About to process file: " + file + " using exchange: " + exchange);
}
if (processStrategy.begin(endpoint, exchange, file)) {
- getProcessor().process(exchange);
- processStrategy.commit(endpoint, exchange, file);
+
+ // Use the async processor interface so that processing of
+ // the
+ // exchange can happen asynchronously
+ getAsyncProcessor().process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+ if (exchange.getException() == null) {
+ try {
+ processStrategy.commit(endpoint, (FileExchange)exchange, file);
+ } catch (Exception e) {
+ handleException(e);
+ }
+ } else {
+ handleException(exchange.getException());
+ }
+ filesBeingProcessed.remove(file);
+ }
+ });
+
}
else {
if (LOG.isDebugEnabled()) {
Index: camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java (revision 570270)
+++ camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java (working copy)
@@ -24,6 +24,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.component.seda.SedaComponent;
import org.apache.camel.component.seda.SedaEndpoint;
+import org.apache.camel.component.seda.SedaEndpoint.Entry;
/**
* An implementation of the <a href="http://activemq.apache.org/camel/vm.html">VM components</a>
@@ -34,19 +35,19 @@
* @version $Revision: 1.1 $
*/
public class VmComponent<E extends Exchange> extends SedaComponent<E> {
- protected static Map<String, BlockingQueue<Exchange>> queues = new HashMap<String, BlockingQueue<Exchange>>();
+ protected static Map<String, BlockingQueue> queues = new HashMap<String, BlockingQueue>();
@Override
protected Endpoint<E> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
- BlockingQueue<E> blockingQueue = (BlockingQueue<E>) getBlockingQueue(uri);
+ BlockingQueue<SedaEndpoint.Entry<E>> blockingQueue = (BlockingQueue<SedaEndpoint.Entry<E>>) getBlockingQueue(uri);
return new SedaEndpoint<E>(uri, this, blockingQueue);
}
- protected BlockingQueue<Exchange> getBlockingQueue(String uri) {
+ protected BlockingQueue<Entry<E>> getBlockingQueue(String uri) {
synchronized (queues) {
- BlockingQueue<Exchange> answer = queues.get(uri);
+ BlockingQueue<Entry<E>> answer = queues.get(uri);
if (answer == null) {
- answer = (BlockingQueue<Exchange>) createQueue();
+ answer = createQueue();
queues.put(uri, answer);
}
return answer;
Index: camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (revision 570270)
+++ camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java (working copy)
@@ -31,8 +31,8 @@
* @version $Revision: 1.1 $
*/
public class SedaComponent<E extends Exchange> extends DefaultComponent<E> {
- public BlockingQueue<E> createQueue() {
- return new LinkedBlockingQueue<E>(1000);
+ public BlockingQueue<SedaEndpoint.Entry<E>> createQueue() {
+ return new LinkedBlockingQueue<SedaEndpoint.Entry<E>>(1000);
}
@Override
Index: camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (revision 570270)
+++ camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (working copy)
@@ -19,10 +19,13 @@
import java.util.concurrent.TimeUnit;
import org.apache.camel.AlreadyStoppedException;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,12 +36,12 @@
private static final Log LOG = LogFactory.getLog(SedaConsumer.class);
private SedaEndpoint<E> endpoint;
- private Processor processor;
+ private AsyncProcessor processor;
private Thread thread;
public SedaConsumer(SedaEndpoint<E> endpoint, Processor processor) {
this.endpoint = endpoint;
- this.processor = processor;
+ this.processor = AsyncProcessorTypeConverter.convert(processor);
}
@Override
@@ -48,21 +51,30 @@
public void run() {
while (!isStopping()) {
- E exchange;
+ final SedaEndpoint.Entry<E> entry;
try {
- exchange = endpoint.getQueue().poll(1000, TimeUnit.MILLISECONDS);
+ entry = endpoint.getQueue().poll(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
break;
}
- if (exchange != null && !isStopping()) {
- try {
- processor.process(exchange);
- } catch (AlreadyStoppedException e) {
- LOG.debug("Ignoring failed message due to shutdown: " + e, e);
- break;
- } catch (Throwable e) {
- LOG.error(e);
- }
+ if (entry != null && !isStopping()) {
+ processor.process(entry.getExchange(), new AsyncCallback() {
+ public void done(boolean sync) {
+ if (entry.getCallback() != null) {
+ entry.getCallback().done(false);
+ } else {
+ Throwable e = entry.getExchange().getException();
+ if (e != null) {
+ if (e instanceof AlreadyStoppedException) {
+ LOG.debug("Ignoring failed message due to shutdown: " + e, e);
+ } else {
+ LOG.error(e);
+ }
+ }
+ }
+ }
+ });
+
}
}
}
Index: camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (revision 570270)
+++ camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (working copy)
@@ -18,8 +18,11 @@
import java.util.concurrent.BlockingQueue;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -35,9 +38,47 @@
* @version $Revision: 519973 $
*/
public class SedaEndpoint<E extends Exchange> extends DefaultEndpoint<E> {
- private BlockingQueue<E> queue;
+
+ static public class Entry<E extends Exchange> {
+ E exchange;
+ AsyncCallback callback;
+
+ public Entry(E exchange, AsyncCallback callback) {
+ this.exchange = exchange;
+ this.callback = callback;
+ }
+
+ public E getExchange() {
+ return exchange;
+ }
+ public void setExchange(E exchange) {
+ this.exchange = exchange;
+ }
+ public AsyncCallback getCallback() {
+ return callback;
+ }
+ public void setCallback(AsyncCallback callback) {
+ this.callback = callback;
+ }
+
+ }
+
+ private final class SedaProducer extends DefaultProducer implements AsyncProcessor {
+ private SedaProducer(Endpoint endpoint) {
+ super(endpoint);
+ }
+ public void process(Exchange exchange) {
+ queue.add(new Entry<E>(toExchangeType(exchange), null));
+ }
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ queue.add(new Entry<E>(toExchangeType(exchange), callback));
+ return false;
+ }
+ }
- public SedaEndpoint(String endpointUri, Component component, BlockingQueue<E> queue) {
+ private BlockingQueue<Entry<E>> queue;
+
+ public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Entry<E>> queue) {
super(endpointUri, component);
this.queue = queue;
}
@@ -47,11 +88,7 @@
}
public Producer<E> createProducer() throws Exception {
- return new DefaultProducer(this) {
- public void process(Exchange exchange) {
- queue.add(toExchangeType(exchange));
- }
- };
+ return new SedaProducer(this);
}
public Consumer<E> createConsumer(Processor processor) throws Exception {
@@ -64,7 +101,7 @@
return (E)new DefaultExchange(getContext());
}
- public BlockingQueue<E> getQueue() {
+ public BlockingQueue<Entry<E>> getQueue() {
return queue;
}