This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 0c77d3b CAMEL-16446: camel-core - Optimize EIPs to eager load needed
classes
0c77d3b is described below
commit 0c77d3be900427f77baba0bdc9b079b2eb0541da
Author: Claus Ibsen <[email protected]>
AuthorDate: Sat Apr 3 12:13:23 2021 +0200
CAMEL-16446: camel-core - Optimize EIPs to eager load needed classes
---
.../apache/camel/processor/MulticastProcessor.java | 53 +++++++++++++++++++---
.../camel/processor/RecipientListProcessor.java | 4 ++
.../java/org/apache/camel/processor/Splitter.java | 24 ++++++++++
.../apache/camel/support/cache/ServicePool.java | 41 +++++++++++++----
4 files changed, 106 insertions(+), 16 deletions(-)
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 71d00ab..572fea9 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -52,6 +52,8 @@ import org.apache.camel.Route;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.StreamCache;
import org.apache.camel.Traceable;
+import org.apache.camel.processor.aggregate.ShareUnitOfWorkAggregationStrategy;
+import org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy;
import org.apache.camel.processor.errorhandler.ErrorHandlerSupport;
import org.apache.camel.spi.ErrorHandlerAware;
import org.apache.camel.spi.IdAware;
@@ -150,6 +152,14 @@ public class MulticastProcessor extends
AsyncProcessorSupport
}
+ private final class Scheduler implements Executor {
+
+ @Override
+ public void execute(Runnable command) {
+ schedule(command);
+ }
+ }
+
protected final Processor onPrepare;
private final CamelContext camelContext;
private final InternalProcessorFactory internalProcessorFactory;
@@ -167,6 +177,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport
private final boolean stopOnException;
private final ExecutorService executorService;
private final boolean shutdownExecutorService;
+ private final Scheduler scheduler = new Scheduler();
private ExecutorService aggregateExecutorService;
private boolean shutdownAggregateExecutorService;
private final long timeout;
@@ -263,6 +274,23 @@ public class MulticastProcessor extends
AsyncProcessorSupport
}
@Override
+ protected void doBuild() throws Exception {
+ // eager load classes
+ Object dummy = new MulticastReactiveTask();
+ LOG.trace("Loaded {}", dummy.getClass().getName());
+ Object dummy2 = new MulticastTransactedTask();
+ LOG.trace("Loaded {}", dummy2.getClass().getName());
+ Object dummy3 = new UseOriginalAggregationStrategy();
+ LOG.trace("Loaded {}", dummy3.getClass().getName());
+ if (isShareUnitOfWork()) {
+ Object dummy4 = new ShareUnitOfWorkAggregationStrategy(null);
+ LOG.trace("Loaded {}", dummy4.getClass().getName());
+ }
+ Object dummy5 = new DefaultProcessorExchangePair(0, null, null, null);
+ LOG.trace("Loaded {}", dummy5.getClass().getName());
+ }
+
+ @Override
protected void doInit() throws Exception {
if (route != null) {
Exchange exchange = new DefaultExchange(getCamelContext());
@@ -356,23 +384,30 @@ public class MulticastProcessor extends
AsyncProcessorSupport
final Iterable<ProcessorExchangePair> pairs;
final AsyncCallback callback;
final Iterator<ProcessorExchangePair> iterator;
- final ReentrantLock lock;
- final AsyncCompletionService<Exchange> completion;
- final AtomicReference<Exchange> result;
+ final ReentrantLock lock = new ReentrantLock();
+ final AsyncCompletionService<Exchange> completion
+ = new AsyncCompletionService<>(scheduler, !isStreaming(),
lock);
+ final AtomicReference<Exchange> result = new AtomicReference<>();
final AtomicInteger nbExchangeSent = new AtomicInteger();
final AtomicInteger nbAggregated = new AtomicInteger();
final AtomicBoolean allSent = new AtomicBoolean();
final AtomicBoolean done = new AtomicBoolean();
final Map<String, String> mdc;
+ private MulticastTask() {
+ // used for eager classloading
+ this.original = null;
+ this.pairs = null;
+ this.callback = null;
+ this.iterator = null;
+ this.mdc = null;
+ }
+
MulticastTask(Exchange original, Iterable<ProcessorExchangePair>
pairs, AsyncCallback callback) {
this.original = original;
this.pairs = pairs;
this.callback = callback;
this.iterator = pairs.iterator();
- this.lock = new ReentrantLock();
- this.completion = new
AsyncCompletionService<>(MulticastProcessor.this::schedule, !isStreaming(),
lock);
- this.result = new AtomicReference<>();
if (timeout > 0) {
schedule(aggregateExecutorService, this::timeout, timeout,
TimeUnit.MILLISECONDS);
}
@@ -459,6 +494,9 @@ public class MulticastProcessor extends
AsyncProcessorSupport
*/
protected class MulticastReactiveTask extends MulticastTask {
+ private MulticastReactiveTask() {
+ }
+
public MulticastReactiveTask(Exchange original,
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
super(original, pairs, callback);
}
@@ -555,6 +593,9 @@ public class MulticastProcessor extends
AsyncProcessorSupport
*/
protected class MulticastTransactedTask extends MulticastTask {
+ private MulticastTransactedTask() {
+ }
+
public MulticastTransactedTask(Exchange original,
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
super(original, pairs, callback);
}
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index f2df1f6..e1dacf3 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -334,6 +334,10 @@ public class RecipientListProcessor extends
MulticastProcessor {
protected void doBuild() throws Exception {
super.doBuild();
ServiceHelper.buildService(producerCache);
+
+ // eager load classes
+ Object dummy = new RecipientProcessorExchangePair(0, null, null, null,
null, null, null, false);
+ LOG.trace("Loaded {}", dummy.getClass().getName());
}
@Override
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
index 19c1ce7..58851cc 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Splitter.java
@@ -44,6 +44,8 @@ import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.ObjectHelper;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.StringHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.camel.util.ObjectHelper.notNull;
@@ -53,6 +55,8 @@ import static org.apache.camel.util.ObjectHelper.notNull;
*/
public class Splitter extends MulticastProcessor implements AsyncProcessor,
Traceable {
+ private static final Logger LOG = LoggerFactory.getLogger(Splitter.class);
+
private static final String IGNORE_DELIMITER_MARKER = "false";
private final Expression expression;
private final String delimiter;
@@ -112,6 +116,14 @@ public class Splitter extends MulticastProcessor
implements AsyncProcessor, Trac
}
@Override
+ protected void doBuild() throws Exception {
+ super.doBuild();
+ // eager load classes
+ Object dummy = new SplitterIterable();
+ LOG.trace("Loaded {}", dummy.getClass().getName());
+ }
+
+ @Override
protected void doInit() throws Exception {
super.doInit();
expression.init(getCamelContext());
@@ -180,6 +192,18 @@ public class Splitter extends MulticastProcessor
implements AsyncProcessor, Trac
private final Route route;
private final Exchange original;
+ private SplitterIterable() {
+ // used for eager classloading
+ value = null;
+ iterator = null;
+ copy = null;
+ route = null;
+ original = null;
+ // for loading classes from iterator
+ Object dummy = iterator();
+ LOG.trace("Loaded {}", dummy.getClass().getName());
+ }
+
private SplitterIterable(Exchange exchange, Object value) {
this.original = exchange;
this.value = value;
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
index 1a4d3ba..28e9ff2 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/cache/ServicePool.java
@@ -127,15 +127,11 @@ abstract class ServicePool<S extends Service> extends
ServiceSupport implements
}
private Pool<S> getOrCreatePool(Endpoint endpoint) {
- return pool.computeIfAbsent(endpoint, this::createPool);
- }
-
- private Pool<S> createPool(Endpoint endpoint) {
boolean singleton = endpoint.isSingletonProducer();
if (singleton) {
- return new SinglePool(endpoint);
+ return pool.computeIfAbsent(endpoint, SinglePool::new);
} else {
- return new MultiplePool(endpoint);
+ return pool.computeIfAbsent(endpoint, MultiplePool::new);
}
}
@@ -157,6 +153,15 @@ abstract class ServicePool<S extends Service> extends
ServiceSupport implements
}
@Override
+ protected void doBuild() throws Exception {
+ // eager load classes
+ SinglePool dummy = new SinglePool();
+ LOG.trace("Loaded {}", dummy.getClass().getName());
+ MultiplePool dummy2 = new MultiplePool();
+ LOG.trace("Loaded {}", dummy2.getClass().getName());
+ }
+
+ @Override
protected void doStart() throws Exception {
// noop
}
@@ -194,6 +199,11 @@ abstract class ServicePool<S extends Service> extends
ServiceSupport implements
private final Endpoint endpoint;
private volatile S s;
+ private SinglePool() {
+ // only used for eager classloading
+ this.endpoint = null;
+ }
+
SinglePool(Endpoint endpoint) {
this.endpoint = endpoint;
}
@@ -251,11 +261,13 @@ abstract class ServicePool<S extends Service> extends
ServiceSupport implements
}
private void cleanupEvicts() {
- singlePoolEvicted.forEach((e, p) -> {
+ for (Map.Entry<Endpoint, Pool<S>> entry :
singlePoolEvicted.entrySet()) {
+ Endpoint e = entry.getKey();
+ Pool<S> p = entry.getValue();
doStop(e);
p.stop();
singlePoolEvicted.remove(e);
- });
+ }
}
void doStop(Service s) {
@@ -279,6 +291,13 @@ abstract class ServicePool<S extends Service> extends
ServiceSupport implements
private final BlockingQueue<S> queue;
private final List<S> evicts;
+ private MultiplePool() {
+ // only used for eager classloading
+ this.endpoint = null;
+ this.queue = null;
+ this.evicts = null;
+ }
+
MultiplePool(Endpoint endpoint) {
this.endpoint = endpoint;
this.queue = new ArrayBlockingQueue<>(capacity);
@@ -289,8 +308,10 @@ abstract class ServicePool<S extends Service> extends
ServiceSupport implements
if (!evicts.isEmpty()) {
synchronized (this) {
if (!evicts.isEmpty()) {
- evicts.forEach(this::doStop);
- evicts.forEach(queue::remove);
+ for (S evict : evicts) {
+ doStop(evict);
+ queue.remove(evict);
+ }
evicts.clear();
if (queue.isEmpty()) {
pool.remove(endpoint);