This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit 97b9aaf618e491901b2414725e8fd96d307c2941 Author: Claus Ibsen <[email protected]> AuthorDate: Sat Feb 20 15:22:02 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- ...{DefaultExchange.java => AbstractExchange.java} | 46 +- .../org/apache/camel/support/DefaultConsumer.java | 11 +- .../org/apache/camel/support/DefaultExchange.java | 683 +-------------------- .../camel/support/DefaultPooledExchange.java | 11 +- .../camel/support/PollingConsumerSupport.java | 5 +- 5 files changed, 53 insertions(+), 703 deletions(-) diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java similarity index 96% copy from core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java copy to core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java index adb804c..6e5daa6 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java @@ -40,16 +40,20 @@ import org.apache.camel.spi.UnitOfWork; import org.apache.camel.util.ObjectHelper; /** - * The default and only implementation of {@link Exchange}. + * Base class for the two official and only implementations of {@link Exchange}, the {@link DefaultExchange} and + * {@link DefaultPooledExchange}. + * + * Camel end users should use {@link DefaultExchange} if creating an {@link Exchange} manually. However that is more + * seldom to use, as exchanges are created via {@link Endpoint}. + * + * @see DefaultExchange */ -public class DefaultExchange implements ExtendedExchange { +class AbstractExchange implements ExtendedExchange { - // TODO: AbstractExchange and move somewhere, and have DefaultExchange as thin - - private final CamelContext context; - long created; + final CamelContext context; // optimize to create properties always and with a reasonable small size final Map<String, Object> properties = new ConcurrentHashMap<>(8); + long created; Message in; Message out; Exception exception; @@ -72,19 +76,19 @@ public class DefaultExchange implements ExtendedExchange { boolean redeliveryExhausted; Boolean errorHandlerHandled; - public DefaultExchange(CamelContext context) { + public AbstractExchange(CamelContext context) { this.context = context; this.pattern = ExchangePattern.InOnly; this.created = System.currentTimeMillis(); } - public DefaultExchange(CamelContext context, ExchangePattern pattern) { + public AbstractExchange(CamelContext context, ExchangePattern pattern) { this.context = context; this.pattern = pattern; this.created = System.currentTimeMillis(); } - public DefaultExchange(Exchange parent) { + public AbstractExchange(Exchange parent) { this.context = parent.getContext(); this.pattern = parent.getPattern(); this.created = parent.getCreated(); @@ -93,14 +97,14 @@ public class DefaultExchange implements ExtendedExchange { this.unitOfWork = parent.getUnitOfWork(); } - public DefaultExchange(Endpoint fromEndpoint) { + public AbstractExchange(Endpoint fromEndpoint) { this.context = fromEndpoint.getCamelContext(); this.pattern = ExchangePattern.InOnly; this.created = System.currentTimeMillis(); this.fromEndpoint = fromEndpoint; } - public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) { + public AbstractExchange(Endpoint fromEndpoint, ExchangePattern pattern) { this.context = fromEndpoint.getCamelContext(); this.pattern = pattern; this.created = System.currentTimeMillis(); @@ -108,16 +112,6 @@ public class DefaultExchange implements ExtendedExchange { } @Override - public String toString() { - // do not output information about the message as it may contain sensitive information - if (exchangeId != null) { - return "Exchange[" + exchangeId + "]"; - } else { - return "Exchange[]"; - } - } - - @Override public long getCreated() { return created; } @@ -717,4 +711,14 @@ public class DefaultExchange implements ExtendedExchange { return context.getUuidGenerator().generateUuid(); } + @Override + public String toString() { + // do not output information about the message as it may contain sensitive information + if (exchangeId != null) { + return "Exchange[" + exchangeId + "]"; + } else { + return "Exchange[]"; + } + } + } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java index 685f8c1..2bd8cc2 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java @@ -16,7 +16,16 @@ */ package org.apache.camel.support; -import org.apache.camel.*; +import org.apache.camel.AsyncProcessor; +import org.apache.camel.Consumer; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExtendedCamelContext; +import org.apache.camel.ExtendedExchange; +import org.apache.camel.PooledExchange; +import org.apache.camel.Processor; +import org.apache.camel.Route; +import org.apache.camel.RouteAware; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.spi.RouteIdAware; diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java index adb804c..198484a 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java @@ -16,705 +16,34 @@ */ package org.apache.camel.support; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; - import org.apache.camel.CamelContext; -import org.apache.camel.CamelExecutionException; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; -import org.apache.camel.ExtendedCamelContext; -import org.apache.camel.ExtendedExchange; -import org.apache.camel.Message; -import org.apache.camel.MessageHistory; -import org.apache.camel.spi.HeadersMapFactory; -import org.apache.camel.spi.Synchronization; -import org.apache.camel.spi.UnitOfWork; -import org.apache.camel.util.ObjectHelper; /** * The default and only implementation of {@link Exchange}. */ -public class DefaultExchange implements ExtendedExchange { - - // TODO: AbstractExchange and move somewhere, and have DefaultExchange as thin - - private final CamelContext context; - long created; - // optimize to create properties always and with a reasonable small size - final Map<String, Object> properties = new ConcurrentHashMap<>(8); - Message in; - Message out; - Exception exception; - String exchangeId; - UnitOfWork unitOfWork; - ExchangePattern pattern; - Endpoint fromEndpoint; - String fromRouteId; - List<Synchronization> onCompletions; - Boolean externalRedelivered; - String historyNodeId; - String historyNodeLabel; - boolean transacted; - boolean routeStop; - boolean rollbackOnly; - boolean rollbackOnlyLast; - boolean notifyEvent; - boolean interrupted; - boolean interruptable = true; - boolean redeliveryExhausted; - Boolean errorHandlerHandled; +public final class DefaultExchange extends AbstractExchange { public DefaultExchange(CamelContext context) { - this.context = context; - this.pattern = ExchangePattern.InOnly; - this.created = System.currentTimeMillis(); + super(context); } public DefaultExchange(CamelContext context, ExchangePattern pattern) { - this.context = context; - this.pattern = pattern; - this.created = System.currentTimeMillis(); + super(context, pattern); } public DefaultExchange(Exchange parent) { - this.context = parent.getContext(); - this.pattern = parent.getPattern(); - this.created = parent.getCreated(); - this.fromEndpoint = parent.getFromEndpoint(); - this.fromRouteId = parent.getFromRouteId(); - this.unitOfWork = parent.getUnitOfWork(); + super(parent); } public DefaultExchange(Endpoint fromEndpoint) { - this.context = fromEndpoint.getCamelContext(); - this.pattern = ExchangePattern.InOnly; - this.created = System.currentTimeMillis(); - this.fromEndpoint = fromEndpoint; + super(fromEndpoint); } public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) { - this.context = fromEndpoint.getCamelContext(); - this.pattern = pattern; - this.created = System.currentTimeMillis(); - this.fromEndpoint = fromEndpoint; - } - - @Override - public String toString() { - // do not output information about the message as it may contain sensitive information - if (exchangeId != null) { - return "Exchange[" + exchangeId + "]"; - } else { - return "Exchange[]"; - } - } - - @Override - public long getCreated() { - return created; - } - - @Override - public Exchange copy() { - DefaultExchange exchange = new DefaultExchange(this); - - exchange.setIn(getIn().copy()); - exchange.getIn().setBody(getIn().getBody()); - if (getIn().hasHeaders()) { - exchange.getIn().setHeaders(safeCopyHeaders(getIn().getHeaders())); - } - if (hasOut()) { - exchange.setOut(getOut().copy()); - exchange.getOut().setBody(getOut().getBody()); - if (getOut().hasHeaders()) { - exchange.getOut().setHeaders(safeCopyHeaders(getOut().getHeaders())); - } - } - - exchange.setException(exception); - exchange.setRouteStop(routeStop); - exchange.setRollbackOnly(rollbackOnly); - exchange.setRollbackOnlyLast(rollbackOnlyLast); - exchange.setNotifyEvent(notifyEvent); - exchange.setRedeliveryExhausted(redeliveryExhausted); - exchange.setErrorHandlerHandled(errorHandlerHandled); - - // copy properties after body as body may trigger lazy init - if (hasProperties()) { - safeCopyProperties(getProperties(), exchange.getProperties()); - } - - return exchange; - } - - private Map<String, Object> safeCopyHeaders(Map<String, Object> headers) { - if (headers == null) { - return null; - } - - if (context != null) { - ExtendedCamelContext ecc = (ExtendedCamelContext) context; - HeadersMapFactory factory = ecc.getHeadersMapFactory(); - if (factory != null) { - return factory.newMap(headers); - } - } - // should not really happen but some tests dont start camel context - return new HashMap<>(headers); - } - - @SuppressWarnings("unchecked") - private void safeCopyProperties(Map<String, Object> source, Map<String, Object> target) { - target.putAll(source); - if (getContext().isMessageHistory()) { - // safe copy message history using a defensive copy - List<MessageHistory> history = (List<MessageHistory>) target.remove(Exchange.MESSAGE_HISTORY); - if (history != null) { - // use thread-safe list as message history may be accessed concurrently - target.put(Exchange.MESSAGE_HISTORY, new CopyOnWriteArrayList<>(history)); - } - } - } - - @Override - public CamelContext getContext() { - return context; - } - - @Override - public Object getProperty(String name) { - return properties.get(name); - } - - @Override - public Object getProperty(String name, Object defaultValue) { - Object answer = getProperty(name); - return answer != null ? answer : defaultValue; - } - - @Override - @SuppressWarnings("unchecked") - public <T> T getProperty(String name, Class<T> type) { - Object value = getProperty(name); - if (value == null) { - // lets avoid NullPointerException when converting to boolean for null values - if (boolean.class == type) { - return (T) Boolean.FALSE; - } - return null; - } - - // eager same instance type test to avoid the overhead of invoking the type converter - // if already same type - if (type.isInstance(value)) { - return (T) value; - } - - return ExchangeHelper.convertToType(this, type, value); - } - - @Override - @SuppressWarnings("unchecked") - public <T> T getProperty(String name, Object defaultValue, Class<T> type) { - Object value = getProperty(name); - if (value == null) { - value = defaultValue; - } - if (value == null) { - // lets avoid NullPointerException when converting to boolean for null values - if (boolean.class == type) { - return (T) Boolean.FALSE; - } - return null; - } - - // eager same instance type test to avoid the overhead of invoking the type converter - // if already same type - if (type.isInstance(value)) { - return (T) value; - } - - return ExchangeHelper.convertToType(this, type, value); - } - - @Override - public void setProperty(String name, Object value) { - if (value != null) { - // avoid the NullPointException - properties.put(name, value); - } else { - // if the value is null, we just remove the key from the map - if (name != null) { - properties.remove(name); - } - } - } - - @Override - public void setProperties(Map<String, Object> properties) { - this.properties.clear(); - this.properties.putAll(properties); - } - - @Override - public Object removeProperty(String name) { - if (!hasProperties()) { - return null; - } - return properties.remove(name); - } - - @Override - public boolean removeProperties(String pattern) { - return removeProperties(pattern, (String[]) null); - } - - @Override - public boolean removeProperties(String pattern, String... excludePatterns) { - if (!hasProperties()) { - return false; - } - - // special optimized - if (excludePatterns == null && "*".equals(pattern)) { - properties.clear(); - return true; - } - - // store keys to be removed as we cannot loop and remove at the same time in implementations such as HashMap - Set<String> toBeRemoved = null; - boolean matches = false; - for (String key : properties.keySet()) { - if (PatternHelper.matchPattern(key, pattern)) { - if (excludePatterns != null && PatternHelper.isExcludePatternMatch(key, excludePatterns)) { - continue; - } - matches = true; - if (toBeRemoved == null) { - toBeRemoved = new HashSet<>(); - } - toBeRemoved.add(key); - } - } - - if (matches) { - if (toBeRemoved.size() == properties.size()) { - // special optimization when all should be removed - properties.clear(); - } else { - for (String key : toBeRemoved) { - properties.remove(key); - } - } - } - - return matches; - } - - @Override - public Map<String, Object> getProperties() { - return properties; - } - - @Override - public boolean hasProperties() { - return !properties.isEmpty(); - } - - @Override - public Message getIn() { - if (in == null) { - in = new DefaultMessage(getContext()); - configureMessage(in); - } - return in; - } - - @Override - public <T> T getIn(Class<T> type) { - Message in = getIn(); - - // eager same instance type test to avoid the overhead of invoking the type converter - // if already same type - if (type.isInstance(in)) { - return type.cast(in); - } - - // fallback to use type converter - return context.getTypeConverter().convertTo(type, this, in); - } - - @Override - public void setIn(Message in) { - this.in = in; - configureMessage(in); - } - - @Override - public Message getOut() { - // lazy create - if (out == null) { - out = (in instanceof MessageSupport) - ? ((MessageSupport) in).newInstance() : new DefaultMessage(getContext()); - configureMessage(out); - } - return out; - } - - @Override - public <T> T getOut(Class<T> type) { - if (!hasOut()) { - return null; - } - - Message out = getOut(); - - // eager same instance type test to avoid the overhead of invoking the type converter - // if already same type - if (type.isInstance(out)) { - return type.cast(out); - } - - // fallback to use type converter - return context.getTypeConverter().convertTo(type, this, out); - } - - @Override - public boolean hasOut() { - return out != null; - } - - @Override - public void setOut(Message out) { - this.out = out; - configureMessage(out); - } - - @Override - public Message getMessage() { - return hasOut() ? getOut() : getIn(); - } - - @Override - public <T> T getMessage(Class<T> type) { - return hasOut() ? getOut(type) : getIn(type); - } - - @Override - public void setMessage(Message message) { - if (hasOut()) { - setOut(message); - } else { - setIn(message); - } - } - - @Override - public Exception getException() { - return exception; - } - - @Override - public <T> T getException(Class<T> type) { - return ObjectHelper.getException(type, exception); - } - - @Override - public void setException(Throwable t) { - if (t == null) { - this.exception = null; - } else if (t instanceof Exception) { - this.exception = (Exception) t; - } else { - // wrap throwable into an exception - this.exception = CamelExecutionException.wrapCamelExecutionException(this, t); - } - if (t instanceof InterruptedException) { - // mark the exchange as interrupted due to the interrupt exception - setInterrupted(true); - } - } - - @Override - public <T extends Exchange> T adapt(Class<T> type) { - return type.cast(this); - } - - @Override - public ExchangePattern getPattern() { - return pattern; - } - - @Override - public void setPattern(ExchangePattern pattern) { - this.pattern = pattern; - } - - @Override - public Endpoint getFromEndpoint() { - return fromEndpoint; - } - - @Override - public void setFromEndpoint(Endpoint fromEndpoint) { - this.fromEndpoint = fromEndpoint; - } - - @Override - public String getFromRouteId() { - return fromRouteId; - } - - @Override - public void setFromRouteId(String fromRouteId) { - this.fromRouteId = fromRouteId; - } - - @Override - public String getExchangeId() { - if (exchangeId == null) { - exchangeId = createExchangeId(); - } - return exchangeId; - } - - @Override - public void setExchangeId(String id) { - this.exchangeId = id; - } - - @Override - public boolean isFailed() { - return exception != null; - } - - @Override - public boolean isTransacted() { - return transacted; - } - - @Override - public void setTransacted(boolean transacted) { - this.transacted = true; - } - - @Override - public boolean isRouteStop() { - return routeStop; - } - - @Override - public void setRouteStop(boolean routeStop) { - this.routeStop = routeStop; - } - - @Override - public boolean isExternalRedelivered() { - if (externalRedelivered == null) { - // lets avoid adding methods to the Message API, so we use the - // DefaultMessage to allow component specific messages to extend - // and implement the isExternalRedelivered method. - Message msg = getIn(); - if (msg instanceof DefaultMessage) { - externalRedelivered = ((DefaultMessage) msg).isTransactedRedelivered(); - } - // not from a transactional resource so mark it as false by default - if (externalRedelivered == null) { - externalRedelivered = false; - } - } - return externalRedelivered; - } - - @Override - public boolean isRollbackOnly() { - return rollbackOnly; - } - - @Override - public void setRollbackOnly(boolean rollbackOnly) { - this.rollbackOnly = rollbackOnly; - } - - @Override - public boolean isRollbackOnlyLast() { - return rollbackOnlyLast; - } - - @Override - public void setRollbackOnlyLast(boolean rollbackOnlyLast) { - this.rollbackOnlyLast = rollbackOnlyLast; - } - - @Override - public UnitOfWork getUnitOfWork() { - return unitOfWork; - } - - @Override - public void setUnitOfWork(UnitOfWork unitOfWork) { - this.unitOfWork = unitOfWork; - if (unitOfWork != null && onCompletions != null) { - // now an unit of work has been assigned so add the on completions - // we might have registered already - for (Synchronization onCompletion : onCompletions) { - unitOfWork.addSynchronization(onCompletion); - } - // cleanup the temporary on completion list as they now have been registered - // on the unit of work - onCompletions.clear(); - onCompletions = null; - } - } - - @Override - public void addOnCompletion(Synchronization onCompletion) { - if (unitOfWork == null) { - // unit of work not yet registered so we store the on completion temporary - // until the unit of work is assigned to this exchange by the unit of work - if (onCompletions == null) { - onCompletions = new ArrayList<>(); - } - onCompletions.add(onCompletion); - } else { - getUnitOfWork().addSynchronization(onCompletion); - } - } - - @Override - public boolean containsOnCompletion(Synchronization onCompletion) { - if (unitOfWork != null) { - // if there is an unit of work then the completions is moved there - return unitOfWork.containsSynchronization(onCompletion); - } else { - // check temporary completions if no unit of work yet - return onCompletions != null && onCompletions.contains(onCompletion); - } - } - - @Override - public void handoverCompletions(Exchange target) { - if (onCompletions != null) { - for (Synchronization onCompletion : onCompletions) { - target.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); - } - // cleanup the temporary on completion list as they have been handed over - onCompletions.clear(); - onCompletions = null; - } else if (unitOfWork != null) { - // let unit of work handover - unitOfWork.handoverSynchronization(target); - } - } - - @Override - public List<Synchronization> handoverCompletions() { - List<Synchronization> answer = null; - if (onCompletions != null) { - answer = new ArrayList<>(onCompletions); - onCompletions.clear(); - onCompletions = null; - } - return answer; - } - - @Override - public String getHistoryNodeId() { - return historyNodeId; - } - - @Override - public void setHistoryNodeId(String historyNodeId) { - this.historyNodeId = historyNodeId; - } - - @Override - public String getHistoryNodeLabel() { - return historyNodeLabel; - } - - @Override - public void setHistoryNodeLabel(String historyNodeLabel) { - this.historyNodeLabel = historyNodeLabel; - } - - @Override - public boolean isNotifyEvent() { - return notifyEvent; - } - - @Override - public void setNotifyEvent(boolean notifyEvent) { - this.notifyEvent = notifyEvent; - } - - @Override - public boolean isInterrupted() { - return interrupted; - } - - @Override - public void setInterrupted(boolean interrupted) { - if (interruptable) { - this.interrupted = interrupted; - } - } - - @Override - public void setInterruptable(boolean interruptable) { - this.interruptable = interruptable; - } - - @Override - public boolean isRedeliveryExhausted() { - return redeliveryExhausted; - } - - @Override - public void setRedeliveryExhausted(boolean redeliveryExhausted) { - this.redeliveryExhausted = redeliveryExhausted; - } - - public Boolean getErrorHandlerHandled() { - return errorHandlerHandled; - } - - @Override - public boolean isErrorHandlerHandledSet() { - return errorHandlerHandled != null; - } - - @Override - public boolean isErrorHandlerHandled() { - return errorHandlerHandled; - } - - @Override - public void setErrorHandlerHandled(Boolean errorHandlerHandled) { - this.errorHandlerHandled = errorHandlerHandled; - } - - /** - * Configures the message after it has been set on the exchange - */ - protected void configureMessage(Message message) { - if (message instanceof MessageSupport) { - MessageSupport messageSupport = (MessageSupport) message; - messageSupport.setExchange(this); - messageSupport.setCamelContext(getContext()); - } - } - - protected String createExchangeId() { - return context.getUuidGenerator().generateUuid(); + super(fromEndpoint, pattern); } } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java index 17ec269..fc4492f 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java @@ -18,12 +18,17 @@ package org.apache.camel.support; import java.util.function.Function; -import org.apache.camel.*; +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; +import org.apache.camel.PooledExchange; /** - * The default {@link PooledExchange}. + * The default and only implementation of {@link PooledExchange}. */ -public final class DefaultPooledExchange extends DefaultExchange implements PooledExchange { +public final class DefaultPooledExchange extends AbstractExchange implements PooledExchange { private Function<Exchange, Boolean> onDone; private Class originalInClassType; diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java b/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java index a0ae0de..6749f6a 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/PollingConsumerSupport.java @@ -16,7 +16,10 @@ */ package org.apache.camel.support; -import org.apache.camel.*; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.PollingConsumer; +import org.apache.camel.Processor; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.support.service.ServiceSupport;
