This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 507fb2b4f8e6 CAMEL-23513: Fix completeAllOnStop() not completing
aggregations with completionInterval()
507fb2b4f8e6 is described below
commit 507fb2b4f8e6c7328a3addd1c578a2b40333629a
Author: Guillaume Nodet <[email protected]>
AuthorDate: Thu May 14 23:21:47 2026 +0200
CAMEL-23513: Fix completeAllOnStop() not completing aggregations with
completionInterval()
When completeAllOnStop is enabled together with completionInterval, the
aggregator was not completing pending aggregations during shutdown because
forceCompletionOfAllGroups() was only invoked when completionTimeout was
configured, not when completionInterval was configured.
- Add completionInterval check alongside completionTimeout in the
doStop/doShutdown forceCompletion logic in AggregateProcessor
- Add test verifying aggregations complete on stop with interval-only config
---
.../camel/processor/aggregate/AggregateProcessor.java | 2 +-
.../AggregateCompleteAllOnStopWithIntervalTest.java | 14 ++++++++------
2 files changed, 9 insertions(+), 7 deletions(-)
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 0c35ad6bcb72..f10e1663b8a6 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -1725,7 +1725,7 @@ public class AggregateProcessor extends
BaseProcessorSupport
// but only do this when forced=false, as that is when we have chance
to
// send out new messages to be routed by Camel. When forced=true, then
// we have to shutdown in a hurry
- if (!forced && forceCompletionOnStop) {
+ if (!forced && (forceCompletionOnStop || completeAllOnStop)) {
doForceCompletionOnStop();
}
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopWithIntervalTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopWithIntervalTest.java
index 5a2dada1e110..40d388c2a6a4 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopWithIntervalTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompleteAllOnStopWithIntervalTest.java
@@ -24,13 +24,15 @@ import org.junit.jupiter.api.Test;
/**
* This test verifies that completeAllOnStop() properly completes aggregations
on shutdown when using
- * completionInterval.
+ * completionInterval. Uses a very long interval (60s) so the interval timer
will never fire during the short shutdown
+ * timeout — force completion in prepareShutdown must handle it.
*/
public class AggregateCompleteAllOnStopWithIntervalTest extends
ContextTestSupport {
@Test
- public void testCompleteAllOnStopWithCompletionIntervalOnly() throws
Exception {
- // Set shutdown timeout to 5x the completion interval (1 second)
+ public void testCompleteAllOnStopWithCompletionInterval() throws Exception
{
+ // Set shutdown timeout shorter than the completion interval (60s)
+ // so the interval timer will never fire — force completion must
handle it
context.getShutdownStrategy().setTimeout(5);
MockEndpoint mock = getMockEndpoint("mock:aggregated");
@@ -49,8 +51,8 @@ public class AggregateCompleteAllOnStopWithIntervalTest
extends ContextTestSuppo
input.assertIsSatisfied();
- // Stop the route immediately without waiting for completionInterval
- // With completeAllOnStop(), we expect the aggregation to be completed
+ // Stop the context without waiting for completionInterval
+ // With completeAllOnStop(), the aggregation must be force-completed
during shutdown
context.stop();
mock.assertIsSatisfied();
@@ -66,7 +68,7 @@ public class AggregateCompleteAllOnStopWithIntervalTest
extends ContextTestSuppo
.aggregate(new GroupedBodyAggregationStrategy())
.simple("${in.header.aggregateKey}")
.completionSize(10)
- .completionInterval(1000)
+ .completionInterval(60000)
.completeAllOnStop()
.to("mock:aggregated");
}