This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch sandbox/camel-3.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c698faeca8fb118f1a281b84dec269d367d834dd
Author: Guillaume Nodet <[email protected]>
AuthorDate: Mon Oct 1 17:21:23 2018 +0200

    Make InterceptSendToEndpointProcessor asynchronous
---
 .../apache/camel/impl/InterceptSendToEndpoint.java |  2 +-
 .../impl/InterceptSendToEndpointProcessor.java     | 41 ++++++++++------------
 2 files changed, 19 insertions(+), 24 deletions(-)

diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java 
b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
index 3b8a0af..044d538 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpoint.java
@@ -94,7 +94,7 @@ public class InterceptSendToEndpoint implements Endpoint, 
ShutdownableService {
 
     @Override
     public AsyncProducer createAsyncProducer() throws Exception {
-        Producer producer = delegate.createProducer();
+        AsyncProducer producer = delegate.createAsyncProducer();
         return new InterceptSendToEndpointProcessor(this, delegate, producer, 
skip);
     }
 
diff --git 
a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
index 5ed03e5..07f245f 100644
--- 
a/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/impl/InterceptSendToEndpointProcessor.java
@@ -18,9 +18,11 @@ package org.apache.camel.impl;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.AsyncProducer;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Producer;
+import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,10 +37,10 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
 
     private final InterceptSendToEndpoint endpoint;
     private final Endpoint delegate;
-    private final Producer producer;
+    private final AsyncProducer producer;
     private final boolean skip;
 
-    public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, 
Endpoint delegate, Producer producer, boolean skip) throws Exception {
+    public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint, 
Endpoint delegate, AsyncProducer producer, boolean skip) throws Exception {
         super(delegate);
         this.endpoint = endpoint;
         this.delegate = delegate;
@@ -61,18 +63,19 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
 
         if (endpoint.getDetour() != null) {
             // detour the exchange using synchronous processing
-            try {
-                endpoint.getDetour().process(exchange);
-            } catch (Exception e) {
-                exchange.setException(e);
-            }
+            AsyncProcessor detour = 
AsyncProcessorConverterHelper.convert(endpoint.getDetour());
+            return detour.process(exchange, s -> callback(exchange, callback, 
s));
         }
 
+        return callback(exchange, callback, true);
+    }
+
+    private boolean callback(Exchange exchange, AsyncCallback callback, 
boolean doneSync) {
         // Decide whether to continue or not; similar logic to the Pipeline
         // check for error if so we should break out
         if (!continueProcessing(exchange, "skip sending to original intended 
destination: " + getEndpoint(), log)) {
-            callback.done(true);
-            return true;
+            callback.done(doneSync);
+            return doneSync;
         }
 
         // determine if we should skip or not
@@ -92,24 +95,16 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
             }
 
             // route to original destination leveraging the asynchronous 
routing engine if possible
-            if (producer instanceof AsyncProcessor) {
-                AsyncProcessor async = (AsyncProcessor) producer;
-                return async.process(exchange, callback);
-            } else {
-                try {
-                    producer.process(exchange);
-                } catch (Exception e) {
-                    exchange.setException(e);
-                }
-                callback.done(true);
-                return true;
-            }
+            boolean s = producer.process(exchange, ds -> {
+                callback.done(doneSync && ds);
+            });
+            return doneSync && s;
         } else {
             if (log.isDebugEnabled()) {
                 log.debug("Stop() means skip sending exchange to original 
intended destination: {} for exchange: {}", getEndpoint(), exchange);
             }
-            callback.done(true);
-            return true;
+            callback.done(doneSync);
+            return doneSync;
         }
     }
 

Reply via email to