This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 79b25c6 CAMEL-16462: camel-core - Optimize RecipientList EIP to
reduce object allocations.
79b25c6 is described below
commit 79b25c60bc6afb678e77d505bb3ff21c54e823e9
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Apr 7 11:00:46 2021 +0200
CAMEL-16462: camel-core - Optimize RecipientList EIP to reduce object
allocations.
---
.../apache/camel/processor/MulticastProcessor.java | 28 ++++++++++-------
.../org/apache/camel/processor/RecipientList.java | 10 ++++++-
.../util/concurrent/AsyncCompletionService.java | 35 ++++++++++++++--------
3 files changed, 50 insertions(+), 23 deletions(-)
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index c010738..96d3502 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -320,10 +320,10 @@ public class MulticastProcessor extends
AsyncProcessorSupport
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
- return process(exchange, callback, null);
+ return process(exchange, callback, null, 0);
}
- protected boolean process(Exchange exchange, AsyncCallback callback,
Iterator iter) {
+ protected boolean process(Exchange exchange, AsyncCallback callback,
Iterator iter, int size) {
Iterable<ProcessorExchangePair> pairs;
try {
pairs = createProcessorExchangePairs(exchange, iter);
@@ -343,7 +343,8 @@ public class MulticastProcessor extends
AsyncProcessorSupport
// which is how the routing engine normally operates
// if we have parallel processing enabled then we cannot run in
transacted mode (requires synchronous processing via same thread)
MulticastTask state = !isParallelProcessing() &&
exchange.isTransacted()
- ? new MulticastTransactedTask(exchange, pairs, callback) : new
MulticastReactiveTask(exchange, pairs, callback);
+ ? new MulticastTransactedTask(exchange, pairs, callback, size)
+ : new MulticastReactiveTask(exchange, pairs, callback, size);
if (isParallelProcessing()) {
executorService.submit(() -> reactiveExecutor.schedule(state));
} else {
@@ -407,8 +408,7 @@ public class MulticastProcessor extends
AsyncProcessorSupport
final AsyncCallback callback;
final Iterator<ProcessorExchangePair> iterator;
final ReentrantLock lock = new ReentrantLock();
- final AsyncCompletionService<Exchange> completion
- = new AsyncCompletionService<>(scheduler, !isStreaming(),
lock);
+ final AsyncCompletionService<Exchange> completion;
final AtomicReference<Exchange> result = new AtomicReference<>();
final AtomicInteger nbExchangeSent = new AtomicInteger();
final AtomicInteger nbAggregated = new AtomicInteger();
@@ -423,9 +423,10 @@ public class MulticastProcessor extends
AsyncProcessorSupport
this.callback = null;
this.iterator = null;
this.mdc = null;
+ this.completion = null;
}
- MulticastTask(Exchange original, Iterable<ProcessorExchangePair>
pairs, AsyncCallback callback) {
+ MulticastTask(Exchange original, Iterable<ProcessorExchangePair>
pairs, AsyncCallback callback, int capacity) {
this.original = original;
this.pairs = pairs;
this.callback = callback;
@@ -441,6 +442,11 @@ public class MulticastProcessor extends
AsyncProcessorSupport
} else {
this.mdc = null;
}
+ if (capacity > 0) {
+ this.completion = new AsyncCompletionService<>(scheduler,
!isStreaming(), lock, capacity);
+ } else {
+ this.completion = new AsyncCompletionService<>(scheduler,
!isStreaming(), lock);
+ }
}
@Override
@@ -519,8 +525,9 @@ public class MulticastProcessor extends
AsyncProcessorSupport
private MulticastReactiveTask() {
}
- public MulticastReactiveTask(Exchange original,
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
- super(original, pairs, callback);
+ public MulticastReactiveTask(Exchange original,
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback,
+ int size) {
+ super(original, pairs, callback, size);
}
@Override
@@ -621,8 +628,9 @@ public class MulticastProcessor extends
AsyncProcessorSupport
private MulticastTransactedTask() {
}
- public MulticastTransactedTask(Exchange original,
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) {
- super(original, pairs, callback);
+ public MulticastTransactedTask(Exchange original,
Iterable<ProcessorExchangePair> pairs, AsyncCallback callback,
+ int size) {
+ super(original, pairs, callback, size);
}
@Override
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
index 78ef5bc..4a6c551 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor;
+import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
@@ -191,6 +192,13 @@ public class RecipientList extends AsyncProcessorSupport
implements IdAware, Rou
* Sends the given exchange to the recipient list
*/
public boolean sendToRecipientList(Exchange exchange, Object
recipientList, AsyncCallback callback) {
+ // optimize to calculate number of recipients if possible
+ int size = 0;
+ if (recipientList instanceof Collection) {
+ size = ((Collection<?>) recipientList).size();
+ } else if (recipientList.getClass().isArray()) {
+ size = Array.getLength(recipientList);
+ }
Iterator<?> iter;
if (delimiter != null &&
delimiter.equalsIgnoreCase(IGNORE_DELIMITER_MARKER)) {
@@ -200,7 +208,7 @@ public class RecipientList extends AsyncProcessorSupport
implements IdAware, Rou
}
// now let the multicast process the exchange
- return recipientListProcessor.process(exchange, callback, iter);
+ return recipientListProcessor.process(exchange, callback, iter, size);
}
public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
diff --git
a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/AsyncCompletionService.java
b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/AsyncCompletionService.java
index 9ce4fd3..4cdc311 100644
---
a/core/camel-util/src/main/java/org/apache/camel/util/concurrent/AsyncCompletionService.java
+++
b/core/camel-util/src/main/java/org/apache/camel/util/concurrent/AsyncCompletionService.java
@@ -19,7 +19,8 @@ package org.apache.camel.util.concurrent;
import java.util.PriorityQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
@@ -28,21 +29,30 @@ public class AsyncCompletionService<V> {
private final Executor executor;
private final boolean ordered;
- private final PriorityQueue<Task> queue = new PriorityQueue<>();
- private final AtomicLong nextId = new AtomicLong();
- private final AtomicLong index = new AtomicLong();
+ private final PriorityQueue<Task> queue;
+ private final AtomicInteger nextId = new AtomicInteger();
+ private final AtomicInteger index = new AtomicInteger();
private final ReentrantLock lock;
private final Condition available;
public AsyncCompletionService(Executor executor, boolean ordered) {
- this(executor, ordered, null);
+ this(executor, ordered, null, 0);
}
public AsyncCompletionService(Executor executor, boolean ordered,
ReentrantLock lock) {
+ this(executor, ordered, lock, 0);
+ }
+
+ public AsyncCompletionService(Executor executor, boolean ordered,
ReentrantLock lock, int capacity) {
this.executor = executor;
this.ordered = ordered;
this.lock = lock != null ? lock : new ReentrantLock();
this.available = this.lock.newCondition();
+ if (capacity > 0) {
+ queue = new PriorityQueue<>(capacity);
+ } else {
+ queue = new PriorityQueue<>();
+ }
}
public ReentrantLock getLock() {
@@ -135,34 +145,35 @@ public class AsyncCompletionService<V> {
}
}
- private class Task implements Runnable, Comparable<Task> {
- private final long id;
+ private class Task implements Runnable, Comparable<Task>, Consumer<V> {
+ private final int id;
private final Consumer<Consumer<V>> runner;
private V result;
- Task(long id, Consumer<Consumer<V>> runner) {
+ Task(int id, Consumer<Consumer<V>> runner) {
this.id = id;
this.runner = runner;
}
@Override
public void run() {
- runner.accept(this::setResult);
+ runner.accept(this);
}
- protected void setResult(V result) {
+ @Override
+ public void accept(V result) {
this.result = result;
complete(this);
}
@Override
public int compareTo(Task other) {
- return Long.compare(this.id, other.id);
+ return Integer.compare(this.id, other.id);
}
@Override
public String toString() {
- return "SubmitOrderedFutureTask[" + this.id + "]";
+ return "SubmitOrderedTask[" + this.id + "]";
}
}
}