This is an automated email from the ASF dual-hosted git repository.

fmariani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 9e8b740e3ea2 CAMEL-23281: fix split/aggregate deadlock with 
SynchronousExecutorService in transacted routes
9e8b740e3ea2 is described below

commit 9e8b740e3ea2bd4d5b6b79ccb4853c7df87bf4d2
Author: Croway <[email protected]>
AuthorDate: Wed Apr 8 15:05:56 2026 +0200

    CAMEL-23281: fix split/aggregate deadlock with SynchronousExecutorService 
in transacted routes
    
    Process aggregate completion inline for SynchronousExecutorService instead
    of going through the reactive queue. The CAMEL-23030 change from
    scheduleSync to schedule causes a deadlock when the surrounding split is
    transacted: the nested await/executeFromQueue cycles interact with
    Pipeline.scheduleMain queue swapping, making pending tasks unreachable.
---
 .../processor/aggregate/AggregateProcessor.java    |  21 +--
 ...itAggregateInChoiceSynchronousExecutorTest.java | 142 +++++++++++++++++++++
 2 files changed, 155 insertions(+), 8 deletions(-)

diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 3c512bbbb293..7d7ad406059f 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -858,7 +858,7 @@ public class AggregateProcessor extends BaseProcessorSupport
         executorService.execute(() -> {
             ExchangeHelper.prepareOutToIn(exchange);
 
-            Runnable task = () -> processor.process(exchange, done -> {
+            AsyncCallback completionCallback = done -> {
                 // log exception if there was a problem
                 if (exchange.getException() != null) {
                     // if there was an exception then let the exception 
handler handle it
@@ -867,14 +867,19 @@ public class AggregateProcessor extends 
BaseProcessorSupport
                 } else {
                     LOG.trace("Processing aggregated exchange: {} complete.", 
exchange);
                 }
-            });
-            // execute the task using this thread sync (similar to multicast 
eip in parallel mode)
-            if (exchange.isTransacted()) {
-                reactiveExecutor.scheduleQueue(task);
-            } else if (executorService instanceof SynchronousExecutorService) {
-                reactiveExecutor.schedule(task);
+            };
+
+            if (executorService instanceof SynchronousExecutorService) {
+                // CAMEL-23281: process inline to avoid deadlock with 
transacted exchanges
+                processor.process(exchange, completionCallback);
             } else {
-                reactiveExecutor.scheduleSync(task);
+                Runnable task = () -> processor.process(exchange, 
completionCallback);
+                // execute the task using this thread sync (similar to 
multicast eip in parallel mode)
+                if (exchange.isTransacted()) {
+                    reactiveExecutor.scheduleQueue(task);
+                } else {
+                    reactiveExecutor.scheduleSync(task);
+                }
             }
         });
     }
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateInChoiceSynchronousExecutorTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateInChoiceSynchronousExecutorTest.java
new file mode 100644
index 000000000000..6c3f512d5a09
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateInChoiceSynchronousExecutorTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.AggregationStrategies;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.concurrent.SynchronousExecutorService;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.camel.Exchange.SPLIT_COMPLETE;
+import static org.apache.camel.Exchange.SPLIT_INDEX;
+
+/**
+ * Reproducer for CAMEL-23281: split/aggregator deadlock when the aggregate 
uses SynchronousExecutorService with
+ * completionSize + completionPredicate(SPLIT_COMPLETE) and the exchange is 
transacted.
+ *
+ * Root cause: when a transacted exchange triggers aggregate completion, 
AggregateProcessor.onSubmitCompletion() queues
+ * the completion task via reactiveExecutor.scheduleQueue(). This task is 
later drained by
+ * DefaultAsyncProcessorAwaitManager.await() via executeFromQueue(). The 
drained task re-enters
+ * CamelInternalProcessor.processTransacted() which calls 
processor.process(exchange) (sync version), triggering another
+ * DefaultAsyncProcessorAwaitManager.process() → await() → executeFromQueue() 
cycle. When the reactive queue is
+ * exhausted, the innermost await() blocks on CountDownLatch.await() forever — 
deadlock.
+ */
+public class SplitAggregateInChoiceSynchronousExecutorTest extends 
ContextTestSupport {
+
+    @Test
+    public void testSplitAggregateInChoiceNonTransacted() throws Exception {
+        StringBuilder sb = new StringBuilder();
+        sb.append("HEADER\n");
+        for (int i = 1; i <= 11; i++) {
+            sb.append("Line ").append(i).append("\n");
+        }
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(3);
+
+        template.sendBody("direct:start", sb.toString());
+
+        MockEndpoint.assertIsSatisfied(10, SECONDS, result);
+    }
+
+    @Test
+    @Timeout(30)
+    public void testSplitAggregateTransactedDeadlock() throws Exception {
+        // Transacted split + aggregate: deadlocks due to recursive 
processTransacted → await → executeFromQueue cycle
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 11; i++) {
+            sb.append("Line ").append(i).append("\n");
+        }
+
+        MockEndpoint result = getMockEndpoint("mock:aggregated");
+        result.expectedMessageCount(3);
+
+        template.request("direct:transacted-simple", e -> {
+            e.getIn().setBody(sb.toString());
+        });
+
+        MockEndpoint.assertIsSatisfied(10, SECONDS, result);
+    }
+
+    @Test
+    @Timeout(30)
+    public void testSplitAggregateTransactedInChoiceDeadlock() throws 
Exception {
+        // Same as the JIRA reproducer: transacted split with aggregate inside 
choice/when
+        StringBuilder sb = new StringBuilder();
+        sb.append("HEADER\n");
+        for (int i = 1; i <= 11; i++) {
+            sb.append("Line ").append(i).append("\n");
+        }
+
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedMessageCount(3);
+
+        template.request("direct:transacted", e -> {
+            e.getIn().setBody(sb.toString());
+        });
+
+        MockEndpoint.assertIsSatisfied(10, SECONDS, result);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                // Non-transacted: split CSV, skip header, aggregate inside 
choice/when — works fine
+                from("direct:start")
+                        
.split(body().tokenize("\n")).streaming().stopOnException()
+                        .choice()
+                        .when(exchangeProperty(SPLIT_INDEX).isGreaterThan(0))
+                        .aggregate(constant(true), 
AggregationStrategies.groupedBody())
+                        .eagerCheckCompletion()
+                        .executorService(new SynchronousExecutorService())
+                        .completionSize(5)
+                        .completionPredicate(exchangeProperty(SPLIT_COMPLETE))
+                        .to("mock:result");
+
+                // Transacted split + aggregate (no choice/when) — deadlocks
+                from("direct:transacted-simple")
+                        .process(e -> 
e.getExchangeExtension().setTransacted(true))
+                        
.split(body().tokenize("\n")).streaming().stopOnException()
+                        .aggregate(constant(true), 
AggregationStrategies.groupedBody())
+                        .eagerCheckCompletion()
+                        .executorService(new SynchronousExecutorService())
+                        .completionSize(5)
+                        .completionPredicate(exchangeProperty(SPLIT_COMPLETE))
+                        .to("mock:aggregated");
+
+                // Transacted split + aggregate inside choice/when 
(CAMEL-23281 pattern) — deadlocks
+                from("direct:transacted")
+                        .process(e -> 
e.getExchangeExtension().setTransacted(true))
+                        
.split(body().tokenize("\n")).streaming().stopOnException()
+                        .choice()
+                        .when(exchangeProperty(SPLIT_INDEX).isGreaterThan(0))
+                        .aggregate(constant(true), 
AggregationStrategies.groupedBody())
+                        .eagerCheckCompletion()
+                        .executorService(new SynchronousExecutorService())
+                        .completionSize(5)
+                        .completionPredicate(exchangeProperty(SPLIT_COMPLETE))
+                        .to("mock:result");
+            }
+        };
+    }
+}

Reply via email to