CAMEL-10308 Provide a way to use async engine from ProducerTemplate
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/51a80684 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/51a80684 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/51a80684 Branch: refs/heads/master Commit: 51a8068424cc9a265036220d70ef6d2af0312c3a Parents: 5104c16 Author: Vitalii Tymchyshyn <v...@tym.im> Authored: Sun Sep 11 19:25:42 2016 -0400 Committer: Claus Ibsen <davscl...@apache.org> Committed: Sat Sep 17 09:34:52 2016 +0200 ---------------------------------------------------------------------- .../java/org/apache/camel/ProducerTemplate.java | 76 +++-- ...AsyncCallbackToCompletableFutureAdapter.java | 55 ++++ .../camel/impl/DefaultProducerTemplate.java | 303 ++++++++----------- .../camel/impl/EventNotifierCallback.java | 52 ++++ .../org/apache/camel/impl/ProducerCache.java | 138 ++++++++- .../org/apache/camel/util/ExchangeHelper.java | 4 +- .../impl/DefaultProducerTemplateAsyncTest.java | 4 +- ...ultProducerTemplateNonBlockingAsyncTest.java | 45 +++ 8 files changed, 453 insertions(+), 224 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java index 9589072..6461e4e 100644 --- a/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java +++ b/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java @@ -16,14 +16,15 @@ */ package org.apache.camel; +import org.apache.camel.spi.Synchronization; + import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import org.apache.camel.spi.Synchronization; - /** * Template for working with Camel and sending {@link Message} instances in an * {@link Exchange} to an {@link Endpoint}. @@ -91,6 +92,21 @@ public interface ProducerTemplate extends Service { * @return the size of current cached resources */ int getCurrentCacheSize(); + + /** + * Reports if async* methods will dispath processing from the calling thread (true) or through executor (false). + * They will still employ asynchronous engine, so this mode can be useful for high-speed non-blocking processing. + * @return if async* methods will run in the calling thread + */ + boolean isSynchronous(); + + /** + * Reports if async* methods will dispath processing from the calling thread (true) or through executor (false). + * In any case they would still employ asynchronous engine, so setting to true can be useful + * for high-speed non-blocking processing. + * @param synchronous if async* methods will run in the calling thread + */ + void setSynchronous(boolean synchronous); /** * Get the default endpoint to use if none is specified @@ -895,7 +911,7 @@ public interface ProducerTemplate extends Service { * @param exchange the exchange to send * @return a handle to be used to get the response in the future */ - Future<Exchange> asyncSend(String endpointUri, Exchange exchange); + CompletableFuture<Exchange> asyncSend(String endpointUri, Exchange exchange); /** * Sends an asynchronous exchange to the given endpoint. @@ -904,7 +920,7 @@ public interface ProducerTemplate extends Service { * @param processor the transformer used to populate the new exchange * @return a handle to be used to get the response in the future */ - Future<Exchange> asyncSend(String endpointUri, Processor processor); + CompletableFuture<Exchange> asyncSend(String endpointUri, Processor processor); /** * Sends an asynchronous body to the given endpoint. @@ -914,7 +930,7 @@ public interface ProducerTemplate extends Service { * @param body the body to send * @return a handle to be used to get the response in the future */ - Future<Object> asyncSendBody(String endpointUri, Object body); + CompletableFuture<Object> asyncSendBody(String endpointUri, Object body); /** * Sends an asynchronous body to the given endpoint. @@ -924,7 +940,7 @@ public interface ProducerTemplate extends Service { * @param body the body to send * @return a handle to be used to get the response in the future */ - Future<Object> asyncRequestBody(String endpointUri, Object body); + CompletableFuture<Object> asyncRequestBody(String endpointUri, Object body); /** * Sends an asynchronous body to the given endpoint. @@ -936,7 +952,7 @@ public interface ProducerTemplate extends Service { * @param headerValue the header value * @return a handle to be used to get the response in the future */ - Future<Object> asyncRequestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue); + CompletableFuture<Object> asyncRequestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue); /** * Sends an asynchronous body to the given endpoint. @@ -947,7 +963,7 @@ public interface ProducerTemplate extends Service { * @param headers headers * @return a handle to be used to get the response in the future */ - Future<Object> asyncRequestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers); + CompletableFuture<Object> asyncRequestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers); /** * Sends an asynchronous body to the given endpoint. @@ -958,7 +974,7 @@ public interface ProducerTemplate extends Service { * @param type the expected response type * @return a handle to be used to get the response in the future */ - <T> Future<T> asyncRequestBody(String endpointUri, Object body, Class<T> type); + <T> CompletableFuture<T> asyncRequestBody(String endpointUri, Object body, Class<T> type); /** * Sends an asynchronous body to the given endpoint. @@ -971,7 +987,7 @@ public interface ProducerTemplate extends Service { * @param type the expected response type * @return a handle to be used to get the response in the future */ - <T> Future<T> asyncRequestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type); + <T> CompletableFuture<T> asyncRequestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class<T> type); /** * Sends an asynchronous body to the given endpoint. @@ -983,7 +999,7 @@ public interface ProducerTemplate extends Service { * @param type the expected response type * @return a handle to be used to get the response in the future */ - <T> Future<T> asyncRequestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type); + <T> CompletableFuture<T> asyncRequestBodyAndHeaders(String endpointUri, Object body, Map<String, Object> headers, Class<T> type); /** * Sends an asynchronous exchange to the given endpoint. @@ -992,7 +1008,7 @@ public interface ProducerTemplate extends Service { * @param exchange the exchange to send * @return a handle to be used to get the response in the future */ - Future<Exchange> asyncSend(Endpoint endpoint, Exchange exchange); + CompletableFuture<Exchange> asyncSend(Endpoint endpoint, Exchange exchange); /** * Sends an asynchronous exchange to the given endpoint. @@ -1001,7 +1017,7 @@ public interface ProducerTemplate extends Service { * @param processor the transformer used to populate the new exchange * @return a handle to be used to get the response in the future */ - Future<Exchange> asyncSend(Endpoint endpoint, Processor processor); + CompletableFuture<Exchange> asyncSend(Endpoint endpoint, Processor processor); /** * Sends an asynchronous body to the given endpoint. @@ -1011,7 +1027,7 @@ public interface ProducerTemplate extends Service { * @param body the body to send * @return a handle to be used to get the response in the future */ - Future<Object> asyncSendBody(Endpoint endpoint, Object body); + CompletableFuture<Object> asyncSendBody(Endpoint endpoint, Object body); /** * Sends an asynchronous body to the given endpoint. @@ -1021,7 +1037,7 @@ public interface ProducerTemplate extends Service { * @param body the body to send * @return a handle to be used to get the response in the future */ - Future<Object> asyncRequestBody(Endpoint endpoint, Object body); + CompletableFuture<Object> asyncRequestBody(Endpoint endpoint, Object body); /** * Sends an asynchronous body to the given endpoint. @@ -1033,7 +1049,7 @@ public interface ProducerTemplate extends Service { * @param headerValue the header value * @return a handle to be used to get the response in the future */ - Future<Object> asyncRequestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue); + CompletableFuture<Object> asyncRequestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue); /** * Sends an asynchronous body to the given endpoint. @@ -1044,7 +1060,7 @@ public interface ProducerTemplate extends Service { * @param headers headers * @return a handle to be used to get the response in the future */ - Future<Object> asyncRequestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers); + CompletableFuture<Object> asyncRequestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers); /** * Sends an asynchronous body to the given endpoint. @@ -1055,7 +1071,7 @@ public interface ProducerTemplate extends Service { * @param type the expected response type * @return a handle to be used to get the response in the future */ - <T> Future<T> asyncRequestBody(Endpoint endpoint, Object body, Class<T> type); + <T> CompletableFuture<T> asyncRequestBody(Endpoint endpoint, Object body, Class<T> type); /** * Sends an asynchronous body to the given endpoint. @@ -1068,7 +1084,7 @@ public interface ProducerTemplate extends Service { * @param type the expected response type * @return a handle to be used to get the response in the future */ - <T> Future<T> asyncRequestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type); + <T> CompletableFuture<T> asyncRequestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class<T> type); /** * Sends an asynchronous body to the given endpoint. @@ -1080,7 +1096,7 @@ public interface ProducerTemplate extends Service { * @param type the expected response type * @return a handle to be used to get the response in the future */ - <T> Future<T> asyncRequestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type); + <T> CompletableFuture<T> asyncRequestBodyAndHeaders(Endpoint endpoint, Object body, Map<String, Object> headers, Class<T> type); /** * Gets the response body from the future handle, will wait until the response is ready. @@ -1093,7 +1109,7 @@ public interface ProducerTemplate extends Service { * @return the result (see class javadoc) * @throws CamelExecutionException if the processing of the exchange failed */ - <T> T extractFutureBody(Future<Object> future, Class<T> type) throws CamelExecutionException; + <T> T extractFutureBody(Future<?> future, Class<T> type) throws CamelExecutionException; /** * Gets the response body from the future handle, will wait at most the given time for the response to be ready. @@ -1109,7 +1125,7 @@ public interface ProducerTemplate extends Service { * @throws java.util.concurrent.TimeoutException if the wait timed out * @throws CamelExecutionException if the processing of the exchange failed */ - <T> T extractFutureBody(Future<Object> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException, CamelExecutionException; + <T> T extractFutureBody(Future<?> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException, CamelExecutionException; // Asynchronous methods with callback // ----------------------------------------------------------------------- @@ -1122,7 +1138,7 @@ public interface ProducerTemplate extends Service { * @param onCompletion callback invoked when exchange has been completed * @return a handle to be used to get the response in the future */ - Future<Exchange> asyncCallback(String endpointUri, Exchange exchange, Synchronization onCompletion); + CompletableFuture<Exchange> asyncCallback(String endpointUri, Exchange exchange, Synchronization onCompletion); /** * Sends an asynchronous exchange to the given endpoint. @@ -1132,7 +1148,7 @@ public interface ProducerTemplate extends Service { * @param onCompletion callback invoked when exchange has been completed * @return a handle to be used to get the response in the future */ - Future<Exchange> asyncCallback(Endpoint endpoint, Exchange exchange, Synchronization onCompletion); + CompletableFuture<Exchange> asyncCallback(Endpoint endpoint, Exchange exchange, Synchronization onCompletion); /** * Sends an asynchronous exchange to the given endpoint using a supplied processor. @@ -1143,7 +1159,7 @@ public interface ProducerTemplate extends Service { * @param onCompletion callback invoked when exchange has been completed * @return a handle to be used to get the response in the future */ - Future<Exchange> asyncCallback(String endpointUri, Processor processor, Synchronization onCompletion); + CompletableFuture<Exchange> asyncCallback(String endpointUri, Processor processor, Synchronization onCompletion); /** * Sends an asynchronous exchange to the given endpoint using a supplied processor. @@ -1154,7 +1170,7 @@ public interface ProducerTemplate extends Service { * @param onCompletion callback invoked when exchange has been completed * @return a handle to be used to get the response in the future */ - Future<Exchange> asyncCallback(Endpoint endpoint, Processor processor, Synchronization onCompletion); + CompletableFuture<Exchange> asyncCallback(Endpoint endpoint, Processor processor, Synchronization onCompletion); /** * Sends an asynchronous body to the given endpoint. @@ -1165,7 +1181,7 @@ public interface ProducerTemplate extends Service { * @param onCompletion callback invoked when exchange has been completed * @return a handle to be used to get the response in the future */ - Future<Object> asyncCallbackSendBody(String endpointUri, Object body, Synchronization onCompletion); + CompletableFuture<Object> asyncCallbackSendBody(String endpointUri, Object body, Synchronization onCompletion); /** * Sends an asynchronous body to the given endpoint. @@ -1176,7 +1192,7 @@ public interface ProducerTemplate extends Service { * @param onCompletion callback invoked when exchange has been completed * @return a handle to be used to get the response in the future */ - Future<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion); + CompletableFuture<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion); /** * Sends an asynchronous body to the given endpoint. @@ -1187,7 +1203,7 @@ public interface ProducerTemplate extends Service { * @param onCompletion callback invoked when exchange has been completed * @return a handle to be used to get the response in the future */ - Future<Object> asyncCallbackRequestBody(String endpointUri, Object body, Synchronization onCompletion); + CompletableFuture<Object> asyncCallbackRequestBody(String endpointUri, Object body, Synchronization onCompletion); /** * Sends an asynchronous body to the given endpoint. @@ -1198,6 +1214,6 @@ public interface ProducerTemplate extends Service { * @param onCompletion callback invoked when exchange has been completed * @return a handle to be used to get the response in the future */ - Future<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion); + CompletableFuture<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion); } http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java b/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java new file mode 100644 index 0000000..0baa510 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/AsyncCallbackToCompletableFutureAdapter.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.AsyncCallback; + +import java.util.concurrent.CompletableFuture; + +/** + * AsyncCallback that provides a CompletableFuture when async action is done + */ +public class AsyncCallbackToCompletableFutureAdapter<T> implements AsyncCallback{ + private final CompletableFuture<T> future; + private volatile T result; + + public AsyncCallbackToCompletableFutureAdapter() { + this(null); + } + + public AsyncCallbackToCompletableFutureAdapter(T result) { + this(null, result); + } + + public AsyncCallbackToCompletableFutureAdapter(CompletableFuture<T> future, T result) { + this.future = future != null ? future : new CompletableFuture<>(); + this.result = result; + } + + public void setResult(T result) { + this.result = result; + } + + public CompletableFuture<T> getFuture() { + return future; + } + + @Override + public void done(boolean doneSync) { + future.complete(result); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java index 6eff970..19958e4 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java @@ -17,11 +17,12 @@ package org.apache.camel.impl; import java.util.Map; -import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import org.apache.camel.CamelContext; import org.apache.camel.CamelExecutionException; @@ -39,6 +40,7 @@ import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; +import org.apache.camel.util.concurrent.SynchronousExecutorService; /** * Template (named like Spring's TransactionTemplate & JmsTemplate @@ -54,6 +56,7 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT private Endpoint defaultEndpoint; private int maximumCacheSize; private boolean eventNotifierEnabled = true; + private volatile boolean synchronous; public DefaultProducerTemplate(CamelContext camelContext) { this.camelContext = camelContext; @@ -82,6 +85,16 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT this.maximumCacheSize = maximumCacheSize; } + @Override + public boolean isSynchronous() { + return synchronous; + } + + @Override + public void setSynchronous(boolean synchronous) { + this.synchronous = synchronous; + } + public int getCurrentCacheSize() { if (producerCache == null) { return 0; @@ -241,17 +254,7 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT } public void sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) throws CamelExecutionException { - Exchange result = send(endpoint, new Processor() { - public void process(Exchange exchange) { - Message in = exchange.getIn(); - if (headers != null) { - for (Map.Entry<String, Object> header : headers.entrySet()) { - in.setHeader(header.getKey(), header.getValue()); - } - } - in.setBody(body); - } - }); + Exchange result = send(endpoint, createBodyAndHeaders(body, headers)); // must invoke extract result body in case of exception to be rethrown extractResultBody(result); } @@ -261,17 +264,7 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT } public Object sendBodyAndHeaders(Endpoint endpoint, ExchangePattern pattern, final Object body, final Map<String, Object> headers) throws CamelExecutionException { - Exchange exchange = send(endpoint, pattern, new Processor() { - public void process(Exchange exchange) throws Exception { - Message in = exchange.getIn(); - if (headers != null) { - for (Map.Entry<String, Object> header : headers.entrySet()) { - in.setHeader(header.getKey(), header.getValue()); - } - } - in.setBody(body); - } - }); + Exchange exchange = send(endpoint, pattern, createBodyAndHeaders(body, headers)); Object result = extractResultBody(exchange, pattern); if (pattern.isOutCapable()) { return result; @@ -485,6 +478,21 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT return new ConvertBodyProcessor(type); } + protected Function<Exchange, Exchange> createCompletionFunction(Synchronization onCompletion) { + return (answer) -> { + // invoke callback before returning answer + // as it allows callback to be used without unit of work invoking it + // and thus it works directly from a producer template as well, as opposed + // to the unit of work that is injected in routes + if (answer.isFailed()) { + onCompletion.onFailure(answer); + } else { + onCompletion.onComplete(answer); + } + return answer; + }; + } + protected Endpoint resolveMandatoryEndpoint(String endpointUri) { Endpoint endpoint = camelContext.getEndpoint(endpointUri); if (endpoint == null) { @@ -511,228 +519,169 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT this.executor = executorService; } - public Future<Exchange> asyncSend(final String uri, final Exchange exchange) { + public CompletableFuture<Exchange> asyncSend(final String uri, final Exchange exchange) { return asyncSend(resolveMandatoryEndpoint(uri), exchange); } - public Future<Exchange> asyncSend(final String uri, final Processor processor) { + public CompletableFuture<Exchange> asyncSend(final String uri, final Processor processor) { return asyncSend(resolveMandatoryEndpoint(uri), processor); } - public Future<Object> asyncSendBody(final String uri, final Object body) { + public CompletableFuture<Object> asyncSendBody(final String uri, final Object body) { return asyncSendBody(resolveMandatoryEndpoint(uri), body); } - public Future<Object> asyncRequestBody(final String uri, final Object body) { + public CompletableFuture<Object> asyncRequestBody(final String uri, final Object body) { return asyncRequestBody(resolveMandatoryEndpoint(uri), body); } - public <T> Future<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) { - return asyncRequestBody(resolveMandatoryEndpoint(uri), body, type); + public <T> CompletableFuture<T> asyncRequestBody(final String uri, final Object body, final Class<T> type) { + return asyncRequestBody(resolveMandatoryEndpoint(uri), createSetBodyProcessor(body), type); } - public Future<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) { + public CompletableFuture<Object> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue) { return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue); } - public <T> Future<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) { + public <T> CompletableFuture<T> asyncRequestBodyAndHeader(final String endpointUri, final Object body, final String header, final Object headerValue, final Class<T> type) { return asyncRequestBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue, type); } - public Future<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) { + public CompletableFuture<Object> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers) { return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers); } - public <T> Future<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) { + public <T> CompletableFuture<T> asyncRequestBodyAndHeaders(final String endpointUri, final Object body, final Map<String, Object> headers, final Class<T> type) { return asyncRequestBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers, type); } - public <T> T extractFutureBody(Future<Object> future, Class<T> type) { + public <T> T extractFutureBody(Future<?> future, Class<T> type) { return ExchangeHelper.extractFutureBody(camelContext, future, type); } - public <T> T extractFutureBody(Future<Object> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException { + public <T> T extractFutureBody(Future<?> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException { return ExchangeHelper.extractFutureBody(camelContext, future, timeout, unit, type); } - public Future<Object> asyncCallbackSendBody(String uri, Object body, Synchronization onCompletion) { + public CompletableFuture<Object> asyncCallbackSendBody(String uri, Object body, Synchronization onCompletion) { return asyncCallbackSendBody(resolveMandatoryEndpoint(uri), body, onCompletion); } - public Future<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion) { + public CompletableFuture<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion) { return asyncCallback(endpoint, ExchangePattern.InOnly, body, onCompletion); } - public Future<Object> asyncCallbackRequestBody(String uri, Object body, Synchronization onCompletion) { + public CompletableFuture<Object> asyncCallbackRequestBody(String uri, Object body, Synchronization onCompletion) { return asyncCallbackRequestBody(resolveMandatoryEndpoint(uri), body, onCompletion); } - public Future<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion) { + public CompletableFuture<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion) { return asyncCallback(endpoint, ExchangePattern.InOut, body, onCompletion); } - public Future<Exchange> asyncCallback(String uri, Exchange exchange, Synchronization onCompletion) { + public CompletableFuture<Exchange> asyncCallback(String uri, Exchange exchange, Synchronization onCompletion) { return asyncCallback(resolveMandatoryEndpoint(uri), exchange, onCompletion); } - public Future<Exchange> asyncCallback(String uri, Processor processor, Synchronization onCompletion) { + public CompletableFuture<Exchange> asyncCallback(String uri, Processor processor, Synchronization onCompletion) { return asyncCallback(resolveMandatoryEndpoint(uri), processor, onCompletion); } - public Future<Object> asyncRequestBody(final Endpoint endpoint, final Object body) { - Callable<Object> task = new Callable<Object>() { - public Object call() throws Exception { - return requestBody(endpoint, body); - } - }; - return getExecutorService().submit(task); + public CompletableFuture<Object> asyncRequestBody(final Endpoint endpoint, final Object body) { + return asyncRequestBody(endpoint, createSetBodyProcessor(body)); } - public <T> Future<T> asyncRequestBody(final Endpoint endpoint, final Object body, final Class<T> type) { - Callable<T> task = new Callable<T>() { - public T call() throws Exception { - return requestBody(endpoint, body, type); - } - }; - return getExecutorService().submit(task); + public <T> CompletableFuture<T> asyncRequestBody(Endpoint endpoint, Object body, Class<T> type) { + return asyncRequestBody(endpoint, createSetBodyProcessor(body), type); } - public Future<Object> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header, + public CompletableFuture<Object> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header, final Object headerValue) { - Callable<Object> task = new Callable<Object>() { - public Object call() throws Exception { - return requestBodyAndHeader(endpoint, body, header, headerValue); - } - }; - return getExecutorService().submit(task); + return asyncRequestBody(endpoint, createBodyAndHeaderProcessor(body, header, headerValue)); } - public <T> Future<T> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header, + protected <T> CompletableFuture<T> asyncRequestBody(final Endpoint endpoint, Processor processor, final Class<T> type) { + return asyncRequestBody(endpoint, processor, createConvertBodyProcessor(type)) + .thenApply(answer -> camelContext.getTypeConverter().convertTo(type, answer)); + } + + public <T> CompletableFuture<T> asyncRequestBodyAndHeader(final Endpoint endpoint, final Object body, final String header, final Object headerValue, final Class<T> type) { - Callable<T> task = new Callable<T>() { - public T call() throws Exception { - return requestBodyAndHeader(endpoint, body, header, headerValue, type); - } - }; - return getExecutorService().submit(task); + return asyncRequestBody(endpoint, createBodyAndHeaderProcessor(body, header, headerValue), type); } - public Future<Object> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body, + public CompletableFuture<Object> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body, final Map<String, Object> headers) { - Callable<Object> task = new Callable<Object>() { - public Object call() throws Exception { - return requestBodyAndHeaders(endpoint, body, headers); - } - }; - return getExecutorService().submit(task); + return asyncRequestBody(endpoint, createBodyAndHeaders(body, headers)); } - public <T> Future<T> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body, + public <T> CompletableFuture<T> asyncRequestBodyAndHeaders(final Endpoint endpoint, final Object body, final Map<String, Object> headers, final Class<T> type) { - Callable<T> task = new Callable<T>() { - public T call() throws Exception { - return requestBodyAndHeaders(endpoint, body, headers, type); - } - }; - return getExecutorService().submit(task); + return asyncRequestBody(endpoint, createBodyAndHeaders(body, headers), type); } - public Future<Exchange> asyncSend(final Endpoint endpoint, final Exchange exchange) { - Callable<Exchange> task = new Callable<Exchange>() { - public Exchange call() throws Exception { - return send(endpoint, exchange); - } - }; - return getExecutorService().submit(task); + public CompletableFuture<Exchange> asyncSend(final Endpoint endpoint, final Exchange exchange) { + return asyncSendExchange(endpoint, null, null, null, exchange); } - public Future<Exchange> asyncSend(final Endpoint endpoint, final Processor processor) { - Callable<Exchange> task = new Callable<Exchange>() { - public Exchange call() throws Exception { - return send(endpoint, processor); - } - }; - return getExecutorService().submit(task); + public CompletableFuture<Exchange> asyncSend(final Endpoint endpoint, final Processor processor) { + return asyncSend(endpoint, null, processor, null); } - public Future<Object> asyncSendBody(final Endpoint endpoint, final Object body) { - Callable<Object> task = new Callable<Object>() { - public Object call() throws Exception { - sendBody(endpoint, body); - // its InOnly, so no body to return - return null; - } - }; - return getExecutorService().submit(task); - } - - private Future<Object> asyncCallback(final Endpoint endpoint, final ExchangePattern pattern, final Object body, final Synchronization onCompletion) { - Callable<Object> task = new Callable<Object>() { - public Object call() throws Exception { - Exchange answer = send(endpoint, pattern, createSetBodyProcessor(body)); - - // invoke callback before returning answer - // as it allows callback to be used without unit of work invoking it - // and thus it works directly from a producer template as well, as opposed - // to the unit of work that is injected in routes - if (answer.isFailed()) { - onCompletion.onFailure(answer); - } else { - onCompletion.onComplete(answer); - } + public CompletableFuture<Object> asyncSendBody(final Endpoint endpoint, final Object body) { + return asyncSend(endpoint, createSetBodyProcessor(body)) + .thenApply(this::extractResultBody); + } - Object result = extractResultBody(answer, pattern); - if (pattern.isOutCapable()) { - return result; - } else { - // return null if not OUT capable - return null; - } - } - }; - return getExecutorService().submit(task); - } - - public Future<Exchange> asyncCallback(final Endpoint endpoint, final Exchange exchange, final Synchronization onCompletion) { - Callable<Exchange> task = new Callable<Exchange>() { - public Exchange call() throws Exception { - // process the exchange, any exception occurring will be caught and set on the exchange - send(endpoint, exchange); - - // invoke callback before returning answer - // as it allows callback to be used without unit of work invoking it - // and thus it works directly from a producer template as well, as opposed - // to the unit of work that is injected in routes - if (exchange.isFailed()) { - onCompletion.onFailure(exchange); - } else { - onCompletion.onComplete(exchange); - } - return exchange; - } - }; - return getExecutorService().submit(task); - } - - public Future<Exchange> asyncCallback(final Endpoint endpoint, final Processor processor, final Synchronization onCompletion) { - Callable<Exchange> task = new Callable<Exchange>() { - public Exchange call() throws Exception { - // process the exchange, any exception occurring will be caught and set on the exchange - Exchange answer = send(endpoint, processor); - - // invoke callback before returning answer - // as it allows callback to be used without unit of work invoking it - // and thus it works directly from a producer template as well, as opposed - // to the unit of work that is injected in routes - if (answer.isFailed()) { - onCompletion.onFailure(answer); - } else { - onCompletion.onComplete(answer); - } - return answer; - } - }; - return getExecutorService().submit(task); + public CompletableFuture<Exchange> asyncCallback(final Endpoint endpoint, final Exchange exchange, final Synchronization onCompletion) { + return asyncSend(endpoint, exchange).thenApply(createCompletionFunction(onCompletion)); + } + + public CompletableFuture<Exchange> asyncCallback(final Endpoint endpoint, final Processor processor, final Synchronization onCompletion) { + return asyncSend(endpoint, processor).thenApply(createCompletionFunction(onCompletion)); + } + + protected CompletableFuture<Object> asyncRequestBody(final Endpoint endpoint, Processor processor) { + return asyncRequestBody(endpoint, processor, (Processor) null); + } + + protected CompletableFuture<Object> asyncRequestBody(final Endpoint endpoint, Processor processor, Processor resultProcessor) { + return asyncRequest(endpoint, processor, resultProcessor) + .thenApply(e -> extractResultBody(e, ExchangePattern.InOut)); + } + + protected CompletableFuture<Exchange> asyncRequest(Endpoint endpoint, Processor processor, + Processor resultProcessor) { + return asyncSend(endpoint, ExchangePattern.InOut, processor, resultProcessor); + } + + protected CompletableFuture<Exchange> asyncSend( + Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor) { + return asyncSendExchange(endpoint, pattern, processor, resultProcessor, null); + } + + protected CompletableFuture<Exchange> asyncSendExchange( + Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor, + Exchange inExchange) { + CompletableFuture<Exchange> exchangeFuture = new CompletableFuture<>(); + getExecutorService().submit(() -> getProducerCache().asyncSendExchange(endpoint, pattern, processor, + resultProcessor, inExchange, exchangeFuture)); + return exchangeFuture; + } + + protected CompletableFuture<Object> asyncCallback(final Endpoint endpoint, final ExchangePattern pattern, + final Object body, final Synchronization onCompletion) { + return asyncSend(endpoint, pattern, createSetBodyProcessor(body), null) + .thenApply(createCompletionFunction(onCompletion)) + .thenApply(answer -> { + Object result = extractResultBody(answer, pattern); + if (pattern.isOutCapable()) { + return result; + } else { + // return null if not OUT capable + return null; + } + }); } private ProducerCache getProducerCache() { @@ -756,7 +705,11 @@ public class DefaultProducerTemplate extends ServiceSupport implements ProducerT if (executor != null) { return executor; } - executor = camelContext.getExecutorServiceManager().newDefaultThreadPool(this, "ProducerTemplate"); + if (synchronous) { + executor = new SynchronousExecutorService(); + } else { + executor = camelContext.getExecutorServiceManager().newDefaultThreadPool(this, "ProducerTemplate"); + } } ObjectHelper.notNull(executor, "ExecutorService"); http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java b/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java new file mode 100644 index 0000000..734551f --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/EventNotifierCallback.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.util.EventHelper; +import org.apache.camel.util.StopWatch; + +/** + * Helper class to notify on exchange sending events in async engine + */ +class EventNotifierCallback implements AsyncCallback { + private final AsyncCallback originalCallback; + private final StopWatch watch; + private final Exchange exchange; + private final Endpoint endpoint; + + public EventNotifierCallback(AsyncCallback originalCallback, Exchange exchange, + Endpoint endpoint) { + this.originalCallback = originalCallback; + this.watch = new StopWatch(); + this.exchange = exchange; + this.endpoint = endpoint; + EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); + } + + @Override + public void done(boolean doneSync) { + try { + originalCallback.done(doneSync); + } finally { + long timeTaken = watch.stop(); + EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java index ebf641b..b475d29 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java @@ -19,6 +19,7 @@ package org.apache.camel.impl; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; @@ -260,6 +261,83 @@ public class ProducerCache extends ServiceSupport { } /** + * Asynchronously sends an exchange to an endpoint using a supplied + * {@link Processor} to populate the exchange + * <p> + * This method will <b>neither</b> throw an exception <b>nor</b> complete future exceptionally. + * If processing of the given Exchange failed then the exception is stored on the return Exchange + * + * @param endpoint the endpoint to send the exchange to + * @param pattern the message {@link ExchangePattern} such as + * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} + * @param processor the transformer used to populate the new exchange + * @param resultProcessor a processor to process the exchange when the send is complete. + * @param future the preexisting future to complete when processing is done or null if to create new one + * @return future that completes with exchange when processing is done. Either passed into future parameter + * or new one if parameter was null + */ + public CompletableFuture<Exchange> asyncSend(Endpoint endpoint, ExchangePattern pattern, Processor processor, Processor resultProcessor, + CompletableFuture<Exchange> future) { + return asyncSendExchange(endpoint, pattern, processor, resultProcessor, null, future); + } + + /** + * Asynchronously sends an exchange to an endpoint using a supplied + * {@link Processor} to populate the exchange + * <p> + * This method will <b>neither</b> throw an exception <b>nor</b> complete future exceptionally. + * If processing of the given Exchange failed then the exception is stored on the return Exchange + * + * @param endpoint the endpoint to send the exchange to + * @param pattern the message {@link ExchangePattern} such as + * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} + * @param processor the transformer used to populate the new exchange + * @param resultProcessor a processor to process the exchange when the send is complete. + * @param exchange an exchange to use in processing. Exchange will be created if parameter is null. + * @param future the preexisting future to complete when processing is done or null if to create new one + * @return future that completes with exchange when processing is done. Either passed into future parameter + * or new one if parameter was null + */ + public CompletableFuture<Exchange> asyncSendExchange(final Endpoint endpoint, ExchangePattern pattern, + final Processor processor, final Processor resultProcessor, Exchange exchange, + CompletableFuture<Exchange> future) { + AsyncCallbackToCompletableFutureAdapter<Exchange> futureAdapter = new AsyncCallbackToCompletableFutureAdapter<>(future, exchange); + doInAsyncProducer(endpoint, exchange, pattern, futureAdapter, + (producer, asyncProducer, innerExchange, exchangePattern, producerCallback) -> { + if (innerExchange == null) { + innerExchange = pattern != null ? + producer.getEndpoint().createExchange(pattern) : + producer.getEndpoint().createExchange(); + futureAdapter.setResult(innerExchange); + } + + if (processor != null) { + // lets populate using the processor callback + AsyncProcessor asyncProcessor = AsyncProcessorConverterHelper.convert(processor); + try { + final Exchange finalExchange = innerExchange; + asyncProcessor.process(innerExchange, new AsyncCallback() { + @Override + public void done(boolean doneSync) { + asyncDispatchExchange(endpoint, producer, resultProcessor, finalExchange, + producerCallback); + } + }); + return false; + } catch (Exception e) { + // populate failed so return + innerExchange.setException(e); + producerCallback.done(true); + return true; + } + } + + return asyncDispatchExchange(endpoint, producer, resultProcessor, innerExchange, producerCallback); + }); + return futureAdapter.getFuture(); + } + + /** * Sends an exchange to an endpoint using a supplied callback, using the synchronous processing. * <p/> * If an exception was thrown during processing, it would be set on the given Exchange @@ -396,6 +474,31 @@ public class ProducerCache extends ServiceSupport { } } + protected boolean asyncDispatchExchange(final Endpoint endpoint, Producer producer, + final Processor resultProcessor, Exchange exchange, AsyncCallback callback) { + // now lets dispatch + LOG.debug(">>>> {} {}", endpoint, exchange); + + // set property which endpoint we send to + exchange.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri()); + + // send the exchange using the processor + try { + if (eventNotifierEnabled) { + callback = new EventNotifierCallback(callback, exchange, endpoint); + } + CamelInternalProcessor internal = prepareInternalProcessor(producer, resultProcessor); + + return internal.process(exchange, callback); + } catch (Throwable e) { + // ensure exceptions is caught and set on the exchange + exchange.setException(e); + callback.done(true); + return true; + } + + } + protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern, final Processor processor, final Processor resultProcessor, Exchange exchange) { return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>() { @@ -429,21 +532,7 @@ public class ProducerCache extends ServiceSupport { EventHelper.notifyExchangeSending(exchange.getContext(), exchange, endpoint); } - // if we have a result processor then wrap in pipeline to execute both of them in sequence - Processor target; - if (resultProcessor != null) { - List<Processor> processors = new ArrayList<Processor>(2); - processors.add(producer); - processors.add(resultProcessor); - target = Pipeline.newInstance(getCamelContext(), processors); - } else { - target = producer; - } - - // wrap in unit of work - CamelInternalProcessor internal = new CamelInternalProcessor(target); - internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); - + CamelInternalProcessor internal = prepareInternalProcessor(producer, resultProcessor); internal.process(exchange); } catch (Throwable e) { // ensure exceptions is caught and set on the exchange @@ -460,6 +549,24 @@ public class ProducerCache extends ServiceSupport { }); } + protected CamelInternalProcessor prepareInternalProcessor(Producer producer, Processor resultProcessor) { + // if we have a result processor then wrap in pipeline to execute both of them in sequence + Processor target; + if (resultProcessor != null) { + List<Processor> processors = new ArrayList<Processor>(2); + processors.add(producer); + processors.add(resultProcessor); + target = Pipeline.newInstance(getCamelContext(), processors); + } else { + target = producer; + } + + // wrap in unit of work + CamelInternalProcessor internal = new CamelInternalProcessor(target); + internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); + return internal; + } + protected synchronized Producer doGetProducer(Endpoint endpoint, boolean pooled) { String key = endpoint.getEndpointUri(); Producer answer = producers.get(key); @@ -648,4 +755,5 @@ public class ProducerCache extends ServiceSupport { public String toString() { return "ProducerCache for source: " + source + ", capacity: " + getCapacity(); } + } http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java index 2c92cc5..eca7f0f 100644 --- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java @@ -735,7 +735,7 @@ public final class ExchangeHelper { * @return the result body, can be <tt>null</tt>. * @throws CamelExecutionException is thrown if the processing of the exchange failed */ - public static <T> T extractFutureBody(CamelContext context, Future<Object> future, Class<T> type) { + public static <T> T extractFutureBody(CamelContext context, Future<?> future, Class<T> type) { try { return doExtractFutureBody(context, future.get(), type); } catch (InterruptedException e) { @@ -765,7 +765,7 @@ public final class ExchangeHelper { * @throws CamelExecutionException is thrown if the processing of the exchange failed * @throws java.util.concurrent.TimeoutException is thrown if a timeout triggered */ - public static <T> T extractFutureBody(CamelContext context, Future<Object> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException { + public static <T> T extractFutureBody(CamelContext context, Future<?> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException { try { if (timeout > 0) { return doExtractFutureBody(context, future.get(timeout, unit), type); http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java index 24f560e..44e921f 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java +++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java @@ -570,11 +570,11 @@ public class DefaultProducerTemplateAsyncTest extends ContextTestSupport { @Override public void configure() throws Exception { from("direct:start") - .delay(400) + .delay(400).asyncDelayed() .transform(body().append(" World")).to("mock:result"); from("direct:error") - .delay(400) + .delay(400).asyncDelayed() .process(new Processor() { public void process(Exchange exchange) throws Exception { throw new IllegalArgumentException("Damn forced by unit test"); http://git-wip-us.apache.org/repos/asf/camel/blob/51a80684/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java new file mode 100644 index 0000000..f218a6e --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateNonBlockingAsyncTest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.Exchange; +import org.junit.Assert; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * @version + */ +public class DefaultProducerTemplateNonBlockingAsyncTest extends DefaultProducerTemplateAsyncTest{ + @Override + protected void setUp() throws Exception { + super.setUp(); + template.stop(); + template.setSynchronous(true); + template.start(); + } + + public void testRunningInSameThread() throws ExecutionException, InterruptedException { + Thread originalThread = Thread.currentThread(); + CompletableFuture<Exchange> future = template.asyncSend("direct:echo", e -> { + Assert.assertSame(originalThread, Thread.currentThread()); + e.getIn().setBody("Hi"); + }); + Assert.assertEquals("HiHi", template.extractFutureBody(future, String.class)); + } +}