This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 86d23b581d6ef84c29b3faeefeb945eab38efe5a
Author: Claus Ibsen <[email protected]>
AuthorDate: Thu Jun 13 10:44:35 2019 +0200

    CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different 
reactive engines
---
 .../org/apache/camel/spi/ReactiveExecutor.java     |  2 +-
 .../engine/DefaultAsyncProcessorAwaitManager.java  |  1 -
 .../camel/impl/engine/DefaultReactiveExecutor.java |  9 +++++++
 .../processor/SharedCamelInternalProcessor.java    |  8 +++----
 .../reactive/vertx/VertXReactiveExecutor.java      | 28 +++++++++++++++++++---
 5 files changed, 39 insertions(+), 9 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index f61b012..2a4eb9f 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -75,7 +75,7 @@ public interface ReactiveExecutor {
     void scheduleSync(Runnable runnable, String description);
 
     /**
-     * Executes the next task
+     * Executes the next task (if supported by the reactive executor 
implementation)
      *
      * @return true if a task was executed or false if no more pending tasks
      */
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
index 8087942..d0e571c 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java
@@ -71,7 +71,6 @@ public class DefaultAsyncProcessorAwaitManager extends 
ServiceSupport implements
      *
      * @param processor the processor
      * @param exchange  the exchange
-     * @throws Exception can be thrown if waiting is interrupted
      */
     public void process(final AsyncProcessor processor, final Exchange 
exchange) {
         CountDownLatch latch = new CountDownLatch(1);
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index 350e189..17f69e7 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -144,6 +144,9 @@ public class DefaultReactiveExecutor extends ServiceSupport 
implements ReactiveE
         }
 
         void schedule(Runnable runnable, boolean first, boolean main, boolean 
sync) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Schedule [first={}, main={}, sync={}]: {}", first, 
main, sync, runnable);
+            }
             if (main) {
                 if (!queue.isEmpty()) {
                     if (back == null) {
@@ -179,6 +182,9 @@ public class DefaultReactiveExecutor extends ServiceSupport 
implements ReactiveE
                         try {
                             executor.pendingTasks.decrementAndGet();
 //                            thread.setName(name + " - " + polled.toString());
+                            if (LOG.isTraceEnabled()) {
+                                LOG.trace("Running: {}", runnable);
+                            }
                             polled.run();
                         } catch (Throwable t) {
                             LOG.warn("Error executing reactive work due to " + 
t.getMessage() + ". This exception is ignored.", t);
@@ -204,6 +210,9 @@ public class DefaultReactiveExecutor extends ServiceSupport 
implements ReactiveE
             try {
                 executor.pendingTasks.decrementAndGet();
                 thread.setName(name + " - " + polled.toString());
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Running: {}", polled);
+                }
                 polled.run();
             } catch (Throwable t) {
                 // should not happen
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index 1a5e92d..cd4405b 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -107,7 +107,7 @@ public class SharedCamelInternalProcessor {
     /**
      * Asynchronous API
      */
-    public boolean process(Exchange exchange, AsyncCallback ocallback, 
AsyncProcessor processor, Processor resultProcessor) {
+    public boolean process(Exchange exchange, AsyncCallback originalCallback, 
AsyncProcessor processor, Processor resultProcessor) {
         // ----------------------------------------------------------
         // CAMEL END USER - READ ME FOR DEBUGGING TIPS
         // ----------------------------------------------------------
@@ -124,7 +124,7 @@ public class SharedCamelInternalProcessor {
 
         if (processor == null || !continueProcessing(exchange, processor)) {
             // no processor or we should not continue then we are done
-            ocallback.done(true);
+            originalCallback.done(true);
             return true;
         }
 
@@ -138,13 +138,13 @@ public class SharedCamelInternalProcessor {
                 states[i] = state;
             } catch (Throwable e) {
                 exchange.setException(e);
-                ocallback.done(true);
+                originalCallback.done(true);
                 return true;
             }
         }
 
         // create internal callback which will execute the advices in reverse 
order when done
-        AsyncCallback callback = new InternalCallback(states, exchange, 
ocallback, resultProcessor);
+        AsyncCallback callback = new InternalCallback(states, exchange, 
originalCallback, resultProcessor);
 
         // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it 
from Camel 3.0
         Object synchronous = 
exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC);
diff --git 
a/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
 
b/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
index 47bd7b4..922b4b5 100644
--- 
a/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
+++ 
b/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
@@ -39,31 +39,53 @@ public class VertXReactiveExecutor extends ServiceSupport 
implements ReactiveExe
     @Override
     public void schedule(Runnable runnable, String description) {
         LOG.trace("schedule: {}", runnable);
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
         vertx.nettyEventLoopGroup().execute(runnable);
     }
 
     @Override
     public void scheduleMain(Runnable runnable, String description) {
         LOG.trace("scheduleMain: {}", runnable);
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
         vertx.nettyEventLoopGroup().execute(runnable);
     }
 
     @Override
     public void scheduleSync(Runnable runnable, String description) {
         LOG.trace("scheduleSync: {}", runnable);
+        if (description != null) {
+            runnable = describe(runnable, description);
+        }
+        final Runnable task = runnable;
         vertx.executeBlocking(future -> {
-            runnable.run();
+            task.run();
             future.complete();
         }, res -> {});
     }
 
     @Override
     public boolean executeFromQueue() {
-        LOG.trace("executeFromQueue");
-        // TODO: not implemented
+        // not supported so return false
         return false;
     }
 
+    private static Runnable describe(Runnable runnable, String description) {
+        return new Runnable() {
+            @Override
+            public void run() {
+                runnable.run();
+            }
+            @Override
+            public String toString() {
+                return description;
+            }
+        };
+    }
+
     @Override
     protected void doStart() throws Exception {
         LOG.debug("Starting VertX");

Reply via email to