CAMEL-10308
Provide a way to use async engine from ProducerTemplate

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/51a80684
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/51a80684
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/51a80684

Branch: refs/heads/master
Commit: 51a8068424cc9a265036220d70ef6d2af0312c3a
Parents: 5104c16
Author: Vitalii Tymchyshyn <v...@tym.im>
Authored: Sun Sep 11 19:25:42 2016 -0400
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Sat Sep 17 09:34:52 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/ProducerTemplate.java |  76 +++--
 ...AsyncCallbackToCompletableFutureAdapter.java |  55 ++++
 .../camel/impl/DefaultProducerTemplate.java     | 303 ++++++++-----------
 .../camel/impl/EventNotifierCallback.java       |  52 ++++
 .../org/apache/camel/impl/ProducerCache.java    | 138 ++++++++-
 .../org/apache/camel/util/ExchangeHelper.java   |   4 +-
 .../impl/DefaultProducerTemplateAsyncTest.java  |   4 +-
 ...ultProducerTemplateNonBlockingAsyncTest.java |  45 +++
 8 files changed, 453 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java 
b/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
index 9589072..6461e4e 100644
--- a/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
+++ b/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
@@ -16,14 +16,15 @@
  */
 package org.apache.camel;
 
+import org.apache.camel.spi.Synchronization;
+
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.camel.spi.Synchronization;
-
 /**
  * Template for working with Camel and sending {@link Message} instances in an
  * {@link Exchange} to an {@link Endpoint}.
@@ -91,6 +92,21 @@ public interface ProducerTemplate extends Service {
      * @return the size of current cached resources
      */
     int getCurrentCacheSize();
+
+    /**
+     * Reports if async* methods will dispath processing from the calling 
thread (true) or through executor (false).
+     * They will still employ asynchronous engine, so this mode can be useful 
for high-speed non-blocking processing.
+     * @return if async* methods will run in the calling thread
+     */
+    boolean isSynchronous();
+
+    /**
+     * Reports if async* methods will dispath processing from the calling 
thread (true) or through executor (false).
+     * In any case they would still employ asynchronous engine, so setting to 
true can be useful
+     * for high-speed non-blocking processing.
+     * @param synchronous if async* methods will run in the calling thread
+     */
+    void setSynchronous(boolean synchronous);
     
     /**
      * Get the default endpoint to use if none is specified
@@ -895,7 +911,7 @@ public interface ProducerTemplate extends Service {
      * @param exchange    the exchange to send
      * @return a handle to be used to get the response in the future
      */
-    Future<Exchange> asyncSend(String endpointUri, Exchange exchange);
+    CompletableFuture<Exchange> asyncSend(String endpointUri, Exchange 
exchange);
 
     /**
      * Sends an asynchronous exchange to the given endpoint.
@@ -904,7 +920,7 @@ public interface ProducerTemplate extends Service {
      * @param processor   the transformer used to populate the new exchange
      * @return a handle to be used to get the response in the future
      */
-    Future<Exchange> asyncSend(String endpointUri, Processor processor);
+    CompletableFuture<Exchange> asyncSend(String endpointUri, Processor 
processor);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -914,7 +930,7 @@ public interface ProducerTemplate extends Service {
      * @param body        the body to send
      * @return a handle to be used to get the response in the future
      */
-    Future<Object> asyncSendBody(String endpointUri, Object body);
+    CompletableFuture<Object> asyncSendBody(String endpointUri, Object body);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -924,7 +940,7 @@ public interface ProducerTemplate extends Service {
      * @param body        the body to send
      * @return a handle to be used to get the response in the future
      */
-    Future<Object> asyncRequestBody(String endpointUri, Object body);
+    CompletableFuture<Object> asyncRequestBody(String endpointUri, Object 
body);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -936,7 +952,7 @@ public interface ProducerTemplate extends Service {
      * @param headerValue the header value
      * @return a handle to be used to get the response in the future
      */
-    Future<Object> asyncRequestBodyAndHeader(String endpointUri, Object body, 
String header, Object headerValue);
+    CompletableFuture<Object> asyncRequestBodyAndHeader(String endpointUri, 
Object body, String header, Object headerValue);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -947,7 +963,7 @@ public interface ProducerTemplate extends Service {
      * @param headers     headers
      * @return a handle to be used to get the response in the future
      */
-    Future<Object> asyncRequestBodyAndHeaders(String endpointUri, Object body, 
Map<String, Object> headers);
+    CompletableFuture<Object> asyncRequestBodyAndHeaders(String endpointUri, 
Object body, Map<String, Object> headers);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -958,7 +974,7 @@ public interface ProducerTemplate extends Service {
      * @param type        the expected response type
      * @return a handle to be used to get the response in the future
      */
-    <T> Future<T> asyncRequestBody(String endpointUri, Object body, Class<T> 
type);
+    <T> CompletableFuture<T> asyncRequestBody(String endpointUri, Object body, 
Class<T> type);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -971,7 +987,7 @@ public interface ProducerTemplate extends Service {
      * @param type        the expected response type
      * @return a handle to be used to get the response in the future
      */
-    <T> Future<T> asyncRequestBodyAndHeader(String endpointUri, Object body, 
String header, Object headerValue, Class<T> type);
+    <T> CompletableFuture<T> asyncRequestBodyAndHeader(String endpointUri, 
Object body, String header, Object headerValue, Class<T> type);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -983,7 +999,7 @@ public interface ProducerTemplate extends Service {
      * @param type        the expected response type
      * @return a handle to be used to get the response in the future
      */
-    <T> Future<T> asyncRequestBodyAndHeaders(String endpointUri, Object body, 
Map<String, Object> headers, Class<T> type);
+    <T> CompletableFuture<T> asyncRequestBodyAndHeaders(String endpointUri, 
Object body, Map<String, Object> headers, Class<T> type);
 
     /**
      * Sends an asynchronous exchange to the given endpoint.
@@ -992,7 +1008,7 @@ public interface ProducerTemplate extends Service {
      * @param exchange    the exchange to send
      * @return a handle to be used to get the response in the future
      */
-    Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange);
+    CompletableFuture<Exchange> asyncSend(Endpoint endpoint, Exchange 
exchange);
 
     /**
      * Sends an asynchronous exchange to the given endpoint.
@@ -1001,7 +1017,7 @@ public interface ProducerTemplate extends Service {
      * @param processor   the transformer used to populate the new exchange
      * @return a handle to be used to get the response in the future
      */
-    Future<Exchange> asyncSend(Endpoint endpoint, Processor processor);
+    CompletableFuture<Exchange> asyncSend(Endpoint endpoint, Processor 
processor);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -1011,7 +1027,7 @@ public interface ProducerTemplate extends Service {
      * @param body        the body to send
      * @return a handle to be used to get the response in the future
      */
-    Future<Object> asyncSendBody(Endpoint endpoint, Object body);
+    CompletableFuture<Object> asyncSendBody(Endpoint endpoint, Object body);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -1021,7 +1037,7 @@ public interface ProducerTemplate extends Service {
      * @param body        the body to send
      * @return a handle to be used to get the response in the future
      */
-    Future<Object> asyncRequestBody(Endpoint endpoint, Object body);
+    CompletableFuture<Object> asyncRequestBody(Endpoint endpoint, Object body);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -1033,7 +1049,7 @@ public interface ProducerTemplate extends Service {
      * @param headerValue the header value
      * @return a handle to be used to get the response in the future
      */
-    Future<Object> asyncRequestBodyAndHeader(Endpoint endpoint, Object body, 
String header, Object headerValue);
+    CompletableFuture<Object> asyncRequestBodyAndHeader(Endpoint endpoint, 
Object body, String header, Object headerValue);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -1044,7 +1060,7 @@ public interface ProducerTemplate extends Service {
      * @param headers     headers
      * @return a handle to be used to get the response in the future
      */
-    Future<Object> asyncRequestBodyAndHeaders(Endpoint endpoint, Object body, 
Map<String, Object> headers);
+    CompletableFuture<Object> asyncRequestBodyAndHeaders(Endpoint endpoint, 
Object body, Map<String, Object> headers);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -1055,7 +1071,7 @@ public interface ProducerTemplate extends Service {
      * @param type        the expected response type
      * @return a handle to be used to get the response in the future
      */
-    <T> Future<T> asyncRequestBody(Endpoint endpoint, Object body, Class<T> 
type);
+    <T> CompletableFuture<T> asyncRequestBody(Endpoint endpoint, Object body, 
Class<T> type);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -1068,7 +1084,7 @@ public interface ProducerTemplate extends Service {
      * @param type        the expected response type
      * @return a handle to be used to get the response in the future
      */
-    <T> Future<T> asyncRequestBodyAndHeader(Endpoint endpoint, Object body, 
String header, Object headerValue, Class<T> type);
+    <T> CompletableFuture<T> asyncRequestBodyAndHeader(Endpoint endpoint, 
Object body, String header, Object headerValue, Class<T> type);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -1080,7 +1096,7 @@ public interface ProducerTemplate extends Service {
      * @param type        the expected response type
      * @return a handle to be used to get the response in the future
      */
-    <T> Future<T> asyncRequestBodyAndHeaders(Endpoint endpoint, Object body, 
Map<String, Object> headers, Class<T> type);
+    <T> CompletableFuture<T> asyncRequestBodyAndHeaders(Endpoint endpoint, 
Object body, Map<String, Object> headers, Class<T> type);
 
     /**
      * Gets the response body from the future handle, will wait until the 
response is ready.
@@ -1093,7 +1109,7 @@ public interface ProducerTemplate extends Service {
      * @return the result (see class javadoc)
      * @throws CamelExecutionException if the processing of the exchange failed
      */
-    <T> T extractFutureBody(Future<Object> future, Class<T> type) throws 
CamelExecutionException;
+    <T> T extractFutureBody(Future<?> future, Class<T> type) throws 
CamelExecutionException;
 
     /**
      * Gets the response body from the future handle, will wait at most the 
given time for the response to be ready.
@@ -1109,7 +1125,7 @@ public interface ProducerTemplate extends Service {
      * @throws java.util.concurrent.TimeoutException if the wait timed out
      * @throws CamelExecutionException if the processing of the exchange failed
      */
-    <T> T extractFutureBody(Future<Object> future, long timeout, TimeUnit 
unit, Class<T> type) throws TimeoutException, CamelExecutionException;
+    <T> T extractFutureBody(Future<?> future, long timeout, TimeUnit unit, 
Class<T> type) throws TimeoutException, CamelExecutionException;
 
     // Asynchronous methods with callback
     // -----------------------------------------------------------------------
@@ -1122,7 +1138,7 @@ public interface ProducerTemplate extends Service {
      * @param onCompletion  callback invoked when exchange has been completed
      * @return a handle to be used to get the response in the future
      */
-    Future<Exchange> asyncCallback(String endpointUri, Exchange exchange, 
Synchronization onCompletion);
+    CompletableFuture<Exchange> asyncCallback(String endpointUri, Exchange 
exchange, Synchronization onCompletion);
 
     /**
      * Sends an asynchronous exchange to the given endpoint.
@@ -1132,7 +1148,7 @@ public interface ProducerTemplate extends Service {
      * @param onCompletion  callback invoked when exchange has been completed
      * @return a handle to be used to get the response in the future
      */
-    Future<Exchange> asyncCallback(Endpoint endpoint, Exchange exchange, 
Synchronization onCompletion);
+    CompletableFuture<Exchange> asyncCallback(Endpoint endpoint, Exchange 
exchange, Synchronization onCompletion);
 
     /**
      * Sends an asynchronous exchange to the given endpoint using a supplied 
processor.
@@ -1143,7 +1159,7 @@ public interface ProducerTemplate extends Service {
      * @param onCompletion  callback invoked when exchange has been completed
      * @return a handle to be used to get the response in the future
      */
-    Future<Exchange> asyncCallback(String endpointUri, Processor processor, 
Synchronization onCompletion);
+    CompletableFuture<Exchange> asyncCallback(String endpointUri, Processor 
processor, Synchronization onCompletion);
 
     /**
      * Sends an asynchronous exchange to the given endpoint using a supplied 
processor.
@@ -1154,7 +1170,7 @@ public interface ProducerTemplate extends Service {
      * @param onCompletion  callback invoked when exchange has been completed
      * @return a handle to be used to get the response in the future
      */
-    Future<Exchange> asyncCallback(Endpoint endpoint, Processor processor, 
Synchronization onCompletion);
+    CompletableFuture<Exchange> asyncCallback(Endpoint endpoint, Processor 
processor, Synchronization onCompletion);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -1165,7 +1181,7 @@ public interface ProducerTemplate extends Service {
      * @param onCompletion  callback invoked when exchange has been completed
      * @return a handle to be used to get the response in the future
      */
-    Future<Object> asyncCallbackSendBody(String endpointUri, Object body, 
Synchronization onCompletion);
+    CompletableFuture<Object> asyncCallbackSendBody(String endpointUri, Object 
body, Synchronization onCompletion);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -1176,7 +1192,7 @@ public interface ProducerTemplate extends Service {
      * @param onCompletion  callback invoked when exchange has been completed
      * @return a handle to be used to get the response in the future
      */
-    Future<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, 
Synchronization onCompletion);
+    CompletableFuture<Object> asyncCallbackSendBody(Endpoint endpoint, Object 
body, Synchronization onCompletion);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -1187,7 +1203,7 @@ public interface ProducerTemplate extends Service {
      * @param onCompletion  callback invoked when exchange has been completed
      * @return a handle to be used to get the response in the future
      */
-    Future<Object> asyncCallbackRequestBody(String endpointUri, Object body, 
Synchronization onCompletion);
+    CompletableFuture<Object> asyncCallbackRequestBody(String endpointUri, 
Object body, Synchronization onCompletion);
 
     /**
      * Sends an asynchronous body to the given endpoint.
@@ -1198,6 +1214,6 @@ public interface ProducerTemplate extends Service {
      * @param onCompletion  callback invoked when exchange has been completed
      * @return a handle to be used to get the response in the future
      */
-    Future<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, 
Synchronization onCompletion);
+    CompletableFuture<Object> asyncCallbackRequestBody(Endpoint endpoint, 
Object body, Synchronization onCompletion);
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java
 
b/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java
new file mode 100644
index 0000000..0baa510
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.AsyncCallback;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * AsyncCallback that provides a CompletableFuture when async action is done
+ */
+public class AsyncCallbackToCompletableFutureAdapter<T> implements 
AsyncCallback{
+    private final CompletableFuture<T> future;
+    private volatile T result;
+
+    public AsyncCallbackToCompletableFutureAdapter() {
+        this(null);
+    }
+
+    public AsyncCallbackToCompletableFutureAdapter(T result) {
+        this(null, result);
+    }
+
+    public AsyncCallbackToCompletableFutureAdapter(CompletableFuture<T> 
future, T result) {
+        this.future = future != null ? future : new CompletableFuture<>();
+        this.result = result;
+    }
+
+    public void setResult(T result) {
+        this.result = result;
+    }
+
+    public CompletableFuture<T> getFuture() {
+        return future;
+    }
+
+    @Override
+    public void done(boolean doneSync) {
+        future.complete(result);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
index 6eff970..19958e4 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
@@ -17,11 +17,12 @@
 package org.apache.camel.impl;
 
 import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExecutionException;
@@ -39,6 +40,7 @@ import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.apache.camel.util.concurrent.SynchronousExecutorService;
 
 /**
  * Template (named like Spring's TransactionTemplate & JmsTemplate
@@ -54,6 +56,7 @@ public class DefaultProducerTemplate extends ServiceSupport 
implements ProducerT
     private Endpoint defaultEndpoint;
     private int maximumCacheSize;
     private boolean eventNotifierEnabled = true;
+    private volatile boolean synchronous;
 
     public DefaultProducerTemplate(CamelContext camelContext) {
         this.camelContext = camelContext;
@@ -82,6 +85,16 @@ public class DefaultProducerTemplate extends ServiceSupport 
implements ProducerT
         this.maximumCacheSize = maximumCacheSize;
     }
 
+    @Override
+    public boolean isSynchronous() {
+        return synchronous;
+    }
+
+    @Override
+    public void setSynchronous(boolean synchronous) {
+        this.synchronous = synchronous;
+    }
+
     public int getCurrentCacheSize() {
         if (producerCache == null) {
             return 0;
@@ -241,17 +254,7 @@ public class DefaultProducerTemplate extends 
ServiceSupport implements ProducerT
     }
 
     public void sendBodyAndHeaders(Endpoint endpoint, final Object body, final 
Map<String, Object> headers) throws CamelExecutionException {
-        Exchange result = send(endpoint, new Processor() {
-            public void process(Exchange exchange) {
-                Message in = exchange.getIn();
-                if (headers != null) {
-                    for (Map.Entry<String, Object> header : 
headers.entrySet()) {
-                        in.setHeader(header.getKey(), header.getValue());
-                    }
-                }
-                in.setBody(body);
-            }
-        });
+        Exchange result = send(endpoint, createBodyAndHeaders(body, headers));
         // must invoke extract result body in case of exception to be rethrown
         extractResultBody(result);
     }
@@ -261,17 +264,7 @@ public class DefaultProducerTemplate extends 
ServiceSupport implements ProducerT
     }
 
     public Object sendBodyAndHeaders(Endpoint endpoint, ExchangePattern 
pattern, final Object body, final Map<String, Object> headers) throws 
CamelExecutionException {
-        Exchange exchange = send(endpoint, pattern, new Processor() {
-            public void process(Exchange exchange) throws Exception {
-                Message in = exchange.getIn();
-                if (headers != null) {
-                    for (Map.Entry<String, Object> header : 
headers.entrySet()) {
-                        in.setHeader(header.getKey(), header.getValue());
-                    }
-                }
-                in.setBody(body);
-            }
-        });
+        Exchange exchange = send(endpoint, pattern, createBodyAndHeaders(body, 
headers));
         Object result = extractResultBody(exchange, pattern);
         if (pattern.isOutCapable()) {
             return result;
@@ -485,6 +478,21 @@ public class DefaultProducerTemplate extends 
ServiceSupport implements ProducerT
         return new ConvertBodyProcessor(type);
     }
 
+    protected Function<Exchange, Exchange> 
createCompletionFunction(Synchronization onCompletion) {
+        return (answer) -> {
+            // invoke callback before returning answer
+            // as it allows callback to be used without unit of work invoking 
it
+            // and thus it works directly from a producer template as well, as 
opposed
+            // to the unit of work that is injected in routes
+            if (answer.isFailed()) {
+                onCompletion.onFailure(answer);
+            } else {
+                onCompletion.onComplete(answer);
+            }
+            return answer;
+        };
+    }
+
     protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
         Endpoint endpoint = camelContext.getEndpoint(endpointUri);
         if (endpoint == null) {
@@ -511,228 +519,169 @@ public class DefaultProducerTemplate extends 
ServiceSupport implements ProducerT
         this.executor = executorService;
     }
 
-    public Future<Exchange> asyncSend(final String uri, final Exchange 
exchange) {
+    public CompletableFuture<Exchange> asyncSend(final String uri, final 
Exchange exchange) {
         return asyncSend(resolveMandatoryEndpoint(uri), exchange);
     }
 
-    public Future<Exchange> asyncSend(final String uri, final Processor 
processor) {
+    public CompletableFuture<Exchange> asyncSend(final String uri, final 
Processor processor) {
         return asyncSend(resolveMandatoryEndpoint(uri), processor);
     }
 
-    public Future<Object> asyncSendBody(final String uri, final Object body) {
+    public CompletableFuture<Object> asyncSendBody(final String uri, final 
Object body) {
         return asyncSendBody(resolveMandatoryEndpoint(uri), body);
     }
 
-    public Future<Object> asyncRequestBody(final String uri, final Object 
body) {
+    public CompletableFuture<Object> asyncRequestBody(final String uri, final 
Object body) {
         return asyncRequestBody(resolveMandatoryEndpoint(uri), body);
     }
 
-    public <T> Future<T> asyncRequestBody(final String uri, final Object body, 
final Class<T> type) {
-        return asyncRequestBody(resolveMandatoryEndpoint(uri), body, type);
+    public <T> CompletableFuture<T> asyncRequestBody(final String uri, final 
Object body, final Class<T> type) {
+        return asyncRequestBody(resolveMandatoryEndpoint(uri), 
createSetBodyProcessor(body), type);
     }
 
-    public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, 
final Object body, final String header, final Object headerValue) {
+    public CompletableFuture<Object> asyncRequestBodyAndHeader(final String 
endpointUri, final Object body, final String header, final Object headerValue) {
         return 
asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, 
headerValue);
     }
 
-    public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, 
final Object body, final String header, final Object headerValue, final 
Class<T> type) {
+    public <T> CompletableFuture<T> asyncRequestBodyAndHeader(final String 
endpointUri, final Object body, final String header, final Object headerValue, 
final Class<T> type) {
         return 
asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, 
headerValue, type);
     }
 
-    public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, 
final Object body, final Map<String, Object> headers) {
+    public CompletableFuture<Object> asyncRequestBodyAndHeaders(final String 
endpointUri, final Object body, final Map<String, Object> headers) {
         return 
asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, 
headers);
     }
 
-    public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, 
final Object body, final Map<String, Object> headers, final Class<T> type) {
+    public <T> CompletableFuture<T> asyncRequestBodyAndHeaders(final String 
endpointUri, final Object body, final Map<String, Object> headers, final 
Class<T> type) {
         return 
asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, 
headers, type);
     }
 
-    public <T> T extractFutureBody(Future<Object> future, Class<T> type) {
+    public <T> T extractFutureBody(Future<?> future, Class<T> type) {
         return ExchangeHelper.extractFutureBody(camelContext, future, type);
     }
 
-    public <T> T extractFutureBody(Future<Object> future, long timeout, 
TimeUnit unit, Class<T> type) throws TimeoutException {
+    public <T> T extractFutureBody(Future<?> future, long timeout, TimeUnit 
unit, Class<T> type) throws TimeoutException {
         return ExchangeHelper.extractFutureBody(camelContext, future, timeout, 
unit, type);
     }
 
-    public Future<Object> asyncCallbackSendBody(String uri, Object body, 
Synchronization onCompletion) {
+    public CompletableFuture<Object> asyncCallbackSendBody(String uri, Object 
body, Synchronization onCompletion) {
         return asyncCallbackSendBody(resolveMandatoryEndpoint(uri), body, 
onCompletion);
     }
 
-    public Future<Object> asyncCallbackSendBody(Endpoint endpoint, Object 
body, Synchronization onCompletion) {
+    public CompletableFuture<Object> asyncCallbackSendBody(Endpoint endpoint, 
Object body, Synchronization onCompletion) {
         return asyncCallback(endpoint, ExchangePattern.InOnly, body, 
onCompletion);
     }
 
-    public Future<Object> asyncCallbackRequestBody(String uri, Object body, 
Synchronization onCompletion) {
+    public CompletableFuture<Object> asyncCallbackRequestBody(String uri, 
Object body, Synchronization onCompletion) {
         return asyncCallbackRequestBody(resolveMandatoryEndpoint(uri), body, 
onCompletion);
     }
 
-    public Future<Object> asyncCallbackRequestBody(Endpoint endpoint, Object 
body, Synchronization onCompletion) {
+    public CompletableFuture<Object> asyncCallbackRequestBody(Endpoint 
endpoint, Object body, Synchronization onCompletion) {
         return asyncCallback(endpoint, ExchangePattern.InOut, body, 
onCompletion);
     }
 
-    public Future<Exchange> asyncCallback(String uri, Exchange exchange, 
Synchronization onCompletion) {
+    public CompletableFuture<Exchange> asyncCallback(String uri, Exchange 
exchange, Synchronization onCompletion) {
         return asyncCallback(resolveMandatoryEndpoint(uri), exchange, 
onCompletion);
     }
 
-    public Future<Exchange> asyncCallback(String uri, Processor processor, 
Synchronization onCompletion) {
+    public CompletableFuture<Exchange> asyncCallback(String uri, Processor 
processor, Synchronization onCompletion) {
         return asyncCallback(resolveMandatoryEndpoint(uri), processor, 
onCompletion);
     }
 
-    public Future<Object> asyncRequestBody(final Endpoint endpoint, final 
Object body) {
-        Callable<Object> task = new Callable<Object>() {
-            public Object call() throws Exception {
-                return requestBody(endpoint, body);
-            }
-        };
-        return getExecutorService().submit(task);
+    public CompletableFuture<Object> asyncRequestBody(final Endpoint endpoint, 
final Object body) {
+        return asyncRequestBody(endpoint, createSetBodyProcessor(body));
     }
 
-    public <T> Future<T> asyncRequestBody(final Endpoint endpoint, final 
Object body, final Class<T> type) {
-        Callable<T> task = new Callable<T>() {
-            public T call() throws Exception {
-                return requestBody(endpoint, body, type);
-            }
-        };
-        return getExecutorService().submit(task);
+    public <T> CompletableFuture<T> asyncRequestBody(Endpoint endpoint, Object 
body, Class<T> type) {
+        return asyncRequestBody(endpoint, createSetBodyProcessor(body), type);
     }
 
-    public Future<Object> asyncRequestBodyAndHeader(final Endpoint endpoint, 
final Object body, final String header,
+    public CompletableFuture<Object> asyncRequestBodyAndHeader(final Endpoint 
endpoint, final Object body, final String header,
                                                     final Object headerValue) {
-        Callable<Object> task = new Callable<Object>() {
-            public Object call() throws Exception {
-                return requestBodyAndHeader(endpoint, body, header, 
headerValue);
-            }
-        };
-        return getExecutorService().submit(task);
+        return asyncRequestBody(endpoint, createBodyAndHeaderProcessor(body, 
header, headerValue));
     }
 
-    public <T> Future<T> asyncRequestBodyAndHeader(final Endpoint endpoint, 
final Object body, final String header,
+    protected  <T> CompletableFuture<T> asyncRequestBody(final Endpoint 
endpoint, Processor processor, final Class<T> type) {
+        return asyncRequestBody(endpoint, processor, 
createConvertBodyProcessor(type))
+                .thenApply(answer -> 
camelContext.getTypeConverter().convertTo(type, answer));
+    }
+
+    public <T> CompletableFuture<T> asyncRequestBodyAndHeader(final Endpoint 
endpoint, final Object body, final String header,
                                                    final Object headerValue, 
final Class<T> type) {
-        Callable<T> task = new Callable<T>() {
-            public T call() throws Exception {
-                return requestBodyAndHeader(endpoint, body, header, 
headerValue, type);
-            }
-        };
-        return getExecutorService().submit(task);
+        return asyncRequestBody(endpoint, createBodyAndHeaderProcessor(body, 
header, headerValue), type);
     }
 
-    public Future<Object> asyncRequestBodyAndHeaders(final Endpoint endpoint, 
final Object body,
+    public CompletableFuture<Object> asyncRequestBodyAndHeaders(final Endpoint 
endpoint, final Object body,
                                                      final Map<String, Object> 
headers) {
-        Callable<Object> task = new Callable<Object>() {
-            public Object call() throws Exception {
-                return requestBodyAndHeaders(endpoint, body, headers);
-            }
-        };
-        return getExecutorService().submit(task);
+        return asyncRequestBody(endpoint, createBodyAndHeaders(body, headers));
     }
 
-    public <T> Future<T> asyncRequestBodyAndHeaders(final Endpoint endpoint, 
final Object body,
+    public <T> CompletableFuture<T> asyncRequestBodyAndHeaders(final Endpoint 
endpoint, final Object body,
                                                     final Map<String, Object> 
headers, final Class<T> type) {
-        Callable<T> task = new Callable<T>() {
-            public T call() throws Exception {
-                return requestBodyAndHeaders(endpoint, body, headers, type);
-            }
-        };
-        return getExecutorService().submit(task);
+        return asyncRequestBody(endpoint, createBodyAndHeaders(body, headers), 
type);
     }
 
-    public Future<Exchange> asyncSend(final Endpoint endpoint, final Exchange 
exchange) {
-        Callable<Exchange> task = new Callable<Exchange>() {
-            public Exchange call() throws Exception {
-                return send(endpoint, exchange);
-            }
-        };
-        return getExecutorService().submit(task);
+    public CompletableFuture<Exchange> asyncSend(final Endpoint endpoint, 
final Exchange exchange) {
+        return asyncSendExchange(endpoint, null, null, null, exchange);
     }
 
-    public Future<Exchange> asyncSend(final Endpoint endpoint, final Processor 
processor) {
-        Callable<Exchange> task = new Callable<Exchange>() {
-            public Exchange call() throws Exception {
-                return send(endpoint, processor);
-            }
-        };
-        return getExecutorService().submit(task);
+    public CompletableFuture<Exchange> asyncSend(final Endpoint endpoint, 
final Processor processor) {
+        return asyncSend(endpoint, null, processor, null);
     }
 
-    public Future<Object> asyncSendBody(final Endpoint endpoint, final Object 
body) {
-        Callable<Object> task = new Callable<Object>() {
-            public Object call() throws Exception {
-                sendBody(endpoint, body);
-                // its InOnly, so no body to return
-                return null;
-            }
-        };
-        return getExecutorService().submit(task);
-    }
-
-    private Future<Object> asyncCallback(final Endpoint endpoint, final 
ExchangePattern pattern, final Object body, final Synchronization onCompletion) 
{
-        Callable<Object> task = new Callable<Object>() {
-            public Object call() throws Exception {
-                Exchange answer = send(endpoint, pattern, 
createSetBodyProcessor(body));
-
-                // invoke callback before returning answer
-                // as it allows callback to be used without unit of work 
invoking it
-                // and thus it works directly from a producer template as 
well, as opposed
-                // to the unit of work that is injected in routes
-                if (answer.isFailed()) {
-                    onCompletion.onFailure(answer);
-                } else {
-                    onCompletion.onComplete(answer);
-                }
+    public CompletableFuture<Object> asyncSendBody(final Endpoint endpoint, 
final Object body) {
+        return asyncSend(endpoint, createSetBodyProcessor(body))
+                .thenApply(this::extractResultBody);
+    }
 
-                Object result = extractResultBody(answer, pattern);
-                if (pattern.isOutCapable()) {
-                    return result;
-                } else {
-                    // return null if not OUT capable
-                    return null;
-                }
-            }
-        };
-        return getExecutorService().submit(task);
-    }
-
-    public Future<Exchange> asyncCallback(final Endpoint endpoint, final 
Exchange exchange, final Synchronization onCompletion) {
-        Callable<Exchange> task = new Callable<Exchange>() {
-            public Exchange call() throws Exception {
-                // process the exchange, any exception occurring will be 
caught and set on the exchange
-                send(endpoint, exchange);
-
-                // invoke callback before returning answer
-                // as it allows callback to be used without unit of work 
invoking it
-                // and thus it works directly from a producer template as 
well, as opposed
-                // to the unit of work that is injected in routes
-                if (exchange.isFailed()) {
-                    onCompletion.onFailure(exchange);
-                } else {
-                    onCompletion.onComplete(exchange);
-                }
-                return exchange;
-            }
-        };
-        return getExecutorService().submit(task);
-    }
-
-    public Future<Exchange> asyncCallback(final Endpoint endpoint, final 
Processor processor, final Synchronization onCompletion) {
-        Callable<Exchange> task = new Callable<Exchange>() {
-            public Exchange call() throws Exception {
-                // process the exchange, any exception occurring will be 
caught and set on the exchange
-                Exchange answer = send(endpoint, processor);
-
-                // invoke callback before returning answer
-                // as it allows callback to be used without unit of work 
invoking it
-                // and thus it works directly from a producer template as 
well, as opposed
-                // to the unit of work that is injected in routes
-                if (answer.isFailed()) {
-                    onCompletion.onFailure(answer);
-                } else {
-                    onCompletion.onComplete(answer);
-                }
-                return answer;
-            }
-        };
-        return getExecutorService().submit(task);
+    public CompletableFuture<Exchange> asyncCallback(final Endpoint endpoint, 
final Exchange exchange, final Synchronization onCompletion) {
+        return asyncSend(endpoint, 
exchange).thenApply(createCompletionFunction(onCompletion));
+    }
+
+    public CompletableFuture<Exchange> asyncCallback(final Endpoint endpoint, 
final Processor processor, final Synchronization onCompletion) {
+        return asyncSend(endpoint, 
processor).thenApply(createCompletionFunction(onCompletion));
+    }
+
+    protected CompletableFuture<Object> asyncRequestBody(final Endpoint 
endpoint, Processor processor) {
+        return asyncRequestBody(endpoint, processor, (Processor) null);
+    }
+
+    protected CompletableFuture<Object> asyncRequestBody(final Endpoint 
endpoint, Processor processor, Processor resultProcessor) {
+        return asyncRequest(endpoint, processor, resultProcessor)
+                .thenApply(e -> extractResultBody(e, ExchangePattern.InOut));
+    }
+
+    protected CompletableFuture<Exchange> asyncRequest(Endpoint endpoint, 
Processor processor,
+            Processor resultProcessor) {
+        return asyncSend(endpoint, ExchangePattern.InOut, processor, 
resultProcessor);
+    }
+
+    protected CompletableFuture<Exchange> asyncSend(
+            Endpoint endpoint, ExchangePattern pattern, Processor processor, 
Processor resultProcessor) {
+        return asyncSendExchange(endpoint, pattern, processor, 
resultProcessor, null);
+    }
+
+    protected CompletableFuture<Exchange> asyncSendExchange(
+            Endpoint endpoint, ExchangePattern pattern, Processor processor, 
Processor resultProcessor,
+            Exchange inExchange) {
+        CompletableFuture<Exchange> exchangeFuture = new CompletableFuture<>();
+        getExecutorService().submit(() -> 
getProducerCache().asyncSendExchange(endpoint, pattern, processor,
+                resultProcessor, inExchange, exchangeFuture));
+        return exchangeFuture;
+    }
+
+    protected CompletableFuture<Object> asyncCallback(final Endpoint endpoint, 
final ExchangePattern pattern,
+            final Object body, final Synchronization onCompletion) {
+        return asyncSend(endpoint, pattern, createSetBodyProcessor(body), null)
+                .thenApply(createCompletionFunction(onCompletion))
+                .thenApply(answer -> {
+                    Object result = extractResultBody(answer, pattern);
+                    if (pattern.isOutCapable()) {
+                        return result;
+                    } else {
+                        // return null if not OUT capable
+                        return null;
+                    }
+                });
     }
 
     private ProducerCache getProducerCache() {
@@ -756,7 +705,11 @@ public class DefaultProducerTemplate extends 
ServiceSupport implements ProducerT
             if (executor != null) {
                 return executor;
             }
-            executor = 
camelContext.getExecutorServiceManager().newDefaultThreadPool(this, 
"ProducerTemplate");
+            if (synchronous) {
+                executor = new SynchronousExecutorService();
+            } else {
+                executor = 
camelContext.getExecutorServiceManager().newDefaultThreadPool(this, 
"ProducerTemplate");
+            }
         }
 
         ObjectHelper.notNull(executor, "ExecutorService");

http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java 
b/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java
new file mode 100644
index 0000000..734551f
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.util.EventHelper;
+import org.apache.camel.util.StopWatch;
+
+/**
+ * Helper class to notify on exchange sending events in async engine
+ */
+class EventNotifierCallback implements AsyncCallback {
+    private final AsyncCallback originalCallback;
+    private final StopWatch watch;
+    private final Exchange exchange;
+    private final Endpoint endpoint;
+
+    public EventNotifierCallback(AsyncCallback originalCallback, Exchange 
exchange,
+            Endpoint endpoint) {
+        this.originalCallback = originalCallback;
+        this.watch = new StopWatch();
+        this.exchange = exchange;
+        this.endpoint = endpoint;
+        EventHelper.notifyExchangeSending(exchange.getContext(), exchange, 
endpoint);
+    }
+
+    @Override
+    public void done(boolean doneSync) {
+        try {
+            originalCallback.done(doneSync);
+        } finally {
+            long timeTaken = watch.stop();
+            EventHelper.notifyExchangeSent(exchange.getContext(), exchange, 
endpoint, timeTaken);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java 
b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
index ebf641b..b475d29 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
@@ -19,6 +19,7 @@ package org.apache.camel.impl;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
@@ -260,6 +261,83 @@ public class ProducerCache extends ServiceSupport {
     }
 
     /**
+     * Asynchronously sends an exchange to an endpoint using a supplied
+     * {@link Processor} to populate the exchange
+     * <p>
+     * This method will <b>neither</b> throw an exception <b>nor</b> complete 
future exceptionally.
+     * If processing of the given Exchange failed then the exception is stored 
on the return Exchange
+     *
+     * @param endpoint        the endpoint to send the exchange to
+     * @param pattern         the message {@link ExchangePattern} such as
+     *                        {@link ExchangePattern#InOnly} or {@link 
ExchangePattern#InOut}
+     * @param processor       the transformer used to populate the new exchange
+     * @param resultProcessor a processor to process the exchange when the 
send is complete.
+     * @param future          the preexisting future to complete when 
processing is done or null if to create new one
+     * @return future that completes with exchange when processing is done. 
Either passed into future parameter
+     *              or new one if parameter was null
+     */
+    public CompletableFuture<Exchange> asyncSend(Endpoint endpoint, 
ExchangePattern pattern, Processor processor, Processor resultProcessor,
+            CompletableFuture<Exchange> future) {
+        return asyncSendExchange(endpoint, pattern, processor, 
resultProcessor, null, future);
+    }
+
+    /**
+     * Asynchronously sends an exchange to an endpoint using a supplied
+     * {@link Processor} to populate the exchange
+     * <p>
+     * This method will <b>neither</b> throw an exception <b>nor</b> complete 
future exceptionally.
+     * If processing of the given Exchange failed then the exception is stored 
on the return Exchange
+     *
+     * @param endpoint        the endpoint to send the exchange to
+     * @param pattern         the message {@link ExchangePattern} such as
+     *                        {@link ExchangePattern#InOnly} or {@link 
ExchangePattern#InOut}
+     * @param processor       the transformer used to populate the new exchange
+     * @param resultProcessor a processor to process the exchange when the 
send is complete.
+     * @param exchange        an exchange to use in processing. Exchange will 
be created if parameter is null.
+     * @param future          the preexisting future to complete when 
processing is done or null if to create new one
+     * @return future that completes with exchange when processing is done. 
Either passed into future parameter
+     *              or new one if parameter was null
+     */
+    public CompletableFuture<Exchange> asyncSendExchange(final Endpoint 
endpoint, ExchangePattern pattern,
+            final Processor processor, final Processor resultProcessor, 
Exchange exchange,
+            CompletableFuture<Exchange> future) {
+        AsyncCallbackToCompletableFutureAdapter<Exchange> futureAdapter = new 
AsyncCallbackToCompletableFutureAdapter<>(future, exchange);
+        doInAsyncProducer(endpoint, exchange, pattern, futureAdapter,
+                (producer, asyncProducer, innerExchange, exchangePattern, 
producerCallback) -> {
+                    if (innerExchange == null) {
+                        innerExchange = pattern != null ?
+                                producer.getEndpoint().createExchange(pattern) 
:
+                                producer.getEndpoint().createExchange();
+                        futureAdapter.setResult(innerExchange);
+                    }
+
+                    if (processor != null) {
+                        // lets populate using the processor callback
+                        AsyncProcessor asyncProcessor = 
AsyncProcessorConverterHelper.convert(processor);
+                        try {
+                            final Exchange finalExchange = innerExchange;
+                            asyncProcessor.process(innerExchange, new 
AsyncCallback() {
+                                @Override
+                                public void done(boolean doneSync) {
+                                    asyncDispatchExchange(endpoint, producer, 
resultProcessor, finalExchange,
+                                            producerCallback);
+                                }
+                            });
+                            return false;
+                        } catch (Exception e) {
+                            // populate failed so return
+                            innerExchange.setException(e);
+                            producerCallback.done(true);
+                            return true;
+                        }
+                    }
+
+                    return asyncDispatchExchange(endpoint, producer, 
resultProcessor, innerExchange, producerCallback);
+                });
+        return futureAdapter.getFuture();
+    }
+
+    /**
      * Sends an exchange to an endpoint using a supplied callback, using the 
synchronous processing.
      * <p/>
      * If an exception was thrown during processing, it would be set on the 
given Exchange
@@ -396,6 +474,31 @@ public class ProducerCache extends ServiceSupport {
         }
     }
 
+    protected boolean asyncDispatchExchange(final Endpoint endpoint, Producer 
producer,
+            final Processor resultProcessor, Exchange exchange, AsyncCallback 
callback) {
+        // now lets dispatch
+        LOG.debug(">>>> {} {}", endpoint, exchange);
+
+        // set property which endpoint we send to
+        exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
+
+        // send the exchange using the processor
+        try {
+            if (eventNotifierEnabled) {
+                callback = new EventNotifierCallback(callback, exchange, 
endpoint);
+            }
+            CamelInternalProcessor internal = 
prepareInternalProcessor(producer, resultProcessor);
+
+            return internal.process(exchange, callback);
+        } catch (Throwable e) {
+            // ensure exceptions is caught and set on the exchange
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+
+    }
+
     protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern 
pattern,
                                     final Processor processor, final Processor 
resultProcessor, Exchange exchange) {
         return doInProducer(endpoint, exchange, pattern, new 
ProducerCallback<Exchange>() {
@@ -429,21 +532,7 @@ public class ProducerCache extends ServiceSupport {
                         
EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint);
                     }
 
-                    // if we have a result processor then wrap in pipeline to 
execute both of them in sequence
-                    Processor target;
-                    if (resultProcessor != null) {
-                        List<Processor> processors = new 
ArrayList<Processor>(2);
-                        processors.add(producer);
-                        processors.add(resultProcessor);
-                        target = Pipeline.newInstance(getCamelContext(), 
processors);
-                    } else {
-                        target = producer;
-                    }
-
-                    // wrap in unit of work
-                    CamelInternalProcessor internal = new 
CamelInternalProcessor(target);
-                    internal.addAdvice(new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
-
+                    CamelInternalProcessor internal = 
prepareInternalProcessor(producer, resultProcessor);
                     internal.process(exchange);
                 } catch (Throwable e) {
                     // ensure exceptions is caught and set on the exchange
@@ -460,6 +549,24 @@ public class ProducerCache extends ServiceSupport {
         });
     }
 
+    protected CamelInternalProcessor prepareInternalProcessor(Producer 
producer, Processor resultProcessor) {
+        // if we have a result processor then wrap in pipeline to execute both 
of them in sequence
+        Processor target;
+        if (resultProcessor != null) {
+            List<Processor> processors = new ArrayList<Processor>(2);
+            processors.add(producer);
+            processors.add(resultProcessor);
+            target = Pipeline.newInstance(getCamelContext(), processors);
+        } else {
+            target = producer;
+        }
+
+        // wrap in unit of work
+        CamelInternalProcessor internal = new CamelInternalProcessor(target);
+        internal.addAdvice(new 
CamelInternalProcessor.UnitOfWorkProcessorAdvice(null));
+        return internal;
+    }
+
     protected synchronized Producer doGetProducer(Endpoint endpoint, boolean 
pooled) {
         String key = endpoint.getEndpointUri();
         Producer answer = producers.get(key);
@@ -648,4 +755,5 @@ public class ProducerCache extends ServiceSupport {
     public String toString() {
         return "ProducerCache for source: " + source + ", capacity: " + 
getCapacity();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java 
b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index 2c92cc5..eca7f0f 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -735,7 +735,7 @@ public final class ExchangeHelper {
      * @return the result body, can be <tt>null</tt>.
      * @throws CamelExecutionException is thrown if the processing of the 
exchange failed
      */
-    public static <T> T extractFutureBody(CamelContext context, Future<Object> 
future, Class<T> type) {
+    public static <T> T extractFutureBody(CamelContext context, Future<?> 
future, Class<T> type) {
         try {
             return doExtractFutureBody(context, future.get(), type);
         } catch (InterruptedException e) {
@@ -765,7 +765,7 @@ public final class ExchangeHelper {
      * @throws CamelExecutionException is thrown if the processing of the 
exchange failed
      * @throws java.util.concurrent.TimeoutException is thrown if a timeout 
triggered
      */
-    public static <T> T extractFutureBody(CamelContext context, Future<Object> 
future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
+    public static <T> T extractFutureBody(CamelContext context, Future<?> 
future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
         try {
             if (timeout > 0) {
                 return doExtractFutureBody(context, future.get(timeout, unit), 
type);

http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
 
b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
index 24f560e..44e921f 100644
--- 
a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
@@ -570,11 +570,11 @@ public class DefaultProducerTemplateAsyncTest extends 
ContextTestSupport {
             @Override
             public void configure() throws Exception {
                 from("direct:start")
-                        .delay(400)
+                        .delay(400).asyncDelayed()
                         .transform(body().append(" World")).to("mock:result");
 
                 from("direct:error")
-                        .delay(400)
+                        .delay(400).asyncDelayed()
                         .process(new Processor() {
                             public void process(Exchange exchange) throws 
Exception {
                                 throw new IllegalArgumentException("Damn 
forced by unit test");

http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java
 
b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java
new file mode 100644
index 0000000..f218a6e
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl;
+
+import org.apache.camel.Exchange;
+import org.junit.Assert;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * @version
+ */
+public class DefaultProducerTemplateNonBlockingAsyncTest extends 
DefaultProducerTemplateAsyncTest{
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        template.stop();
+        template.setSynchronous(true);
+        template.start();
+    }
+
+    public void testRunningInSameThread() throws ExecutionException, 
InterruptedException {
+        Thread originalThread = Thread.currentThread();
+        CompletableFuture<Exchange> future = template.asyncSend("direct:echo", 
e -> {
+            Assert.assertSame(originalThread, Thread.currentThread());
+            e.getIn().setBody("Hi");
+        });
+        Assert.assertEquals("HiHi", template.extractFutureBody(future, 
String.class));
+    }
+}

Reply via email to