This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 70ec443 CAMEL-14354: ReactiveExecutor should avoid unnessasary object
allocations for human descriptions for processors that creates many objects for
toString representation of their processor chain.
70ec443 is described below
commit 70ec443c612b04fa67dd66dfce2770ebe5da8dd8
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Jan 3 16:10:48 2020 +0100
CAMEL-14354: ReactiveExecutor should avoid unnessasary object allocations
for human descriptions for processors that creates many objects for toString
representation of their processor chain.
---
.../reactive/vertx/VertXReactiveExecutor.java | 29 ++++++++++++++++------
.../org/apache/camel/spi/ReactiveExecutor.java | 12 +++------
.../camel/impl/engine/DefaultReactiveExecutor.java | 21 +++++++++++++---
.../camel/processor/CamelInternalProcessor.java | 2 +-
.../apache/camel/processor/MulticastProcessor.java | 2 +-
.../java/org/apache/camel/processor/Pipeline.java | 9 +++----
.../processor/SharedCamelInternalProcessor.java | 2 +-
7 files changed, 49 insertions(+), 28 deletions(-)
diff --git
a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
index 64f78d2..9465fea 100644
---
a/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
+++
b/components/camel-reactive-executor-vertx/src/main/java/org/apache/camel/reactive/vertx/VertXReactiveExecutor.java
@@ -49,29 +49,36 @@ public class VertXReactiveExecutor extends ServiceSupport
implements ReactiveExe
}
@Override
- public void schedule(Runnable runnable, String description) {
+ public void schedule(Runnable runnable) {
LOG.trace("schedule: {}", runnable);
+ vertx.nettyEventLoopGroup().execute(runnable);
+ }
+
+ @Override
+ public void schedule(Runnable runnable, String description) {
if (description != null) {
runnable = describe(runnable, description);
}
+ schedule(runnable);
+ }
+
+ @Override
+ public void scheduleMain(Runnable runnable) {
+ LOG.trace("scheduleMain: {}", runnable);
vertx.nettyEventLoopGroup().execute(runnable);
}
@Override
public void scheduleMain(Runnable runnable, String description) {
- LOG.trace("scheduleMain: {}", runnable);
if (description != null) {
runnable = describe(runnable, description);
}
- vertx.nettyEventLoopGroup().execute(runnable);
+ scheduleMain(runnable);
}
@Override
- public void scheduleSync(Runnable runnable, String description) {
+ public void scheduleSync(Runnable runnable) {
LOG.trace("scheduleSync: {}", runnable);
- if (description != null) {
- runnable = describe(runnable, description);
- }
final Runnable task = runnable;
vertx.executeBlocking(future -> {
task.run();
@@ -80,6 +87,14 @@ public class VertXReactiveExecutor extends ServiceSupport
implements ReactiveExe
}
@Override
+ public void scheduleSync(Runnable runnable, String description) {
+ if (description != null) {
+ runnable = describe(runnable, description);
+ }
+ scheduleSync(runnable);
+ }
+
+ @Override
public boolean executeFromQueue() {
// not supported so return false
return false;
diff --git
a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
index 2a4eb9f..fc3c4de 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ReactiveExecutor.java
@@ -28,9 +28,7 @@ public interface ReactiveExecutor {
*
* @param runnable the task
*/
- default void schedule(Runnable runnable) {
- schedule(runnable, null);
- }
+ void schedule(Runnable runnable);
/**
* Schedules the task to be run
@@ -45,9 +43,7 @@ public interface ReactiveExecutor {
*
* @param runnable the task
*/
- default void scheduleMain(Runnable runnable) {
- scheduleMain(runnable, null);
- }
+ void scheduleMain(Runnable runnable);
/**
* Schedules the task to be prioritized and run asap
@@ -62,9 +58,7 @@ public interface ReactiveExecutor {
*
* @param runnable the task
*/
- default void scheduleSync(Runnable runnable) {
- scheduleSync(runnable, null);
- }
+ void scheduleSync(Runnable runnable);
/**
* Schedules the task to run synchronously
diff --git
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
index c441e34..7be0a46 100644
---
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
+++
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultReactiveExecutor.java
@@ -52,11 +52,26 @@ public class DefaultReactiveExecutor extends ServiceSupport
implements ReactiveE
private final AtomicLong pendingTasks = new AtomicLong();
@Override
+ public void schedule(Runnable runnable) {
+ workers.get().schedule(runnable, true, false, false);
+ }
+
+ @Override
+ public void scheduleMain(Runnable runnable) {
+ workers.get().schedule(runnable, true, true, false);
+ }
+
+ @Override
+ public void scheduleSync(Runnable runnable) {
+ workers.get().schedule(runnable, false, true, true);
+ }
+
+ @Override
public void scheduleMain(Runnable runnable, String description) {
if (description != null) {
runnable = describe(runnable, description);
}
- workers.get().schedule(runnable, true, true, false);
+ scheduleMain(runnable);
}
@Override
@@ -64,7 +79,7 @@ public class DefaultReactiveExecutor extends ServiceSupport
implements ReactiveE
if (description != null) {
runnable = describe(runnable, description);
}
- workers.get().schedule(runnable, true, false, false);
+ schedule(runnable);
}
@Override
@@ -72,7 +87,7 @@ public class DefaultReactiveExecutor extends ServiceSupport
implements ReactiveE
if (description != null) {
runnable = describe(runnable, description);
}
- workers.get().schedule(runnable, false, true, true);
+ scheduleSync(runnable);
}
@Override
diff --git
a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 5d5af65..7f273cc 100644
---
a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++
b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -240,7 +240,7 @@ public class CamelInternalProcessor extends
DelegateAsyncProcessor {
log.trace("Exchange processed and is continued routed
asynchronously for exchangeId: {} -> {}",
exchange.getExchangeId(), exchange);
}
- }, "CamelInternalProcessor - UnitOfWork - afterProcess - " +
processor + " - " + exchange.getExchangeId());
+ });
return false;
}
}
diff --git
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index b0d49f0..7546dbc 100644
---
a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++
b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -260,7 +260,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport implements Navigat
if (isParallelProcessing()) {
executorService.submit(() ->
camelContext.getReactiveExecutor().schedule(runnable));
} else {
- camelContext.getReactiveExecutor().schedule(runnable, "Multicast
next step");
+ camelContext.getReactiveExecutor().schedule(runnable);
}
}
diff --git
a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
index fcc9baa..6ad6f8f 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Pipeline.java
@@ -83,11 +83,9 @@ public class Pipeline extends AsyncProcessorSupport
implements Navigate<Processo
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
if (exchange.isTransacted()) {
- camelContext.getReactiveExecutor().scheduleSync(() ->
Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
- "Step[" + exchange.getExchangeId() + "," + Pipeline.this +
"]");
+ camelContext.getReactiveExecutor().scheduleSync(() ->
Pipeline.this.doProcess(exchange, callback, processors.iterator(), true));
} else {
- camelContext.getReactiveExecutor().scheduleMain(() ->
Pipeline.this.doProcess(exchange, callback, processors.iterator(), true),
- "Step[" + exchange.getExchangeId() + "," + Pipeline.this +
"]");
+ camelContext.getReactiveExecutor().scheduleMain(() ->
Pipeline.this.doProcess(exchange, callback, processors.iterator(), true));
}
return false;
}
@@ -103,8 +101,7 @@ public class Pipeline extends AsyncProcessorSupport
implements Navigate<Processo
AsyncProcessor processor = processors.next();
processor.process(exchange, doneSync ->
- camelContext.getReactiveExecutor().schedule(() ->
doProcess(exchange, callback, processors, false),
- "Step[" + exchange.getExchangeId() + "," +
Pipeline.this + "]"));
+ camelContext.getReactiveExecutor().schedule(() ->
doProcess(exchange, callback, processors, false)));
} else {
ExchangeHelper.copyResults(exchange, exchange);
diff --git
a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
index cd4405b..4eaa6a7 100644
---
a/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
+++
b/core/camel-base/src/main/java/org/apache/camel/processor/SharedCamelInternalProcessor.java
@@ -201,7 +201,7 @@ public class SharedCamelInternalProcessor {
LOG.trace("Exchange processed and is continued routed
asynchronously for exchangeId: {} -> {}",
exchange.getExchangeId(), exchange);
}
- }, "SharedCamelInternalProcessor - UnitOfWork - afterProcess - " +
processor + " - " + exchange.getExchangeId());
+ });
return sync;
}
}