This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch errorhandler-in-dsl in repository https://gitbox.apache.org/repos/asf/camel.git
commit cbe1e9855faf6fee0b1c38dd31c51027ef0ab487 Author: Claus Ibsen <[email protected]> AuthorDate: Tue Apr 5 18:31:05 2022 +0200 CAMEL-16834: error handler in model DSL. WIP --- .../errorhandler/DeadLetterChannelProperties.java | 1 + .../DefaultErrorHandlerProperties.java | 1 + .../errorhandler/ErrorHandlerRefProperties.java | 1 + .../errorhandler/NoErrorHandlerProperties.java | 1 + .../reifier/errorhandler/ErrorHandlerReifier.java | 17 +-- .../NewDefaultErrorHandlerReifier.java | 134 +++++++++++++++++++++ .../errorhandler/NewNoErrorHandlerReifier.java} | 26 ++-- .../errorhandler/NewDeadLetterChannelTest.java | 1 + 8 files changed, 165 insertions(+), 17 deletions(-) diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DeadLetterChannelProperties.java b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DeadLetterChannelProperties.java index 8ce8e0483e9..3996624619e 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DeadLetterChannelProperties.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DeadLetterChannelProperties.java @@ -19,6 +19,7 @@ package org.apache.camel.model.errorhandler; import org.apache.camel.CamelContext; import org.apache.camel.Predicate; +@Deprecated public interface DeadLetterChannelProperties extends DefaultErrorHandlerProperties { // has no additional configurations diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerProperties.java b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerProperties.java index 4688c8c1f1c..ba2d352faf2 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerProperties.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/DefaultErrorHandlerProperties.java @@ -24,6 +24,7 @@ import org.apache.camel.Processor; import org.apache.camel.processor.errorhandler.RedeliveryPolicy; import org.apache.camel.spi.CamelLogger; +@Deprecated public interface DefaultErrorHandlerProperties extends ErrorHandlerFactory { boolean hasLogger(); diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/ErrorHandlerRefProperties.java b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/ErrorHandlerRefProperties.java index d9ce7801c96..e1a57a28bf1 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/ErrorHandlerRefProperties.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/ErrorHandlerRefProperties.java @@ -18,6 +18,7 @@ package org.apache.camel.model.errorhandler; import org.apache.camel.ErrorHandlerFactory; +@Deprecated public interface ErrorHandlerRefProperties extends ErrorHandlerFactory { String DEFAULT_ERROR_HANDLER_BUILDER = "CamelDefaultErrorHandlerBuilder"; diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/NoErrorHandlerProperties.java b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/NoErrorHandlerProperties.java index babb993df96..94a9ee886f6 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/NoErrorHandlerProperties.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/NoErrorHandlerProperties.java @@ -18,6 +18,7 @@ package org.apache.camel.model.errorhandler; import org.apache.camel.ErrorHandlerFactory; +@Deprecated public interface NoErrorHandlerProperties extends ErrorHandlerFactory { // no configuration diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java index 400d45a6d48..e48f144ab9c 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java @@ -36,8 +36,10 @@ import org.apache.camel.model.OnExceptionDefinition; import org.apache.camel.model.RedeliveryPolicyDefinition; import org.apache.camel.model.errorhandler.DeadLetterChannelDefinition; import org.apache.camel.model.errorhandler.DeadLetterChannelProperties; +import org.apache.camel.model.errorhandler.DefaultErrorHandlerDefinition; import org.apache.camel.model.errorhandler.DefaultErrorHandlerProperties; import org.apache.camel.model.errorhandler.ErrorHandlerRefProperties; +import org.apache.camel.model.errorhandler.NoErrorHandlerDefinition; import org.apache.camel.model.errorhandler.NoErrorHandlerProperties; import org.apache.camel.processor.errorhandler.ErrorHandlerSupport; import org.apache.camel.processor.errorhandler.ExceptionPolicy; @@ -92,6 +94,7 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerFactory> extends } private static ErrorHandlerReifier<? extends ErrorHandlerFactory> coreReifier(Route route, ErrorHandlerFactory definition) { + // TODO: legacy, should if (definition instanceof DeadLetterChannelProperties) { return new DeadLetterChannelReifier(route, definition); } else if (definition instanceof DefaultErrorHandlerProperties) { @@ -104,14 +107,14 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerFactory> extends if (definition instanceof DeadLetterChannelDefinition) { return new NewDeadLetterChannelReifier(route, (DeadLetterChannelDefinition) definition); - // } else if (definition instanceof DefaultErrorHandlerDefinition) { - // return new DefaultErrorHandlerReifier<>(route, definition); - // TODO: ref properties? - // } else if (definition instanceof ErrorHandlerRefProperties) { - // return new ErrorHandlerRefReifier(route, definition); - // } else if (definition instanceof NoErrorHandlerDefinition) { - // return new NoErrorHandlerReifier(route, definition); + } else if (definition instanceof DefaultErrorHandlerDefinition) { + return new NewDefaultErrorHandlerReifier(route, (DefaultErrorHandlerDefinition) definition); + } else if (definition instanceof NoErrorHandlerDefinition) { + return new NewNoErrorHandlerReifier(route, definition); } + // TODO: ref properties? + // } else if (definition instanceof ErrorHandlerRefProperties) { + // return new ErrorHandlerRefReifier(route, definition); return null; } diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/NewDefaultErrorHandlerReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/NewDefaultErrorHandlerReifier.java new file mode 100644 index 00000000000..cbdb97c1862 --- /dev/null +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/NewDefaultErrorHandlerReifier.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.reifier.errorhandler; + +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.camel.CamelContext; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Predicate; +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.model.RedeliveryPolicyDefinition; +import org.apache.camel.model.errorhandler.DeadLetterChannelDefinition; +import org.apache.camel.model.errorhandler.DefaultErrorHandlerDefinition; +import org.apache.camel.processor.FatalFallbackErrorHandler; +import org.apache.camel.processor.SendProcessor; +import org.apache.camel.processor.errorhandler.DefaultErrorHandler; +import org.apache.camel.processor.errorhandler.RedeliveryPolicy; +import org.apache.camel.spi.CamelLogger; +import org.apache.camel.spi.ExecutorServiceManager; +import org.apache.camel.spi.Language; +import org.apache.camel.spi.ThreadPoolProfile; + +public class NewDefaultErrorHandlerReifier extends ErrorHandlerReifier<DefaultErrorHandlerDefinition> { + + public NewDefaultErrorHandlerReifier(Route route, DefaultErrorHandlerDefinition definition) { + super(route, definition); + } + + @Override + public Processor createErrorHandler(Processor processor) throws Exception { + // optimize to use shared default instance if using out of the box settings + + RedeliveryPolicy redeliveryPolicy = resolveRedeliveryPolicy(definition, camelContext); + CamelLogger logger = resolveLogger(definition, camelContext); + + DefaultErrorHandler answer = new DefaultErrorHandler( + camelContext, processor, logger, + getProcessor(definition.getOnRedeliveryProcessor(), definition.getOnRedeliveryRef()), + redeliveryPolicy, + getPredicate(definition.getRetryWhilePredicate(), definition.getRetryWhileRef()), + getExecutorService(definition.getExecutorServiceBean(), definition.getExecutorServiceRef()), + getProcessor(definition.getOnPrepareFailureProcessor(), definition.getOnPrepareFailureRef()), + getProcessor(definition.getOnExceptionOccurredProcessor(), definition.getOnExceptionOccurredRef())); + // configure error handler before we can use it + configure(answer); + return answer; + } + + private Predicate resolveRetryWhilePolicy(DeadLetterChannelDefinition definition, CamelContext camelContext) { + Predicate answer = definition.getRetryWhilePredicate(); + + if (answer == null && definition.getRetryWhileRef() != null) { + // it is a bean expression + Language bean = camelContext.resolveLanguage("bean"); + answer = bean.createPredicate(definition.getRetryWhileRef()); + answer.initPredicate(camelContext); + } + + return answer; + } + + private CamelLogger resolveLogger(DefaultErrorHandlerDefinition definition, CamelContext camelContext) { + CamelLogger answer = definition.getLoggerBean(); + if (answer == null && definition.getLoggerRef() != null) { + answer = mandatoryLookup(definition.getLoggerRef(), CamelLogger.class); + } + return answer; + } + + private Processor createDeadLetterChannelProcessor(String uri) { + // wrap in our special safe fallback error handler if sending to + // dead letter channel fails + Processor child = new SendProcessor(camelContext.getEndpoint(uri), ExchangePattern.InOnly); + // force MEP to be InOnly so when sending to DLQ we would not expect + // a reply if the MEP was InOut + return new FatalFallbackErrorHandler(child, true); + } + + private RedeliveryPolicy resolveRedeliveryPolicy(DefaultErrorHandlerDefinition definition, CamelContext camelContext) { + RedeliveryPolicy answer = null; + RedeliveryPolicyDefinition def = definition.getRedeliveryPolicy(); + if (def != null) { + answer = ErrorHandlerReifier.createRedeliveryPolicy(def, camelContext, null); + } + if (def == null && definition.getRedeliveryPolicyRef() != null) { + answer = mandatoryLookup(definition.getRedeliveryPolicyRef(), RedeliveryPolicy.class); + } + if (answer == null) { + answer = RedeliveryPolicy.DEFAULT_POLICY; + } + return answer; + } + + protected synchronized ScheduledExecutorService getExecutorService( + ScheduledExecutorService executorService, String executorServiceRef) { + if (executorService == null || executorService.isShutdown()) { + // camel context will shutdown the executor when it shutdown so no + // need to shut it down when stopping + if (executorServiceRef != null) { + executorService = lookupByNameAndType(executorServiceRef, ScheduledExecutorService.class); + if (executorService == null) { + ExecutorServiceManager manager = camelContext.getExecutorServiceManager(); + ThreadPoolProfile profile = manager.getThreadPoolProfile(executorServiceRef); + executorService = manager.newScheduledThreadPool(this, executorServiceRef, profile); + } + if (executorService == null) { + throw new IllegalArgumentException("ExecutorService " + executorServiceRef + " not found in registry."); + } + } else { + // no explicit configured thread pool, so leave it up to the + // error handler to decide if it need a default thread pool from + // CamelContext#getErrorHandlerExecutorService + executorService = null; + } + } + return executorService; + } + +} diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/ErrorHandlerRefProperties.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/NewNoErrorHandlerReifier.java similarity index 53% copy from core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/ErrorHandlerRefProperties.java copy to core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/NewNoErrorHandlerReifier.java index d9ce7801c96..d75640ecb87 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/errorhandler/ErrorHandlerRefProperties.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/errorhandler/NewNoErrorHandlerReifier.java @@ -14,19 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.model.errorhandler; +package org.apache.camel.reifier.errorhandler; import org.apache.camel.ErrorHandlerFactory; +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.model.errorhandler.NoErrorHandlerDefinition; +import org.apache.camel.processor.errorhandler.NoErrorHandler; +import org.apache.camel.spi.ErrorHandler; -public interface ErrorHandlerRefProperties extends ErrorHandlerFactory { +public class NewNoErrorHandlerReifier extends ErrorHandlerReifier<NoErrorHandlerDefinition> { - String DEFAULT_ERROR_HANDLER_BUILDER = "CamelDefaultErrorHandlerBuilder"; + public NewNoErrorHandlerReifier(Route route, ErrorHandlerFactory definition) { + super(route, (NoErrorHandlerDefinition) definition); + } - String getRef(); - - void setRef(String ref); - - boolean isSupportTransacted(); - - void setSupportTransacted(boolean supportTransacted); + @Override + public Processor createErrorHandler(Processor processor) throws Exception { + ErrorHandler answer = new NoErrorHandler(processor); + configure(answer); + return answer; + } } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/errorhandler/NewDeadLetterChannelTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/errorhandler/NewDeadLetterChannelTest.java index 2379ac835f8..6c65af6aa6f 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/errorhandler/NewDeadLetterChannelTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/errorhandler/NewDeadLetterChannelTest.java @@ -21,6 +21,7 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.model.errorhandler.DeadLetterChannelDefinition; import org.junit.jupiter.api.Test; +@Deprecated public class NewDeadLetterChannelTest extends ContextTestSupport { @Test
