This is an automated email from the ASF dual-hosted git repository. gnodet pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4275688e8e36e7c17e7bd5bd6dac56b32744184a Author: Guillaume Nodet <[email protected]> AuthorDate: Thu Feb 20 13:37:20 2020 +0100 Move ErrorHandler all reification to the reifiers # Conflicts: # core/camel-api/src/main/java/org/apache/camel/ErrorHandlerFactory.java # core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java # core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderRef.java # core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderSupport.java --- .../JtaTransactionErrorHandlerBuilder.java | 99 +++------------- ...java => JtaTransactionErrorHandlerReifier.java} | 129 +++++++++------------ .../cdi/transaction/JtaTransactionPolicy.java | 10 +- .../processor/HystrixHierarchicalConfigTest.java | 2 +- .../SpringHystrixRouteHierarchicalConfigTest.java | 2 +- .../spring/spi/TransactionErrorHandlerBuilder.java | 61 ---------- .../apache/camel/spring/EndpointReferenceTest.java | 2 - .../spring/config/DummyErrorHandlerBuilder.java | 42 ++++--- .../java/org/apache/camel/ErrorHandlerFactory.java | 23 ---- .../java/org/apache/camel/spi/RouteContext.java | 2 + ...RouteContext.java => AbstractRouteContext.java} | 16 ++- .../apache/camel/processor/MulticastProcessor.java | 21 +--- .../camel/processor/RecipientListProcessor.java | 20 +--- .../camel/builder/AdviceWithRouteBuilder.java | 25 ++-- .../camel/builder/DeadLetterChannelBuilder.java | 33 ------ .../camel/builder/DefaultErrorHandlerBuilder.java | 38 ------ .../camel/builder/ErrorHandlerBuilderRef.java | 29 ----- .../camel/builder/ErrorHandlerBuilderSupport.java | 103 +--------------- .../camel/builder/NoErrorHandlerBuilder.java | 30 ----- .../java/org/apache/camel/impl/DefaultModel.java | 1 - .../DefaultRouteContext.java} | 20 ++-- .../apache/camel/reifier/DynamicRouterReifier.java | 6 +- .../org/apache/camel/reifier/ProcessorReifier.java | 14 ++- .../apache/camel/reifier/RecipientListReifier.java | 2 +- .../org/apache/camel/reifier/RouteReifier.java | 1 - .../apache/camel/reifier/RoutingSlipReifier.java | 9 +- .../org/apache/camel/reifier/WireTapReifier.java | 2 +- .../errorhandler/ErrorHandlerRefReifier.java | 12 +- .../reifier/errorhandler/ErrorHandlerReifier.java | 76 ++++++++---- .../errorhandler/ErrorHandlerSupportTest.java | 19 +-- .../DefaultExceptionPolicyStrategyTest.java | 10 +- .../camel/reifier/DataFormatReifierTest.java | 8 +- .../apache/camel/reifier/ProcessorReifierTest.java | 2 +- .../apache/camel/support/CamelContextHelper.java | 1 - 34 files changed, 262 insertions(+), 608 deletions(-) diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java index bb15f57..f56598a 100644 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java @@ -16,22 +16,11 @@ */ package org.apache.camel.cdi.transaction; -import java.util.Map; - -import org.apache.camel.CamelContext; import org.apache.camel.LoggingLevel; -import org.apache.camel.Processor; -import org.apache.camel.RuntimeCamelException; import org.apache.camel.builder.DefaultErrorHandlerBuilder; import org.apache.camel.builder.ErrorHandlerBuilder; -import org.apache.camel.model.TransactedDefinition; -import org.apache.camel.reifier.TransactedReifier; +import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier; import org.apache.camel.spi.CamelLogger; -import org.apache.camel.spi.Policy; -import org.apache.camel.spi.RouteContext; -import org.apache.camel.spi.TransactedPolicy; -import org.apache.camel.util.ObjectHelper; -import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** @@ -40,12 +29,9 @@ import org.slf4j.LoggerFactory; */ public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilder { - public static final String ROLLBACK_LOGGING_LEVEL_PROPERTY = - JtaTransactionErrorHandlerBuilder.class.getName() + "#rollbackLoggingLevel"; - - private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionErrorHandlerBuilder.class); - - private static final String PROPAGATION_REQUIRED = "PROPAGATION_REQUIRED"; + static { + ErrorHandlerReifier.registerReifier(JtaTransactionErrorHandlerBuilder.class, JtaTransactionErrorHandlerReifier::new); + } private LoggingLevel rollbackLoggingLevel = LoggingLevel.WARN; @@ -75,73 +61,8 @@ public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilde } } - @Override - public Processor createErrorHandler(final RouteContext routeContext, final Processor processor) throws Exception { - CamelContext camelContext = routeContext.getCamelContext(); - // resolve policy reference, if given - if (transactionPolicy == null) { - if (policyRef != null) { - final TransactedDefinition transactedDefinition = new TransactedDefinition(); - transactedDefinition.setRef(policyRef); - final Policy policy = new TransactedReifier(camelContext, transactedDefinition).resolvePolicy(); - if (policy != null) { - if (!(policy instanceof JtaTransactionPolicy)) { - throw new RuntimeCamelException("The configured policy '" + policyRef + "' is of type '" - + policyRef.getClass().getName() + "' but an instance of '" - + JtaTransactionPolicy.class.getName() + "' is required!"); - } - transactionPolicy = (JtaTransactionPolicy) policy; - } - } - } - - // try to lookup default policy - if (transactionPolicy == null) { - LOG.debug( - "No transaction policy configured on TransactionErrorHandlerBuilder. Will try find it in the registry."); - - Map<String, TransactedPolicy> mapPolicy = camelContext.getRegistry().findByTypeWithName(TransactedPolicy.class); - if (mapPolicy != null && mapPolicy.size() == 1) { - TransactedPolicy policy = mapPolicy.values().iterator().next(); - if (policy instanceof JtaTransactionPolicy) { - transactionPolicy = (JtaTransactionPolicy) policy; - } - } - - if (transactionPolicy == null) { - TransactedPolicy policy = camelContext.getRegistry().lookupByNameAndType(PROPAGATION_REQUIRED, TransactedPolicy.class); - if (policy instanceof JtaTransactionPolicy) { - transactionPolicy = (JtaTransactionPolicy) policy; - } - } - - if (transactionPolicy != null) { - LOG.debug("Found TransactionPolicy in registry to use: {}", transactionPolicy); - } - } - - ObjectHelper.notNull(transactionPolicy, "transactionPolicy", this); - - final Map<String, String> properties = camelContext.getGlobalOptions(); - if ((properties != null) && properties.containsKey(ROLLBACK_LOGGING_LEVEL_PROPERTY)) { - rollbackLoggingLevel = LoggingLevel.valueOf(properties.get(ROLLBACK_LOGGING_LEVEL_PROPERTY)); - } - - JtaTransactionErrorHandler answer = new JtaTransactionErrorHandler(camelContext, - processor, - getLogger(), - getOnRedelivery(), - getRedeliveryPolicy(), - getExceptionPolicyStrategy(), - transactionPolicy, - getRetryWhilePolicy(camelContext), - getExecutorService(camelContext), - rollbackLoggingLevel, - getOnExceptionOccurred()); - - // configure error handler before we can use it - configure(routeContext, answer); - return answer; + public String getPolicyRef() { + return policyRef; } public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final String ref) { @@ -149,11 +70,19 @@ public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilde return this; } + public JtaTransactionPolicy getTransactionPolicy() { + return transactionPolicy; + } + public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final JtaTransactionPolicy transactionPolicy) { this.transactionPolicy = transactionPolicy; return this; } + public LoggingLevel getRollbackLoggingLevel() { + return rollbackLoggingLevel; + } + public JtaTransactionErrorHandlerBuilder setRollbackLoggingLevel(final LoggingLevel rollbackLoggingLevel) { this.rollbackLoggingLevel = rollbackLoggingLevel; return this; diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerReifier.java similarity index 51% copy from components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java copy to components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerReifier.java index bb15f57..252e3ee 100644 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerBuilder.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionErrorHandlerReifier.java @@ -17,77 +17,51 @@ package org.apache.camel.cdi.transaction; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; -import org.apache.camel.CamelContext; +import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.LoggingLevel; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.builder.DefaultErrorHandlerBuilder; -import org.apache.camel.builder.ErrorHandlerBuilder; import org.apache.camel.model.TransactedDefinition; import org.apache.camel.reifier.TransactedReifier; -import org.apache.camel.spi.CamelLogger; +import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier; +import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.Policy; import org.apache.camel.spi.RouteContext; +import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.spi.TransactedPolicy; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Builds transactional error handlers. This class is based on - * {@link org.apache.camel.spring.spi.TransactionErrorHandlerBuilder}. - */ -public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilder { +public class JtaTransactionErrorHandlerReifier extends ErrorHandlerReifier<JtaTransactionErrorHandlerBuilder> { public static final String ROLLBACK_LOGGING_LEVEL_PROPERTY = - JtaTransactionErrorHandlerBuilder.class.getName() + "#rollbackLoggingLevel"; - - private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionErrorHandlerBuilder.class); + JtaTransactionErrorHandlerBuilder.class.getName() + "#rollbackLoggingLevel"; private static final String PROPAGATION_REQUIRED = "PROPAGATION_REQUIRED"; - private LoggingLevel rollbackLoggingLevel = LoggingLevel.WARN; - - private JtaTransactionPolicy transactionPolicy; - - private String policyRef; + private static final Logger LOG = LoggerFactory.getLogger(JtaTransactionErrorHandlerReifier.class); - @Override - public boolean supportTransacted() { - return true; + public JtaTransactionErrorHandlerReifier(RouteContext routeContext, ErrorHandlerFactory definition) { + super(routeContext, (JtaTransactionErrorHandlerBuilder) definition); } @Override - public ErrorHandlerBuilder cloneBuilder() { - final JtaTransactionErrorHandlerBuilder answer = new JtaTransactionErrorHandlerBuilder(); - cloneBuilder(answer); - return answer; - } + public Processor createErrorHandler(final Processor processor) throws Exception { + JtaTransactionPolicy transactionPolicy = definition.getTransactionPolicy(); - @Override - protected void cloneBuilder(DefaultErrorHandlerBuilder other) { - super.cloneBuilder(other); - if (other instanceof JtaTransactionErrorHandlerBuilder) { - final JtaTransactionErrorHandlerBuilder otherTx = (JtaTransactionErrorHandlerBuilder) other; - transactionPolicy = otherTx.transactionPolicy; - rollbackLoggingLevel = otherTx.rollbackLoggingLevel; - } - } - - @Override - public Processor createErrorHandler(final RouteContext routeContext, final Processor processor) throws Exception { - CamelContext camelContext = routeContext.getCamelContext(); // resolve policy reference, if given if (transactionPolicy == null) { - if (policyRef != null) { + if (definition.getPolicyRef() != null) { final TransactedDefinition transactedDefinition = new TransactedDefinition(); - transactedDefinition.setRef(policyRef); + transactedDefinition.setRef(definition.getPolicyRef()); final Policy policy = new TransactedReifier(camelContext, transactedDefinition).resolvePolicy(); if (policy != null) { if (!(policy instanceof JtaTransactionPolicy)) { - throw new RuntimeCamelException("The configured policy '" + policyRef + "' is of type '" - + policyRef.getClass().getName() + "' but an instance of '" + throw new RuntimeCamelException("The configured policy '" + definition.getPolicyRef() + + "' is of type '" + policy.getClass().getName() + "' but an instance of '" + JtaTransactionPolicy.class.getName() + "' is required!"); } transactionPolicy = (JtaTransactionPolicy) policy; @@ -97,10 +71,10 @@ public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilde // try to lookup default policy if (transactionPolicy == null) { - LOG.debug( - "No transaction policy configured on TransactionErrorHandlerBuilder. Will try find it in the registry."); + LOG.debug("No transaction policy configured on TransactionErrorHandlerBuilder. " + + "Will try find it in the registry."); - Map<String, TransactedPolicy> mapPolicy = camelContext.getRegistry().findByTypeWithName(TransactedPolicy.class); + Map<String, TransactedPolicy> mapPolicy = findByTypeWithName(TransactedPolicy.class); if (mapPolicy != null && mapPolicy.size() == 1) { TransactedPolicy policy = mapPolicy.values().iterator().next(); if (policy instanceof JtaTransactionPolicy) { @@ -109,7 +83,7 @@ public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilde } if (transactionPolicy == null) { - TransactedPolicy policy = camelContext.getRegistry().lookupByNameAndType(PROPAGATION_REQUIRED, TransactedPolicy.class); + TransactedPolicy policy = lookupByNameAndType(PROPAGATION_REQUIRED, TransactedPolicy.class); if (policy instanceof JtaTransactionPolicy) { transactionPolicy = (JtaTransactionPolicy) policy; } @@ -123,49 +97,52 @@ public class JtaTransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilde ObjectHelper.notNull(transactionPolicy, "transactionPolicy", this); final Map<String, String> properties = camelContext.getGlobalOptions(); + LoggingLevel rollbackLoggingLevel = definition.getRollbackLoggingLevel(); if ((properties != null) && properties.containsKey(ROLLBACK_LOGGING_LEVEL_PROPERTY)) { rollbackLoggingLevel = LoggingLevel.valueOf(properties.get(ROLLBACK_LOGGING_LEVEL_PROPERTY)); } JtaTransactionErrorHandler answer = new JtaTransactionErrorHandler(camelContext, processor, - getLogger(), - getOnRedelivery(), - getRedeliveryPolicy(), - getExceptionPolicyStrategy(), + definition.getLogger(), + definition.getOnRedelivery(), + definition.getRedeliveryPolicy(), + definition.getExceptionPolicyStrategy(), transactionPolicy, - getRetryWhilePolicy(camelContext), - getExecutorService(camelContext), + definition.getRetryWhilePolicy(camelContext), + getExecutorService(), rollbackLoggingLevel, - getOnExceptionOccurred()); + definition.getOnExceptionOccurred()); // configure error handler before we can use it - configure(routeContext, answer); + configure(answer); return answer; } - public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final String ref) { - policyRef = ref; - return this; - } - - public JtaTransactionErrorHandlerBuilder setTransactionPolicy(final JtaTransactionPolicy transactionPolicy) { - this.transactionPolicy = transactionPolicy; - return this; - } - - public JtaTransactionErrorHandlerBuilder setRollbackLoggingLevel(final LoggingLevel rollbackLoggingLevel) { - this.rollbackLoggingLevel = rollbackLoggingLevel; - return this; - } - - @Override - protected CamelLogger createLogger() { - return new CamelLogger(LoggerFactory.getLogger(TransactionErrorHandler.class), LoggingLevel.ERROR); + protected synchronized ScheduledExecutorService getExecutorService() { + ScheduledExecutorService executorService = definition.getExecutorService(); + if (executorService == null || executorService.isShutdown()) { + // camel context will shutdown the executor when it shutdown so no + // need to shut it down when stopping + if (definition.getExecutorServiceRef() != null) { + executorService = lookupByNameAndType(definition.getExecutorServiceRef(), ScheduledExecutorService.class); + if (executorService == null) { + ExecutorServiceManager manager = camelContext.getExecutorServiceManager(); + ThreadPoolProfile profile = manager.getThreadPoolProfile(definition.getExecutorServiceRef()); + executorService = manager.newScheduledThreadPool(this, definition.getExecutorServiceRef(), profile); + } + if (executorService == null) { + throw new IllegalArgumentException("ExecutorServiceRef " + definition.getExecutorServiceRef() + " not found in registry."); + } + } else { + // no explicit configured thread pool, so leave it up to the + // error handler to decide if it need + // a default thread pool from + // CamelContext#getErrorHandlerExecutorService + executorService = null; + } + } + return executorService; } - @Override - public String toString() { - return "JtaTransactionErrorHandlerBuilder"; - } } diff --git a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionPolicy.java b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionPolicy.java index e19d650..124a483 100644 --- a/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionPolicy.java +++ b/components/camel-cdi/src/main/java/org/apache/camel/cdi/transaction/JtaTransactionPolicy.java @@ -65,13 +65,11 @@ public abstract class JtaTransactionPolicy implements TransactedPolicy { public Processor wrap(RouteContext routeContext, Processor processor) { JtaTransactionErrorHandler answer; // the goal is to configure the error handler builder on the route as a - // transacted error handler, - // either its already a transacted or if not we replace it with a - // transacted one that we configure here + // transacted error handler. If the configured builder is not transacted, + // we replace it with a transacted one that we configure here // and wrap the processor in the transacted error handler as we can have - // transacted routes that change - // propagation behavior, eg: from A required -> B -> requiresNew C - // (advanced use-case) + // transacted routes that change propagation behavior, + // eg: from A required -> B -> requiresNew C (advanced use-case) // if we should not support this we do not need to wrap the processor as // we only need one transacted error handler diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixHierarchicalConfigTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixHierarchicalConfigTest.java index 11f8415..765360a 100644 --- a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixHierarchicalConfigTest.java +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/HystrixHierarchicalConfigTest.java @@ -18,7 +18,7 @@ package org.apache.camel.component.hystrix.processor; import org.apache.camel.CamelContext; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.impl.engine.DefaultRouteContext; +import org.apache.camel.impl.DefaultRouteContext; import org.apache.camel.model.CircuitBreakerDefinition; import org.apache.camel.model.HystrixConfigurationDefinition; import org.apache.camel.model.Model; diff --git a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/SpringHystrixRouteHierarchicalConfigTest.java b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/SpringHystrixRouteHierarchicalConfigTest.java index b191a31..105b3e2 100644 --- a/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/SpringHystrixRouteHierarchicalConfigTest.java +++ b/components/camel-hystrix/src/test/java/org/apache/camel/component/hystrix/processor/SpringHystrixRouteHierarchicalConfigTest.java @@ -17,7 +17,7 @@ package org.apache.camel.component.hystrix.processor; import org.apache.camel.ExtendedCamelContext; -import org.apache.camel.impl.engine.DefaultRouteContext; +import org.apache.camel.impl.DefaultRouteContext; import org.apache.camel.model.CircuitBreakerDefinition; import org.apache.camel.model.HystrixConfigurationDefinition; import org.apache.camel.model.RouteDefinition; diff --git a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java index 1c7fc85..3aa044a 100644 --- a/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java +++ b/components/camel-spring/src/main/java/org/apache/camel/spring/spi/TransactionErrorHandlerBuilder.java @@ -59,67 +59,6 @@ public class TransactionErrorHandlerBuilder extends DefaultErrorHandlerBuilder { return true; } - @Override - public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception { - CamelContext camelContext = routeContext.getCamelContext(); - if (transactionTemplate == null) { - // lookup in context if no transaction template has been configured - LOG.debug("No TransactionTemplate configured on TransactionErrorHandlerBuilder. Will try find it in the registry."); - - Map<String, TransactedPolicy> mapPolicy = camelContext.getRegistry().findByTypeWithName(TransactedPolicy.class); - if (mapPolicy != null && mapPolicy.size() == 1) { - TransactedPolicy policy = mapPolicy.values().iterator().next(); - if (policy instanceof SpringTransactionPolicy) { - transactionTemplate = ((SpringTransactionPolicy) policy).getTransactionTemplate(); - } - } - - if (transactionTemplate == null) { - TransactedPolicy policy = camelContext.getRegistry().lookupByNameAndType(PROPAGATION_REQUIRED, TransactedPolicy.class); - if (policy instanceof SpringTransactionPolicy) { - transactionTemplate = ((SpringTransactionPolicy) policy).getTransactionTemplate(); - } - } - - if (transactionTemplate == null) { - Map<String, TransactionTemplate> mapTemplate = camelContext.getRegistry().findByTypeWithName(TransactionTemplate.class); - if (mapTemplate == null || mapTemplate.isEmpty()) { - LOG.trace("No TransactionTemplate found in registry."); - } else if (mapTemplate.size() == 1) { - transactionTemplate = mapTemplate.values().iterator().next(); - } else { - LOG.debug("Found {} TransactionTemplate in registry. Cannot determine which one to use. " - + "Please configure a TransactionTemplate on the TransactionErrorHandlerBuilder", mapTemplate.size()); - } - } - - if (transactionTemplate == null) { - Map<String, PlatformTransactionManager> mapManager = camelContext.getRegistry().findByTypeWithName(PlatformTransactionManager.class); - if (mapManager == null || mapManager.isEmpty()) { - LOG.trace("No PlatformTransactionManager found in registry."); - } else if (mapManager.size() == 1) { - transactionTemplate = new TransactionTemplate(mapManager.values().iterator().next()); - } else { - LOG.debug("Found {} PlatformTransactionManager in registry. Cannot determine which one to use for TransactionTemplate. " - + "Please configure a TransactionTemplate on the TransactionErrorHandlerBuilder", mapManager.size()); - } - } - - if (transactionTemplate != null) { - LOG.debug("Found TransactionTemplate in registry to use: {}", transactionTemplate); - } - } - - ObjectHelper.notNull(transactionTemplate, "transactionTemplate", this); - - TransactionErrorHandler answer = new TransactionErrorHandler(camelContext, processor, - getLogger(), getOnRedelivery(), getRedeliveryPolicy(), getExceptionPolicyStrategy(), transactionTemplate, - getRetryWhilePolicy(camelContext), getExecutorService(camelContext), getRollbackLoggingLevel(), getOnExceptionOccurred()); - // configure error handler before we can use it - configure(routeContext, answer); - return answer; - } - public void setTransactionTemplate(TransactionTemplate transactionTemplate) { this.transactionTemplate = transactionTemplate; } diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java index 23c80cc..42db380 100644 --- a/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/EndpointReferenceTest.java @@ -21,9 +21,7 @@ import org.apache.camel.Endpoint; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.NoSuchEndpointException; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.impl.engine.DefaultRouteContext; import org.apache.camel.model.RouteDefinition; -import org.apache.camel.spi.RouteContext; import org.apache.camel.spring.example.DummyBean; import org.apache.camel.support.CamelContextHelper; import org.junit.Test; diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/config/DummyErrorHandlerBuilder.java b/components/camel-spring/src/test/java/org/apache/camel/spring/config/DummyErrorHandlerBuilder.java index 3ad380e..c7d4764 100644 --- a/components/camel-spring/src/test/java/org/apache/camel/spring/config/DummyErrorHandlerBuilder.java +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/config/DummyErrorHandlerBuilder.java @@ -16,23 +16,27 @@ */ package org.apache.camel.spring.config; +import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.builder.ErrorHandlerBuilder; import org.apache.camel.builder.ErrorHandlerBuilderSupport; +import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier; import org.apache.camel.spi.RouteContext; import org.apache.camel.support.processor.DelegateProcessor; import org.springframework.beans.factory.BeanNameAware; public class DummyErrorHandlerBuilder extends ErrorHandlerBuilderSupport implements BeanNameAware { + public static final String PROPERTY_NAME = "DummyErrorHandler"; - private String beanName; - public DummyErrorHandlerBuilder() { + static { + ErrorHandlerReifier.registerReifier(DummyErrorHandlerBuilder.class, DummyErrorHandlerReifier::new); } - public DummyErrorHandlerBuilder(String beanName) { - this.beanName = beanName; + private String beanName; + + public DummyErrorHandlerBuilder() { } @Override @@ -40,9 +44,8 @@ public class DummyErrorHandlerBuilder extends ErrorHandlerBuilderSupport impleme this.beanName = beanName; } - @Override - public boolean supportTransacted() { - return false; + public String getBeanName() { + return beanName; } @Override @@ -53,15 +56,22 @@ public class DummyErrorHandlerBuilder extends ErrorHandlerBuilderSupport impleme return answer; } - @Override - public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception { - return new DelegateProcessor(processor) { - @Override - public void process(Exchange exchange) throws Exception { - exchange.setProperty(PROPERTY_NAME, beanName); - super.process(exchange); - } - }; + public static class DummyErrorHandlerReifier extends ErrorHandlerReifier<DummyErrorHandlerBuilder> { + + public DummyErrorHandlerReifier(RouteContext routeContext, ErrorHandlerFactory definition) { + super(routeContext, (DummyErrorHandlerBuilder) definition); + } + + @Override + public Processor createErrorHandler(Processor processor) throws Exception { + return new DelegateProcessor(processor) { + @Override + public void process(Exchange exchange) throws Exception { + exchange.setProperty(PROPERTY_NAME, definition.getBeanName()); + super.process(exchange); + } + }; + } } } diff --git a/core/camel-api/src/main/java/org/apache/camel/ErrorHandlerFactory.java b/core/camel-api/src/main/java/org/apache/camel/ErrorHandlerFactory.java index 62f5918..9e35c8a 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ErrorHandlerFactory.java +++ b/core/camel-api/src/main/java/org/apache/camel/ErrorHandlerFactory.java @@ -16,32 +16,9 @@ */ package org.apache.camel; -import org.apache.camel.spi.RouteContext; - /** * Factory for creating {@link org.apache.camel.processor.ErrorHandler}s. */ public interface ErrorHandlerFactory { - /** - * Creates the error handler - * - * @param routeContext the route context - * @param processor the outer processor - * @return the error handler - * @throws Exception is thrown if the error handler could not be created - */ - Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception; - - /** - * Gets or lookup the target error handler factory. - * - * Will either get this current as the error handler factory, or in case this is a reference lookup - * for a actual error handler, then this method will perform a lookup to return the actual error handler factory. - * - * @param routeContext the route context - * @return the error handler factory. - */ - ErrorHandlerFactory getOrLookupErrorHandlerFactory(RouteContext routeContext); - } diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java b/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java index 8513863..39e9b2c 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/RouteContext.java @@ -164,6 +164,8 @@ public interface RouteContext extends RuntimeConfiguration, EndpointAware { ErrorHandlerFactory getErrorHandlerFactory(); + Processor createErrorHandler(Processor processor) throws Exception; + void addAdvice(CamelInternalProcessorAdvice<?> advice); void addProperty(String key, Object value); diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractRouteContext.java similarity index 97% rename from core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java rename to core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractRouteContext.java index 0ee844b..5bf48b0 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/AbstractRouteContext.java @@ -48,7 +48,8 @@ import org.apache.camel.util.ObjectHelper; /** * The context used to activate new routing rules */ -public class DefaultRouteContext implements RouteContext { +public abstract class AbstractRouteContext implements RouteContext { + private NamedNode route; private String routeId; private Route runtimeRoute; @@ -81,7 +82,7 @@ public class DefaultRouteContext implements RouteContext { // must be concurrent as error handlers can be mutated concurrently via multicast/recipientlist EIPs private ConcurrentMap<ErrorHandlerFactory, Set<NamedNode>> errorHandlers = new ConcurrentHashMap<>(); - public DefaultRouteContext(CamelContext camelContext, NamedNode route, String routeId) { + public AbstractRouteContext(CamelContext camelContext, NamedNode route, String routeId) { this.camelContext = camelContext; this.route = route; this.routeId = routeId; @@ -508,12 +509,12 @@ public class DefaultRouteContext implements RouteContext { @Override public void addErrorHandler(ErrorHandlerFactory factory, NamedNode onException) { - getErrorHandlers(factory).add(onException); + doGetErrorHandlers(factory).add(onException); } @Override public Set<NamedNode> getErrorHandlers(ErrorHandlerFactory factory) { - return errorHandlers.computeIfAbsent(factory, f -> new LinkedHashSet<>()); + return doGetErrorHandlers(factory); } public void removeErrorHandlers(ErrorHandlerFactory factory) { @@ -522,10 +523,15 @@ public class DefaultRouteContext implements RouteContext { @Override public void addErrorHandlerFactoryReference(ErrorHandlerFactory source, ErrorHandlerFactory target) { - Set<NamedNode> list = getErrorHandlers(source); + Set<NamedNode> list = doGetErrorHandlers(source); Set<NamedNode> previous = errorHandlers.put(target, list); if (list != previous && ObjectHelper.isNotEmpty(previous) && ObjectHelper.isNotEmpty(list)) { throw new IllegalStateException("Multiple references with different handlers"); } } + + public Set<NamedNode> doGetErrorHandlers(ErrorHandlerFactory factory) { + return errorHandlers.computeIfAbsent(factory, f -> new LinkedHashSet<>()); + } + } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 7cadfdc..65f1587 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -41,7 +41,6 @@ import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.CamelExchangeException; import org.apache.camel.Endpoint; -import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Exchange; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.ExtendedExchange; @@ -138,9 +137,9 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat * <p/> * See the <tt>createProcessorExchangePair</tt> and <tt>createErrorHandler</tt> methods. */ - static final class PreparedErrorHandler extends KeyValueHolder<RouteContext, Processor> { + static final class ErrorHandlerKey extends KeyValueHolder<RouteContext, Processor> { - PreparedErrorHandler(RouteContext key, Processor value) { + ErrorHandlerKey(RouteContext key, Processor value) { super(key, value); } @@ -163,7 +162,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat private ExecutorService aggregateExecutorService; private boolean shutdownAggregateExecutorService; private final long timeout; - private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<>(); + private final ConcurrentMap<ErrorHandlerKey, Processor> errorHandlers = new ConcurrentHashMap<>(); private final boolean shareUnitOfWork; public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors) { @@ -722,7 +721,7 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat // for the entire multicast block again which will start from scratch again // create key for cache - final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor); + final ErrorHandlerKey key = new ErrorHandlerKey(routeContext, processor); // lookup cached first to reuse and preserve memory answer = errorHandlers.get(key); @@ -732,11 +731,10 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat } LOG.trace("Creating error handler for: {}", processor); - ErrorHandlerFactory builder = routeContext.getErrorHandlerFactory(); // create error handler (create error handler directly to keep it light weight, - // instead of using ProcessorDefinition.wrapInErrorHandler) + // instead of using ProcessorReifier.wrapInErrorHandler) try { - processor = createErrorHandler(routeContext, builder, exchange, processor); + processor = routeContext.createErrorHandler(processor); // and wrap in unit of work processor so the copy exchange also can run under UoW answer = createUnitOfWorkProcessor(routeContext, processor, exchange); @@ -764,13 +762,6 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat } /** - * Strategy to create the error handler from the builder - */ - protected Processor createErrorHandler(RouteContext routeContext, ErrorHandlerFactory builder, Exchange exchange, Processor processor) throws Exception { - return builder.createErrorHandler(routeContext, processor); - } - - /** * Strategy to create the unit of work to be used for the sub route * * @param routeContext the route context diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java index 66a22d3..a031939 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/RecipientListProcessor.java @@ -16,9 +16,6 @@ */ package org.apache.camel.processor; -import java.io.UnsupportedEncodingException; -import java.net.MalformedURLException; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -28,7 +25,6 @@ import org.apache.camel.AggregationStrategy; import org.apache.camel.AsyncProducer; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; -import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.ExtendedCamelContext; @@ -46,7 +42,6 @@ import org.apache.camel.support.MessageHelper; import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.support.service.ServiceHelper; import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -247,8 +242,6 @@ public class RecipientListProcessor extends MulticastProcessor { */ protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint, Producer producer, Exchange exchange, ExchangePattern pattern, boolean prototypeEndpoint) { - Processor prepared = producer; - // copy exchange, and do not share the unit of work Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); @@ -258,11 +251,11 @@ public class RecipientListProcessor extends MulticastProcessor { } // set property which endpoint we send to - setToEndpoint(copy, prepared); + setToEndpoint(copy, producer); // rework error handling to support fine grained error handling RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; - prepared = createErrorHandler(routeContext, copy, prepared); + Processor prepared = createErrorHandler(routeContext, copy, producer); // invoke on prepare on the exchange if specified if (onPrepare != null) { @@ -278,17 +271,14 @@ public class RecipientListProcessor extends MulticastProcessor { } @Override - protected Processor createErrorHandler(RouteContext routeContext, ErrorHandlerFactory builder, Exchange exchange, Processor processor) throws Exception { - // in case its a reference to another builder then we want the real builder - final ErrorHandlerFactory ehBuilder = builder.getOrLookupErrorHandlerFactory(routeContext); - - Processor answer = super.createErrorHandler(routeContext, ehBuilder, exchange, processor); + protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) { + Processor answer = super.createErrorHandler(routeContext, exchange, processor); exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() { @Override public void onDone(Exchange exchange) { // remove error handler builder from route context as we are done with the recipient list // and we cannot reuse this and must remove it to avoid leaking the error handler on the route context - routeContext.removeErrorHandlers(ehBuilder); + routeContext.removeErrorHandlers(routeContext.getErrorHandlerFactory()); } }); return answer; diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/builder/AdviceWithRouteBuilder.java b/core/camel-core-engine/src/main/java/org/apache/camel/builder/AdviceWithRouteBuilder.java index c9fe6a5..074e796 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/builder/AdviceWithRouteBuilder.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/builder/AdviceWithRouteBuilder.java @@ -88,6 +88,20 @@ public abstract class AdviceWithRouteBuilder extends RouteBuilder { * @throws Exception can be thrown from the route builder */ public static RouteDefinition adviceWith(CamelContext camelContext, Object routeId, ThrowingConsumer<AdviceWithRouteBuilder, Exception> builder) throws Exception { + RouteDefinition rd = findRouteDefinition(camelContext, routeId); + + return RouteReifier.adviceWith(rd, camelContext, new AdviceWithRouteBuilder() { + @Override + public void configure() throws Exception { + if (builder instanceof AdviceWithRouteBuilder) { + setLogRouteAsXml(((AdviceWithRouteBuilder) builder).isLogRouteAsXml()); + } + builder.accept(this); + } + }); + } + + protected static RouteDefinition findRouteDefinition(CamelContext camelContext, Object routeId) { ModelCamelContext mcc = camelContext.adapt(ModelCamelContext.class); if (mcc.getRouteDefinitions().isEmpty()) { throw new IllegalArgumentException("Cannot advice route as there are no routes"); @@ -115,16 +129,7 @@ public abstract class AdviceWithRouteBuilder extends RouteBuilder { rd = mcc.getRouteDefinitions().get(0); } } - - return RouteReifier.adviceWith(rd, camelContext, new AdviceWithRouteBuilder() { - @Override - public void configure() throws Exception { - if (builder instanceof AdviceWithRouteBuilder) { - setLogRouteAsXml(((AdviceWithRouteBuilder) builder).isLogRouteAsXml()); - } - builder.accept(this); - } - }); + return rd; } /** diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java b/core/camel-core-engine/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java index 3fdd549..108ba2b 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java @@ -16,18 +16,14 @@ */ package org.apache.camel.builder; -import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.ExchangePattern; import org.apache.camel.LoggingLevel; -import org.apache.camel.NoSuchEndpointException; import org.apache.camel.Processor; import org.apache.camel.processor.FatalFallbackErrorHandler; import org.apache.camel.processor.SendProcessor; import org.apache.camel.processor.errorhandler.DeadLetterChannel; import org.apache.camel.spi.CamelLogger; -import org.apache.camel.spi.RouteContext; -import org.apache.camel.util.StringHelper; import org.slf4j.LoggerFactory; /** @@ -52,25 +48,6 @@ public class DeadLetterChannelBuilder extends DefaultErrorHandlerBuilder { } @Override - public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception { - validateDeadLetterUri(routeContext); - - CamelContext camelContext = routeContext.getCamelContext(); - DeadLetterChannel answer = new DeadLetterChannel(camelContext, processor, getLogger(), getOnRedelivery(), getRedeliveryPolicy(), - getExceptionPolicyStrategy(), getFailureProcessor(), getDeadLetterUri(), isDeadLetterHandleNewException(), - isUseOriginalMessage(), isUseOriginalBody(), getRetryWhilePolicy(camelContext), - getExecutorService(camelContext), getOnPrepareFailure(), getOnExceptionOccurred()); - // configure error handler before we can use it - configure(routeContext, answer); - return answer; - } - - @Override - public boolean supportTransacted() { - return false; - } - - @Override public ErrorHandlerBuilder cloneBuilder() { DeadLetterChannelBuilder answer = new DeadLetterChannelBuilder(); super.cloneBuilder(answer); @@ -93,16 +70,6 @@ public class DeadLetterChannelBuilder extends DefaultErrorHandlerBuilder { return failureProcessor; } - protected void validateDeadLetterUri(RouteContext routeContext) { - if (deadLetter == null) { - StringHelper.notEmpty(deadLetterUri, "deadLetterUri", this); - deadLetter = routeContext.getCamelContext().getEndpoint(deadLetterUri); - if (deadLetter == null) { - throw new NoSuchEndpointException(deadLetterUri); - } - } - } - @Override protected CamelLogger createLogger() { return new CamelLogger(LoggerFactory.getLogger(DeadLetterChannel.class), LoggingLevel.ERROR); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java b/core/camel-core-engine/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java index 67f5171..f893756 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/builder/DefaultErrorHandlerBuilder.java @@ -27,10 +27,7 @@ import org.apache.camel.Processor; import org.apache.camel.processor.errorhandler.DefaultErrorHandler; import org.apache.camel.processor.errorhandler.RedeliveryPolicy; import org.apache.camel.spi.CamelLogger; -import org.apache.camel.spi.ExecutorServiceManager; import org.apache.camel.spi.Language; -import org.apache.camel.spi.RouteContext; -import org.apache.camel.spi.ThreadPoolProfile; import org.apache.camel.support.ExpressionToPredicateAdapter; import org.slf4j.LoggerFactory; @@ -60,16 +57,6 @@ public class DefaultErrorHandlerBuilder extends ErrorHandlerBuilderSupport { } @Override - public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception { - DefaultErrorHandler answer = new DefaultErrorHandler(routeContext.getCamelContext(), processor, getLogger(), getOnRedelivery(), getRedeliveryPolicy(), - getExceptionPolicyStrategy(), getRetryWhilePolicy(routeContext.getCamelContext()), - getExecutorService(routeContext.getCamelContext()), getOnPrepareFailure(), getOnExceptionOccurred()); - // configure error handler before we can use it - configure(routeContext, answer); - return answer; - } - - @Override public boolean supportTransacted() { return false; } @@ -663,31 +650,6 @@ public class DefaultErrorHandlerBuilder extends ErrorHandlerBuilderSupport { return new CamelLogger(LoggerFactory.getLogger(DefaultErrorHandler.class), LoggingLevel.ERROR); } - protected synchronized ScheduledExecutorService getExecutorService(CamelContext camelContext) { - if (executorService == null || executorService.isShutdown()) { - // camel context will shutdown the executor when it shutdown so no - // need to shut it down when stopping - if (executorServiceRef != null) { - executorService = camelContext.getRegistry().lookupByNameAndType(executorServiceRef, ScheduledExecutorService.class); - if (executorService == null) { - ExecutorServiceManager manager = camelContext.getExecutorServiceManager(); - ThreadPoolProfile profile = manager.getThreadPoolProfile(executorServiceRef); - executorService = manager.newScheduledThreadPool(this, executorServiceRef, profile); - } - if (executorService == null) { - throw new IllegalArgumentException("ExecutorServiceRef " + executorServiceRef + " not found in registry."); - } - } else { - // no explicit configured thread pool, so leave it up to the - // error handler to decide if it need - // a default thread pool from - // CamelContext#getErrorHandlerExecutorService - executorService = null; - } - } - return executorService; - } - @Override public String toString() { return "DefaultErrorHandlerBuilder"; diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderRef.java b/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderRef.java index f4c2eaf..0a7a6bc 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderRef.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderRef.java @@ -16,12 +16,6 @@ */ package org.apache.camel.builder; -import org.apache.camel.ErrorHandlerFactory; -import org.apache.camel.Processor; -import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier; -import org.apache.camel.spi.RouteContext; -import org.apache.camel.util.ObjectHelper; - /** * Represents a proxy to an error handler builder which is resolved by named * reference @@ -35,17 +29,6 @@ public class ErrorHandlerBuilderRef extends ErrorHandlerBuilderSupport { } @Override - public Processor createErrorHandler(RouteContext routeContext, Processor processor) throws Exception { - ErrorHandlerFactory handler = lookupErrorHandler(routeContext); - return ErrorHandlerReifier.reifier(routeContext, handler).createErrorHandler(processor); - } - - @Override - public ErrorHandlerFactory getOrLookupErrorHandlerFactory(RouteContext routeContext) { - return lookupErrorHandler(routeContext); - } - - @Override public boolean supportTransacted() { return supportTransacted; } @@ -69,18 +52,6 @@ public class ErrorHandlerBuilderRef extends ErrorHandlerBuilderSupport { return ref; } - private ErrorHandlerBuilder lookupErrorHandler(RouteContext routeContext) { - ErrorHandlerBuilder handler = (ErrorHandlerBuilder)ErrorHandlerReifier.lookupErrorHandlerFactory(routeContext, getRef()); - ObjectHelper.notNull(handler, "error handler '" + ref + "'"); - - // configure if the handler support transacted - supportTransacted = handler.supportTransacted(); - - routeContext.addErrorHandlerFactoryReference(this, handler); - - return handler; - } - @Override public String toString() { return "ErrorHandlerBuilderRef[" + ref + "]"; diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderSupport.java b/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderSupport.java index d5b5ae8..66a042b 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderSupport.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderSupport.java @@ -16,27 +16,8 @@ */ package org.apache.camel.builder; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; - import org.apache.camel.ErrorHandlerFactory; -import org.apache.camel.NamedNode; -import org.apache.camel.Predicate; -import org.apache.camel.Processor; -import org.apache.camel.RuntimeCamelException; -import org.apache.camel.model.OnExceptionDefinition; -import org.apache.camel.model.ProcessorDefinitionHelper; -import org.apache.camel.model.RouteDefinition; -import org.apache.camel.processor.ErrorHandler; -import org.apache.camel.processor.errorhandler.ErrorHandlerSupport; -import org.apache.camel.processor.errorhandler.ExceptionPolicy; -import org.apache.camel.processor.errorhandler.ExceptionPolicyKey; import org.apache.camel.processor.errorhandler.ExceptionPolicyStrategy; -import org.apache.camel.processor.errorhandler.RedeliveryErrorHandler; -import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier; -import org.apache.camel.spi.ClassResolver; -import org.apache.camel.spi.RouteContext; import org.apache.camel.util.ObjectHelper; /** @@ -45,89 +26,13 @@ import org.apache.camel.util.ObjectHelper; public abstract class ErrorHandlerBuilderSupport implements ErrorHandlerBuilder { private ExceptionPolicyStrategy exceptionPolicyStrategy; - protected void cloneBuilder(ErrorHandlerBuilderSupport other) { - other.exceptionPolicyStrategy = exceptionPolicyStrategy; - } - @Override - public ErrorHandlerFactory getOrLookupErrorHandlerFactory(RouteContext routeContext) { - // only ErrorHandlerRef should override to lookup - return this; + public boolean supportTransacted() { + return false; } - /** - * Configures the other error handler based on this error handler. - * - * @param routeContext the route context - * @param handler the other error handler - */ - public void configure(RouteContext routeContext, ErrorHandler handler) { - if (handler instanceof ErrorHandlerSupport) { - ErrorHandlerSupport handlerSupport = (ErrorHandlerSupport)handler; - - Set<NamedNode> list = routeContext.getErrorHandlers(this); - for (NamedNode exception : list) { - addExceptionPolicy(handlerSupport, routeContext, (OnExceptionDefinition)exception); - } - } - if (handler instanceof RedeliveryErrorHandler) { - RedeliveryErrorHandler reh = (RedeliveryErrorHandler)handler; - boolean original = reh.isUseOriginalMessagePolicy() || reh.isUseOriginalBodyPolicy(); - if (original) { - if (reh.isUseOriginalMessagePolicy() && reh.isUseOriginalBodyPolicy()) { - throw new IllegalArgumentException("Cannot set both useOriginalMessage and useOriginalBody on the error handler"); - } - // ensure allow original is turned on - routeContext.setAllowUseOriginalMessage(true); - } - } - } - - public static void addExceptionPolicy(ErrorHandlerSupport handlerSupport, RouteContext routeContext, OnExceptionDefinition exceptionType) { - if (routeContext != null) { - // add error handler as child service so they get lifecycle handled - Processor errorHandler = routeContext.getOnException(exceptionType.getId()); - handlerSupport.addErrorHandler(errorHandler); - - // load exception classes - List<Class<? extends Throwable>> list; - if (ObjectHelper.isNotEmpty(exceptionType.getExceptions())) { - list = createExceptionClasses(exceptionType, routeContext.getCamelContext().getClassResolver()); - for (Class<? extends Throwable> clazz : list) { - String routeId = null; - // only get the route id, if the exception type is route - // scoped - if (exceptionType.isRouteScoped()) { - RouteDefinition route = ProcessorDefinitionHelper.getRoute(exceptionType); - if (route != null) { - routeId = route.getId(); - } - } - Predicate when = exceptionType.getOnWhen() != null ? exceptionType.getOnWhen().getExpression() : null; - ExceptionPolicyKey key = new ExceptionPolicyKey(routeId, clazz, when); - ExceptionPolicy policy = toExceptionPolicy(exceptionType, routeContext); - handlerSupport.addExceptionPolicy(key, policy); - } - } - } - } - - protected static ExceptionPolicy toExceptionPolicy(OnExceptionDefinition exceptionType, RouteContext routeContext) { - return ErrorHandlerReifier.createExceptionPolicy(exceptionType, routeContext.getCamelContext()); - } - - protected static List<Class<? extends Throwable>> createExceptionClasses(OnExceptionDefinition exceptionType, ClassResolver resolver) { - List<String> list = exceptionType.getExceptions(); - List<Class<? extends Throwable>> answer = new ArrayList<>(list.size()); - for (String name : list) { - try { - Class<? extends Throwable> type = resolver.resolveMandatoryClass(name, Throwable.class); - answer.add(type); - } catch (ClassNotFoundException e) { - throw RuntimeCamelException.wrapRuntimeCamelException(e); - } - } - return answer; + protected void cloneBuilder(ErrorHandlerBuilderSupport other) { + other.exceptionPolicyStrategy = exceptionPolicyStrategy; } /** diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java b/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java index 3565d35..85aa900 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/builder/NoErrorHandlerBuilder.java @@ -16,12 +16,7 @@ */ package org.apache.camel.builder; -import org.apache.camel.AsyncCallback; -import org.apache.camel.Exchange; -import org.apache.camel.ExtendedExchange; import org.apache.camel.Processor; -import org.apache.camel.spi.RouteContext; -import org.apache.camel.support.processor.DelegateAsyncProcessor; /** * A builder to disable the use of an error handler so that any exceptions are @@ -34,31 +29,6 @@ import org.apache.camel.support.processor.DelegateAsyncProcessor; public class NoErrorHandlerBuilder extends ErrorHandlerBuilderSupport { @Override - public Processor createErrorHandler(RouteContext routeContext, Processor processor) { - return new DelegateAsyncProcessor(processor) { - @Override - public boolean process(final Exchange exchange, final AsyncCallback callback) { - return super.process(exchange, new AsyncCallback() { - @Override - public void done(boolean doneSync) { - exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(false); - callback.done(doneSync); - } - }); - } - - @Override - public String toString() { - if (processor == null) { - // if no output then dont do any description - return ""; - } - return "NoErrorHandler[" + processor + "]"; - } - }; - } - - @Override public boolean supportTransacted() { return false; } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java index ebcfe43..1593bb3 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultModel.java @@ -31,7 +31,6 @@ import org.apache.camel.ExtendedCamelContext; import org.apache.camel.FailedToStartRouteException; import org.apache.camel.Route; import org.apache.camel.impl.engine.AbstractCamelContext; -import org.apache.camel.impl.engine.DefaultRouteContext; import org.apache.camel.model.DataFormatDefinition; import org.apache.camel.model.HystrixConfigurationDefinition; import org.apache.camel.model.Model; diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerRefReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultRouteContext.java similarity index 60% copy from core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerRefReifier.java copy to core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultRouteContext.java index c7c466c..821f74a 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerRefReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/impl/DefaultRouteContext.java @@ -14,22 +14,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.reifier.errorhandler; +package org.apache.camel.impl; -import org.apache.camel.ErrorHandlerFactory; +import org.apache.camel.CamelContext; +import org.apache.camel.NamedNode; import org.apache.camel.Processor; -import org.apache.camel.builder.ErrorHandlerBuilderRef; +import org.apache.camel.impl.engine.AbstractRouteContext; +import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier; import org.apache.camel.spi.RouteContext; -public class ErrorHandlerRefReifier extends ErrorHandlerReifier<ErrorHandlerBuilderRef> { +/** + * The context used to activate new routing rules + */ +public class DefaultRouteContext extends AbstractRouteContext implements RouteContext { - public ErrorHandlerRefReifier(RouteContext routeContext, ErrorHandlerFactory definition) { - super(routeContext, (ErrorHandlerBuilderRef)definition); + public DefaultRouteContext(CamelContext camelContext, NamedNode route, String routeId) { + super(camelContext, route, routeId); } @Override public Processor createErrorHandler(Processor processor) throws Exception { - return definition.createErrorHandler(routeContext, processor); + return ErrorHandlerReifier.reifier(this, getErrorHandlerFactory()) + .createErrorHandler(processor); } } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/DynamicRouterReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/DynamicRouterReifier.java index bbf1f22..b84af32 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/DynamicRouterReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/DynamicRouterReifier.java @@ -17,13 +17,11 @@ package org.apache.camel.reifier; import org.apache.camel.AsyncProcessor; -import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.model.DynamicRouterDefinition; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.processor.DynamicRouter; -import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier; import org.apache.camel.spi.RouteContext; public class DynamicRouterReifier extends ExpressionReifier<DynamicRouterDefinition<?>> { @@ -45,12 +43,10 @@ public class DynamicRouterReifier extends ExpressionReifier<DynamicRouterDefinit dynamicRouter.setCacheSize(parseInt(definition.getCacheSize())); } - // and wrap this in an error handler - ErrorHandlerFactory builder = routeContext.getErrorHandlerFactory(); // create error handler (create error handler directly to keep it light // weight, // instead of using ProcessorReifier.wrapInErrorHandler) - AsyncProcessor errorHandler = (AsyncProcessor)ErrorHandlerReifier.reifier(routeContext, builder).createErrorHandler(dynamicRouter.newRoutingSlipProcessorForErrorHandler()); + AsyncProcessor errorHandler = (AsyncProcessor) routeContext.createErrorHandler(dynamicRouter.newRoutingSlipProcessorForErrorHandler()); dynamicRouter.setErrorHandler(errorHandler); return dynamicRouter; diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java index c693938..8012339 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ProcessorReifier.java @@ -661,7 +661,7 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends if (inheritErrorHandler == null || inheritErrorHandler) { log.trace("{} is configured to inheritErrorHandler", definition); Processor output = channel.getOutput(); - Processor errorHandler = wrapInErrorHandler(output); + Processor errorHandler = wrapInErrorHandler(output, true); // set error handler on channel channel.setErrorHandler(errorHandler); } else { @@ -673,17 +673,21 @@ public abstract class ProcessorReifier<T extends ProcessorDefinition<?>> extends * Wraps the given output in an error handler * * @param output the output + * @param longLived if the processor is longLived or not * @return the output wrapped with the error handler * @throws Exception can be thrown if failed to create error handler builder */ - protected Processor wrapInErrorHandler(Processor output) throws Exception { + protected Processor wrapInErrorHandler(Processor output, boolean longLived) throws Exception { ErrorHandlerFactory builder = routeContext.getErrorHandlerFactory(); + // create error handler Processor errorHandler = ErrorHandlerReifier.reifier(routeContext, builder).createErrorHandler(output); - // invoke lifecycles so we can manage this error handler builder - for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { - strategy.onErrorHandlerAdd(routeContext, errorHandler, builder); + if (longLived) { + // invoke lifecycles so we can manage this error handler builder + for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { + strategy.onErrorHandlerAdd(routeContext, errorHandler, builder); + } } return errorHandler; diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java index 81aac98..9ad47d8 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RecipientListReifier.java @@ -99,7 +99,7 @@ public class RecipientListReifier extends ProcessorReifier<RecipientListDefiniti // special error handling // when sending to the recipients individually Processor evalProcessor = new EvaluateExpressionProcessor(expression); - evalProcessor = super.wrapInErrorHandler(evalProcessor); + evalProcessor = wrapInErrorHandler(evalProcessor, true); pipe.add(evalProcessor); pipe.add(answer); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java index 7b5b098..c403cc1 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RouteReifier.java @@ -24,7 +24,6 @@ import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.ExtendedCamelContext; import org.apache.camel.FailedToCreateRouteException; -import org.apache.camel.NoSuchEndpointException; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.RuntimeCamelException; diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RoutingSlipReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RoutingSlipReifier.java index 29db4da..611392d 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RoutingSlipReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/RoutingSlipReifier.java @@ -17,13 +17,11 @@ package org.apache.camel.reifier; import org.apache.camel.AsyncProcessor; -import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.RoutingSlipDefinition; import org.apache.camel.processor.RoutingSlip; -import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier; import org.apache.camel.spi.RouteContext; import static org.apache.camel.model.RoutingSlipDefinition.DEFAULT_DELIMITER; @@ -51,11 +49,8 @@ public class RoutingSlipReifier extends ExpressionReifier<RoutingSlipDefinition< } // and wrap this in an error handler - ErrorHandlerFactory builder = routeContext.getErrorHandlerFactory(); - // create error handler (create error handler directly to keep it light - // weight, - // instead of using ProcessorDefinition.wrapInErrorHandler) - AsyncProcessor errorHandler = (AsyncProcessor)ErrorHandlerReifier.reifier(routeContext, builder).createErrorHandler(routingSlip.newRoutingSlipProcessorForErrorHandler()); + AsyncProcessor processor = routingSlip.newRoutingSlipProcessorForErrorHandler(); + AsyncProcessor errorHandler = (AsyncProcessor) wrapInErrorHandler(processor, false); routingSlip.setErrorHandler(errorHandler); return routingSlip; diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java index 9a69586..063b543 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java @@ -49,7 +49,7 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> { SendDynamicProcessor dynamicTo = (SendDynamicProcessor)super.createProcessor(); // create error handler we need to use for processing the wire tapped - Processor target = wrapInErrorHandler(dynamicTo); + Processor target = wrapInErrorHandler(dynamicTo, true); // and wrap in unit of work CamelInternalProcessor internal = new CamelInternalProcessor(camelContext, target); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerRefReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerRefReifier.java index c7c466c..df793fb 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerRefReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerRefReifier.java @@ -20,6 +20,7 @@ import org.apache.camel.ErrorHandlerFactory; import org.apache.camel.Processor; import org.apache.camel.builder.ErrorHandlerBuilderRef; import org.apache.camel.spi.RouteContext; +import org.apache.camel.util.ObjectHelper; public class ErrorHandlerRefReifier extends ErrorHandlerReifier<ErrorHandlerBuilderRef> { @@ -29,7 +30,16 @@ public class ErrorHandlerRefReifier extends ErrorHandlerReifier<ErrorHandlerBuil @Override public Processor createErrorHandler(Processor processor) throws Exception { - return definition.createErrorHandler(routeContext, processor); + ErrorHandlerFactory handler = lookupErrorHandler(routeContext); + return ErrorHandlerReifier.reifier(routeContext, handler).createErrorHandler(processor); + } + + private ErrorHandlerFactory lookupErrorHandler(RouteContext routeContext) { + ErrorHandlerFactory handler = + ErrorHandlerReifier.lookupErrorHandlerFactory(routeContext, definition.getRef()); + ObjectHelper.notNull(handler, "error handler '" + definition.getRef() + "'"); + routeContext.addErrorHandlerFactoryReference(definition, handler); + return handler; } } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java index c769767..f58379c 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/errorhandler/ErrorHandlerReifier.java @@ -16,7 +16,9 @@ */ package org.apache.camel.reifier.errorhandler; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.function.BiFunction; @@ -35,16 +37,17 @@ import org.apache.camel.builder.ErrorHandlerBuilderRef; import org.apache.camel.builder.ErrorHandlerBuilderSupport; import org.apache.camel.builder.NoErrorHandlerBuilder; import org.apache.camel.model.OnExceptionDefinition; +import org.apache.camel.model.ProcessorDefinitionHelper; import org.apache.camel.model.RedeliveryPolicyDefinition; import org.apache.camel.model.RouteDefinition; import org.apache.camel.processor.ErrorHandler; import org.apache.camel.processor.errorhandler.ErrorHandlerSupport; import org.apache.camel.processor.errorhandler.ExceptionPolicy; import org.apache.camel.processor.errorhandler.ExceptionPolicy.RedeliveryOption; +import org.apache.camel.processor.errorhandler.ExceptionPolicyKey; import org.apache.camel.processor.errorhandler.RedeliveryErrorHandler; import org.apache.camel.processor.errorhandler.RedeliveryPolicy; import org.apache.camel.reifier.AbstractReifier; -import org.apache.camel.reifier.language.ExpressionReifier; import org.apache.camel.spi.RouteContext; import org.apache.camel.support.CamelContextHelper; import org.apache.camel.util.ObjectHelper; @@ -80,44 +83,34 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerBuilderSupport> BiFunction<RouteContext, ErrorHandlerFactory, ErrorHandlerReifier<? extends ErrorHandlerFactory>> reifier = ERROR_HANDLERS.get(definition.getClass()); if (reifier != null) { return reifier.apply(routeContext, definition); - } else if (definition instanceof ErrorHandlerBuilderSupport) { - return new ErrorHandlerReifier<ErrorHandlerBuilderSupport>(routeContext, (ErrorHandlerBuilderSupport)definition) { - @Override - public Processor createErrorHandler(Processor processor) throws Exception { - return definition.createErrorHandler(routeContext, processor); - } - }; - } else { - throw new IllegalStateException("Unsupported definition: " + definition); } + throw new IllegalStateException("Unsupported definition: " + definition); } - public static ExceptionPolicy createExceptionPolicy(OnExceptionDefinition def, CamelContext camelContext) { + public ExceptionPolicy createExceptionPolicy(OnExceptionDefinition def) { Predicate handled = def.getHandledPolicy(); if (handled == null && def.getHandled() != null) { - handled = ExpressionReifier.reifier(camelContext, def.getHandled()).createPredicate(); + handled = createPredicate(def.getHandled()); } Predicate continued = def.getContinuedPolicy(); if (continued == null && def.getContinued() != null) { - continued = ExpressionReifier.reifier(camelContext, def.getContinued()).createPredicate(); + continued = createPredicate(def.getContinued()); } Predicate retryWhile = def.getRetryWhilePolicy(); if (retryWhile == null && def.getRetryWhile() != null) { - retryWhile = ExpressionReifier.reifier(camelContext, def.getRetryWhile()).createPredicate(); + retryWhile = createPredicate(def.getRetryWhile()); } Processor onRedelivery = def.getOnRedelivery(); if (onRedelivery == null && def.getOnRedeliveryRef() != null) { - onRedelivery = CamelContextHelper.mandatoryLookup(camelContext, - CamelContextHelper.parseText(camelContext, def.getOnRedeliveryRef()), Processor.class); + onRedelivery = mandatoryLookup(parseString(def.getOnRedeliveryRef()), Processor.class); } Processor onExceptionOccurred = def.getOnExceptionOccurred(); if (onExceptionOccurred == null && def.getOnExceptionOccurredRef() != null) { - onExceptionOccurred = CamelContextHelper.mandatoryLookup(camelContext, - CamelContextHelper.parseText(camelContext, def.getOnExceptionOccurredRef()), Processor.class); + onExceptionOccurred = mandatoryLookup(parseString(def.getOnExceptionOccurredRef()), Processor.class); } return new ExceptionPolicy(def.getId(), CamelContextHelper.getRouteId(def), - def.getUseOriginalMessage() != null && CamelContextHelper.parseBoolean(camelContext, def.getUseOriginalMessage()), - def.getUseOriginalBody() != null && CamelContextHelper.parseBoolean(camelContext, def.getUseOriginalBody()), + parseBoolean(def.getUseOriginalMessage(), false), + parseBoolean(def.getUseOriginalBody(), false), ObjectHelper.isNotEmpty(def.getOutputs()), handled, continued, retryWhile, onRedelivery, onExceptionOccurred, def.getRedeliveryPolicyRef(), @@ -262,6 +255,47 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerBuilderSupport> return !DEFAULT_ERROR_HANDLER_BUILDER.equals(ref); } + public void addExceptionPolicy(ErrorHandlerSupport handlerSupport, OnExceptionDefinition exceptionType) { + // add error handler as child service so they get lifecycle handled + Processor errorHandler = routeContext.getOnException(exceptionType.getId()); + handlerSupport.addErrorHandler(errorHandler); + + // load exception classes + List<Class<? extends Throwable>> list; + if (ObjectHelper.isNotEmpty(exceptionType.getExceptions())) { + list = createExceptionClasses(exceptionType); + for (Class<? extends Throwable> clazz : list) { + String routeId = null; + // only get the route id, if the exception type is route + // scoped + if (exceptionType.isRouteScoped()) { + RouteDefinition route = ProcessorDefinitionHelper.getRoute(exceptionType); + if (route != null) { + routeId = route.getId(); + } + } + Predicate when = exceptionType.getOnWhen() != null ? exceptionType.getOnWhen().getExpression() : null; + ExceptionPolicyKey key = new ExceptionPolicyKey(routeId, clazz, when); + ExceptionPolicy policy = createExceptionPolicy(exceptionType); + handlerSupport.addExceptionPolicy(key, policy); + } + } + } + + protected List<Class<? extends Throwable>> createExceptionClasses(OnExceptionDefinition exceptionType) { + List<String> list = exceptionType.getExceptions(); + List<Class<? extends Throwable>> answer = new ArrayList<>(list.size()); + for (String name : list) { + try { + Class<? extends Throwable> type = camelContext.getClassResolver().resolveMandatoryClass(name, Throwable.class); + answer.add(type); + } catch (ClassNotFoundException e) { + throw RuntimeCamelException.wrapRuntimeCamelException(e); + } + } + return answer; + } + /** * Creates the error handler * @@ -276,7 +310,7 @@ public abstract class ErrorHandlerReifier<T extends ErrorHandlerBuilderSupport> ErrorHandlerSupport handlerSupport = (ErrorHandlerSupport)handler; for (NamedNode exception : routeContext.getErrorHandlers(definition)) { - ErrorHandlerBuilderSupport.addExceptionPolicy(handlerSupport, routeContext, (OnExceptionDefinition)exception); + addExceptionPolicy(handlerSupport, (OnExceptionDefinition) exception); } } if (handler instanceof RedeliveryErrorHandler) { diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/errorhandler/ErrorHandlerSupportTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/errorhandler/ErrorHandlerSupportTest.java index 87922b6..9ecc356 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/errorhandler/ErrorHandlerSupportTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/errorhandler/ErrorHandlerSupportTest.java @@ -22,9 +22,10 @@ import java.util.List; import org.apache.camel.ContextTestSupport; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.builder.ErrorHandlerBuilderSupport; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.model.OnExceptionDefinition; +import org.apache.camel.reifier.errorhandler.DefaultErrorHandlerReifier; +import org.apache.camel.spi.RouteContext; import org.junit.Test; public class ErrorHandlerSupportTest extends ContextTestSupport { @@ -36,7 +37,7 @@ public class ErrorHandlerSupportTest extends ContextTestSupport { exceptions.add(ParentException.class); ErrorHandlerSupport support = new ShuntErrorHandlerSupport(); - ErrorHandlerBuilderSupport.addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(exceptions)); + addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(exceptions)); assertEquals(ChildException.class.getName(), getExceptionPolicyFor(support, new ChildException(), 0)); assertEquals(ParentException.class.getName(), getExceptionPolicyFor(support, new ParentException(), 1)); @@ -49,7 +50,7 @@ public class ErrorHandlerSupportTest extends ContextTestSupport { exceptions.add(ChildException.class); ErrorHandlerSupport support = new ShuntErrorHandlerSupport(); - ErrorHandlerBuilderSupport.addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(exceptions)); + addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(exceptions)); assertEquals(ChildException.class.getName(), getExceptionPolicyFor(support, new ChildException(), 1)); assertEquals(ParentException.class.getName(), getExceptionPolicyFor(support, new ParentException(), 0)); @@ -58,8 +59,8 @@ public class ErrorHandlerSupportTest extends ContextTestSupport { @Test public void testTwoPolicyChildFirst() { ErrorHandlerSupport support = new ShuntErrorHandlerSupport(); - ErrorHandlerBuilderSupport.addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ChildException.class)); - ErrorHandlerBuilderSupport.addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ParentException.class)); + addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ChildException.class)); + addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ParentException.class)); assertEquals(ChildException.class.getName(), getExceptionPolicyFor(support, new ChildException(), 0)); assertEquals(ParentException.class.getName(), getExceptionPolicyFor(support, new ParentException(), 0)); @@ -68,13 +69,17 @@ public class ErrorHandlerSupportTest extends ContextTestSupport { @Test public void testTwoPolicyChildLast() { ErrorHandlerSupport support = new ShuntErrorHandlerSupport(); - ErrorHandlerBuilderSupport.addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ParentException.class)); - ErrorHandlerBuilderSupport.addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ChildException.class)); + addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ParentException.class)); + addExceptionPolicy(support, context.getRoute("foo").getRouteContext(), new OnExceptionDefinition(ChildException.class)); assertEquals(ChildException.class.getName(), getExceptionPolicyFor(support, new ChildException(), 0)); assertEquals(ParentException.class.getName(), getExceptionPolicyFor(support, new ParentException(), 0)); } + private static void addExceptionPolicy(ErrorHandlerSupport handlerSupport, RouteContext routeContext, OnExceptionDefinition exceptionType) { + new DefaultErrorHandlerReifier<>(routeContext, null).addExceptionPolicy(handlerSupport, exceptionType); + } + private static String getExceptionPolicyFor(ErrorHandlerSupport support, Throwable childException, int index) { return support.getExceptionPolicy(null, childException).getExceptions().get(index); } diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyTest.java index 24bd5b1..62ee77c 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/exceptionpolicy/DefaultExceptionPolicyStrategyTest.java @@ -24,15 +24,20 @@ import java.net.SocketException; import java.util.HashMap; import org.apache.camel.AlreadyStoppedException; +import org.apache.camel.CamelContext; import org.apache.camel.CamelExchangeException; import org.apache.camel.ExchangeTimedOutException; import org.apache.camel.RuntimeCamelException; import org.apache.camel.ValidationException; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.impl.DefaultRouteContext; import org.apache.camel.model.OnExceptionDefinition; import org.apache.camel.processor.errorhandler.DefaultExceptionPolicyStrategy; import org.apache.camel.processor.errorhandler.ExceptionPolicy; import org.apache.camel.processor.errorhandler.ExceptionPolicyKey; +import org.apache.camel.reifier.errorhandler.DefaultErrorHandlerReifier; import org.apache.camel.reifier.errorhandler.ErrorHandlerReifier; +import org.apache.camel.spi.RouteContext; import org.junit.Assert; import org.junit.Test; @@ -48,7 +53,10 @@ public class DefaultExceptionPolicyStrategyTest extends Assert { private ExceptionPolicy type3; private ExceptionPolicy exceptionPolicy(Class<? extends Throwable> exceptionClass) { - return ErrorHandlerReifier.createExceptionPolicy(new OnExceptionDefinition(exceptionClass), null); + CamelContext cc = new DefaultCamelContext(); + RouteContext context = new DefaultRouteContext(cc, null, null); + return new DefaultErrorHandlerReifier<>(context, null) + .createExceptionPolicy(new OnExceptionDefinition(exceptionClass)); } private void setupPolicies() { diff --git a/core/camel-core/src/test/java/org/apache/camel/reifier/DataFormatReifierTest.java b/core/camel-core/src/test/java/org/apache/camel/reifier/DataFormatReifierTest.java index e98248d..9d7d481 100644 --- a/core/camel-core/src/test/java/org/apache/camel/reifier/DataFormatReifierTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/reifier/DataFormatReifierTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.reifier; +import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.model.dataformat.CustomDataFormat; import org.apache.camel.reifier.dataformat.CustomDataFormatReifier; import org.apache.camel.reifier.dataformat.DataFormatReifier; @@ -27,17 +28,18 @@ import static junit.framework.TestCase.fail; @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class DataFormatReifierTest { + @Test public void testHandleCustomDataFormat() { + DefaultCamelContext context = new DefaultCamelContext(); try { - DataFormatReifier.reifier(null, new MyDataFormat()); - + DataFormatReifier.reifier(context, new MyDataFormat()); fail("Should throw IllegalStateException instead"); } catch (IllegalStateException e) { } DataFormatReifier.registerReifier(MyDataFormat.class, CustomDataFormatReifier::new); - DataFormatReifier.reifier(null, new MyDataFormat()); + DataFormatReifier.reifier(context, new MyDataFormat()); } public static class MyDataFormat extends CustomDataFormat { diff --git a/core/camel-core/src/test/java/org/apache/camel/reifier/ProcessorReifierTest.java b/core/camel-core/src/test/java/org/apache/camel/reifier/ProcessorReifierTest.java index d6a0c86..1deeae8 100644 --- a/core/camel-core/src/test/java/org/apache/camel/reifier/ProcessorReifierTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/reifier/ProcessorReifierTest.java @@ -16,7 +16,7 @@ */ package org.apache.camel.reifier; -import org.apache.camel.impl.engine.DefaultRouteContext; +import org.apache.camel.impl.DefaultRouteContext; import org.apache.camel.model.ProcessDefinition; import org.apache.camel.spi.RouteContext; import org.junit.Test; diff --git a/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java index 20589ee..75bb00b 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/CamelContextHelper.java @@ -27,7 +27,6 @@ import org.apache.camel.ExtendedCamelContext; import org.apache.camel.NamedNode; import org.apache.camel.NoSuchBeanException; import org.apache.camel.NoSuchEndpointException; -import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.RuntimeCamelException; import org.apache.camel.spi.RouteStartupOrder;
