This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.18.x by this push:
new 12a5093e474f CAMEL-23030: avoid stack overflow with synchronous
aggregate executor (#21538)
12a5093e474f is described below
commit 12a5093e474f79b9011f5151db304b8195604b7a
Author: Mrutunjay Kinagi <[email protected]>
AuthorDate: Thu Feb 19 15:00:46 2026 +0530
CAMEL-23030: avoid stack overflow with synchronous aggregate executor
(#21538)
* CAMEL-23030: Avoid stack overflow in aggregate completion with
synchronous executor
* CAMEL-23030: Restrict aggregate scheduling change to synchronous executor
---
.../processor/aggregate/AggregateProcessor.java | 3 +
...eSynchronousExecutorStackOverflowIssueTest.java | 87 ++++++++++++++++++++++
2 files changed, 90 insertions(+)
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 0f82dd55243a..3c512bbbb293 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -69,6 +69,7 @@ import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.TimeUtils;
+import org.apache.camel.util.concurrent.SynchronousExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -870,6 +871,8 @@ public class AggregateProcessor extends BaseProcessorSupport
// execute the task using this thread sync (similar to multicast
eip in parallel mode)
if (exchange.isTransacted()) {
reactiveExecutor.scheduleQueue(task);
+ } else if (executorService instanceof SynchronousExecutorService) {
+ reactiveExecutor.schedule(task);
} else {
reactiveExecutor.scheduleSync(task);
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateSynchronousExecutorStackOverflowIssueTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateSynchronousExecutorStackOverflowIssueTest.java
new file mode 100644
index 000000000000..f9fac6b0129f
--- /dev/null
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/SplitAggregateSynchronousExecutorStackOverflowIssueTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy;
+import org.apache.camel.util.concurrent.SynchronousExecutorService;
+import org.junit.jupiter.api.Test;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.camel.Exchange.SPLIT_COMPLETE;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SplitAggregateSynchronousExecutorStackOverflowIssueTest extends
ContextTestSupport {
+
+ private final AtomicInteger count = new AtomicInteger();
+
+ @Test
+ public void testStackoverflow() throws Exception {
+ int size = 50000;
+
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedMessageCount(size / 10);
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < size; i++) {
+ sb.append("Line #").append(i);
+ sb.append("\n");
+ }
+
+ template.sendBody("direct:start", sb);
+
+ MockEndpoint.assertIsSatisfied(60, SECONDS, result);
+
+ // with SynchronousExecutorService and recursive completion this
quickly reaches > 1000 frames.
+ assertTrue(count.get() < 70, "Stackframe must not be too high, was " +
count.get());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ @Override
+ public void configure() {
+
+ from("direct:start")
+ .split().tokenize("\n").streaming()
+ .process(e -> {
+ if (e.getProperty(Exchange.SPLIT_INDEX, 0,
int.class) % 1000 == 0) {
+ int frames = (int)
Stream.of(Thread.currentThread().getStackTrace())
+ .filter(st ->
!st.getClassName().startsWith("org.junit."))
+ .count();
+ count.set(frames);
+ log.info("Stackframe: {}", frames);
+ }
+ })
+ .aggregate(constant("foo"), new
GroupedBodyAggregationStrategy())
+ .executorService(new SynchronousExecutorService())
+ .completeAllOnStop()
+ .eagerCheckCompletion()
+ .completionSize(10)
+ .completionTimeout(SECONDS.toMillis(5))
+ .completionPredicate(exchangeProperty(SPLIT_COMPLETE))
+ .to("mock:result");
+ }
+ };
+ }
+}