This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4aff4a43aff21f3da611a6b336f44b3c8be80460
Author: Claus Ibsen <[email protected]>
AuthorDate: Sat Feb 20 15:08:37 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../java/org/apache/camel/ExtendedExchange.java    |  36 -----
 .../main/java/org/apache/camel/PooledExchange.java |  65 ++++++++
 .../camel/impl/engine/DefaultUnitOfWork.java       |  20 +--
 .../camel/impl/engine/PooledExchangeFactory.java   |  22 +--
 .../org/apache/camel/support/DefaultConsumer.java  |  12 +-
 .../org/apache/camel/support/DefaultExchange.java  | 154 +++++--------------
 .../camel/support/DefaultPooledExchange.java       | 165 +++++++++++++++++++++
 7 files changed, 284 insertions(+), 190 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java 
b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index dec68a9..bb0b523 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -18,7 +18,6 @@ package org.apache.camel;
 
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
 
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.UnitOfWork;
@@ -30,41 +29,6 @@ import org.apache.camel.spi.UnitOfWork;
 public interface ExtendedExchange extends Exchange {
 
     /**
-     * Registers a task to run when this exchange is done.
-     * <p/>
-     * <b>Important:</b> This API is NOT intended for Camel end users, but 
used internally by Camel itself.
-     */
-    void onDone(Function<Exchange, Boolean> task);
-
-    /**
-     * When the exchange is done being used.
-     * <p/>
-     * <b>Important:</b> This API is NOT intended for Camel end users, but 
used internally by Camel itself.
-     */
-    void done(boolean forced);
-
-    /**
-     * Resets the exchange for reuse with the given created timestamp;
-     * <p/>
-     * <b>Important:</b> This API is NOT intended for Camel end users, but 
used internally by Camel itself.
-     */
-    void reset(long created);
-
-    /**
-     * Whether this exchange was created to auto release when its unit of work 
is done
-     * <p/>
-     * <b>Important:</b> This API is NOT intended for Camel end users, but 
used internally by Camel itself.
-     */
-    void setAutoRelease(boolean autoRelease);
-
-    /**
-     * Whether this exchange was created to auto release when its unit of work 
is done
-     * <p/>
-     * <b>Important:</b> This API is NOT intended for Camel end users, but 
used internally by Camel itself.
-     */
-    boolean isAutoRelease();
-
-    /**
      * Sets the endpoint which originated this message exchange. This method 
should typically only be called by
      * {@link Endpoint} implementations
      */
diff --git a/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java 
b/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java
new file mode 100644
index 0000000..de0ad66
--- /dev/null
+++ b/core/camel-api/src/main/java/org/apache/camel/PooledExchange.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import java.util.function.Function;
+
+import org.apache.camel.spi.ExchangeFactory;
+
+/**
+ * Pooled {@link Exchange} which contains the methods and APIs that are not 
intended for Camel end users but used
+ * internally by Camel for optimizing memory footprint by reusing exchanges 
created by {@link Consumer}s via
+ * {@link ExchangeFactory}.
+ */
+public interface PooledExchange extends ExtendedExchange {
+
+    /**
+     * Registers a task to run when this exchange is done.
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but 
used internally by Camel itself.
+     */
+    void onDone(Function<Exchange, Boolean> task);
+
+    /**
+     * When the exchange is done being used.
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but 
used internally by Camel itself.
+     */
+    void done(boolean forced);
+
+    /**
+     * Resets the exchange for reuse with the given created timestamp;
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but 
used internally by Camel itself.
+     */
+    void reset(long created);
+
+    /**
+     * Whether this exchange was created to auto release when its unit of work 
is done
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but 
used internally by Camel itself.
+     */
+    void setAutoRelease(boolean autoRelease);
+
+    /**
+     * Whether this exchange was created to auto release when its unit of work 
is done
+     * <p/>
+     * <b>Important:</b> This API is NOT intended for Camel end users, but 
used internally by Camel itself.
+     */
+    boolean isAutoRelease();
+
+}
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index 372072a..ee9bf54 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -25,13 +25,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.function.Predicate;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.ExtendedExchange;
-import org.apache.camel.Message;
-import org.apache.camel.Processor;
-import org.apache.camel.Route;
+import org.apache.camel.*;
 import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.SynchronizationVetoable;
@@ -248,11 +242,13 @@ public class DefaultUnitOfWork implements UnitOfWork {
         }
 
         // the exchange is now done
-        try {
-            exchange.adapt(ExtendedExchange.class).done(false);
-        } catch (Throwable e) {
-            // must catch exceptions to ensure synchronizations is also invoked
-            log.warn("Exception occurred during exchange done. This exception 
will be ignored.", e);
+        if (exchange instanceof PooledExchange) {
+            try {
+                ((PooledExchange) exchange).done(false);
+            } catch (Throwable e) {
+                // must catch exceptions to ensure synchronizations is also 
invoked
+                log.warn("Exception occurred during exchange done. This 
exception will be ignored.", e);
+            }
         }
     }
 
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index 83e619b..c5b51b4 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -19,17 +19,9 @@ package org.apache.camel.impl.engine;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.camel.CamelContext;
-import org.apache.camel.CamelContextAware;
-import org.apache.camel.Consumer;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Experimental;
-import org.apache.camel.ExtendedExchange;
-import org.apache.camel.NonManagedService;
-import org.apache.camel.StaticService;
+import org.apache.camel.*;
 import org.apache.camel.spi.ExchangeFactory;
-import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.DefaultPooledExchange;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.URISupport;
@@ -99,7 +91,7 @@ public class PooledExchangeFactory extends ServiceSupport
                 created.incrementAndGet();
             }
             // create a new exchange as there was no free from the pool
-            ExtendedExchange answer = new DefaultExchange(camelContext);
+            PooledExchange answer = new DefaultPooledExchange(camelContext);
             answer.setAutoRelease(autoRelease);
             if (autoRelease) {
                 // the consumer will either always be in auto release mode or 
not, so its safe to initialize the task only once when the exchange is created
@@ -111,7 +103,7 @@ public class PooledExchangeFactory extends ServiceSupport
                 acquired.incrementAndGet();
             }
             // reset exchange for reuse
-            ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
+            PooledExchange ee = exchange.adapt(PooledExchange.class);
             ee.reset(System.currentTimeMillis());
         }
         return exchange;
@@ -125,7 +117,7 @@ public class PooledExchangeFactory extends ServiceSupport
                 created.incrementAndGet();
             }
             // create a new exchange as there was no free from the pool
-            ExtendedExchange answer = new DefaultExchange(fromEndpoint);
+            PooledExchange answer = new DefaultPooledExchange(fromEndpoint);
             answer.setAutoRelease(autoRelease);
             if (autoRelease) {
                 // the consumer will either always be in auto release mode or 
not, so its safe to initialize the task only once when the exchange is created
@@ -137,7 +129,7 @@ public class PooledExchangeFactory extends ServiceSupport
                 acquired.incrementAndGet();
             }
             // reset exchange for reuse
-            ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
+            PooledExchange ee = exchange.adapt(PooledExchange.class);
             ee.reset(System.currentTimeMillis());
         }
         return exchange;
@@ -147,7 +139,7 @@ public class PooledExchangeFactory extends ServiceSupport
     public boolean release(Exchange exchange) {
         try {
             // done exchange before returning back to pool
-            ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
+            PooledExchange ee = exchange.adapt(PooledExchange.class);
             boolean force = !ee.isAutoRelease();
             ee.done(force);
             ee.onDone(null);
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index 2c00dc1..685f8c1 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -16,15 +16,7 @@
  */
 package org.apache.camel.support;
 
-import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Consumer;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedCamelContext;
-import org.apache.camel.ExtendedExchange;
-import org.apache.camel.Processor;
-import org.apache.camel.Route;
-import org.apache.camel.RouteAware;
+import org.apache.camel.*;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.spi.RouteIdAware;
@@ -137,7 +129,7 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
         if (exchange != null) {
             if (!autoRelease) {
                 // if not auto release we must manually force done
-                exchange.adapt(ExtendedExchange.class).done(true);
+                exchange.adapt(PooledExchange.class).done(true);
             }
             exchangeFactory.release(exchange);
         }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index a6955b1..adb804c 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.function.Function;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExecutionException;
@@ -43,51 +42,46 @@ import org.apache.camel.util.ObjectHelper;
 /**
  * The default and only implementation of {@link Exchange}.
  */
-public final class DefaultExchange implements ExtendedExchange {
+public class DefaultExchange implements ExtendedExchange {
+
+    // TODO: AbstractExchange and move somewhere, and have DefaultExchange as 
thin
 
     private final CamelContext context;
-    private Function<Exchange, Boolean> onDone;
-    private long created;
+    long created;
     // optimize to create properties always and with a reasonable small size
-    private final Map<String, Object> properties = new ConcurrentHashMap<>(8);
-    private Class originalInClassType;
-    private Message in;
-    private Message originalOut;
-    private Message out;
-    private Exception exception;
-    private String exchangeId;
-    private UnitOfWork unitOfWork;
-    private final ExchangePattern originalPattern;
-    private ExchangePattern pattern;
-    private Endpoint fromEndpoint;
-    private String fromRouteId;
-    private List<Synchronization> onCompletions;
-    private Boolean externalRedelivered;
-    private String historyNodeId;
-    private String historyNodeLabel;
-    private boolean transacted;
-    private boolean routeStop;
-    private boolean rollbackOnly;
-    private boolean rollbackOnlyLast;
-    private boolean notifyEvent;
-    private boolean interrupted;
-    private boolean interruptable = true;
-    private boolean redeliveryExhausted;
-    private Boolean errorHandlerHandled;
-    private boolean autoRelease;
+    final Map<String, Object> properties = new ConcurrentHashMap<>(8);
+    Message in;
+    Message out;
+    Exception exception;
+    String exchangeId;
+    UnitOfWork unitOfWork;
+    ExchangePattern pattern;
+    Endpoint fromEndpoint;
+    String fromRouteId;
+    List<Synchronization> onCompletions;
+    Boolean externalRedelivered;
+    String historyNodeId;
+    String historyNodeLabel;
+    boolean transacted;
+    boolean routeStop;
+    boolean rollbackOnly;
+    boolean rollbackOnlyLast;
+    boolean notifyEvent;
+    boolean interrupted;
+    boolean interruptable = true;
+    boolean redeliveryExhausted;
+    Boolean errorHandlerHandled;
 
     public DefaultExchange(CamelContext context) {
         this.context = context;
         this.pattern = ExchangePattern.InOnly;
         this.created = System.currentTimeMillis();
-        this.originalPattern = this.pattern;
     }
 
     public DefaultExchange(CamelContext context, ExchangePattern pattern) {
         this.context = context;
         this.pattern = pattern;
         this.created = System.currentTimeMillis();
-        this.originalPattern = this.pattern;
     }
 
     public DefaultExchange(Exchange parent) {
@@ -97,7 +91,6 @@ public final class DefaultExchange implements 
ExtendedExchange {
         this.fromEndpoint = parent.getFromEndpoint();
         this.fromRouteId = parent.getFromRouteId();
         this.unitOfWork = parent.getUnitOfWork();
-        this.originalPattern = this.pattern;
     }
 
     public DefaultExchange(Endpoint fromEndpoint) {
@@ -105,7 +98,6 @@ public final class DefaultExchange implements 
ExtendedExchange {
         this.pattern = ExchangePattern.InOnly;
         this.created = System.currentTimeMillis();
         this.fromEndpoint = fromEndpoint;
-        this.originalPattern = this.pattern;
     }
 
     public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
@@ -113,7 +105,6 @@ public final class DefaultExchange implements 
ExtendedExchange {
         this.pattern = pattern;
         this.created = System.currentTimeMillis();
         this.fromEndpoint = fromEndpoint;
-        this.originalPattern = this.pattern;
     }
 
     @Override
@@ -126,68 +117,6 @@ public final class DefaultExchange implements 
ExtendedExchange {
         }
     }
 
-    public boolean isAutoRelease() {
-        return autoRelease;
-    }
-
-    public void setAutoRelease(boolean autoRelease) {
-        this.autoRelease = autoRelease;
-    }
-
-    @Override
-    public void onDone(Function<Exchange, Boolean> task) {
-        this.onDone = task;
-    }
-
-    public void done(boolean forced) {
-        if (created > 0 && (forced || autoRelease)) {
-            this.created = 0; // by setting to 0 we also flag that this 
exchange is done and needs to be reset to use again
-            this.properties.clear();
-            this.exchangeId = null;
-            if (in != null && in.getClass() == originalInClassType) {
-                // okay we can reuse in
-                in.reset();
-            } else {
-                this.in = null;
-            }
-            if (out != null) {
-                out.reset();
-                this.out = null;
-            }
-            if (this.unitOfWork != null) {
-                this.unitOfWork.reset();
-            }
-            this.exception = null;
-            // reset pattern to original
-            this.pattern = originalPattern;
-            if (this.onCompletions != null) {
-                this.onCompletions.clear();
-            }
-            // do not reset endpoint/fromRouteId as it would be the same 
consumer/endpoint again
-            this.externalRedelivered = null;
-            this.historyNodeId = null;
-            this.historyNodeLabel = null;
-            this.transacted = false;
-            this.routeStop = false;
-            this.rollbackOnly = false;
-            this.rollbackOnlyLast = false;
-            this.notifyEvent = false;
-            this.interrupted = false;
-            this.interruptable = true;
-            this.redeliveryExhausted = false;
-            this.errorHandlerHandled = null;
-
-            if (onDone != null) {
-                onDone.apply(this);
-            }
-        }
-    }
-
-    @Override
-    public void reset(long created) {
-        this.created = created;
-    }
-
     @Override
     public long getCreated() {
         return created;
@@ -404,7 +333,6 @@ public final class DefaultExchange implements 
ExtendedExchange {
     public Message getIn() {
         if (in == null) {
             in = new DefaultMessage(getContext());
-            originalInClassType = in.getClass();
             configureMessage(in);
         }
         return in;
@@ -428,23 +356,15 @@ public final class DefaultExchange implements 
ExtendedExchange {
     public void setIn(Message in) {
         this.in = in;
         configureMessage(in);
-        if (in != null) {
-            this.originalInClassType = in.getClass();
-        }
     }
 
     @Override
     public Message getOut() {
         // lazy create
         if (out == null) {
-            if (originalOut != null) {
-                out = originalOut;
-            } else {
-                // we can only optimize OUT when its using a default message 
instance
-                out = new DefaultMessage(this);
-                configureMessage(out);
-                originalOut = out;
-            }
+            out = (in instanceof MessageSupport)
+                    ? ((MessageSupport) in).newInstance() : new 
DefaultMessage(getContext());
+            configureMessage(out);
         }
         return out;
     }
@@ -475,10 +395,7 @@ public final class DefaultExchange implements 
ExtendedExchange {
     @Override
     public void setOut(Message out) {
         this.out = out;
-        if (out != null) {
-            configureMessage(out);
-            this.originalOut = null; // we use custom out
-        }
+        configureMessage(out);
     }
 
     @Override
@@ -645,7 +562,7 @@ public final class DefaultExchange implements 
ExtendedExchange {
     @Override
     public void setUnitOfWork(UnitOfWork unitOfWork) {
         this.unitOfWork = unitOfWork;
-        if (unitOfWork != null && onCompletions != null && 
!onCompletions.isEmpty()) {
+        if (unitOfWork != null && onCompletions != null) {
             // now an unit of work has been assigned so add the on completions
             // we might have registered already
             for (Synchronization onCompletion : onCompletions) {
@@ -654,6 +571,7 @@ public final class DefaultExchange implements 
ExtendedExchange {
             // cleanup the temporary on completion list as they now have been 
registered
             // on the unit of work
             onCompletions.clear();
+            onCompletions = null;
         }
     }
 
@@ -667,7 +585,7 @@ public final class DefaultExchange implements 
ExtendedExchange {
             }
             onCompletions.add(onCompletion);
         } else {
-            unitOfWork.addSynchronization(onCompletion);
+            getUnitOfWork().addSynchronization(onCompletion);
         }
     }
 
@@ -684,12 +602,13 @@ public final class DefaultExchange implements 
ExtendedExchange {
 
     @Override
     public void handoverCompletions(Exchange target) {
-        if (onCompletions != null && !onCompletions.isEmpty()) {
+        if (onCompletions != null) {
             for (Synchronization onCompletion : onCompletions) {
                 
target.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
             }
             // cleanup the temporary on completion list as they have been 
handed over
             onCompletions.clear();
+            onCompletions = null;
         } else if (unitOfWork != null) {
             // let unit of work handover
             unitOfWork.handoverSynchronization(target);
@@ -699,9 +618,10 @@ public final class DefaultExchange implements 
ExtendedExchange {
     @Override
     public List<Synchronization> handoverCompletions() {
         List<Synchronization> answer = null;
-        if (onCompletions != null && !onCompletions.isEmpty()) {
+        if (onCompletions != null) {
             answer = new ArrayList<>(onCompletions);
             onCompletions.clear();
+            onCompletions = null;
         }
         return answer;
     }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
new file mode 100644
index 0000000..17ec269
--- /dev/null
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.function.Function;
+
+import org.apache.camel.*;
+
+/**
+ * The default {@link PooledExchange}.
+ */
+public final class DefaultPooledExchange extends DefaultExchange implements 
PooledExchange {
+
+    private Function<Exchange, Boolean> onDone;
+    private Class originalInClassType;
+    private Message originalOut;
+    private final ExchangePattern originalPattern;
+    private boolean autoRelease;
+
+    public DefaultPooledExchange(CamelContext context) {
+        super(context);
+        this.originalPattern = getPattern();
+    }
+
+    public DefaultPooledExchange(CamelContext context, ExchangePattern 
pattern) {
+        super(context, pattern);
+        this.originalPattern = pattern;
+    }
+
+    public DefaultPooledExchange(Exchange parent) {
+        super(parent);
+        this.originalPattern = parent.getPattern();
+    }
+
+    public DefaultPooledExchange(Endpoint fromEndpoint) {
+        super(fromEndpoint);
+        this.originalPattern = getPattern();
+    }
+
+    public DefaultPooledExchange(Endpoint fromEndpoint, ExchangePattern 
pattern) {
+        super(fromEndpoint, pattern);
+        this.originalPattern = pattern;
+    }
+
+    public boolean isAutoRelease() {
+        return autoRelease;
+    }
+
+    public void setAutoRelease(boolean autoRelease) {
+        this.autoRelease = autoRelease;
+    }
+
+    @Override
+    public void onDone(Function<Exchange, Boolean> task) {
+        this.onDone = task;
+    }
+
+    public void done(boolean forced) {
+        if (created > 0 && (forced || autoRelease)) {
+            this.created = 0; // by setting to 0 we also flag that this 
exchange is done and needs to be reset to use again
+            this.properties.clear();
+            this.exchangeId = null;
+            if (in != null && in.getClass() == originalInClassType) {
+                // okay we can reuse in
+                in.reset();
+            } else {
+                this.in = null;
+            }
+            if (out != null) {
+                out.reset();
+                this.out = null;
+            }
+            if (this.unitOfWork != null) {
+                this.unitOfWork.reset();
+            }
+            this.exception = null;
+            // reset pattern to original
+            this.pattern = originalPattern;
+            if (this.onCompletions != null) {
+                this.onCompletions.clear();
+            }
+            // do not reset endpoint/fromRouteId as it would be the same 
consumer/endpoint again
+            this.externalRedelivered = null;
+            this.historyNodeId = null;
+            this.historyNodeLabel = null;
+            this.transacted = false;
+            this.routeStop = false;
+            this.rollbackOnly = false;
+            this.rollbackOnlyLast = false;
+            this.notifyEvent = false;
+            this.interrupted = false;
+            this.interruptable = true;
+            this.redeliveryExhausted = false;
+            this.errorHandlerHandled = null;
+
+            if (onDone != null) {
+                onDone.apply(this);
+            }
+        }
+    }
+
+    @Override
+    public void reset(long created) {
+        this.created = created;
+    }
+
+    @Override
+    public Message getIn() {
+        if (in == null) {
+            in = new DefaultMessage(getContext());
+            originalInClassType = in.getClass();
+            configureMessage(in);
+        }
+        return in;
+    }
+
+    @Override
+    public void setIn(Message in) {
+        this.in = in;
+        configureMessage(in);
+        if (in != null) {
+            this.originalInClassType = in.getClass();
+        }
+    }
+
+    @Override
+    public Message getOut() {
+        // lazy create
+        if (out == null) {
+            if (originalOut != null) {
+                out = originalOut;
+            } else {
+                // we can only optimize OUT when its using a default message 
instance
+                out = new DefaultMessage(this);
+                configureMessage(out);
+                originalOut = out;
+            }
+        }
+        return out;
+    }
+
+    @Override
+    public void setOut(Message out) {
+        this.out = out;
+        if (out != null) {
+            configureMessage(out);
+            this.originalOut = null; // we use custom out
+        }
+    }
+
+}

Reply via email to