This is an automated email from the ASF dual-hosted git repository.
fmariani pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 9e8b740e3ea2 CAMEL-23281: fix split/aggregate deadlock with
SynchronousExecutorService in transacted routes
9e8b740e3ea2 is described below
commit 9e8b740e3ea2bd4d5b6b79ccb4853c7df87bf4d2
Author: Croway <[email protected]>
AuthorDate: Wed Apr 8 15:05:56 2026 +0200
CAMEL-23281: fix split/aggregate deadlock with SynchronousExecutorService
in transacted routes
Process aggregate completion inline for SynchronousExecutorService instead
of going through the reactive queue. The CAMEL-23030 change from
scheduleSync to schedule causes a deadlock when the surrounding split is
transacted: the nested await/executeFromQueue cycles interact with
Pipeline.scheduleMain queue swapping, making pending tasks unreachable.
---
.../processor/aggregate/AggregateProcessor.java | 21 +--
...itAggregateInChoiceSynchronousExecutorTest.java | 142 +++++++++++++++++++++
2 files changed, 155 insertions(+), 8 deletions(-)
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 3c512bbbb293..7d7ad406059f 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -858,7 +858,7 @@ public class AggregateProcessor extends BaseProcessorSupport
executorService.execute(() -> {
ExchangeHelper.prepareOutToIn(exchange);
- Runnable task = () -> processor.process(exchange, done -> {
+ AsyncCallback completionCallback = done -> {
// log exception if there was a problem
if (exchange.getException() != null) {
// if there was an exception then let the exception
handler handle it
@@ -867,14 +867,19 @@ public class AggregateProcessor extends
BaseProcessorSupport
} else {
LOG.trace("Processing aggregated exchange: {} complete.",
exchange);
}
- });
- // execute the task using this thread sync (similar to multicast
eip in parallel mode)
- if (exchange.isTransacted()) {
- reactiveExecutor.scheduleQueue(task);
- } else if (executorService instanceof SynchronousExecutorService) {
- reactiveExecutor.schedule(task);
+ };
+
+ if (executorService instanceof SynchronousExecutorService) {
+ // CAMEL-23281: process inline to avoid deadlock with
transacted exchanges
+ processor.process(exchange, completionCallback);
} else {
- reactiveExecutor.scheduleSync(task);
+ Runnable task = () -> processor.process(exchange,
completionCallback);
+ // execute the task using this thread sync (similar to
multicast eip in parallel mode)
+ if (exchange.isTransacted()) {
+ reactiveExecutor.scheduleQueue(task);
+ } else {
+ reactiveExecutor.scheduleSync(task);
+ }
}
});
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateInChoiceSynchronousExecutorTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateInChoiceSynchronousExecutorTest.java
new file mode 100644
index 000000000000..6c3f512d5a09
--- /dev/null
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateInChoiceSynchronousExecutorTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.AggregationStrategies;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.util.concurrent.SynchronousExecutorService;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.camel.Exchange.SPLIT_COMPLETE;
+import static org.apache.camel.Exchange.SPLIT_INDEX;
+
+/**
+ * Reproducer for CAMEL-23281: split/aggregator deadlock when the aggregate
uses SynchronousExecutorService with
+ * completionSize + completionPredicate(SPLIT_COMPLETE) and the exchange is
transacted.
+ *
+ * Root cause: when a transacted exchange triggers aggregate completion,
AggregateProcessor.onSubmitCompletion() queues
+ * the completion task via reactiveExecutor.scheduleQueue(). This task is
later drained by
+ * DefaultAsyncProcessorAwaitManager.await() via executeFromQueue(). The
drained task re-enters
+ * CamelInternalProcessor.processTransacted() which calls
processor.process(exchange) (sync version), triggering another
+ * DefaultAsyncProcessorAwaitManager.process() → await() → executeFromQueue()
cycle. When the reactive queue is
+ * exhausted, the innermost await() blocks on CountDownLatch.await() forever —
deadlock.
+ */
+public class SplitAggregateInChoiceSynchronousExecutorTest extends
ContextTestSupport {
+
+ @Test
+ public void testSplitAggregateInChoiceNonTransacted() throws Exception {
+ StringBuilder sb = new StringBuilder();
+ sb.append("HEADER\n");
+ for (int i = 1; i <= 11; i++) {
+ sb.append("Line ").append(i).append("\n");
+ }
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(3);
+
+ template.sendBody("direct:start", sb.toString());
+
+ MockEndpoint.assertIsSatisfied(10, SECONDS, result);
+ }
+
+ @Test
+ @Timeout(30)
+ public void testSplitAggregateTransactedDeadlock() throws Exception {
+ // Transacted split + aggregate: deadlocks due to recursive
processTransacted → await → executeFromQueue cycle
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 11; i++) {
+ sb.append("Line ").append(i).append("\n");
+ }
+
+ MockEndpoint result = getMockEndpoint("mock:aggregated");
+ result.expectedMessageCount(3);
+
+ template.request("direct:transacted-simple", e -> {
+ e.getIn().setBody(sb.toString());
+ });
+
+ MockEndpoint.assertIsSatisfied(10, SECONDS, result);
+ }
+
+ @Test
+ @Timeout(30)
+ public void testSplitAggregateTransactedInChoiceDeadlock() throws
Exception {
+ // Same as the JIRA reproducer: transacted split with aggregate inside
choice/when
+ StringBuilder sb = new StringBuilder();
+ sb.append("HEADER\n");
+ for (int i = 1; i <= 11; i++) {
+ sb.append("Line ").append(i).append("\n");
+ }
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(3);
+
+ template.request("direct:transacted", e -> {
+ e.getIn().setBody(sb.toString());
+ });
+
+ MockEndpoint.assertIsSatisfied(10, SECONDS, result);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+ // Non-transacted: split CSV, skip header, aggregate inside
choice/when — works fine
+ from("direct:start")
+
.split(body().tokenize("\n")).streaming().stopOnException()
+ .choice()
+ .when(exchangeProperty(SPLIT_INDEX).isGreaterThan(0))
+ .aggregate(constant(true),
AggregationStrategies.groupedBody())
+ .eagerCheckCompletion()
+ .executorService(new SynchronousExecutorService())
+ .completionSize(5)
+ .completionPredicate(exchangeProperty(SPLIT_COMPLETE))
+ .to("mock:result");
+
+ // Transacted split + aggregate (no choice/when) — deadlocks
+ from("direct:transacted-simple")
+ .process(e ->
e.getExchangeExtension().setTransacted(true))
+
.split(body().tokenize("\n")).streaming().stopOnException()
+ .aggregate(constant(true),
AggregationStrategies.groupedBody())
+ .eagerCheckCompletion()
+ .executorService(new SynchronousExecutorService())
+ .completionSize(5)
+ .completionPredicate(exchangeProperty(SPLIT_COMPLETE))
+ .to("mock:aggregated");
+
+ // Transacted split + aggregate inside choice/when
(CAMEL-23281 pattern) — deadlocks
+ from("direct:transacted")
+ .process(e ->
e.getExchangeExtension().setTransacted(true))
+
.split(body().tokenize("\n")).streaming().stopOnException()
+ .choice()
+ .when(exchangeProperty(SPLIT_INDEX).isGreaterThan(0))
+ .aggregate(constant(true),
AggregationStrategies.groupedBody())
+ .eagerCheckCompletion()
+ .executorService(new SynchronousExecutorService())
+ .completionSize(5)
+ .completionPredicate(exchangeProperty(SPLIT_COMPLETE))
+ .to("mock:result");
+ }
+ };
+ }
+}