This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 86d23b581d6ef84c29b3faeefeb945eab38efe5a Author: Claus Ibsen <[email protected]> AuthorDate: Thu Jun 13 10:44:35 2019 +0200 CAMEL-13636: camel3 - SPI for ReactiveHelper so we can plugin different reactive engines --- .../org/apache/camel/spi/ReactiveExecutor.java | 2 +- .../engine/DefaultAsyncProcessorAwaitManager.java | 1 - .../camel/impl/engine/DefaultReactiveExecutor.java | 9 +++++++ .../processor/SharedCamelInternalProcessor.java | 8 +++---- .../reactive/vertx/VertXReactiveExecutor.java | 28 +++++++++++++++++++--- 5 files changed, 39 insertions(+), 9 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java index f61b012..2a4eb9f 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java @@ -75,7 +75,7 @@ public interface ReactiveExecutor { void scheduleSync(Runnable runnable, String description); /** - * Executes the next task + * Executes the next task (if supported by the reactive executor implementation) * * @return true if a task was executed or false if no more pending tasks */ diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java index 8087942..d0e571c 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultAsyncProcessorAwaitManager.java @@ -71,7 +71,6 @@ public class DefaultAsyncProcessorAwaitManager extends ServiceSupport implements * * @param processor the processor * @param exchange the exchange - * @throws Exception can be thrown if waiting is interrupted */ public void process(final AsyncProcessor processor, final Exchange exchange) { CountDownLatch latch = new CountDownLatch(1); diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java index 350e189..17f69e7 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java @@ -144,6 +144,9 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE } void schedule(Runnable runnable, boolean first, boolean main, boolean sync) { + if (LOG.isTraceEnabled()) { + LOG.trace("Schedule [first={}, main={}, sync={}]: {}", first, main, sync, runnable); + } if (main) { if (!queue.isEmpty()) { if (back == null) { @@ -179,6 +182,9 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE try { executor.pendingTasks.decrementAndGet(); // thread.setName(name + " - " + polled.toString()); + if (LOG.isTraceEnabled()) { + LOG.trace("Running: {}", runnable); + } polled.run(); } catch (Throwable t) { LOG.warn("Error executing reactive work due to " + t.getMessage() + ". This exception is ignored.", t); @@ -204,6 +210,9 @@ public class DefaultReactiveExecutor extends ServiceSupport implements ReactiveE try { executor.pendingTasks.decrementAndGet(); thread.setName(name + " - " + polled.toString()); + if (LOG.isTraceEnabled()) { + LOG.trace("Running: {}", polled); + } polled.run(); } catch (Throwable t) { // should not happen diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java index 1a5e92d..cd4405b 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java @@ -107,7 +107,7 @@ public class SharedCamelInternalProcessor { /** * Asynchronous API */ - public boolean process(Exchange exchange, AsyncCallback ocallback, AsyncProcessor processor, Processor resultProcessor) { + public boolean process(Exchange exchange, AsyncCallback originalCallback, AsyncProcessor processor, Processor resultProcessor) { // ---------------------------------------------------------- // CAMEL END USER - READ ME FOR DEBUGGING TIPS // ---------------------------------------------------------- @@ -124,7 +124,7 @@ public class SharedCamelInternalProcessor { if (processor == null || !continueProcessing(exchange, processor)) { // no processor or we should not continue then we are done - ocallback.done(true); + originalCallback.done(true); return true; } @@ -138,13 +138,13 @@ public class SharedCamelInternalProcessor { states[i] = state; } catch (Throwable e) { exchange.setException(e); - ocallback.done(true); + originalCallback.done(true); return true; } } // create internal callback which will execute the advices in reverse order when done - AsyncCallback callback = new InternalCallback(states, exchange, ocallback, resultProcessor); + AsyncCallback callback = new InternalCallback(states, exchange, originalCallback, resultProcessor); // UNIT_OF_WORK_PROCESS_SYNC is @deprecated and we should remove it from Camel 3.0 Object synchronous = exchange.removeProperty(Exchange.UNIT_OF_WORK_PROCESS_SYNC); diff --git a/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java b/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java index 47bd7b4..922b4b5 100644 --- a/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java +++ b/core/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java @@ -39,31 +39,53 @@ public class VertXReactiveExecutor extends ServiceSupport implements ReactiveExe @Override public void schedule(Runnable runnable, String description) { LOG.trace("schedule: {}", runnable); + if (description != null) { + runnable = describe(runnable, description); + } vertx.nettyEventLoopGroup().execute(runnable); } @Override public void scheduleMain(Runnable runnable, String description) { LOG.trace("scheduleMain: {}", runnable); + if (description != null) { + runnable = describe(runnable, description); + } vertx.nettyEventLoopGroup().execute(runnable); } @Override public void scheduleSync(Runnable runnable, String description) { LOG.trace("scheduleSync: {}", runnable); + if (description != null) { + runnable = describe(runnable, description); + } + final Runnable task = runnable; vertx.executeBlocking(future -> { - runnable.run(); + task.run(); future.complete(); }, res -> {}); } @Override public boolean executeFromQueue() { - LOG.trace("executeFromQueue"); - // TODO: not implemented + // not supported so return false return false; } + private static Runnable describe(Runnable runnable, String description) { + return new Runnable() { + @Override + public void run() { + runnable.run(); + } + @Override + public String toString() { + return description; + } + }; + } + @Override protected void doStart() throws Exception { LOG.debug("Starting VertX");
