[
https://issues.apache.org/jira/browse/CAMEL-18384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17862386#comment-17862386
]
Claus Ibsen commented on CAMEL-18384:
-------------------------------------
Okay so the parallel aggreagate cannot be made more parallel using its own
thread pool, as the EIP depends on exchanges being aggregated in order, as you
need outputs from partly aggregation to build up the response.
As this option does not make sense and benefit, then we are deprecating it.
> Split/Aggregation parallelProcessing+parallelAggregate is sequential
> --------------------------------------------------------------------
>
> Key: CAMEL-18384
> URL: https://issues.apache.org/jira/browse/CAMEL-18384
> Project: Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 3.14.0
> Reporter: Jerome Guilbaud
> Assignee: Claus Ibsen
> Priority: Minor
> Fix For: 4.7.0
>
>
>
> I'll need the {{aggregate}} method to be run in parallel in my split so I
> activated {{parallelAggregate}} option but it doesn't work as I expected it.
> {code:java}
> @RunWith(SpringJUnit4ClassRunner.class)
> public class SplitAggregationTest extends CamelTestSupport {
> private final Logger LOGGER =
> LoggerFactory.getLogger(SplitAggregationTest.class);
> @Override
> protected RouteBuilder createRouteBuilder() throws Exception {
> return new RouteBuilder() {
> @Override
> public void configure() throws Exception {
> from("direct:start")
> .split(body(), new AggregationStrategy() {
> @Override
> public Exchange aggregate(Exchange oldExchange,
> Exchange newExchange) {
> try {
> Thread.sleep(1000);
> } catch (InterruptedException e) {
> LOGGER.error("InterruptedException: ", e);
> }
> return oldExchange == null ? newExchange :
> oldExchange;
> }
> }).streaming().parallelProcessing().parallelAggregate()
> .log(LoggingLevel.INFO, LOGGER, "Aggreg ${body}")
> .end();
> }
> };
> }
> @Test
> public void test1() throws InterruptedException {
> Thread.sleep(5000);
> template.sendBody("direct:start", Arrays.asList("A", "B", "C", "D"));
> Thread.sleep(5000);
> } {code}
> Aggregate methods (in purple) are not run in parallel, they are waiting each
> other to finish before:
> !https://i.stack.imgur.com/6egxf.png!
> Associated thread dump:
> {code:java}
> "Camel (camel-2) thread #17 - Split" #36 daemon prio=5 os_prio=0
> tid=0x000002317da51800 nid=0x149bc waiting on condition [0x00000062a55fe000]
> java.lang.Thread.State: WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x000000076e105c70> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
> at
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
> at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
> at
> org.apache.camel.util.concurrent.AsyncCompletionService.complete(AsyncCompletionService.java:141)
> at
> org.apache.camel.util.concurrent.AsyncCompletionService.access$200(AsyncCompletionService.java:30)
> at
> org.apache.camel.util.concurrent.AsyncCompletionService$Task.accept(AsyncCompletionService.java:168)
> at
> org.apache.camel.processor.MulticastProcessor$MulticastReactiveTask.lambda$null$0(MulticastProcessor.java:579)
> at
> org.apache.camel.processor.MulticastProcessor$MulticastReactiveTask$$Lambda$519/1049021104.done(Unknown
> Source)
> at org.apache.camel.AsyncCallback.run(AsyncCallback.java:44)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:187)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor.schedule(DefaultReactiveExecutor.java:59)
> at
> org.apache.camel.processor.MulticastProcessor.lambda$schedule$1(MulticastProcessor.java:348)
> at
> org.apache.camel.processor.MulticastProcessor$$Lambda$517/1626228981.run(Unknown
> Source)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
> - <0x000000076f0ca838> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> "Camel (camel-2) thread #16 - Split" #35 daemon prio=5 os_prio=0
> tid=0x000002317da54000 nid=0x6a8 waiting on condition [0x00000062a54fe000]
> java.lang.Thread.State: TIMED_WAITING (sleeping)
> at java.lang.Thread.sleep(Native Method)
> at
> com.mybatch.camel.processor.SplitAggregationTest$1$1.aggregate(SplitAggregationTest.java:34)
> at org.apache.camel.AggregationStrategy.aggregate(AggregationStrategy.java:86)
> at
> org.apache.camel.processor.MulticastProcessor.doAggregateInternal(MulticastProcessor.java:894)
> at
> org.apache.camel.processor.MulticastProcessor.doAggregate(MulticastProcessor.java:858)
> at
> org.apache.camel.processor.MulticastProcessor$MulticastTask.aggregate(MulticastProcessor.java:451)
> at
> org.apache.camel.processor.MulticastProcessor$MulticastReactiveTask.lambda$null$0(MulticastProcessor.java:582)
> at
> org.apache.camel.processor.MulticastProcessor$MulticastReactiveTask$$Lambda$519/1049021104.done(Unknown
> Source)
> at org.apache.camel.AsyncCallback.run(AsyncCallback.java:44)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:187)
> at
> org.apache.camel.impl.engine.DefaultReactiveExecutor.schedule(DefaultReactiveExecutor.java:59)
> at
> org.apache.camel.processor.MulticastProcessor.lambda$schedule$1(MulticastProcessor.java:348)
> at
> org.apache.camel.processor.MulticastProcessor$$Lambda$517/1626228981.run(Unknown
> Source)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Locked ownable synchronizers:
> - <0x000000076e105c70> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> - <0x000000076ef115b0> (a java.util.concurrent.ThreadPoolExecutor$Worker)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)