This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch stopkam in repository https://gitbox.apache.org/repos/asf/camel.git
commit dab3d96720ec60a753326a5e3cfa65834e9ade0f Author: Claus Ibsen <[email protected]> AuthorDate: Sun Nov 2 10:35:10 2025 +0100 CAMEL-22627: camel-kamelet - Stopping Camel should let inflight messages that trigger a kamelet from a dynamic EIP allow the kamelet to create the route to be able to process the inflight message. --- .../component/kamelet/KameletShutdownToDTest.java | 62 ++++++++++++++++++++++ .../camel/impl/engine/AbstractCamelContext.java | 6 ++- 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletShutdownToDTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletShutdownToDTest.java new file mode 100644 index 000000000000..31892e1d0ea1 --- /dev/null +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletShutdownToDTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.kamelet; + +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Test; + +public class KameletShutdownToDTest extends CamelTestSupport { + + @Test + public void testShutdown() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("a-Hello"); + + MockEndpoint.assertIsSatisfied(context); + } + + // ********************************************** + // + // test set-up + // + // ********************************************** + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + routeTemplate("echo") + .templateParameter("prefix") + .from("kamelet:source") + .setBody().simple("{{prefix}}-${body}"); + + from("timer:tick?repeatCount=1").routeId("test") + .setBody().constant("Hello") + .process(e -> { + new Thread(() -> context.stop()).start(); + Thread.sleep(500); + }) + .toD("kamelet:echo?prefix=a") + .log("${body}") + .to("mock:result"); + } + }; + } +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index 6d5b81affab0..8bde8f3e1e9d 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -3556,7 +3556,11 @@ public abstract class AbstractCamelContext extends BaseService } else { // start the route service routeServices.put(routeService.getId(), routeService); - if (shouldStartRoutes()) { + // special situation if Camel is stopping and we do graceful shutdown, and process remainder + // inflight messages, and they trigger a dynamic endpoint (toD) that calls a kamelet, then + // we need to allow creating the kamelet route to be able to process the inflight message + boolean stoppingDynamicKamelet = isStopping() && routeService.getRoute().isCreatedByKamelet(); + if (shouldStartRoutes() || stoppingDynamicKamelet) { final StartupStepRecorder startupStepRecorder = camelContextExtension.getStartupStepRecorder(); StartupStep step = startupStepRecorder.beginStep(Route.class, routeService.getId(), "Start Route Services");
