This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch kamelet2 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6bd394dd4562b91ee425ee95726da3d431cabeed Author: Claus Ibsen <[email protected]> AuthorDate: Sun Jan 26 14:08:19 2025 +0100 CAMEL-21599: camel-kamelet - Rework error handler for kamelets to be more standard Camel. WIP --- .../apache/camel/component/kamelet/Kamelet.java | 23 ++++++++++++++--- .../camel/component/kamelet/KameletComponent.java | 22 ++++++++++------ ...lerDirectTest.java => KameletTryCatchTest.java} | 4 +-- .../org/apache/camel/ExtendedCamelContext.java | 30 +++++++++++++++++----- .../impl/engine/DefaultCamelContextExtension.java | 25 ++++++++++++++---- .../org/apache/camel/impl/DefaultCamelContext.java | 4 +-- .../java/org/apache/camel/impl/DefaultModel.java | 4 +++ .../org/apache/camel/reifier/ProcessorReifier.java | 23 ++++++++++------- 8 files changed, 100 insertions(+), 35 deletions(-) diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java index c20ee878206..46bd341004e 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java @@ -25,9 +25,12 @@ import java.util.function.Predicate; import org.apache.camel.CamelContext; import org.apache.camel.builder.NoErrorHandlerBuilder; import org.apache.camel.model.ModelCamelContext; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.model.RouteDefinition; import org.apache.camel.model.RouteTemplateDefinition; import org.apache.camel.model.ToDefinition; +import org.apache.camel.model.TryDefinition; import org.apache.camel.spi.PropertiesComponent; import org.apache.camel.spi.UuidGenerator; import org.apache.camel.support.CamelContextHelper; @@ -50,6 +53,7 @@ public final class Kamelet { public static final String PARAM_UUID = "uuid"; public static final String DEFAULT_LOCATION = "classpath:kamelets"; public static final String PARENT_ROUTE_ID = "parentRouteId"; + public static final String PARENT_PROCESSOR_ID = "parentProcessorId"; public static final String NO_ERROR_HANDLER = "noErrorHandler"; // use a running counter as uuid @@ -184,7 +188,8 @@ public final class Kamelet { final String rid = (String) parameters.get(PARAM_ROUTE_ID); final boolean noErrorHandler = (boolean) parameters.get(NO_ERROR_HANDLER); final String uuid = (String) parameters.get(PARAM_UUID); - final String pid = (String) parameters.get(PARENT_ROUTE_ID); + final String prid = (String) parameters.get(PARENT_ROUTE_ID); + final String ppid = (String) parameters.get(PARENT_PROCESSOR_ID); ObjectHelper.notNull(rid, PARAM_ROUTE_ID); ObjectHelper.notNull(uuid, PARAM_UUID); @@ -198,9 +203,21 @@ public final class Kamelet { def.setNodePrefixId(uuid); if (noErrorHandler) { def.setErrorHandlerFactory(new NoErrorHandlerBuilder()); - } else if (pid != null) { + } else if (prid != null && ppid != null) { + // the kamelet are used from a processor, and we need to check if this processor + // has any error handler or not (if not then we should also not use error handler in the kamelet) ModelCamelContext mcc = (ModelCamelContext) in.getCamelContext(); - RouteDefinition parent = mcc.getRouteDefinition(pid); + ProcessorDefinition<?> proc = mcc.getProcessorDefinition(ppid); + // TODO: should this be wrapped or not + boolean tryBlock = proc != null && ProcessorDefinitionHelper.isParentOfType(TryDefinition.class, proc, true); + if (tryBlock) { + def.setErrorHandlerFactory(new NoErrorHandlerBuilder()); + } + } + if (!def.isErrorHandlerFactorySet() && prid != null) { + // inherit the error handler from the parent route + ModelCamelContext mcc = (ModelCamelContext) in.getCamelContext(); + RouteDefinition parent = mcc.getRouteDefinition(prid); if (parent != null) { def.setErrorHandlerFactory(parent.getErrorHandlerFactory().cloneBuilder()); } diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java index 4c54b5b88c6..7335b58e999 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java @@ -54,6 +54,7 @@ import static org.apache.camel.component.kamelet.Kamelet.PARAM_LOCATION; import static org.apache.camel.component.kamelet.Kamelet.PARAM_ROUTE_ID; import static org.apache.camel.component.kamelet.Kamelet.PARAM_TEMPLATE_ID; import static org.apache.camel.component.kamelet.Kamelet.PARAM_UUID; +import static org.apache.camel.component.kamelet.Kamelet.PARENT_PROCESSOR_ID; import static org.apache.camel.component.kamelet.Kamelet.PARENT_ROUTE_ID; /** @@ -172,8 +173,9 @@ public class KameletComponent extends DefaultComponent { // since this is the real kamelet, then we need to hand it // over to the tracker. // - String routeId = getCamelContext().getCamelContextExtension().getCreateRoutes(); - lifecycleHandler.track(this, routeId); + String routeId = getCamelContext().getCamelContextExtension().getCreateRoute(); + String processorId = getCamelContext().getCamelContextExtension().getCreateProcessor(); + lifecycleHandler.track(this, routeId, processorId); } }; @@ -439,7 +441,7 @@ public class KameletComponent extends DefaultComponent { */ private class LifecycleHandler extends LifecycleStrategySupport { - record Tuple(KameletEndpoint endpoint, String parentRouteId) { + record Tuple(KameletEndpoint endpoint, String parentRouteId, String parentProcessorId) { } private final List<Tuple> endpoints; @@ -450,7 +452,8 @@ public class KameletComponent extends DefaultComponent { this.initialized = new AtomicBoolean(); } - public void createRouteForEndpoint(KameletEndpoint endpoint, String parentRouteId) throws Exception { + public void createRouteForEndpoint(KameletEndpoint endpoint, String parentRouteId, String parentProcessorId) + throws Exception { final ModelCamelContext context = (ModelCamelContext) getCamelContext(); final String templateId = endpoint.getTemplateId(); final String routeId = endpoint.getRouteId(); @@ -469,6 +472,9 @@ public class KameletComponent extends DefaultComponent { if (parentRouteId != null) { endpoint.getKameletProperties().put(PARENT_ROUTE_ID, parentRouteId); } + if (parentProcessorId != null) { + endpoint.getKameletProperties().put(PARENT_PROCESSOR_ID, parentProcessorId); + } String id = context.addRouteFromTemplate(routeId, templateId, uuid, endpoint.getKameletProperties()); RouteDefinition def = context.getRouteDefinition(id); @@ -490,7 +496,7 @@ public class KameletComponent extends DefaultComponent { if (this.initialized.compareAndSet(false, true)) { for (Tuple tuple : endpoints) { try { - createRouteForEndpoint(tuple.endpoint, tuple.parentRouteId); + createRouteForEndpoint(tuple.endpoint, tuple.parentRouteId, tuple.parentProcessorId); } catch (Exception e) { throw new VetoCamelContextStartException( "Failure creating route from template: " + tuple.endpoint.getTemplateId(), e, context); @@ -505,16 +511,16 @@ public class KameletComponent extends DefaultComponent { this.initialized.set(initialized); } - public void track(KameletEndpoint endpoint, String parentRouteId) { + public void track(KameletEndpoint endpoint, String parentRouteId, String parentProcessorId) { if (this.initialized.get()) { try { - createRouteForEndpoint(endpoint, parentRouteId); + createRouteForEndpoint(endpoint, parentRouteId, parentProcessorId); } catch (Exception e) { throw RuntimeCamelException.wrapRuntimeException(e); } } else { LOG.debug("Tracking route template={} and id={}", endpoint.getTemplateId(), endpoint.getRouteId()); - this.endpoints.add(new Tuple(endpoint, parentRouteId)); + this.endpoints.add(new Tuple(endpoint, parentRouteId, parentProcessorId)); } } } diff --git a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletNoErrorHandlerDirectTest.java b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTryCatchTest.java similarity index 94% rename from components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletNoErrorHandlerDirectTest.java rename to components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTryCatchTest.java index 371f1140308..f7aad8bf08d 100644 --- a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletNoErrorHandlerDirectTest.java +++ b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTryCatchTest.java @@ -22,10 +22,10 @@ import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.test.junit5.CamelTestSupport; import org.junit.jupiter.api.Test; -public class KameletNoErrorHandlerDirectTest extends CamelTestSupport { +public class KameletTryCatchTest extends CamelTestSupport { @Test - public void testNoErrorHandler() throws Exception { + public void testTryCatch() throws Exception { getMockEndpoint("mock:catch").expectedMessageCount(1); getMockEndpoint("mock:dead").expectedMessageCount(0); diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java index 0f72c929411..c673400b442 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java @@ -122,17 +122,17 @@ public interface ExtendedCamelContext { * {@link CamelContext} itself is in started state. * * @return <tt>true</tt> if current thread is setting up route(s), or <tt>false</tt> if not. - * @see #getCreateRoutes() + * @see #setupRoutes(boolean) */ boolean isSetupRoutes(); /** * Method to signal to {@link CamelContext} that the process to create routes is in progress. * - * @param routeId the current id of the route being created - * @see #getCreateRoutes() + * @param routeId the current id of the route being created + * @see #getCreateRoute() */ - void createRoutes(String routeId); + void createRoute(String routeId); /** * Indicates whether current thread is creating a route as part of starting Camel. @@ -140,9 +140,27 @@ public interface ExtendedCamelContext { * This can be useful to know by {@link LifecycleStrategy} or the likes, in case they need to react differently. * * @return the route id currently being created/started, or <tt>null</tt> if not. - * @see #isSetupRoutes() + * @see #createRoute(String) */ - String getCreateRoutes(); + String getCreateRoute(); + + /** + * Method to signal to {@link CamelContext} that creation of a given processor is in progress. + * + * @param processorId the current id of the processor being created + * @see #getCreateProcessor() + */ + void createProcessor(String processorId); + + /** + * Indicates whether current thread is creating a processor as part of starting Camel. + * <p/> + * This can be useful to know by {@link LifecycleStrategy} or the likes, in case they need to react differently. + * + * @return the current id of the processor being created + * @see #createProcessor(String) + */ + String getCreateProcessor(); /** * Registers a {@link org.apache.camel.spi.EndpointStrategy callback} to allow you to do custom logic when an diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java index 488d6b04bd9..b4752104bce 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java @@ -91,7 +91,8 @@ import org.slf4j.LoggerFactory; class DefaultCamelContextExtension implements ExtendedCamelContext { private final AbstractCamelContext camelContext; - private final ThreadLocal<String> isCreateRoutes = new ThreadLocal<>(); + private final ThreadLocal<String> isCreateRoute = new ThreadLocal<>(); + private final ThreadLocal<String> isCreateProcessor = new ThreadLocal<>(); private final ThreadLocal<Boolean> isSetupRoutes = new ThreadLocal<>(); private final List<InterceptStrategy> interceptStrategies = new ArrayList<>(); private final Map<String, FactoryFinder> factories = new ConcurrentHashMap<>(); @@ -309,8 +310,13 @@ class DefaultCamelContextExtension implements ExtendedCamelContext { } @Override - public String getCreateRoutes() { - return isCreateRoutes.get(); + public String getCreateRoute() { + return isCreateRoute.get(); + } + + @Override + public String getCreateProcessor() { + return isCreateProcessor.get(); } @Override @@ -400,14 +406,23 @@ class DefaultCamelContextExtension implements ExtendedCamelContext { } @Override - public void createRoutes(String routeId) { + public void createRoute(String routeId) { if (routeId != null) { - isCreateRoutes.set(routeId); + isCreateRoute.set(routeId); } else { isSetupRoutes.remove(); } } + @Override + public void createProcessor(String processorId) { + if (processorId != null) { + isCreateProcessor.set(processorId); + } else { + isCreateProcessor.remove(); + } + } + @Override public void setupRoutes(boolean done) { if (done) { diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index 4ced97c424c..9c4a6c73c7e 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -724,7 +724,7 @@ public class DefaultCamelContext extends SimpleCamelContext implements ModelCame = getCamelContextReference().getCamelContextExtension().getStartupStepRecorder(); StartupStep step = recorder.beginStep(Route.class, routeDefinition.getRouteId(), "Create Route"); - getCamelContextExtension().createRoutes(routeDefinition.getRouteId()); + getCamelContextExtension().createRoute(routeDefinition.getRouteId()); Route route = model.getModelReifierFactory().createRoute(this, routeDefinition); recorder.endStep(step); @@ -753,7 +753,7 @@ public class DefaultCamelContext extends SimpleCamelContext implements ModelCame if (!alreadyStartingRoutes) { setStartingRoutes(false); } - getCamelContextExtension().createRoutes(null); + getCamelContextExtension().createRoute(null); pc.setLocalProperties(null); if (localBeans != null) { localBeans.setLocalBeanRepository(null); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java index b233de3b765..ab43c9370e8 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java @@ -430,6 +430,7 @@ public class DefaultModel implements Model { throws Exception { String parentRouteId = (String) routeTemplateContext.getParameters().remove("_parentRouteId"); + String parentProcessorId = (String) routeTemplateContext.getParameters().remove("_parentProcessorId"); RouteTemplateDefinition target = null; for (RouteTemplateDefinition def : routeTemplateDefinitions) { @@ -520,6 +521,9 @@ public class DefaultModel implements Model { if (parentRouteId != null) { prop.put("parentRouteId", parentRouteId); } + if (parentProcessorId != null) { + prop.put("parentProcessorId", parentProcessorId); + } RouteDefinition def = converter.apply(target, prop); if (routeId != null) { def.setId(routeId); diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java index 8df133008ff..04a9acd9370 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java @@ -851,17 +851,22 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends StartupStep step = camelContext.getCamelContextExtension().getStartupStepRecorder().beginStep(ProcessorReifier.class, outputId, "Create processor"); + camelContext.getCamelContextExtension().createProcessor(outputId); Processor processor = null; - // at first use custom factory - final ProcessorFactory processorFactory = PluginHelper.getProcessorFactory(camelContext); - if (processorFactory != null) { - processor = processorFactory.createProcessor(route, output); - } - // fallback to default implementation if factory did not create the processor - if (processor == null) { - processor = reifier(route, output).createProcessor(); + try { + // at first use custom factory + final ProcessorFactory processorFactory = PluginHelper.getProcessorFactory(camelContext); + if (processorFactory != null) { + processor = processorFactory.createProcessor(route, output); + } + // fallback to default implementation if factory did not create the processor + if (processor == null) { + processor = reifier(route, output).createProcessor(); + } + camelContext.getCamelContextExtension().getStartupStepRecorder().endStep(step); + } finally { + camelContext.getCamelContextExtension().createProcessor(null); } - camelContext.getCamelContextExtension().getStartupStepRecorder().endStep(step); return processor; }
