This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4aff4a43aff21f3da611a6b336f44b3c8be80460 Author: Claus Ibsen <[email protected]> AuthorDate: Sat Feb 20 15:08:37 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../java/org/apache/camel/ExtendedExchange.java | 36 ----- .../main/java/org/apache/camel/PooledExchange.java | 65 ++++++++ .../camel/impl/engine/DefaultUnitOfWork.java | 20 +-- .../camel/impl/engine/PooledExchangeFactory.java | 22 +-- .../org/apache/camel/support/DefaultConsumer.java | 12 +- .../org/apache/camel/support/DefaultExchange.java | 154 +++++-------------- .../camel/support/DefaultPooledExchange.java | 165 +++++++++++++++++++++ 7 files changed, 284 insertions(+), 190 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java index dec68a9..bb0b523 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java @@ -18,7 +18,6 @@ package org.apache.camel; import java.util.List; import java.util.Map; -import java.util.function.Function; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.UnitOfWork; @@ -30,41 +29,6 @@ import org.apache.camel.spi.UnitOfWork; public interface ExtendedExchange extends Exchange { /** - * Registers a task to run when this exchange is done. - * <p/> - * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. - */ - void onDone(Function<Exchange, Boolean> task); - - /** - * When the exchange is done being used. - * <p/> - * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. - */ - void done(boolean forced); - - /** - * Resets the exchange for reuse with the given created timestamp; - * <p/> - * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. - */ - void reset(long created); - - /** - * Whether this exchange was created to auto release when its unit of work is done - * <p/> - * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. - */ - void setAutoRelease(boolean autoRelease); - - /** - * Whether this exchange was created to auto release when its unit of work is done - * <p/> - * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. - */ - boolean isAutoRelease(); - - /** * Sets the endpoint which originated this message exchange. This method should typically only be called by * {@link Endpoint} implementations */ diff --git a/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java b/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java new file mode 100644 index 0000000..de0ad66 --- /dev/null +++ b/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +import java.util.function.Function; + +import org.apache.camel.spi.ExchangeFactory; + +/** + * Pooled {@link Exchange} which contains the methods and APIs that are not intended for Camel end users but used + * internally by Camel for optimizing memory footprint by reusing exchanges created by {@link Consumer}s via + * {@link ExchangeFactory}. + */ +public interface PooledExchange extends ExtendedExchange { + + /** + * Registers a task to run when this exchange is done. + * <p/> + * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. + */ + void onDone(Function<Exchange, Boolean> task); + + /** + * When the exchange is done being used. + * <p/> + * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. + */ + void done(boolean forced); + + /** + * Resets the exchange for reuse with the given created timestamp; + * <p/> + * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. + */ + void reset(long created); + + /** + * Whether this exchange was created to auto release when its unit of work is done + * <p/> + * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. + */ + void setAutoRelease(boolean autoRelease); + + /** + * Whether this exchange was created to auto release when its unit of work is done + * <p/> + * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. + */ + boolean isAutoRelease(); + +} diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index 372072a..ee9bf54 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -25,13 +25,7 @@ import java.util.List; import java.util.Set; import java.util.function.Predicate; -import org.apache.camel.AsyncCallback; -import org.apache.camel.Exchange; -import org.apache.camel.ExtendedCamelContext; -import org.apache.camel.ExtendedExchange; -import org.apache.camel.Message; -import org.apache.camel.Processor; -import org.apache.camel.Route; +import org.apache.camel.*; import org.apache.camel.spi.InflightRepository; import org.apache.camel.spi.Synchronization; import org.apache.camel.spi.SynchronizationVetoable; @@ -248,11 +242,13 @@ public class DefaultUnitOfWork implements UnitOfWork { } // the exchange is now done - try { - exchange.adapt(ExtendedExchange.class).done(false); - } catch (Throwable e) { - // must catch exceptions to ensure synchronizations is also invoked - log.warn("Exception occurred during exchange done. This exception will be ignored.", e); + if (exchange instanceof PooledExchange) { + try { + ((PooledExchange) exchange).done(false); + } catch (Throwable e) { + // must catch exceptions to ensure synchronizations is also invoked + log.warn("Exception occurred during exchange done. This exception will be ignored.", e); + } } } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java index 83e619b..c5b51b4 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java @@ -19,17 +19,9 @@ package org.apache.camel.impl.engine; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicLong; -import org.apache.camel.CamelContext; -import org.apache.camel.CamelContextAware; -import org.apache.camel.Consumer; -import org.apache.camel.Endpoint; -import org.apache.camel.Exchange; -import org.apache.camel.Experimental; -import org.apache.camel.ExtendedExchange; -import org.apache.camel.NonManagedService; -import org.apache.camel.StaticService; +import org.apache.camel.*; import org.apache.camel.spi.ExchangeFactory; -import org.apache.camel.support.DefaultExchange; +import org.apache.camel.support.DefaultPooledExchange; import org.apache.camel.support.SynchronizationAdapter; import org.apache.camel.support.service.ServiceSupport; import org.apache.camel.util.URISupport; @@ -99,7 +91,7 @@ public class PooledExchangeFactory extends ServiceSupport created.incrementAndGet(); } // create a new exchange as there was no free from the pool - ExtendedExchange answer = new DefaultExchange(camelContext); + PooledExchange answer = new DefaultPooledExchange(camelContext); answer.setAutoRelease(autoRelease); if (autoRelease) { // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created @@ -111,7 +103,7 @@ public class PooledExchangeFactory extends ServiceSupport acquired.incrementAndGet(); } // reset exchange for reuse - ExtendedExchange ee = exchange.adapt(ExtendedExchange.class); + PooledExchange ee = exchange.adapt(PooledExchange.class); ee.reset(System.currentTimeMillis()); } return exchange; @@ -125,7 +117,7 @@ public class PooledExchangeFactory extends ServiceSupport created.incrementAndGet(); } // create a new exchange as there was no free from the pool - ExtendedExchange answer = new DefaultExchange(fromEndpoint); + PooledExchange answer = new DefaultPooledExchange(fromEndpoint); answer.setAutoRelease(autoRelease); if (autoRelease) { // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created @@ -137,7 +129,7 @@ public class PooledExchangeFactory extends ServiceSupport acquired.incrementAndGet(); } // reset exchange for reuse - ExtendedExchange ee = exchange.adapt(ExtendedExchange.class); + PooledExchange ee = exchange.adapt(PooledExchange.class); ee.reset(System.currentTimeMillis()); } return exchange; @@ -147,7 +139,7 @@ public class PooledExchangeFactory extends ServiceSupport public boolean release(Exchange exchange) { try { // done exchange before returning back to pool - ExtendedExchange ee = exchange.adapt(ExtendedExchange.class); + PooledExchange ee = exchange.adapt(PooledExchange.class); boolean force = !ee.isAutoRelease(); ee.done(force); ee.onDone(null); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java index 2c00dc1..685f8c1 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java @@ -16,15 +16,7 @@ */ package org.apache.camel.support; -import org.apache.camel.AsyncProcessor; -import org.apache.camel.Consumer; -import org.apache.camel.Endpoint; -import org.apache.camel.Exchange; -import org.apache.camel.ExtendedCamelContext; -import org.apache.camel.ExtendedExchange; -import org.apache.camel.Processor; -import org.apache.camel.Route; -import org.apache.camel.RouteAware; +import org.apache.camel.*; import org.apache.camel.spi.ExceptionHandler; import org.apache.camel.spi.ExchangeFactory; import org.apache.camel.spi.RouteIdAware; @@ -137,7 +129,7 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw if (exchange != null) { if (!autoRelease) { // if not auto release we must manually force done - exchange.adapt(ExtendedExchange.class).done(true); + exchange.adapt(PooledExchange.class).done(true); } exchangeFactory.release(exchange); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java index a6955b1..adb804c 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java @@ -24,7 +24,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.function.Function; import org.apache.camel.CamelContext; import org.apache.camel.CamelExecutionException; @@ -43,51 +42,46 @@ import org.apache.camel.util.ObjectHelper; /** * The default and only implementation of {@link Exchange}. */ -public final class DefaultExchange implements ExtendedExchange { +public class DefaultExchange implements ExtendedExchange { + + // TODO: AbstractExchange and move somewhere, and have DefaultExchange as thin private final CamelContext context; - private Function<Exchange, Boolean> onDone; - private long created; + long created; // optimize to create properties always and with a reasonable small size - private final Map<String, Object> properties = new ConcurrentHashMap<>(8); - private Class originalInClassType; - private Message in; - private Message originalOut; - private Message out; - private Exception exception; - private String exchangeId; - private UnitOfWork unitOfWork; - private final ExchangePattern originalPattern; - private ExchangePattern pattern; - private Endpoint fromEndpoint; - private String fromRouteId; - private List<Synchronization> onCompletions; - private Boolean externalRedelivered; - private String historyNodeId; - private String historyNodeLabel; - private boolean transacted; - private boolean routeStop; - private boolean rollbackOnly; - private boolean rollbackOnlyLast; - private boolean notifyEvent; - private boolean interrupted; - private boolean interruptable = true; - private boolean redeliveryExhausted; - private Boolean errorHandlerHandled; - private boolean autoRelease; + final Map<String, Object> properties = new ConcurrentHashMap<>(8); + Message in; + Message out; + Exception exception; + String exchangeId; + UnitOfWork unitOfWork; + ExchangePattern pattern; + Endpoint fromEndpoint; + String fromRouteId; + List<Synchronization> onCompletions; + Boolean externalRedelivered; + String historyNodeId; + String historyNodeLabel; + boolean transacted; + boolean routeStop; + boolean rollbackOnly; + boolean rollbackOnlyLast; + boolean notifyEvent; + boolean interrupted; + boolean interruptable = true; + boolean redeliveryExhausted; + Boolean errorHandlerHandled; public DefaultExchange(CamelContext context) { this.context = context; this.pattern = ExchangePattern.InOnly; this.created = System.currentTimeMillis(); - this.originalPattern = this.pattern; } public DefaultExchange(CamelContext context, ExchangePattern pattern) { this.context = context; this.pattern = pattern; this.created = System.currentTimeMillis(); - this.originalPattern = this.pattern; } public DefaultExchange(Exchange parent) { @@ -97,7 +91,6 @@ public final class DefaultExchange implements ExtendedExchange { this.fromEndpoint = parent.getFromEndpoint(); this.fromRouteId = parent.getFromRouteId(); this.unitOfWork = parent.getUnitOfWork(); - this.originalPattern = this.pattern; } public DefaultExchange(Endpoint fromEndpoint) { @@ -105,7 +98,6 @@ public final class DefaultExchange implements ExtendedExchange { this.pattern = ExchangePattern.InOnly; this.created = System.currentTimeMillis(); this.fromEndpoint = fromEndpoint; - this.originalPattern = this.pattern; } public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) { @@ -113,7 +105,6 @@ public final class DefaultExchange implements ExtendedExchange { this.pattern = pattern; this.created = System.currentTimeMillis(); this.fromEndpoint = fromEndpoint; - this.originalPattern = this.pattern; } @Override @@ -126,68 +117,6 @@ public final class DefaultExchange implements ExtendedExchange { } } - public boolean isAutoRelease() { - return autoRelease; - } - - public void setAutoRelease(boolean autoRelease) { - this.autoRelease = autoRelease; - } - - @Override - public void onDone(Function<Exchange, Boolean> task) { - this.onDone = task; - } - - public void done(boolean forced) { - if (created > 0 && (forced || autoRelease)) { - this.created = 0; // by setting to 0 we also flag that this exchange is done and needs to be reset to use again - this.properties.clear(); - this.exchangeId = null; - if (in != null && in.getClass() == originalInClassType) { - // okay we can reuse in - in.reset(); - } else { - this.in = null; - } - if (out != null) { - out.reset(); - this.out = null; - } - if (this.unitOfWork != null) { - this.unitOfWork.reset(); - } - this.exception = null; - // reset pattern to original - this.pattern = originalPattern; - if (this.onCompletions != null) { - this.onCompletions.clear(); - } - // do not reset endpoint/fromRouteId as it would be the same consumer/endpoint again - this.externalRedelivered = null; - this.historyNodeId = null; - this.historyNodeLabel = null; - this.transacted = false; - this.routeStop = false; - this.rollbackOnly = false; - this.rollbackOnlyLast = false; - this.notifyEvent = false; - this.interrupted = false; - this.interruptable = true; - this.redeliveryExhausted = false; - this.errorHandlerHandled = null; - - if (onDone != null) { - onDone.apply(this); - } - } - } - - @Override - public void reset(long created) { - this.created = created; - } - @Override public long getCreated() { return created; @@ -404,7 +333,6 @@ public final class DefaultExchange implements ExtendedExchange { public Message getIn() { if (in == null) { in = new DefaultMessage(getContext()); - originalInClassType = in.getClass(); configureMessage(in); } return in; @@ -428,23 +356,15 @@ public final class DefaultExchange implements ExtendedExchange { public void setIn(Message in) { this.in = in; configureMessage(in); - if (in != null) { - this.originalInClassType = in.getClass(); - } } @Override public Message getOut() { // lazy create if (out == null) { - if (originalOut != null) { - out = originalOut; - } else { - // we can only optimize OUT when its using a default message instance - out = new DefaultMessage(this); - configureMessage(out); - originalOut = out; - } + out = (in instanceof MessageSupport) + ? ((MessageSupport) in).newInstance() : new DefaultMessage(getContext()); + configureMessage(out); } return out; } @@ -475,10 +395,7 @@ public final class DefaultExchange implements ExtendedExchange { @Override public void setOut(Message out) { this.out = out; - if (out != null) { - configureMessage(out); - this.originalOut = null; // we use custom out - } + configureMessage(out); } @Override @@ -645,7 +562,7 @@ public final class DefaultExchange implements ExtendedExchange { @Override public void setUnitOfWork(UnitOfWork unitOfWork) { this.unitOfWork = unitOfWork; - if (unitOfWork != null && onCompletions != null && !onCompletions.isEmpty()) { + if (unitOfWork != null && onCompletions != null) { // now an unit of work has been assigned so add the on completions // we might have registered already for (Synchronization onCompletion : onCompletions) { @@ -654,6 +571,7 @@ public final class DefaultExchange implements ExtendedExchange { // cleanup the temporary on completion list as they now have been registered // on the unit of work onCompletions.clear(); + onCompletions = null; } } @@ -667,7 +585,7 @@ public final class DefaultExchange implements ExtendedExchange { } onCompletions.add(onCompletion); } else { - unitOfWork.addSynchronization(onCompletion); + getUnitOfWork().addSynchronization(onCompletion); } } @@ -684,12 +602,13 @@ public final class DefaultExchange implements ExtendedExchange { @Override public void handoverCompletions(Exchange target) { - if (onCompletions != null && !onCompletions.isEmpty()) { + if (onCompletions != null) { for (Synchronization onCompletion : onCompletions) { target.adapt(ExtendedExchange.class).addOnCompletion(onCompletion); } // cleanup the temporary on completion list as they have been handed over onCompletions.clear(); + onCompletions = null; } else if (unitOfWork != null) { // let unit of work handover unitOfWork.handoverSynchronization(target); @@ -699,9 +618,10 @@ public final class DefaultExchange implements ExtendedExchange { @Override public List<Synchronization> handoverCompletions() { List<Synchronization> answer = null; - if (onCompletions != null && !onCompletions.isEmpty()) { + if (onCompletions != null) { answer = new ArrayList<>(onCompletions); onCompletions.clear(); + onCompletions = null; } return answer; } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java new file mode 100644 index 0000000..17ec269 --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support; + +import java.util.function.Function; + +import org.apache.camel.*; + +/** + * The default {@link PooledExchange}. + */ +public final class DefaultPooledExchange extends DefaultExchange implements PooledExchange { + + private Function<Exchange, Boolean> onDone; + private Class originalInClassType; + private Message originalOut; + private final ExchangePattern originalPattern; + private boolean autoRelease; + + public DefaultPooledExchange(CamelContext context) { + super(context); + this.originalPattern = getPattern(); + } + + public DefaultPooledExchange(CamelContext context, ExchangePattern pattern) { + super(context, pattern); + this.originalPattern = pattern; + } + + public DefaultPooledExchange(Exchange parent) { + super(parent); + this.originalPattern = parent.getPattern(); + } + + public DefaultPooledExchange(Endpoint fromEndpoint) { + super(fromEndpoint); + this.originalPattern = getPattern(); + } + + public DefaultPooledExchange(Endpoint fromEndpoint, ExchangePattern pattern) { + super(fromEndpoint, pattern); + this.originalPattern = pattern; + } + + public boolean isAutoRelease() { + return autoRelease; + } + + public void setAutoRelease(boolean autoRelease) { + this.autoRelease = autoRelease; + } + + @Override + public void onDone(Function<Exchange, Boolean> task) { + this.onDone = task; + } + + public void done(boolean forced) { + if (created > 0 && (forced || autoRelease)) { + this.created = 0; // by setting to 0 we also flag that this exchange is done and needs to be reset to use again + this.properties.clear(); + this.exchangeId = null; + if (in != null && in.getClass() == originalInClassType) { + // okay we can reuse in + in.reset(); + } else { + this.in = null; + } + if (out != null) { + out.reset(); + this.out = null; + } + if (this.unitOfWork != null) { + this.unitOfWork.reset(); + } + this.exception = null; + // reset pattern to original + this.pattern = originalPattern; + if (this.onCompletions != null) { + this.onCompletions.clear(); + } + // do not reset endpoint/fromRouteId as it would be the same consumer/endpoint again + this.externalRedelivered = null; + this.historyNodeId = null; + this.historyNodeLabel = null; + this.transacted = false; + this.routeStop = false; + this.rollbackOnly = false; + this.rollbackOnlyLast = false; + this.notifyEvent = false; + this.interrupted = false; + this.interruptable = true; + this.redeliveryExhausted = false; + this.errorHandlerHandled = null; + + if (onDone != null) { + onDone.apply(this); + } + } + } + + @Override + public void reset(long created) { + this.created = created; + } + + @Override + public Message getIn() { + if (in == null) { + in = new DefaultMessage(getContext()); + originalInClassType = in.getClass(); + configureMessage(in); + } + return in; + } + + @Override + public void setIn(Message in) { + this.in = in; + configureMessage(in); + if (in != null) { + this.originalInClassType = in.getClass(); + } + } + + @Override + public Message getOut() { + // lazy create + if (out == null) { + if (originalOut != null) { + out = originalOut; + } else { + // we can only optimize OUT when its using a default message instance + out = new DefaultMessage(this); + configureMessage(out); + originalOut = out; + } + } + return out; + } + + @Override + public void setOut(Message out) { + this.out = out; + if (out != null) { + configureMessage(out); + this.originalOut = null; // we use custom out + } + } + +}
