This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch exchange-factory in repository https://gitbox.apache.org/repos/asf/camel.git
commit 497e4c53ae4f0144ff60eae8532f95c3f4593bbf Author: Claus Ibsen <[email protected]> AuthorDate: Fri Feb 19 16:09:29 2021 +0100 CAMEL-16222: PooledExchangeFactory experiment --- .../main/java/org/apache/camel/ExtendedExchange.java | 20 +++++++++++++++++++- .../apache/camel/impl/engine/DefaultUnitOfWork.java | 2 +- .../camel/impl/engine/PooledExchangeFactory.java | 7 +++++-- .../org/apache/camel/support/DefaultConsumer.java | 6 ++---- .../org/apache/camel/support/DefaultExchange.java | 19 ++++++++++++++----- .../ROOT/pages/camel-3x-upgrade-guide-3_9.adoc | 17 +++++++++++++++++ 6 files changed, 58 insertions(+), 13 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java index 36fb8b9..dec68a9 100644 --- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java +++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java @@ -31,6 +31,8 @@ public interface ExtendedExchange extends Exchange { /** * Registers a task to run when this exchange is done. + * <p/> + * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. */ void onDone(Function<Exchange, Boolean> task); @@ -39,14 +41,30 @@ public interface ExtendedExchange extends Exchange { * <p/> * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. */ - void done(); + void done(boolean forced); /** * Resets the exchange for reuse with the given created timestamp; + * <p/> + * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. */ void reset(long created); /** + * Whether this exchange was created to auto release when its unit of work is done + * <p/> + * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. + */ + void setAutoRelease(boolean autoRelease); + + /** + * Whether this exchange was created to auto release when its unit of work is done + * <p/> + * <b>Important:</b> This API is NOT intended for Camel end users, but used internally by Camel itself. + */ + boolean isAutoRelease(); + + /** * Sets the endpoint which originated this message exchange. This method should typically only be called by * {@link Endpoint} implementations */ diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java index d501526..372072a 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java @@ -249,7 +249,7 @@ public class DefaultUnitOfWork implements UnitOfWork { // the exchange is now done try { - exchange.adapt(ExtendedExchange.class).done(); + exchange.adapt(ExtendedExchange.class).done(false); } catch (Throwable e) { // must catch exceptions to ensure synchronizations is also invoked log.warn("Exception occurred during exchange done. This exception will be ignored.", e); diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java index e4fe5e0..83e619b 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java @@ -100,6 +100,7 @@ public class PooledExchangeFactory extends ServiceSupport } // create a new exchange as there was no free from the pool ExtendedExchange answer = new DefaultExchange(camelContext); + answer.setAutoRelease(autoRelease); if (autoRelease) { // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created answer.onDone(this::release); @@ -125,6 +126,7 @@ public class PooledExchangeFactory extends ServiceSupport } // create a new exchange as there was no free from the pool ExtendedExchange answer = new DefaultExchange(fromEndpoint); + answer.setAutoRelease(autoRelease); if (autoRelease) { // the consumer will either always be in auto release mode or not, so its safe to initialize the task only once when the exchange is created answer.onDone(this::release); @@ -143,10 +145,11 @@ public class PooledExchangeFactory extends ServiceSupport @Override public boolean release(Exchange exchange) { - // reset exchange before returning to pool try { + // done exchange before returning back to pool ExtendedExchange ee = exchange.adapt(ExtendedExchange.class); - ee.done(); + boolean force = !ee.isAutoRelease(); + ee.done(force); ee.onDone(null); // only release back in pool if reset was success diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java index 378f015..2c00dc1 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java @@ -136,10 +136,8 @@ public class DefaultConsumer extends ServiceSupport implements Consumer, RouteAw public void releaseExchange(Exchange exchange, boolean autoRelease) { if (exchange != null) { if (!autoRelease) { - // we must manually done the exchange - // TODO: hack - exchange.adapt(ExtendedExchange.class).onDone(e -> true); - exchange.adapt(ExtendedExchange.class).done(); + // if not auto release we must manually force done + exchange.adapt(ExtendedExchange.class).done(true); } exchangeFactory.release(exchange); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java index ae08318..4ddeb65 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java @@ -72,6 +72,7 @@ public final class DefaultExchange implements ExtendedExchange { private boolean interruptable = true; private boolean redeliveryExhausted; private Boolean errorHandlerHandled; + private boolean autoRelease; public DefaultExchange(CamelContext context) { this.context = context; @@ -123,15 +124,21 @@ public final class DefaultExchange implements ExtendedExchange { } } + public boolean isAutoRelease() { + return autoRelease; + } + + public void setAutoRelease(boolean autoRelease) { + this.autoRelease = autoRelease; + } + @Override public void onDone(Function<Exchange, Boolean> task) { this.onDone = task; } - public void done() { - // only need to do this if there is an onDone task - // and use created flag to avoid doing done more than once - if (created > 0 && onDone != null) { + public void done(boolean forced) { + if (created > 0 && (forced || autoRelease)) { this.created = 0; // by setting to 0 we also flag that this exchange is done and needs to be reset to use again this.properties.clear(); this.exchangeId = null; @@ -162,7 +169,9 @@ public final class DefaultExchange implements ExtendedExchange { this.redeliveryExhausted = false; this.errorHandlerHandled = null; - onDone.apply(this); + if (onDone != null) { + onDone.apply(this); + } } } diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_9.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_9.adoc index e78e559..3bcfd00 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_9.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_9.adoc @@ -6,6 +6,23 @@ from both 3.0 to 3.1 and 3.1 to 3.2. == Upgrading Camel 3.8 to 3.9 +=== API changes + +The `Consumer` API in `camel-api` has been enhanced to help support Camel reducing the footprint during routing. +One aspect is that we allow to recycle `Exchange` instances created by the consumers. This avoids creating new `Exchange` +instances in the memory for each incoming message consumers process. By recycling `Exchange`s we reduce the overhead +on the JVM garbage collector. However this requires Camel to know whether or not the `Exchange` should be recycle or not, +and some API changes took place. + +The `Consumer` API has two new methods which a consumer must use to create an `Exchange` with `createExchange`. +By default the exchange is auto released when its complete in use, but some consumers needs custom control, +and can turn off auto release, which then requires the consumer to manually release the exchange by calling `releaseExchange` +when the consumer is done with the exchange. + +The default implementations in `DefaultConsumer` has adapted this API and 3rd party components can continue as is, by using +the older APIs. However for these 3rd party components to support recycling exchanges, then they must be updated to use this new API. + + === Modularized camel-spring The `camel-spring` component has been modularized into:
