Hey..

so I prototype an initial implementation of this.  I'm going to attach
that patch for review and comments.  It's still kinda of rough and I
broke a few other test cases.  I'll work on fixing that later today
but this does solve the async processing from a file:// endpoint
problem that this thread originally described.  Just run the included
FileAsyncRouteTest case.

Regards,
Hiram

On 8/23/07, James Strachan <[EMAIL PROTECTED]> wrote:
> On 8/23/07, Hiram Chirino <[EMAIL PROTECTED]> wrote:
> > On 8/23/07, James Strachan <[EMAIL PROTECTED]> wrote:
> > > On 8/22/07, Hiram Chirino <[EMAIL PROTECTED]> wrote:
> > > > Hi,
> > > >
> > > > Most of our components currently depend on synchronous processing of
> > > > the Exchange or bad things can happen.  For example the following does
> > > > not work:
> > > >
> > > > from("file:/tmp/foo").to("seda:test");
> > > > from("seda:test").process( myProcessor );
> > > >
> > > > Why? because the file component delete the file as soon as the
> > > > exchange returns from being sent to seda:test.  What would have been
> > > > nice is that file deletion did not occur until after the exchange is
> > > > processed by myProcessor.  But that's occuring in an asynchronous
> > > > thread.
> > > >
> > > > Here's an idea that might help solve this problem.
> > > > Have the seda component call something like
> > > >    exchange.getExchangeFuture().done()
> > > > when the message is processed in it's async thread.
> > > >
> > > > and in the file component, have it call
> > > >    exchange.getExchangeFuture().get();
> > > >    // then the code that deletes the file
> > > > or
> > > >    exchange.getExchangeFuture().setCallback( new Callback() {
> > > >      public void done( Exchange exch ) {
> > > >         // then the code that deletes the file
> > > >      }
> > > > })
> > >
> > > I was pondering about this with relation to this thread the other day...
> > > http://www.nabble.com/Consuming-FTP-file-and-deleting-after-processing-tf4300515s22882.html
> > >
> > > I definitely think we need a standard way to register
> > > post-commit/rollback hooks. i.e. on completion of processing (either
> > > on a commit/completed or rollback/failed) allow a
> > > processor/consumer/producer to register some logic such as to delete a
> > > file, flush some cache etc. Note this is mostly required for
> > > non-transactional things. e.g. in JPA and JMS we can just use
> > > transactions for this.
> >
> > Actually transaction things are easy since they require all processing
> > in the transaction to be done synchronously.
>
> You could suspend the transaction and resume it in another thread; its
> rarely done but it is a possible approach.
>
>
> > The hard bit is
> > processing the exchanges async.
>
> Yeah - async is hard full stop I think :)
>
>
> > > I'm kinda wondering; should we just try make things like files, FTP
> > > and the like transactional; that is to say, we implement transaction
> > > hooks so that we can do a file 'delete/rename' which is registered as
> > > a transaction commit status listener? Just registering some kind of
> > > onCommit/onRollback callbacks would do the trick though as you
> > > suggest.
> > >
> >
> > I don't like the idea of making this looks like transaction semantics
> > when it's not.  Traditional transaction semantics force you to do
> > processing synchronously.  And the point of this is exactly the
> > opposite.
>
> I could counter that by saying I don't like 2 different mechanisms to
> describe a 'unit of work' with callback mechanisms for knowing when it
> completes successfully or fails. i.e. having transaction callbacks and
> async callbacks; we should have just one really.
>
> e.g. using synchronous processing, I might want to process a file and
> do a JDBC insert; only if the transaction commits do I want to delete
> the file.
>
> So I think being able to have file operations work nicely with
> transactions (whether in sync or async mode) is a pretty common
> requirement.
>
> The issue though is; should we treat async processing as
> suspending/resuming a transaction or not (as async processing of
> transactions is tricky).
>
>
> > > There's a second issue which is asynchronous processing; such as a
> > > producer invoking an asynchronous processor then wanting some kind of
> > > callback that the processing has completed. I wanted to make the easy
> > > things really easy with Camel; so was a bit reluctant to add
> > > asynchronous processing explicitly from the start for fear of making
> > > the API very complex; most components afterall tend to be synchronous
> > > (which makes transactions very easy to do too btw).
> > >
> >
> > I agree with this..  and this is my greatest fear.  We need to make
> > sure that the synchronous components stay as simple as they are today.
> >  But allow async aware components support having their exchanges be
> > processed async.
>
> Yeah. I think there's gonna be few of 'em that are truly async too btw
> - so only a few component ninja's will have to worry about that.
>
>
> > > I was thinking we could add some optional API for AsyncProcessor which
> > > is-a Processor but adds an asynchronous invocation API style; rather
> > > like the Channel does in the ServiceMix 4 API...
> > >
> > > // sync API
> > > interface Processor {
> > >         void    process(Exchange exchange);
> > > }
> > >
> > > interface AsyncProcessor extends Processor {
> > >   // async methods
> > >   Future<Exchange>      processAsync(Exchange exchange)
> > >   Future<Exchange>      processsync(Exchange exchange, AsyncHandler 
> > > handler)
> > > }
> > >
> > > Then rather than adding a kinda done() method to the Exchange and
> > > calling it throughout every single producer/consumer/Processor
> > > implementation; we could just use the Future object to know when a
> > > particular asynchronous operation has completed. i.e. keep the async
> > > API to the side, for those rare cases folks really wanna use it -
> > > otherwise we can all stick to the simple sync API that works easily
> > > with transactions.
> > >
> >
> > This might be a good option.  I think that we don't need the
> > "Future<Exchange>      processAsync(Exchange exchange)" call since to
> > make an exchange async you just need to route it through a seda:
> > component.
> >
> > so perhaps we just add:
> > Future<Exchange>      processs(Exchange exchange, AsyncHandler handler)
> >
> > If the path of the exchange is sync, the it's a blocking call and by
> > the time it returns the Future will be done.  But it reached an async
> > component like seda: then it will return without the Future being
> > completed.
> >
> > > Thoughts?
> >
> > Sounds like like a good approach...  Perhaps I'll prototype it..
>
> Go for it! :)
>
> --
> James
> -------
> http://macstrac.blogspot.com/
>


-- 
Regards,
Hiram

Blog: http://hiramchirino.com
Index: camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/processor/Pipeline.java	(revision 570270)
+++ camel-core/src/main/java/org/apache/camel/processor/Pipeline.java	(working copy)
@@ -17,13 +17,18 @@
 package org.apache.camel.processor;
 
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.Message;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -33,7 +38,7 @@
  * 
  * @version $Revision$
  */
-public class Pipeline extends MulticastProcessor implements Processor {
+public class Pipeline extends MulticastProcessor implements AsyncProcessor {
     private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
 
     public Pipeline(Collection<Processor> processors) {
@@ -50,18 +55,67 @@
     }
 
     public void process(Exchange exchange) throws Exception {
+        // This could become a base class method for an AsyncProcessor
+        final CountDownLatch latch = new CountDownLatch(1);
+        if (!process(exchange, new AsyncCallback() {
+            public void done(boolean sync) {
+                if (sync) {
+                    return;
+                }
+                latch.countDown();
+            }
+        })) {
+            latch.await();
+        }
+        // If there was an exception associated with the exchange, throw it.
+        exchange.throwException();
+    }
+    
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        Iterator<Processor> processors = getProcessors().iterator();
         Exchange nextExchange = exchange;
-        boolean first = true;
-        for (Processor producer : getProcessors()) {
-            if (first) {
-                first = false;
+        while (processors.hasNext()) {
+            AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
+            boolean sync = process(nextExchange, callback, processors, processor);
+            // Continue processing the pipeline synchronously ...
+            if (sync) {
+                nextExchange = createNextExchange(processor, exchange);
             } else {
-                nextExchange = createNextExchange(producer, nextExchange);
+                // The pipeline will be completed async...
+                return true;
             }
-            producer.process(nextExchange);
         }
+        // If we get here then the pipeline was processed entirely
+        // synchronously.
+        callback.done(true);
+        return true;
     }
 
+    private boolean process(final Exchange exchange, final AsyncCallback callback, final Iterator<Processor> processors, AsyncProcessor processor) {
+        return processor.process(exchange, new AsyncCallback() {
+            public void done(boolean sync) {
+                
+                // We only have to handle async completion of
+                // the pipeline..  
+                if( sync ) {
+                    return;
+                }
+                
+                // Continue processing the pipeline... 
+                Exchange nextExchange = exchange;
+                while( processors.hasNext() ) {
+                    AsyncProcessor processor = AsyncProcessorTypeConverter.convert(processors.next());
+                    nextExchange = createNextExchange(processor, exchange);
+                    sync = process( nextExchange, callback, processors, processor);
+                    if( !sync ) {
+                        return;
+                    }
+                }
+                callback.done(true);
+            }
+        });
+    }
+
     /**
      * Strategy method to create the next exchange from the
      * 
@@ -111,4 +165,5 @@
     public String toString() {
         return "Pipeline" + getProcessors();
     }
+
 }
Index: camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java	(revision 570270)
+++ camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java	(working copy)
@@ -16,22 +16,25 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.Service;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 /**
  * @version $Revision$
  */
-public class SendProcessor extends ServiceSupport implements Processor, Service {
+public class SendProcessor extends ServiceSupport implements AsyncProcessor, Service {
     private static final transient Log LOG = LogFactory.getLog(SendProcessor.class);
     private Endpoint destination;
     private Producer producer;
+    private AsyncProcessor processor;
 
     public SendProcessor(Endpoint destination) {
         if (destination == null) {
@@ -57,6 +60,21 @@
         }
     }
 
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        if (producer == null) {
+            if (isStopped()) {
+                LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
+            } else {
+                exchange.setException(new IllegalStateException("No producer, this processor has not been started!"));
+            }
+            callback.done(true);
+            return true;
+        } else {
+            return processor.process(exchange, callback);
+        }
+    }
+
+    
     public Endpoint getDestination() {
         return destination;
     }
@@ -64,6 +82,7 @@
     protected void doStart() throws Exception {
         this.producer = destination.createProducer();
         this.producer.start();
+        this.processor = AsyncProcessorTypeConverter.convert(producer);
     }
 
     protected void doStop() throws Exception {
@@ -72,6 +91,7 @@
                 producer.stop();
             } finally {
                 producer = null;
+                processor = null;
             }
         }
     }
Index: camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java	(revision 570270)
+++ camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java	(working copy)
@@ -16,11 +16,13 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.model.ExceptionType;
-import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,13 +35,22 @@
  * 
  * @version $Revision$
  */
-public class DeadLetterChannel extends ErrorHandlerSupport {
+public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor {
     public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter";
     public static final String REDELIVERED = "org.apache.camel.Redelivered";
 
+    private class RedeliveryData {
+        int redeliveryCounter;
+        long redeliveryDelay;
+
+        // default behaviour which can be overloaded on a per exception basis
+        RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
+        Processor failureProcessor = deadLetter;
+    }
+
     private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class);
-    private Processor output;
-    private Processor deadLetter;
+    private AsyncProcessor output;
+    private AsyncProcessor deadLetter;
     private RedeliveryPolicy redeliveryPolicy;
     private Logger logger;
 
@@ -47,14 +58,13 @@
         this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger());
     }
 
-    public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy,
-                             Logger logger) {
-        this.deadLetter = deadLetter;
-        this.output = output;
+    public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy, Logger logger) {
+        this.deadLetter = AsyncProcessorTypeConverter.convert(deadLetter);
+        this.output = AsyncProcessorTypeConverter.convert(output);
         this.redeliveryPolicy = redeliveryPolicy;
         this.logger = logger;
     }
-    
+
     public static <E extends Exchange> Logger createDefaultLogger() {
         return new Logger(LOG, LoggingLevel.ERROR);
     }
@@ -64,6 +74,67 @@
         return "DeadLetterChannel[" + output + ", " + deadLetter + ", " + redeliveryPolicy + "]";
     }
 
+    public boolean process(Exchange exchange, final AsyncCallback callback) {
+        return process(exchange, callback, new RedeliveryData());
+    }
+
+    public boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
+
+        while (true) {
+            if (exchange.getException() != null) {
+                Throwable e = exchange.getException();
+                exchange.setException(null); // Reset it since we are handling it.
+                
+                logger.log("On delivery attempt: " + data.redeliveryCounter + " caught: " + e, e);
+                data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
+
+                ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
+                if (exceptionPolicy != null) {
+                    data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(data.currentRedeliveryPolicy);
+                    Processor processor = exceptionPolicy.getErrorHandler();
+                    if (processor != null) {
+                        data.failureProcessor = processor;
+                    }
+                }
+            }
+
+            if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
+                AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
+                return afp.process(exchange, callback);
+            }
+
+            if (data.redeliveryCounter > 0) {
+                // Figure out how long we should wait to resend this message.
+                data.redeliveryDelay = data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
+                sleep(data.redeliveryDelay);
+            }
+
+            boolean sync = output.process(exchange, new AsyncCallback() {
+                public void done(boolean sync) {
+                    // Only handle the async case...
+                    if (sync) {
+                        return;
+                    }
+                    if (exchange.getException() != null) {
+                        process(exchange, callback, data);
+                    } else {
+                        callback.done(sync);
+                    }
+                }
+            });
+            if (!sync) {
+                // It is going to be processed async..
+                return false;
+            }
+            if (exchange.getException() == null) {
+                // If everything went well.. then we exit here..
+                return true;
+            }
+            // error occured so loop back around.....
+        }
+
+    }
+    
     public void process(Exchange exchange) throws Exception {
         int redeliveryCounter = 0;
         long redeliveryDelay = 0;
@@ -86,7 +157,6 @@
                 logger.log("On delivery attempt: " + redeliveryCounter + " caught: " + e, e);
                 redeliveryCounter = incrementRedeliveryCounter(exchange, e);
 
-
                 ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
                 if (exceptionPolicy != null) {
                     currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(currentRedeliveryPolicy);
@@ -185,4 +255,5 @@
     protected void doStop() throws Exception {
         ServiceHelper.stopServices(deadLetter, output);
     }
+
 }
Index: camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java	(revision 570270)
+++ camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java	(working copy)
@@ -47,6 +47,7 @@
     public DefaultTypeConverter(Injector injector) {
         typeConverterLoaders.add(new AnnotationTypeConverterLoader());
         this.injector = injector;
+        addFallbackConverter(new AsyncProcessorTypeConverter());
         addFallbackConverter(new PropertyEditorTypeConverter());
         addFallbackConverter(new ToStringTypeConverter());
         addFallbackConverter(new ArrayTypeConverter());
Index: camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java	(revision 570270)
+++ camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java	(working copy)
@@ -16,10 +16,15 @@
  */
 package org.apache.camel.impl;
 
+import java.util.concurrent.Future;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.ServiceHelper;
 
@@ -29,6 +34,7 @@
 public class DefaultConsumer<E extends Exchange> extends ServiceSupport implements Consumer<E> {
     private Endpoint<E> endpoint;
     private Processor processor;
+    private AsyncProcessor asyncProcessor;
     private ExceptionHandler exceptionHandler;
 
     public DefaultConsumer(Endpoint<E> endpoint, Processor processor) {
@@ -49,6 +55,20 @@
         return processor;
     }
 
+    /**
+     * Provides an [EMAIL PROTECTED] AsyncProcessor} interface to the configured
+     * processor on the consumer.  If the processor does not implement
+     * the interface, it will be adapted so that it does.  
+     * 
+     * @return
+     */
+    public AsyncProcessor getAsyncProcessor() {
+        if (asyncProcessor == null) {
+            asyncProcessor = AsyncProcessorTypeConverter.convert(processor);
+        }
+        return asyncProcessor;
+    }
+
     public ExceptionHandler getExceptionHandler() {
         if (exceptionHandler == null) {
             exceptionHandler = new LoggingExceptionHandler(getClass());
Index: camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java	(revision 570270)
+++ camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java	(working copy)
@@ -22,6 +22,7 @@
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.util.UuidGenerator;
 
 /**
@@ -182,6 +183,19 @@
         this.exception = exception;
     }
 
+    public void throwException() throws Exception {
+        if (exception == null) {
+            return;
+        }
+        if (exception instanceof Exception) {
+            throw (Exception)exception;
+        }
+        if (exception instanceof RuntimeException) {
+            throw (RuntimeException)exception;
+        }
+        throw new RuntimeCamelException(exception);
+    }
+
     public Message getFault() {
         return fault;
     }
@@ -222,4 +236,5 @@
             messageSupport.setExchange(this);
         }
     }
+
 }
Index: camel-core/src/main/java/org/apache/camel/Exchange.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/Exchange.java	(revision 570270)
+++ camel-core/src/main/java/org/apache/camel/Exchange.java	(working copy)
@@ -133,6 +133,11 @@
     void setException(Throwable e);
 
     /**
+     * Throws the exception associated with this exchange.
+     */
+    void throwException() throws Exception;
+
+    /**
      * Returns the container so that a processor can resolve endpoints from URIs
      * 
      * @return the container which owns this exchange
@@ -152,4 +157,5 @@
      * copied
      */
     void copyFrom(Exchange source);
+
 }
Index: camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java	(revision 570270)
+++ camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java	(working copy)
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.file;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.file.strategy.FileProcessStrategy;
 import org.apache.camel.impl.ScheduledPollConsumer;
@@ -23,6 +25,9 @@
 import org.apache.commons.logging.LogFactory;
 
 import java.io.File;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 /**
  * @version $Revision: 523016 $
@@ -62,8 +67,12 @@
             LOG.debug("Skipping directory " + fileOrDirectory);
         }
     }
+    
+    ConcurrentHashMap<File, File> filesBeingProcessed = new ConcurrentHashMap<File, File>();
 
     protected void pollFile(final File file) {
+        
+
         if (!file.exists()) {
             return;
         }
@@ -77,10 +86,15 @@
                     }
                     return;
                 }
+            } else {
+                if (filesBeingProcessed.contains(file)) {
+                    return;
+                }
+                filesBeingProcessed.put(file, file);
             }
 
-            FileProcessStrategy processStrategy = endpoint.getFileStrategy();
-            FileExchange exchange = endpoint.createExchange(file);
+            final FileProcessStrategy processStrategy = endpoint.getFileStrategy();
+            final FileExchange exchange = endpoint.createExchange(file);
 
             if (isPreserveFileName()) {
                 String relativePath = file.getPath().substring(endpoint.getFile().getPath().length());
@@ -95,8 +109,25 @@
                     LOG.debug("About to process file:  " + file + " using exchange: " + exchange);
                 }
                 if (processStrategy.begin(endpoint, exchange, file)) {
-                    getProcessor().process(exchange);
-                    processStrategy.commit(endpoint, exchange, file);
+                    
+                    // Use the async processor interface so that processing of
+                    // the
+                    // exchange can happen asynchronously
+                    getAsyncProcessor().process(exchange, new AsyncCallback() {
+                        public void done(boolean sync) {
+                            if (exchange.getException() == null) {
+                                try {
+                                    processStrategy.commit(endpoint, (FileExchange)exchange, file);
+                                } catch (Exception e) {
+                                    handleException(e);
+                                }
+                            } else {
+                                handleException(exchange.getException());
+                            }
+                            filesBeingProcessed.remove(file);
+                        }
+                    });
+                    
                 }
                 else {
                     if (LOG.isDebugEnabled()) {
Index: camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java	(revision 570270)
+++ camel-core/src/main/java/org/apache/camel/component/vm/VmComponent.java	(working copy)
@@ -24,6 +24,7 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.component.seda.SedaComponent;
 import org.apache.camel.component.seda.SedaEndpoint;
+import org.apache.camel.component.seda.SedaEndpoint.Entry;
 
 /**
  * An implementation of the <a href="http://activemq.apache.org/camel/vm.html";>VM components</a>
@@ -34,19 +35,19 @@
  * @version $Revision: 1.1 $
  */
 public class VmComponent<E extends Exchange> extends SedaComponent<E> {
-    protected static Map<String, BlockingQueue<Exchange>> queues = new HashMap<String, BlockingQueue<Exchange>>();
+    protected static Map<String, BlockingQueue> queues = new HashMap<String, BlockingQueue>();
 
     @Override
     protected Endpoint<E> createEndpoint(String uri, String remaining, Map parameters) throws Exception {
-        BlockingQueue<E> blockingQueue = (BlockingQueue<E>) getBlockingQueue(uri);
+        BlockingQueue<SedaEndpoint.Entry<E>> blockingQueue = (BlockingQueue<SedaEndpoint.Entry<E>>) getBlockingQueue(uri);
         return new SedaEndpoint<E>(uri, this, blockingQueue);
     }
 
-    protected BlockingQueue<Exchange> getBlockingQueue(String uri) {
+    protected BlockingQueue<Entry<E>> getBlockingQueue(String uri) {
         synchronized (queues) {
-            BlockingQueue<Exchange> answer = queues.get(uri);
+            BlockingQueue<Entry<E>> answer = queues.get(uri);
             if (answer == null) {
-                answer = (BlockingQueue<Exchange>) createQueue();
+                answer = createQueue();
                 queues.put(uri, answer);
             }
             return answer;
Index: camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java	(revision 570270)
+++ camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java	(working copy)
@@ -31,8 +31,8 @@
  * @version $Revision: 1.1 $
  */
 public class SedaComponent<E extends Exchange> extends DefaultComponent<E> {
-    public BlockingQueue<E> createQueue() {
-        return new LinkedBlockingQueue<E>(1000);
+    public BlockingQueue<SedaEndpoint.Entry<E>> createQueue() {
+        return new LinkedBlockingQueue<SedaEndpoint.Entry<E>>(1000);
     }
 
     @Override
Index: camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java	(revision 570270)
+++ camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java	(working copy)
@@ -19,10 +19,13 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.AlreadyStoppedException;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -33,12 +36,12 @@
     private static final Log LOG = LogFactory.getLog(SedaConsumer.class);
 
     private SedaEndpoint<E> endpoint;
-    private Processor processor;
+    private AsyncProcessor processor;
     private Thread thread;
 
     public SedaConsumer(SedaEndpoint<E> endpoint, Processor processor) {
         this.endpoint = endpoint;
-        this.processor = processor;
+        this.processor = AsyncProcessorTypeConverter.convert(processor);
     }
 
     @Override
@@ -48,21 +51,30 @@
 
     public void run() {
         while (!isStopping()) {
-            E exchange;
+            final SedaEndpoint.Entry<E> entry;
             try {
-                exchange = endpoint.getQueue().poll(1000, TimeUnit.MILLISECONDS);
+                entry = endpoint.getQueue().poll(1000, TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
                 break;
             }
-            if (exchange != null && !isStopping()) {
-                try {
-                    processor.process(exchange);
-                } catch (AlreadyStoppedException e) {
-                    LOG.debug("Ignoring failed message due to shutdown: " + e, e);
-                    break;
-                } catch (Throwable e) {
-                    LOG.error(e);
-                }
+            if (entry != null && !isStopping()) {
+                processor.process(entry.getExchange(), new AsyncCallback() {
+                    public void done(boolean sync) {
+                        if (entry.getCallback() != null) {
+                            entry.getCallback().done(false);
+                        } else {
+                            Throwable e = entry.getExchange().getException();
+                            if (e != null) {
+                                if (e instanceof AlreadyStoppedException) {
+                                    LOG.debug("Ignoring failed message due to shutdown: " + e, e);
+                                } else {
+                                    LOG.error(e);
+                                }
+                            }
+                        }
+                    }
+                });
+
             }
         }
     }
Index: camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
===================================================================
--- camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java	(revision 570270)
+++ camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java	(working copy)
@@ -18,8 +18,11 @@
 
 import java.util.concurrent.BlockingQueue;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -35,9 +38,47 @@
  * @version $Revision: 519973 $
  */
 public class SedaEndpoint<E extends Exchange> extends DefaultEndpoint<E> {
-    private BlockingQueue<E> queue;
+    
+    static public class Entry<E extends Exchange> {
+        E exchange;
+        AsyncCallback callback;
+        
+        public Entry(E exchange, AsyncCallback callback) {
+            this.exchange = exchange;
+            this.callback = callback;
+        }
+        
+        public E getExchange() {
+            return exchange;
+        }
+        public void setExchange(E exchange) {
+            this.exchange = exchange;
+        }
+        public AsyncCallback getCallback() {
+            return callback;
+        }
+        public void setCallback(AsyncCallback callback) {
+            this.callback = callback;
+        }
+        
+    }
+    
+    private final class SedaProducer extends DefaultProducer implements AsyncProcessor {
+        private SedaProducer(Endpoint endpoint) {
+            super(endpoint);
+        }
+        public void process(Exchange exchange) {
+            queue.add(new Entry<E>(toExchangeType(exchange), null));
+        }
+        public boolean process(Exchange exchange, AsyncCallback callback) {
+            queue.add(new Entry<E>(toExchangeType(exchange), callback));
+            return false;
+        }
+    }
 
-    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<E> queue) {
+    private BlockingQueue<Entry<E>> queue;
+
+    public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Entry<E>> queue) {
         super(endpointUri, component);
         this.queue = queue;
     }
@@ -47,11 +88,7 @@
     }
 
     public Producer<E> createProducer() throws Exception {
-        return new DefaultProducer(this) {
-            public void process(Exchange exchange) {
-                queue.add(toExchangeType(exchange));
-            }
-        };
+        return new SedaProducer(this);
     }
 
     public Consumer<E> createConsumer(Processor processor) throws Exception {
@@ -64,7 +101,7 @@
         return (E)new DefaultExchange(getContext());
     }
 
-    public BlockingQueue<E> getQueue() {
+    public BlockingQueue<Entry<E>> getQueue() {
         return queue;
     }
 

Reply via email to