This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit d4d4cdc4bc31e6c35ce8437c2b2d23c3b31f14f8 Author: Claus Ibsen <[email protected]> AuthorDate: Thu Jan 30 10:15:19 2020 +0100 CAMEL-14354: camel-core optimize --- .../camel/impl/engine/DefaultProducerCache.java | 2 +- .../camel/impl/engine/DefaultRouteContext.java | 2 +- .../impl/engine/SubscribeMethodProcessor.java | 2 +- .../camel/processor/CamelInternalProcessor.java | 12 +++++------ .../apache/camel/processor/MulticastProcessor.java | 4 ++-- .../apache/camel/processor/UnitOfWorkProducer.java | 2 +- .../org/apache/camel/reifier/AggregateReifier.java | 2 +- .../apache/camel/reifier/OnCompletionReifier.java | 2 +- .../apache/camel/reifier/ResequenceReifier.java | 4 ++-- .../org/apache/camel/reifier/WireTapReifier.java | 2 +- .../org/apache/camel/support/DefaultMessage.java | 25 +++++++++++++++++----- 11 files changed, 37 insertions(+), 22 deletions(-) diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java index e0b15ea..d0c1798 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultProducerCache.java @@ -74,7 +74,7 @@ public class DefaultProducerCache extends ServiceSupport implements ProducerCach } // internal processor used for sending - internalProcessor = new SharedCamelInternalProcessor(camelContext, new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); + internalProcessor = new SharedCamelInternalProcessor(camelContext, new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null, camelContext)); } protected ProducerServicePool createServicePool(CamelContext camelContext, int cacheSize) { diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java index e7739d6..5397988 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultRouteContext.java @@ -177,7 +177,7 @@ public class DefaultRouteContext implements RouteContext { // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW CamelInternalProcessor internal = new CamelInternalProcessor(getCamelContext(), target); - internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(this)); + internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(this, getCamelContext())); // and then optionally add route policy processor if a custom policy is set List<RoutePolicy> routePolicyList = getRoutePolicyList(); diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java index 14d2fc9..0953720 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/SubscribeMethodProcessor.java @@ -58,7 +58,7 @@ public final class SubscribeMethodProcessor extends AsyncProcessorSupport implem .getBeanProcessorFactory().createBeanProcessor(endpoint.getCamelContext(), pojo, method); // must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked CamelInternalProcessor internal = new CamelInternalProcessor(endpoint.getCamelContext(), answer); - internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); + internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null, endpoint.getCamelContext())); Predicate p; if (ObjectHelper.isEmpty(predicate)) { diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index e0f4fa1..f2a7c7e 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -589,14 +589,14 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { private String routeId; private UnitOfWorkFactory uowFactory; - public UnitOfWorkProcessorAdvice(RouteContext routeContext) { + public UnitOfWorkProcessorAdvice(RouteContext routeContext, CamelContext camelContext) { this.routeContext = routeContext; if (routeContext != null) { this.routeId = routeContext.getRouteId(); - this.uowFactory = routeContext.getCamelContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory(); - // optimize uow factory to initialize it early and once per advice - this.uowFactory.afterPropertiesConfigured(routeContext.getCamelContext()); } + this.uowFactory = camelContext.adapt(ExtendedCamelContext.class).getUnitOfWorkFactory(); + // optimize uow factory to initialize it early and once per advice + this.uowFactory.afterPropertiesConfigured(camelContext); } @Override @@ -666,8 +666,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { private final UnitOfWork parent; - public ChildUnitOfWorkProcessorAdvice(RouteContext routeContext, UnitOfWork parent) { - super(routeContext); + public ChildUnitOfWorkProcessorAdvice(RouteContext routeContext, CamelContext camelContext, UnitOfWork parent) { + super(routeContext, camelContext); this.parent = parent; } diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 95af8ad..dd2ae22 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -776,9 +776,9 @@ public class MulticastProcessor extends AsyncProcessorSupport implements Navigat // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); if (parent != null) { - internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(routeContext, parent)); + internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(routeContext, exchange.getContext(), parent)); } else { - internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); + internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext, exchange.getContext())); } return internal; diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java b/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java index fecac02..0214e16 100644 --- a/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java +++ b/core/camel-base/src/main/java/org/apache/camel/processor/UnitOfWorkProducer.java @@ -42,7 +42,7 @@ public final class UnitOfWorkProducer extends DefaultAsyncProducer { this.producer = producer; // wrap in unit of work CamelInternalProcessor internal = new CamelInternalProcessor(producer.getEndpoint().getCamelContext(), producer); - internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null)); + internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(null, producer.getEndpoint().getCamelContext())); this.processor = internal; } diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java index 791747e..0a51722 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/AggregateReifier.java @@ -53,7 +53,7 @@ public class AggregateReifier extends ProcessorReifier<AggregateDefinition> { // wrap the aggregate route in a unit of work processor CamelInternalProcessor internal = new CamelInternalProcessor(routeContext.getCamelContext(), childProcessor); - internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); + internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext, routeContext.getCamelContext())); Expression correlation = definition.getExpression().createExpression(routeContext); AggregationStrategy strategy = createAggregationStrategy(routeContext); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java index b2d9a3c..12222c9 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/OnCompletionReifier.java @@ -67,7 +67,7 @@ public class OnCompletionReifier extends ProcessorReifier<OnCompletionDefinition // wrap the on completion route in a unit of work processor CamelInternalProcessor internal = new CamelInternalProcessor(routeContext.getCamelContext(), childProcessor); - internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); + internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext, routeContext.getCamelContext())); routeContext.setOnCompletion(getId(definition, routeContext), internal); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java index a1e33ef..c2243ac 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/ResequenceReifier.java @@ -76,7 +76,7 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> { // and wrap in unit of work CamelInternalProcessor internal = new CamelInternalProcessor(routeContext.getCamelContext(), processor); - internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); + internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext, routeContext.getCamelContext())); ObjectHelper.notNull(config, "config", this); ObjectHelper.notNull(expression, "expression", this); @@ -109,7 +109,7 @@ public class ResequenceReifier extends ProcessorReifier<ResequenceDefinition> { Expression expression = definition.getExpression().createExpression(routeContext); CamelInternalProcessor internal = new CamelInternalProcessor(routeContext.getCamelContext(), processor); - internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); + internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext, routeContext.getCamelContext())); ObjectHelper.notNull(config, "config", this); ObjectHelper.notNull(expression, "expression", this); diff --git a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java index 1f16c8d..18a12dc 100644 --- a/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java +++ b/core/camel-core-engine/src/main/java/org/apache/camel/reifier/WireTapReifier.java @@ -55,7 +55,7 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> { // and wrap in unit of work CamelInternalProcessor internal = new CamelInternalProcessor(routeContext.getCamelContext(), target); - internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); + internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext, routeContext.getCamelContext())); // is true by default boolean isCopy = definition.getCopy() == null || parseBoolean(routeContext, definition.getCopy()); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java index a37af68..b6c729b 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java @@ -16,6 +16,7 @@ */ package org.apache.camel.support; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -221,11 +222,17 @@ public class DefaultMessage extends MessageSupport { @Override public void setHeaders(Map<String, Object> headers) { - if (camelContext.getHeadersMapFactory().isInstanceOf(headers)) { - this.headers = headers; + HeadersMapFactory factory = camelContext.getHeadersMapFactory(); + if (factory != null) { + if (factory.isInstanceOf(headers)) { + this.headers = headers; + } else { + // create a new map + this.headers = camelContext.getHeadersMapFactory().newMap(headers); + } } else { - // create a new map - this.headers = camelContext.getHeadersMapFactory().newMap(headers); + // should not really happen but some tests rely on using camel context that is not started + this.headers = headers; } } @@ -252,7 +259,15 @@ public class DefaultMessage extends MessageSupport { * the underlying inbound transport */ protected Map<String, Object> createHeaders() { - Map<String, Object> map = camelContext.getHeadersMapFactory().newMap(); + Map<String, Object> map; + + HeadersMapFactory factory = camelContext.getHeadersMapFactory(); + if (factory != null) { + map = factory.newMap(); + } else { + // should not really happen but some tests rely on using camel context that is not started + map = new HashMap<>(); + } populateInitialHeaders(map); return map; }
