This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch kamelet2
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 2d35aa84a299e4bf7762b89e8d9adeb5ac32395c
Author: Claus Ibsen <[email protected]>
AuthorDate: Sun Jan 26 18:29:55 2025 +0100

    CAMEL-21599: camel-kamelet - Rework error handler for kamelets to be more 
standard Camel. WIP
---
 .../apache/camel/component/kamelet/Kamelet.java    | 25 ++++------
 .../camel/component/kamelet/KameletProcessor.java  | 22 ++-------
 .../camel/component/kamelet/KameletReifier.java    | 15 ++++--
 .../camel/model/ProcessorDefinitionHelper.java     | 55 ++++++++++++++++++++++
 .../org/apache/camel/reifier/ProcessorReifier.java | 47 +-----------------
 5 files changed, 82 insertions(+), 82 deletions(-)

diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
index a67401ce646..0dec0f1e009 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java
@@ -30,7 +30,6 @@ import org.apache.camel.model.ProcessorDefinitionHelper;
 import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.RouteTemplateDefinition;
 import org.apache.camel.model.ToDefinition;
-import org.apache.camel.model.TryDefinition;
 import org.apache.camel.spi.PropertiesComponent;
 import org.apache.camel.spi.UuidGenerator;
 import org.apache.camel.support.CamelContextHelper;
@@ -204,25 +203,21 @@ public final class Kamelet {
         if (noErrorHandler) {
             def.setErrorHandlerFactory(new NoErrorHandlerBuilder());
         } else if (prid != null && ppid != null) {
+            ModelCamelContext mcc = (ModelCamelContext) in.getCamelContext();
             // the kamelet are used from a processor, and we need to check if 
this processor
             // has any error handler or not (if not then we should also not 
use error handler in the kamelet)
-            ModelCamelContext mcc = (ModelCamelContext) in.getCamelContext();
-            ProcessorDefinition<?> proc = mcc.getProcessorDefinition(ppid);
-            // TODO: Make API in ProcessorDefinitionHelper we can reuse that
-            // checks for this like in ProcessorReifer.wrapChannel
-            boolean tryBlock = proc != null && 
ProcessorDefinitionHelper.isParentOfType(TryDefinition.class, proc, true);
-            if (tryBlock) {
+            ProcessorDefinition<?> pro = mcc.getProcessorDefinition(ppid);
+            boolean wrap = pro == null || 
ProcessorDefinitionHelper.shouldWrapInErrorHandler(def.getCamelContext(), pro, 
null,
+                    pro.getInheritErrorHandler());
+            if (wrap) {
+                RouteDefinition parent = mcc.getRouteDefinition(prid);
+                if (parent != null) {
+                    
def.setErrorHandlerFactory(parent.getErrorHandlerFactory().cloneBuilder());
+                }
+            } else {
                 def.setErrorHandlerFactory(new NoErrorHandlerBuilder());
             }
         }
-        if (!def.isErrorHandlerFactorySet() && prid != null) {
-            // inherit the error handler from the parent route
-            ModelCamelContext mcc = (ModelCamelContext) in.getCamelContext();
-            RouteDefinition parent = mcc.getRouteDefinition(prid);
-            if (parent != null) {
-                
def.setErrorHandlerFactory(parent.getErrorHandlerFactory().cloneBuilder());
-            }
-        }
 
         if (def.getInput() == null) {
             throw new IllegalArgumentException("Camel route " + rid + " input 
does not exist.");
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java
index 9bfab346d8f..d58fc2b1ff5 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java
@@ -49,10 +49,12 @@ public class KameletProcessor extends AsyncProcessorSupport
     private String id;
     private String routeId;
 
-    public KameletProcessor(CamelContext camelContext, String name, Processor 
processor) {
+    public KameletProcessor(CamelContext camelContext, String name, Processor 
processor) throws Exception {
         this.camelContext = camelContext;
         this.name = name;
         this.processor = AsyncProcessorConverterHelper.convert(processor);
+        this.component = camelContext.getComponent("kamelet", 
KameletComponent.class);
+        this.producer = (KameletProducer) 
camelContext.getEndpoint("kamelet://" + name).createAsyncProducer();
     }
 
     @ManagedAttribute(description = "Kamelet name 
(templateId/routeId?options)")
@@ -116,17 +118,8 @@ public class KameletProcessor extends AsyncProcessorSupport
     }
 
     @Override
-    protected void doBuild() throws Exception {
-        if (component == null) {
-            component = camelContext.getComponent("kamelet", 
KameletComponent.class);
-        }
-        if (producer == null) {
-            producer = (KameletProducer) camelContext.getEndpoint("kamelet://" 
+ name).createAsyncProducer();
-        }
-        if (producer != null) {
-            ((RouteIdAware) producer).setRouteId(getRouteId());
-        }
-        ServiceHelper.buildService(processor, producer);
+    protected void doInit() throws Exception {
+        ServiceHelper.initService(processor, producer);
 
         // we use the kamelet component (producer) to call the kamelet
         // and to receive the reply we register ourselves to the kamelet 
component
@@ -134,11 +127,6 @@ public class KameletProcessor extends AsyncProcessorSupport
         component.addKameletEip(producer.getKey(), processor);
     }
 
-    @Override
-    protected void doInit() throws Exception {
-        ServiceHelper.initService(processor, producer);
-    }
-
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startService(processor, producer);
diff --git 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java
 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java
index 0252e6f92d8..8b0106fac79 100644
--- 
a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java
+++ 
b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java
@@ -20,6 +20,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.model.KameletDefinition;
 import org.apache.camel.reifier.ProcessorReifier;
+import org.apache.camel.spi.NodeIdFactory;
 import org.apache.camel.support.PluginHelper;
 
 public class KameletReifier extends ProcessorReifier<KameletDefinition> {
@@ -36,9 +37,15 @@ public class KameletReifier extends 
ProcessorReifier<KameletDefinition> {
             processor = new NoopProcessor();
         }
         // wrap in uow
-        Processor target = new KameletProcessor(camelContext, 
parseString(definition.getName()), processor);
-        target = PluginHelper.getInternalProcessorFactory(camelContext)
-                .addUnitOfWorkProcessorAdvice(camelContext, target, null);
-        return target;
+        String outputId = 
definition.idOrCreate(camelContext.getCamelContextExtension().getContextPlugin(NodeIdFactory.class));
+        camelContext.getCamelContextExtension().createProcessor(outputId);
+        try {
+            Processor target = new KameletProcessor(camelContext, 
parseString(definition.getName()), processor);
+            target = PluginHelper.getInternalProcessorFactory(camelContext)
+                    .addUnitOfWorkProcessorAdvice(camelContext, target, null);
+            return target;
+        } finally {
+            camelContext.getCamelContextExtension().createProcessor(null);
+        }
     }
 }
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
index 8d7a17f981c..5172592716d 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java
@@ -22,8 +22,10 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.NamedNode;
 import org.apache.camel.spi.Resource;
+import org.apache.camel.support.CamelContextHelper;
 import org.apache.camel.support.ResourceHelper;
 import org.apache.camel.util.FileUtil;
 
@@ -473,4 +475,57 @@ public final class ProcessorDefinitionHelper {
         return answer;
     }
 
+    /**
+     * Whether the model should be wrapped in an error handler or not.
+     *
+     * Some EIPs like try/catch, circuit breaker, multicast, and kamelets have 
impact on whether the model should be
+     * wrapped or not.
+     */
+    public static boolean shouldWrapInErrorHandler(
+            CamelContext context, ProcessorDefinition<?> definition,
+            ProcessorDefinition<?> child, Boolean inheritErrorHandler) {
+        boolean wrap = false;
+
+        // set the error handler, must be done after init as we can set the
+        // error handler as first in the chain
+        if (definition instanceof TryDefinition || definition instanceof 
CatchDefinition
+                || definition instanceof FinallyDefinition) {
+            // do not use error handler for try .. catch .. finally blocks as 
it
+            // will handle errors itself
+        } else if 
(ProcessorDefinitionHelper.isParentOfType(TryDefinition.class, definition, true)
+                || 
ProcessorDefinitionHelper.isParentOfType(CatchDefinition.class, definition, 
true)
+                || 
ProcessorDefinitionHelper.isParentOfType(FinallyDefinition.class, definition, 
true)) {
+            // do not use error handler for try .. catch .. finally blocks as 
it
+            // will handle errors itself
+            // by checking that any of our parent(s) is not a try .. catch or
+            // finally type
+        } else if (definition instanceof OnExceptionDefinition
+                || 
ProcessorDefinitionHelper.isParentOfType(OnExceptionDefinition.class, 
definition, true)) {
+            // do not use error handler for onExceptions blocks as it will
+            // handle errors itself
+        } else if (definition instanceof CircuitBreakerDefinition
+                || 
ProcessorDefinitionHelper.isParentOfType(CircuitBreakerDefinition.class, 
definition, true)) {
+            // do not use error handler for circuit breaker
+            // however if inherit error handler is enabled, we need to wrap an 
error handler on the parent
+            if (inheritErrorHandler != null && inheritErrorHandler && child == 
null) {
+                // only wrap the parent (not the children of the circuit 
breaker)
+                wrap = true;
+            }
+        } else if (definition instanceof MulticastDefinition def) {
+            // do not use error handler for multicast as it offers fine-grained
+            // error handlers for its outputs
+            // however if share unit of work is enabled, we need to wrap an
+            // error handler on the multicast parent
+            Boolean isShareUnitOfWork = 
CamelContextHelper.parseBoolean(context, def.getShareUnitOfWork());
+            if (isShareUnitOfWork != null && isShareUnitOfWork && child == 
null) {
+                // only wrap the parent (not the children of the multicast)
+                wrap = true;
+            }
+        } else {
+            // use error handler by default or if configured to do so
+            wrap = true;
+        }
+        return wrap;
+    }
+
 }
diff --git 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
index 04a9acd9370..854a34a7c61 100644
--- 
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
+++ 
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java
@@ -694,54 +694,9 @@ public abstract class ProcessorReifier<T extends 
ProcessorDefinition<?>> extends
         // initialize the channel
         channel.initChannel(this.route, definition, child, interceptors, 
processor, route, first);
 
-        boolean wrap = false;
         // set the error handler, must be done after init as we can set the
         // error handler as first in the chain
-        if (definition instanceof TryDefinition || definition instanceof 
CatchDefinition
-                || definition instanceof FinallyDefinition) {
-            // do not use error handler for try .. catch .. finally blocks as 
it
-            // will handle errors itself
-            LOG.trace("{} is part of doTry .. doCatch .. doFinally so no error 
handler is applied", definition);
-        } else if 
(ProcessorDefinitionHelper.isParentOfType(TryDefinition.class, definition, true)
-                || 
ProcessorDefinitionHelper.isParentOfType(CatchDefinition.class, definition, 
true)
-                || 
ProcessorDefinitionHelper.isParentOfType(FinallyDefinition.class, definition, 
true)) {
-            // do not use error handler for try .. catch .. finally blocks as 
it
-            // will handle errors itself
-            // by checking that any of our parent(s) is not a try .. catch or
-            // finally type
-            LOG.trace("{} is part of doTry .. doCatch .. doFinally so no error 
handler is applied", definition);
-        } else if (definition instanceof OnExceptionDefinition
-                || 
ProcessorDefinitionHelper.isParentOfType(OnExceptionDefinition.class, 
definition, true)) {
-            LOG.trace("{} is part of OnException so no error handler is 
applied", definition);
-            // do not use error handler for onExceptions blocks as it will
-            // handle errors itself
-        } else if (definition instanceof CircuitBreakerDefinition
-                || 
ProcessorDefinitionHelper.isParentOfType(CircuitBreakerDefinition.class, 
definition, true)) {
-            // do not use error handler for circuit breaker
-            // however if inherit error handler is enabled, we need to wrap an 
error handler on the parent
-            if (inheritErrorHandler != null && inheritErrorHandler && child == 
null) {
-                // only wrap the parent (not the children of the circuit 
breaker)
-                wrap = true;
-            } else {
-                LOG.trace("{} is part of CircuitBreaker so no error handler is 
applied", definition);
-            }
-        } else if (definition instanceof MulticastDefinition def) {
-            // do not use error handler for multicast as it offers fine-grained
-            // error handlers for its outputs
-            // however if share unit of work is enabled, we need to wrap an
-            // error handler on the multicast parent
-            boolean isShareUnitOfWork = parseBoolean(def.getShareUnitOfWork(), 
false);
-            if (isShareUnitOfWork && child == null) {
-                // only wrap the parent (not the children of the multicast)
-                wrap = true;
-            } else {
-                LOG.trace("{} is part of multicast which have special error 
handling so no error handler is applied",
-                        definition);
-            }
-        } else {
-            // use error handler by default or if configured to do so
-            wrap = true;
-        }
+        boolean wrap = 
ProcessorDefinitionHelper.shouldWrapInErrorHandler(camelContext, definition, 
child, inheritErrorHandler);
         if (wrap) {
             wrapChannelInErrorHandler(channel, inheritErrorHandler);
         }

Reply via email to