This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch kamelet2
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 6bd394dd4562b91ee425ee95726da3d431cabeed
Author: Claus Ibsen <[email protected]>
AuthorDate: Sun Jan 26 14:08:19 2025 +0100

    CAMEL-21599: camel-kamelet - Rework error handler for kamelets to be more 
standard Camel. WIP
---
 .../apache/camel/component/kamelet/Kamelet.java    | 23 ++++++++++++++---
 .../camel/component/kamelet/KameletComponent.java  | 22 ++++++++++------
 ...lerDirectTest.java => KameletTryCatchTest.java} |  4 +--
 .../org/apache/camel/ExtendedCamelContext.java     | 30 +++++++++++++++++-----
 .../impl/engine/DefaultCamelContextExtension.java  | 25 ++++++++++++++----
 .../org/apache/camel/impl/DefaultCamelContext.java |  4 +--
 .../java/org/apache/camel/impl/DefaultModel.java   |  4 +++
 .../org/apache/camel/reifier/ProcessorReifier.java | 23 ++++++++++-------
 8 files changed, 100 insertions(+), 35 deletions(-)

diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
index c20ee878206..46bd341004e 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
@@ -25,9 +25,12 @@ import java.util.function.Predicate;
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.NoErrorHandlerBuilder;
 import org.apache.camel.model.ModelCamelContext;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.model.ProcessorDefinitionHelper;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.RouteTemplateDefinition;
 import org.apache.camel.model.ToDefinition;
+import org.apache.camel.model.TryDefinition;
 import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.spi.UuidGenerator;
 import org.apache.camel.support.CamelContextHelper;
@@ -50,6 +53,7 @@ public final class Kamelet {
     public static final String PARAM_UUID = "uuid";
     public static final String DEFAULT_LOCATION = "classpath:kamelets";
     public static final String PARENT_ROUTE_ID = "parentRouteId";
+    public static final String PARENT_PROCESSOR_ID = "parentProcessorId";
     public static final String NO_ERROR_HANDLER = "noErrorHandler";
 
     // use a running counter as uuid
@@ -184,7 +188,8 @@ public final class Kamelet {
         final String rid = (String) parameters.get(PARAM_ROUTE_ID);
         final boolean noErrorHandler = (boolean) 
parameters.get(NO_ERROR_HANDLER);
         final String uuid = (String) parameters.get(PARAM_UUID);
-        final String pid = (String) parameters.get(PARENT_ROUTE_ID);
+        final String prid = (String) parameters.get(PARENT_ROUTE_ID);
+        final String ppid = (String) parameters.get(PARENT_PROCESSOR_ID);
 
         ObjectHelper.notNull(rid, PARAM_ROUTE_ID);
         ObjectHelper.notNull(uuid, PARAM_UUID);
@@ -198,9 +203,21 @@ public final class Kamelet {
         def.setNodePrefixId(uuid);
         if (noErrorHandler) {
             def.setErrorHandlerFactory(new NoErrorHandlerBuilder());
-        } else if (pid != null) {
+        } else if (prid != null && ppid != null) {
+            // the kamelet are used from a processor, and we need to check if 
this processor
+            // has any error handler or not (if not then we should also not 
use error handler in the kamelet)
             ModelCamelContext mcc = (ModelCamelContext) in.getCamelContext();
-            RouteDefinition parent = mcc.getRouteDefinition(pid);
+            ProcessorDefinition<?> proc = mcc.getProcessorDefinition(ppid);
+            // TODO: should this be wrapped or not
+            boolean tryBlock = proc != null && 
ProcessorDefinitionHelper.isParentOfType(TryDefinition.class, proc, true);
+            if (tryBlock) {
+                def.setErrorHandlerFactory(new NoErrorHandlerBuilder());
+            }
+        }
+        if (!def.isErrorHandlerFactorySet() && prid != null) {
+            // inherit the error handler from the parent route
+            ModelCamelContext mcc = (ModelCamelContext) in.getCamelContext();
+            RouteDefinition parent = mcc.getRouteDefinition(prid);
             if (parent != null) {
                 
def.setErrorHandlerFactory(parent.getErrorHandlerFactory().cloneBuilder());
             }
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
index 4c54b5b88c6..7335b58e999 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletComponent.java
@@ -54,6 +54,7 @@ import static 
org.apache.camel.component.kamelet.Kamelet.PARAM_LOCATION;
 import static org.apache.camel.component.kamelet.Kamelet.PARAM_ROUTE_ID;
 import static org.apache.camel.component.kamelet.Kamelet.PARAM_TEMPLATE_ID;
 import static org.apache.camel.component.kamelet.Kamelet.PARAM_UUID;
+import static org.apache.camel.component.kamelet.Kamelet.PARENT_PROCESSOR_ID;
 import static org.apache.camel.component.kamelet.Kamelet.PARENT_ROUTE_ID;
 
 /**
@@ -172,8 +173,9 @@ public class KameletComponent extends DefaultComponent {
                     // since this is the real kamelet, then we need to hand it
                     // over to the tracker.
                     //
-                    String routeId = 
getCamelContext().getCamelContextExtension().getCreateRoutes();
-                    lifecycleHandler.track(this, routeId);
+                    String routeId = 
getCamelContext().getCamelContextExtension().getCreateRoute();
+                    String processorId = 
getCamelContext().getCamelContextExtension().getCreateProcessor();
+                    lifecycleHandler.track(this, routeId, processorId);
                 }
             };
 
@@ -439,7 +441,7 @@ public class KameletComponent extends DefaultComponent {
      */
     private class LifecycleHandler extends LifecycleStrategySupport {
 
-        record Tuple(KameletEndpoint endpoint, String parentRouteId) {
+        record Tuple(KameletEndpoint endpoint, String parentRouteId, String 
parentProcessorId) {
         }
 
         private final List<Tuple> endpoints;
@@ -450,7 +452,8 @@ public class KameletComponent extends DefaultComponent {
             this.initialized = new AtomicBoolean();
         }
 
-        public void createRouteForEndpoint(KameletEndpoint endpoint, String 
parentRouteId) throws Exception {
+        public void createRouteForEndpoint(KameletEndpoint endpoint, String 
parentRouteId, String parentProcessorId)
+                throws Exception {
             final ModelCamelContext context = (ModelCamelContext) 
getCamelContext();
             final String templateId = endpoint.getTemplateId();
             final String routeId = endpoint.getRouteId();
@@ -469,6 +472,9 @@ public class KameletComponent extends DefaultComponent {
                 if (parentRouteId != null) {
                     endpoint.getKameletProperties().put(PARENT_ROUTE_ID, 
parentRouteId);
                 }
+                if (parentProcessorId != null) {
+                    endpoint.getKameletProperties().put(PARENT_PROCESSOR_ID, 
parentProcessorId);
+                }
                 String id = context.addRouteFromTemplate(routeId, templateId, 
uuid, endpoint.getKameletProperties());
                 RouteDefinition def = context.getRouteDefinition(id);
 
@@ -490,7 +496,7 @@ public class KameletComponent extends DefaultComponent {
             if (this.initialized.compareAndSet(false, true)) {
                 for (Tuple tuple : endpoints) {
                     try {
-                        createRouteForEndpoint(tuple.endpoint, 
tuple.parentRouteId);
+                        createRouteForEndpoint(tuple.endpoint, 
tuple.parentRouteId, tuple.parentProcessorId);
                     } catch (Exception e) {
                         throw new VetoCamelContextStartException(
                                 "Failure creating route from template: " + 
tuple.endpoint.getTemplateId(), e, context);
@@ -505,16 +511,16 @@ public class KameletComponent extends DefaultComponent {
             this.initialized.set(initialized);
         }
 
-        public void track(KameletEndpoint endpoint, String parentRouteId) {
+        public void track(KameletEndpoint endpoint, String parentRouteId, 
String parentProcessorId) {
             if (this.initialized.get()) {
                 try {
-                    createRouteForEndpoint(endpoint, parentRouteId);
+                    createRouteForEndpoint(endpoint, parentRouteId, 
parentProcessorId);
                 } catch (Exception e) {
                     throw RuntimeCamelException.wrapRuntimeException(e);
                 }
             } else {
                 LOG.debug("Tracking route template={} and id={}", 
endpoint.getTemplateId(), endpoint.getRouteId());
-                this.endpoints.add(new Tuple(endpoint, parentRouteId));
+                this.endpoints.add(new Tuple(endpoint, parentRouteId, 
parentProcessorId));
             }
         }
     }
diff --git 
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletNoErrorHandlerDirectTest.java
 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTryCatchTest.java
similarity index 94%
rename from 
components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletNoErrorHandlerDirectTest.java
rename to 
components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTryCatchTest.java
index 371f1140308..f7aad8bf08d 100644
--- 
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletNoErrorHandlerDirectTest.java
+++ 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletTryCatchTest.java
@@ -22,10 +22,10 @@ import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit5.CamelTestSupport;
 import org.junit.jupiter.api.Test;
 
-public class KameletNoErrorHandlerDirectTest extends CamelTestSupport {
+public class KameletTryCatchTest extends CamelTestSupport {
 
     @Test
-    public void testNoErrorHandler() throws Exception {
+    public void testTryCatch() throws Exception {
         getMockEndpoint("mock:catch").expectedMessageCount(1);
         getMockEndpoint("mock:dead").expectedMessageCount(0);
 
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java 
b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
index 0f72c929411..c673400b442 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedCamelContext.java
@@ -122,17 +122,17 @@ public interface ExtendedCamelContext {
      * {@link CamelContext} itself is in started state.
      *
      * @return <tt>true</tt> if current thread is setting up route(s), or 
<tt>false</tt> if not.
-     * @see #getCreateRoutes()
+     * @see    #setupRoutes(boolean)
      */
     boolean isSetupRoutes();
 
     /**
      * Method to signal to {@link CamelContext} that the process to create 
routes is in progress.
      *
-     * @param routeId  the current id of the route being created
-     * @see        #getCreateRoutes()
+     * @param routeId the current id of the route being created
+     * @see           #getCreateRoute()
      */
-    void createRoutes(String routeId);
+    void createRoute(String routeId);
 
     /**
      * Indicates whether current thread is creating a route as part of 
starting Camel.
@@ -140,9 +140,27 @@ public interface ExtendedCamelContext {
      * This can be useful to know by {@link LifecycleStrategy} or the likes, 
in case they need to react differently.
      *
      * @return the route id currently being created/started, or <tt>null</tt> 
if not.
-     * @see #isSetupRoutes()
+     * @see    #createRoute(String)
      */
-    String getCreateRoutes();
+    String getCreateRoute();
+
+    /**
+     * Method to signal to {@link CamelContext} that creation of a given 
processor is in progress.
+     *
+     * @param processorId the current id of the processor being created
+     * @see               #getCreateProcessor()
+     */
+    void createProcessor(String processorId);
+
+    /**
+     * Indicates whether current thread is creating a processor as part of 
starting Camel.
+     * <p/>
+     * This can be useful to know by {@link LifecycleStrategy} or the likes, 
in case they need to react differently.
+     *
+     * @return the current id of the processor being created
+     * @see    #createProcessor(String)
+     */
+    String getCreateProcessor();
 
     /**
      * Registers a {@link org.apache.camel.spi.EndpointStrategy callback} to 
allow you to do custom logic when an
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
index 488d6b04bd9..b4752104bce 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultCamelContextExtension.java
@@ -91,7 +91,8 @@ import org.slf4j.LoggerFactory;
 class DefaultCamelContextExtension implements ExtendedCamelContext {
 
     private final AbstractCamelContext camelContext;
-    private final ThreadLocal<String> isCreateRoutes = new ThreadLocal<>();
+    private final ThreadLocal<String> isCreateRoute = new ThreadLocal<>();
+    private final ThreadLocal<String> isCreateProcessor = new ThreadLocal<>();
     private final ThreadLocal<Boolean> isSetupRoutes = new ThreadLocal<>();
     private final List<InterceptStrategy> interceptStrategies = new 
ArrayList<>();
     private final Map<String, FactoryFinder> factories = new 
ConcurrentHashMap<>();
@@ -309,8 +310,13 @@ class DefaultCamelContextExtension implements 
ExtendedCamelContext {
     }
 
     @Override
-    public String getCreateRoutes() {
-        return isCreateRoutes.get();
+    public String getCreateRoute() {
+        return isCreateRoute.get();
+    }
+
+    @Override
+    public String getCreateProcessor() {
+        return isCreateProcessor.get();
     }
 
     @Override
@@ -400,14 +406,23 @@ class DefaultCamelContextExtension implements 
ExtendedCamelContext {
     }
 
     @Override
-    public void createRoutes(String routeId) {
+    public void createRoute(String routeId) {
         if (routeId != null) {
-            isCreateRoutes.set(routeId);
+            isCreateRoute.set(routeId);
         } else {
             isSetupRoutes.remove();
         }
     }
 
+    @Override
+    public void createProcessor(String processorId) {
+        if (processorId != null) {
+            isCreateProcessor.set(processorId);
+        } else {
+            isCreateProcessor.remove();
+        }
+    }
+
     @Override
     public void setupRoutes(boolean done) {
         if (done) {
diff --git 
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
 
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 4ced97c424c..9c4a6c73c7e 100644
--- 
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ 
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -724,7 +724,7 @@ public class DefaultCamelContext extends SimpleCamelContext 
implements ModelCame
                             = 
getCamelContextReference().getCamelContextExtension().getStartupStepRecorder();
                     StartupStep step = recorder.beginStep(Route.class, 
routeDefinition.getRouteId(), "Create Route");
 
-                    
getCamelContextExtension().createRoutes(routeDefinition.getRouteId());
+                    
getCamelContextExtension().createRoute(routeDefinition.getRouteId());
 
                     Route route = 
model.getModelReifierFactory().createRoute(this, routeDefinition);
                     recorder.endStep(step);
@@ -753,7 +753,7 @@ public class DefaultCamelContext extends SimpleCamelContext 
implements ModelCame
             if (!alreadyStartingRoutes) {
                 setStartingRoutes(false);
             }
-            getCamelContextExtension().createRoutes(null);
+            getCamelContextExtension().createRoute(null);
             pc.setLocalProperties(null);
             if (localBeans != null) {
                 localBeans.setLocalBeanRepository(null);
diff --git 
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java 
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
index b233de3b765..ab43c9370e8 100644
--- 
a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
+++ 
b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java
@@ -430,6 +430,7 @@ public class DefaultModel implements Model {
             throws Exception {
 
         String parentRouteId = (String) 
routeTemplateContext.getParameters().remove("_parentRouteId");
+        String parentProcessorId = (String) 
routeTemplateContext.getParameters().remove("_parentProcessorId");
 
         RouteTemplateDefinition target = null;
         for (RouteTemplateDefinition def : routeTemplateDefinitions) {
@@ -520,6 +521,9 @@ public class DefaultModel implements Model {
         if (parentRouteId != null) {
             prop.put("parentRouteId", parentRouteId);
         }
+        if (parentProcessorId != null) {
+            prop.put("parentProcessorId", parentProcessorId);
+        }
         RouteDefinition def = converter.apply(target, prop);
         if (routeId != null) {
             def.setId(routeId);
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index 8df133008ff..04a9acd9370 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -851,17 +851,22 @@ public abstract class ProcessorReifier<T extends 
ProcessorDefinition<?>> extends
         StartupStep step = 
camelContext.getCamelContextExtension().getStartupStepRecorder().beginStep(ProcessorReifier.class,
                 outputId, "Create processor");
 
+        camelContext.getCamelContextExtension().createProcessor(outputId);
         Processor processor = null;
-        // at first use custom factory
-        final ProcessorFactory processorFactory = 
PluginHelper.getProcessorFactory(camelContext);
-        if (processorFactory != null) {
-            processor = processorFactory.createProcessor(route, output);
-        }
-        // fallback to default implementation if factory did not create the 
processor
-        if (processor == null) {
-            processor = reifier(route, output).createProcessor();
+        try {
+            // at first use custom factory
+            final ProcessorFactory processorFactory = 
PluginHelper.getProcessorFactory(camelContext);
+            if (processorFactory != null) {
+                processor = processorFactory.createProcessor(route, output);
+            }
+            // fallback to default implementation if factory did not create 
the processor
+            if (processor == null) {
+                processor = reifier(route, output).createProcessor();
+            }
+            
camelContext.getCamelContextExtension().getStartupStepRecorder().endStep(step);
+        } finally {
+            camelContext.getCamelContextExtension().createProcessor(null);
         }
-        
camelContext.getCamelContextExtension().getStartupStepRecorder().endStep(step);
         return processor;
     }
 

Reply via email to