This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch kamelet2 in repository https://gitbox.apache.org/repos/asf/camel.git
commit 2d35aa84a299e4bf7762b89e8d9adeb5ac32395c Author: Claus Ibsen <[email protected]> AuthorDate: Sun Jan 26 18:29:55 2025 +0100 CAMEL-21599: camel-kamelet - Rework error handler for kamelets to be more standard Camel. WIP --- .../apache/camel/component/kamelet/Kamelet.java | 25 ++++------ .../camel/component/kamelet/KameletProcessor.java | 22 ++------- .../camel/component/kamelet/KameletReifier.java | 15 ++++-- .../camel/model/ProcessorDefinitionHelper.java | 55 ++++++++++++++++++++++ .../org/apache/camel/reifier/ProcessorReifier.java | 47 +----------------- 5 files changed, 82 insertions(+), 82 deletions(-) diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java index a67401ce646..0dec0f1e009 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/Kamelet.java @@ -30,7 +30,6 @@ import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.model.RouteDefinition; import org.apache.camel.model.RouteTemplateDefinition; import org.apache.camel.model.ToDefinition; -import org.apache.camel.model.TryDefinition; import org.apache.camel.spi.PropertiesComponent; import org.apache.camel.spi.UuidGenerator; import org.apache.camel.support.CamelContextHelper; @@ -204,25 +203,21 @@ public final class Kamelet { if (noErrorHandler) { def.setErrorHandlerFactory(new NoErrorHandlerBuilder()); } else if (prid != null && ppid != null) { + ModelCamelContext mcc = (ModelCamelContext) in.getCamelContext(); // the kamelet are used from a processor, and we need to check if this processor // has any error handler or not (if not then we should also not use error handler in the kamelet) - ModelCamelContext mcc = (ModelCamelContext) in.getCamelContext(); - ProcessorDefinition<?> proc = mcc.getProcessorDefinition(ppid); - // TODO: Make API in ProcessorDefinitionHelper we can reuse that - // checks for this like in ProcessorReifer.wrapChannel - boolean tryBlock = proc != null && ProcessorDefinitionHelper.isParentOfType(TryDefinition.class, proc, true); - if (tryBlock) { + ProcessorDefinition<?> pro = mcc.getProcessorDefinition(ppid); + boolean wrap = pro == null || ProcessorDefinitionHelper.shouldWrapInErrorHandler(def.getCamelContext(), pro, null, + pro.getInheritErrorHandler()); + if (wrap) { + RouteDefinition parent = mcc.getRouteDefinition(prid); + if (parent != null) { + def.setErrorHandlerFactory(parent.getErrorHandlerFactory().cloneBuilder()); + } + } else { def.setErrorHandlerFactory(new NoErrorHandlerBuilder()); } } - if (!def.isErrorHandlerFactorySet() && prid != null) { - // inherit the error handler from the parent route - ModelCamelContext mcc = (ModelCamelContext) in.getCamelContext(); - RouteDefinition parent = mcc.getRouteDefinition(prid); - if (parent != null) { - def.setErrorHandlerFactory(parent.getErrorHandlerFactory().cloneBuilder()); - } - } if (def.getInput() == null) { throw new IllegalArgumentException("Camel route " + rid + " input does not exist."); diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java index 9bfab346d8f..d58fc2b1ff5 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletProcessor.java @@ -49,10 +49,12 @@ public class KameletProcessor extends AsyncProcessorSupport private String id; private String routeId; - public KameletProcessor(CamelContext camelContext, String name, Processor processor) { + public KameletProcessor(CamelContext camelContext, String name, Processor processor) throws Exception { this.camelContext = camelContext; this.name = name; this.processor = AsyncProcessorConverterHelper.convert(processor); + this.component = camelContext.getComponent("kamelet", KameletComponent.class); + this.producer = (KameletProducer) camelContext.getEndpoint("kamelet://" + name).createAsyncProducer(); } @ManagedAttribute(description = "Kamelet name (templateId/routeId?options)") @@ -116,17 +118,8 @@ public class KameletProcessor extends AsyncProcessorSupport } @Override - protected void doBuild() throws Exception { - if (component == null) { - component = camelContext.getComponent("kamelet", KameletComponent.class); - } - if (producer == null) { - producer = (KameletProducer) camelContext.getEndpoint("kamelet://" + name).createAsyncProducer(); - } - if (producer != null) { - ((RouteIdAware) producer).setRouteId(getRouteId()); - } - ServiceHelper.buildService(processor, producer); + protected void doInit() throws Exception { + ServiceHelper.initService(processor, producer); // we use the kamelet component (producer) to call the kamelet // and to receive the reply we register ourselves to the kamelet component @@ -134,11 +127,6 @@ public class KameletProcessor extends AsyncProcessorSupport component.addKameletEip(producer.getKey(), processor); } - @Override - protected void doInit() throws Exception { - ServiceHelper.initService(processor, producer); - } - @Override protected void doStart() throws Exception { ServiceHelper.startService(processor, producer); diff --git a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java index 0252e6f92d8..8b0106fac79 100644 --- a/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java +++ b/components/camel-kamelet/src/main/java/org/apache/camel/component/kamelet/KameletReifier.java @@ -20,6 +20,7 @@ import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.model.KameletDefinition; import org.apache.camel.reifier.ProcessorReifier; +import org.apache.camel.spi.NodeIdFactory; import org.apache.camel.support.PluginHelper; public class KameletReifier extends ProcessorReifier<KameletDefinition> { @@ -36,9 +37,15 @@ public class KameletReifier extends ProcessorReifier<KameletDefinition> { processor = new NoopProcessor(); } // wrap in uow - Processor target = new KameletProcessor(camelContext, parseString(definition.getName()), processor); - target = PluginHelper.getInternalProcessorFactory(camelContext) - .addUnitOfWorkProcessorAdvice(camelContext, target, null); - return target; + String outputId = definition.idOrCreate(camelContext.getCamelContextExtension().getContextPlugin(NodeIdFactory.class)); + camelContext.getCamelContextExtension().createProcessor(outputId); + try { + Processor target = new KameletProcessor(camelContext, parseString(definition.getName()), processor); + target = PluginHelper.getInternalProcessorFactory(camelContext) + .addUnitOfWorkProcessorAdvice(camelContext, target, null); + return target; + } finally { + camelContext.getCamelContextExtension().createProcessor(null); + } } } diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java index 8d7a17f981c..5172592716d 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinitionHelper.java @@ -22,8 +22,10 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import org.apache.camel.CamelContext; import org.apache.camel.NamedNode; import org.apache.camel.spi.Resource; +import org.apache.camel.support.CamelContextHelper; import org.apache.camel.support.ResourceHelper; import org.apache.camel.util.FileUtil; @@ -473,4 +475,57 @@ public final class ProcessorDefinitionHelper { return answer; } + /** + * Whether the model should be wrapped in an error handler or not. + * + * Some EIPs like try/catch, circuit breaker, multicast, and kamelets have impact on whether the model should be + * wrapped or not. + */ + public static boolean shouldWrapInErrorHandler( + CamelContext context, ProcessorDefinition<?> definition, + ProcessorDefinition<?> child, Boolean inheritErrorHandler) { + boolean wrap = false; + + // set the error handler, must be done after init as we can set the + // error handler as first in the chain + if (definition instanceof TryDefinition || definition instanceof CatchDefinition + || definition instanceof FinallyDefinition) { + // do not use error handler for try .. catch .. finally blocks as it + // will handle errors itself + } else if (ProcessorDefinitionHelper.isParentOfType(TryDefinition.class, definition, true) + || ProcessorDefinitionHelper.isParentOfType(CatchDefinition.class, definition, true) + || ProcessorDefinitionHelper.isParentOfType(FinallyDefinition.class, definition, true)) { + // do not use error handler for try .. catch .. finally blocks as it + // will handle errors itself + // by checking that any of our parent(s) is not a try .. catch or + // finally type + } else if (definition instanceof OnExceptionDefinition + || ProcessorDefinitionHelper.isParentOfType(OnExceptionDefinition.class, definition, true)) { + // do not use error handler for onExceptions blocks as it will + // handle errors itself + } else if (definition instanceof CircuitBreakerDefinition + || ProcessorDefinitionHelper.isParentOfType(CircuitBreakerDefinition.class, definition, true)) { + // do not use error handler for circuit breaker + // however if inherit error handler is enabled, we need to wrap an error handler on the parent + if (inheritErrorHandler != null && inheritErrorHandler && child == null) { + // only wrap the parent (not the children of the circuit breaker) + wrap = true; + } + } else if (definition instanceof MulticastDefinition def) { + // do not use error handler for multicast as it offers fine-grained + // error handlers for its outputs + // however if share unit of work is enabled, we need to wrap an + // error handler on the multicast parent + Boolean isShareUnitOfWork = CamelContextHelper.parseBoolean(context, def.getShareUnitOfWork()); + if (isShareUnitOfWork != null && isShareUnitOfWork && child == null) { + // only wrap the parent (not the children of the multicast) + wrap = true; + } + } else { + // use error handler by default or if configured to do so + wrap = true; + } + return wrap; + } + } diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java index 04a9acd9370..854a34a7c61 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ProcessorReifier.java @@ -694,54 +694,9 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends // initialize the channel channel.initChannel(this.route, definition, child, interceptors, processor, route, first); - boolean wrap = false; // set the error handler, must be done after init as we can set the // error handler as first in the chain - if (definition instanceof TryDefinition || definition instanceof CatchDefinition - || definition instanceof FinallyDefinition) { - // do not use error handler for try .. catch .. finally blocks as it - // will handle errors itself - LOG.trace("{} is part of doTry .. doCatch .. doFinally so no error handler is applied", definition); - } else if (ProcessorDefinitionHelper.isParentOfType(TryDefinition.class, definition, true) - || ProcessorDefinitionHelper.isParentOfType(CatchDefinition.class, definition, true) - || ProcessorDefinitionHelper.isParentOfType(FinallyDefinition.class, definition, true)) { - // do not use error handler for try .. catch .. finally blocks as it - // will handle errors itself - // by checking that any of our parent(s) is not a try .. catch or - // finally type - LOG.trace("{} is part of doTry .. doCatch .. doFinally so no error handler is applied", definition); - } else if (definition instanceof OnExceptionDefinition - || ProcessorDefinitionHelper.isParentOfType(OnExceptionDefinition.class, definition, true)) { - LOG.trace("{} is part of OnException so no error handler is applied", definition); - // do not use error handler for onExceptions blocks as it will - // handle errors itself - } else if (definition instanceof CircuitBreakerDefinition - || ProcessorDefinitionHelper.isParentOfType(CircuitBreakerDefinition.class, definition, true)) { - // do not use error handler for circuit breaker - // however if inherit error handler is enabled, we need to wrap an error handler on the parent - if (inheritErrorHandler != null && inheritErrorHandler && child == null) { - // only wrap the parent (not the children of the circuit breaker) - wrap = true; - } else { - LOG.trace("{} is part of CircuitBreaker so no error handler is applied", definition); - } - } else if (definition instanceof MulticastDefinition def) { - // do not use error handler for multicast as it offers fine-grained - // error handlers for its outputs - // however if share unit of work is enabled, we need to wrap an - // error handler on the multicast parent - boolean isShareUnitOfWork = parseBoolean(def.getShareUnitOfWork(), false); - if (isShareUnitOfWork && child == null) { - // only wrap the parent (not the children of the multicast) - wrap = true; - } else { - LOG.trace("{} is part of multicast which have special error handling so no error handler is applied", - definition); - } - } else { - // use error handler by default or if configured to do so - wrap = true; - } + boolean wrap = ProcessorDefinitionHelper.shouldWrapInErrorHandler(camelContext, definition, child, inheritErrorHandler); if (wrap) { wrapChannelInErrorHandler(channel, inheritErrorHandler); }
