This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 7db81b5e9914338b16d6a696161bf39f8e3d3527
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Feb 19 13:59:17 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../java/org/apache/camel/ExtendedExchange.java    |  14 ++-
 .../java/org/apache/camel/spi/ExchangeFactory.java |  10 +-
 .../main/java/org/apache/camel/spi/UnitOfWork.java |   8 +-
 .../camel/impl/engine/CamelInternalProcessor.java  |  10 +-
 .../camel/impl/engine/DefaultUnitOfWork.java       | 104 +++++++++++----------
 .../apache/camel/impl/engine/MDCUnitOfWork.java    |   7 --
 .../camel/impl/engine/PooledExchangeFactory.java   |  49 ++++++----
 .../camel/impl/engine/SimpleCamelContext.java      |   2 +-
 .../org/apache/camel/support/DefaultConsumer.java  |   1 -
 .../org/apache/camel/support/DefaultExchange.java  |  94 +++++++++++--------
 .../org/apache/camel/support/UnitOfWorkHelper.java |  12 +--
 11 files changed, 168 insertions(+), 143 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java 
b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index 9232ceb..36fb8b9 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -18,6 +18,7 @@ package org.apache.camel;
 
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.UnitOfWork;
@@ -29,16 +30,21 @@ import org.apache.camel.spi.UnitOfWork;
 public interface ExtendedExchange extends Exchange {
 
     /**
-     * Clears the exchange from user data so it may be reused.
+     * Registers a task to run when this exchange is done.
+     */
+    void onDone(Function<Exchange, Boolean> task);
+
+    /**
+     * When the exchange is done being used.
      * <p/>
      * <b>Important:</b> This API is NOT intended for Camel end users, but 
used internally by Camel itself.
      */
-    void reset();
+    void done();
 
     /**
-     * Sets the created timestamp
+     * Resets the exchange for reuse with the given created timestamp;
      */
-    void setCreated(long created);
+    void reset(long created);
 
     /**
      * Sets the endpoint which originated this message exchange. This method 
should typically only be called by
diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
index 592f431..34ef0cf 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/ExchangeFactory.java
@@ -67,7 +67,13 @@ public interface ExchangeFactory {
      */
     Exchange create(Endpoint fromEndpoint, boolean autoRelease);
 
-    default void release(Exchange exchange) {
-        // noop
+    /**
+     * Releases the exchange back into the pool
+     *
+     * @param  exchange the exchange
+     * @return          true if released into the pool, or false if something 
went wrong and the exchange was discarded
+     */
+    default boolean release(Exchange exchange) {
+        return true;
     }
 }
diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java 
b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
index e94840a..a6f118e 100644
--- a/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
+++ b/core/camel-api/src/main/java/org/apache/camel/spi/UnitOfWork.java
@@ -23,13 +23,12 @@ import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
-import org.apache.camel.Service;
 
 /**
  * An object representing the unit of work processing an {@link Exchange} 
which allows the use of
  * {@link Synchronization} hooks. This object might map one-to-one with a 
transaction in JPA or Spring; or might not.
  */
-public interface UnitOfWork extends Service {
+public interface UnitOfWork {
 
     String MDC_BREADCRUMB_ID = "camel.breadcrumbId";
     String MDC_EXCHANGE_ID = "camel.exchangeId";
@@ -50,9 +49,10 @@ public interface UnitOfWork extends Service {
     /**
      * Prepares this unit of work with the given input {@link Exchange}
      *
-     * @param exchange the exchange
+     * @param  exchange the exchange
+     * @return          true if the unit of work was created and prepared, 
false if already prepared
      */
-    void onExchange(Exchange exchange);
+    boolean onPrepare(Exchange exchange);
 
     /**
      * Adds a synchronization hook
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
index 255b894..9696d1b 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/CamelInternalProcessor.java
@@ -658,11 +658,17 @@ public class CamelInternalProcessor extends 
DelegateAsyncProcessor implements In
                 created = createUnitOfWork(exchange);
                 ExtendedExchange ee = (ExtendedExchange) exchange;
                 ee.setUnitOfWork(created);
-                created.start();
                 uow = created;
             } else {
                 // reuse existing exchange
-                uow.onExchange(exchange);
+                if (uow.onPrepare(exchange)) {
+                    // need to re-attach uow
+                    ExtendedExchange ee = (ExtendedExchange) exchange;
+                    ee.setUnitOfWork(uow);
+                    // we are prepared for reuse and can regard it as-if we 
created the unit of work
+                    // so the after method knows that this is the outer bounds 
and should done the unit of work
+                    created = uow;
+                }
             }
 
             // for any exchange we should push/pop route context so we can 
keep track of which route we are routing
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
index 7e51696..d501526 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultUnitOfWork.java
@@ -32,7 +32,6 @@ import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
-import org.apache.camel.Service;
 import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.SynchronizationVetoable;
@@ -47,7 +46,7 @@ import org.slf4j.LoggerFactory;
 /**
  * The default implementation of {@link org.apache.camel.spi.UnitOfWork}
  */
-public class DefaultUnitOfWork implements UnitOfWork, Service {
+public class DefaultUnitOfWork implements UnitOfWork {
     private static final Logger LOG = 
LoggerFactory.getLogger(DefaultUnitOfWork.class);
 
     // instances used by MDCUnitOfWork
@@ -81,7 +80,7 @@ public class DefaultUnitOfWork implements UnitOfWork, Service 
{
         this.useBreadcrumb = useBreadcrumb;
         this.context = (ExtendedCamelContext) exchange.getContext();
         this.inflightRepository = inflightRepository;
-        onExchange(exchange);
+        doOnPrepare(exchange);
     }
 
     UnitOfWork newInstance(Exchange exchange) {
@@ -89,50 +88,57 @@ public class DefaultUnitOfWork implements UnitOfWork, 
Service {
     }
 
     @Override
-    public void onExchange(Exchange exchange) {
+    public boolean onPrepare(Exchange exchange) {
         if (this.exchange == null) {
-            // unit of work is reused, so setup for this exchange
-            this.exchange = exchange;
-
-            if (allowUseOriginalMessage) {
-                // special for JmsMessage as it can cause it to loose headers 
later.
-                if 
(exchange.getIn().getClass().getName().equals("org.apache.camel.component.jms.JmsMessage"))
 {
-                    this.originalInMessage = new DefaultMessage(context);
-                    this.originalInMessage.setBody(exchange.getIn().getBody());
-                    
this.originalInMessage.getHeaders().putAll(exchange.getIn().getHeaders());
-                } else {
-                    this.originalInMessage = exchange.getIn().copy();
-                }
-                // must preserve exchange on the original in message
-                if (this.originalInMessage instanceof MessageSupport) {
-                    ((MessageSupport) 
this.originalInMessage).setExchange(exchange);
-                }
-            }
+            doOnPrepare(exchange);
+            return true;
+        } else {
+            return false;
+        }
+    }
 
-            // inject breadcrumb header if enabled
-            if (useBreadcrumb) {
-                // create or use existing breadcrumb
-                String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
-                if (breadcrumbId == null) {
-                    // no existing breadcrumb, so create a new one based on 
the exchange id
-                    breadcrumbId = exchange.getExchangeId();
-                    exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, 
breadcrumbId);
-                }
+    private void doOnPrepare(Exchange exchange) {
+        // unit of work is reused, so setup for this exchange
+        this.exchange = exchange;
+
+        if (allowUseOriginalMessage) {
+            // special for JmsMessage as it can cause it to loose headers 
later.
+            if 
(exchange.getIn().getClass().getName().equals("org.apache.camel.component.jms.JmsMessage"))
 {
+                this.originalInMessage = new DefaultMessage(context);
+                this.originalInMessage.setBody(exchange.getIn().getBody());
+                
this.originalInMessage.getHeaders().putAll(exchange.getIn().getHeaders());
+            } else {
+                this.originalInMessage = exchange.getIn().copy();
+            }
+            // must preserve exchange on the original in message
+            if (this.originalInMessage instanceof MessageSupport) {
+                ((MessageSupport) 
this.originalInMessage).setExchange(exchange);
             }
+        }
 
-            // fire event
-            if (context.isEventNotificationApplicable()) {
-                try {
-                    EventHelper.notifyExchangeCreated(context, exchange);
-                } catch (Throwable e) {
-                    // must catch exceptions to ensure the exchange is not 
failing due to notification event failed
-                    log.warn("Exception occurred during event notification. 
This exception will be ignored.", e);
-                }
+        // inject breadcrumb header if enabled
+        if (useBreadcrumb) {
+            // create or use existing breadcrumb
+            String breadcrumbId = 
exchange.getIn().getHeader(Exchange.BREADCRUMB_ID, String.class);
+            if (breadcrumbId == null) {
+                // no existing breadcrumb, so create a new one based on the 
exchange id
+                breadcrumbId = exchange.getExchangeId();
+                exchange.getIn().setHeader(Exchange.BREADCRUMB_ID, 
breadcrumbId);
             }
+        }
 
-            // register to inflight registry
-            inflightRepository.add(exchange);
+        // fire event
+        if (context.isEventNotificationApplicable()) {
+            try {
+                EventHelper.notifyExchangeCreated(context, exchange);
+            } catch (Throwable e) {
+                // must catch exceptions to ensure the exchange is not failing 
due to notification event failed
+                log.warn("Exception occurred during event notification. This 
exception will be ignored.", e);
+            }
         }
+
+        // register to inflight registry
+        inflightRepository.add(exchange);
     }
 
     @Override
@@ -161,16 +167,6 @@ public class DefaultUnitOfWork implements UnitOfWork, 
Service {
     }
 
     @Override
-    public void start() {
-        // noop
-    }
-
-    @Override
-    public void stop() {
-        // noop
-    }
-
-    @Override
     public synchronized void addSynchronization(Synchronization 
synchronization) {
         if (synchronizations == null) {
             synchronizations = new ArrayList<>(8);
@@ -250,6 +246,14 @@ public class DefaultUnitOfWork implements UnitOfWork, 
Service {
                 log.warn("Exception occurred during event notification. This 
exception will be ignored.", e);
             }
         }
+
+        // the exchange is now done
+        try {
+            exchange.adapt(ExtendedExchange.class).done();
+        } catch (Throwable e) {
+            // must catch exceptions to ensure synchronizations is also invoked
+            log.warn("Exception occurred during exchange done. This exception 
will be ignored.", e);
+        }
     }
 
     @Override
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
index d111302..64d34f6 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/MDCUnitOfWork.java
@@ -87,13 +87,6 @@ public class MDCUnitOfWork extends DefaultUnitOfWork {
     }
 
     @Override
-    public void stop() {
-        super.stop();
-        // and remove when stopping
-        clear();
-    }
-
-    @Override
     public void pushRoute(Route route) {
         super.pushRoute(route);
         if (route != null) {
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
index e1d45fa..6f44fb1 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -19,7 +19,15 @@ package org.apache.camel.impl.engine;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.camel.*;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.Consumer;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Experimental;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.NonManagedService;
+import org.apache.camel.StaticService;
 import org.apache.camel.spi.ExchangeFactory;
 import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.support.SynchronizationAdapter;
@@ -89,18 +97,19 @@ public class PooledExchangeFactory extends ServiceSupport
                 created.incrementAndGet();
             }
             // create a new exchange as there was no free from the pool
-            exchange = new DefaultExchange(camelContext);
+            ExtendedExchange answer = new DefaultExchange(camelContext);
+            if (autoRelease) {
+                // the consumer will either always be in auto release mode or 
not, so its safe to initialize the task only once when the exchange is created
+                answer.onDone(this::release);
+            }
+            return answer;
         } else {
             if (statisticsEnabled) {
                 acquired.incrementAndGet();
             }
-            // the exchange is reused but update the created to now
+            // reset exchange for reuse
             ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
-            ee.setCreated(System.currentTimeMillis());
-        }
-        if (autoRelease) {
-            // add on completion which will return the exchange when done
-            
exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
+            ee.reset(System.currentTimeMillis());
         }
         return exchange;
     }
@@ -113,41 +122,41 @@ public class PooledExchangeFactory extends ServiceSupport
                 created.incrementAndGet();
             }
             // create a new exchange as there was no free from the pool
-            exchange = new DefaultExchange(fromEndpoint);
+            ExtendedExchange answer = new DefaultExchange(fromEndpoint);
+            if (autoRelease) {
+                // the consumer will either always be in auto release mode or 
not, so its safe to initialize the task only once when the exchange is created
+                answer.onDone(this::release);
+            }
+            return answer;
         } else {
             if (statisticsEnabled) {
                 acquired.incrementAndGet();
             }
-            // the exchange is reused but update the created to now
+            // reset exchange for reuse
             ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
-            ee.setCreated(System.currentTimeMillis());
-            // need to mark this exchange from the given endpoint
-            ee.setFromEndpoint(fromEndpoint);
-        }
-        if (autoRelease) {
-            // add on completion which will return the exchange when done
-            
exchange.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
+            ee.reset(System.currentTimeMillis());
         }
         return exchange;
     }
 
     @Override
-    public void release(Exchange exchange) {
+    public boolean release(Exchange exchange) {
         // reset exchange before returning to pool
         try {
             ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
-            ee.reset();
+            ee.done();
 
             // only release back in pool if reset was success
             if (statisticsEnabled) {
                 released.incrementAndGet();
             }
-            pool.offer(exchange);
+            return pool.offer(exchange);
         } catch (Exception e) {
             if (statisticsEnabled) {
                 discarded.incrementAndGet();
             }
             LOG.debug("Error resetting exchange: {}. This exchange is 
discarded.", exchange);
+            return false;
         }
     }
 
diff --git 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
index e373e43..a9d1e87 100644
--- 
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
+++ 
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java
@@ -552,7 +552,7 @@ public class SimpleCamelContext extends 
AbstractCamelContext {
                 ExchangeFactory.class);
 
         // TODO: experiment
-        //        return result.orElseGet(DefaultExchangeFactory::new);
+        //                return result.orElseGet(DefaultExchangeFactory::new);
         return result.orElseGet(PooledExchangeFactory::new);
     }
 
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
index ee12829..83c568c 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultConsumer.java
@@ -109,7 +109,6 @@ public class DefaultConsumer extends ServiceSupport 
implements Consumer, RouteAw
         UnitOfWork uow = 
endpoint.getCamelContext().adapt(ExtendedCamelContext.class).getUnitOfWorkFactory()
                 .createUnitOfWork(exchange);
         exchange.adapt(ExtendedExchange.class).setUnitOfWork(uow);
-        uow.start();
         return uow;
     }
 
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index 808bfb6..1ba720a 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Function;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExecutionException;
@@ -45,6 +46,7 @@ import org.apache.camel.util.ObjectHelper;
 public final class DefaultExchange implements ExtendedExchange {
 
     private final CamelContext context;
+    private Function<Exchange, Boolean> onDone;
     private long created;
     // optimize to create properties always and with a reasonable small size
     private final Map<String, Object> properties = new ConcurrentHashMap<>(8);
@@ -121,49 +123,62 @@ public final class DefaultExchange implements 
ExtendedExchange {
         }
     }
 
-    public void reset() {
-        this.properties.clear();
-        this.exchangeId = null;
-        this.created = 0;
-        // TODO: optimize in/out to keep as default message (if original 
message is this kind)
-        this.in = null;
-        this.out = null;
-        this.exception = null;
-        // reset uow
-        if (this.unitOfWork != null) {
-            this.unitOfWork.reset();
-        }
-        // reset pattern to original
-        this.pattern = originalPattern;
-        if (this.onCompletions != null) {
-            this.onCompletions.clear();
-        }
-        // do not reset endpoint/fromRouteId as it would be the same 
consumer/endpoint again
-        this.externalRedelivered = null;
-        this.historyNodeId = null;
-        this.historyNodeLabel = null;
-        this.transacted = false;
-        this.routeStop = false;
-        this.rollbackOnly = false;
-        this.rollbackOnlyLast = false;
-        this.notifyEvent = false;
-        this.interrupted = false;
-        this.interruptable = true;
-        this.redeliveryExhausted = false;
-        this.errorHandlerHandled = null;
+    @Override
+    public void onDone(Function<Exchange, Boolean> task) {
+        this.onDone = task;
     }
 
-    @Override
-    public long getCreated() {
-        return created;
+    public void done() {
+        // only need to do this if there is an onDone task
+        // and use created flag to avoid doing done more than once
+        if (created > 0) {
+            this.created = 0; // by setting to 0 we also flag that this 
exchange is done and needs to be reset to use again
+            this.properties.clear();
+            this.exchangeId = null;
+            // TODO: optimize in/out to keep as default message (if original 
message is this kind)
+            this.in = null;
+            this.out = null;
+            this.exception = null;
+            // reset uow
+            if (this.unitOfWork != null) {
+                this.unitOfWork.reset();
+            }
+            // reset pattern to original
+            this.pattern = originalPattern;
+            if (this.onCompletions != null) {
+                this.onCompletions.clear();
+            }
+            // do not reset endpoint/fromRouteId as it would be the same 
consumer/endpoint again
+            this.externalRedelivered = null;
+            this.historyNodeId = null;
+            this.historyNodeLabel = null;
+            this.transacted = false;
+            this.routeStop = false;
+            this.rollbackOnly = false;
+            this.rollbackOnlyLast = false;
+            this.notifyEvent = false;
+            this.interrupted = false;
+            this.interruptable = true;
+            this.redeliveryExhausted = false;
+            this.errorHandlerHandled = null;
+
+            if (onDone != null) {
+                onDone.apply(this);
+            }
+        }
     }
 
     @Override
-    public void setCreated(long created) {
+    public void reset(long created) {
         this.created = created;
     }
 
     @Override
+    public long getCreated() {
+        return created;
+    }
+
+    @Override
     public Exchange copy() {
         DefaultExchange exchange = new DefaultExchange(this);
 
@@ -603,7 +618,7 @@ public final class DefaultExchange implements 
ExtendedExchange {
     @Override
     public void setUnitOfWork(UnitOfWork unitOfWork) {
         this.unitOfWork = unitOfWork;
-        if (unitOfWork != null && onCompletions != null) {
+        if (unitOfWork != null && onCompletions != null && 
!onCompletions.isEmpty()) {
             // now an unit of work has been assigned so add the on completions
             // we might have registered already
             for (Synchronization onCompletion : onCompletions) {
@@ -612,7 +627,6 @@ public final class DefaultExchange implements 
ExtendedExchange {
             // cleanup the temporary on completion list as they now have been 
registered
             // on the unit of work
             onCompletions.clear();
-            onCompletions = null;
         }
     }
 
@@ -626,7 +640,7 @@ public final class DefaultExchange implements 
ExtendedExchange {
             }
             onCompletions.add(onCompletion);
         } else {
-            getUnitOfWork().addSynchronization(onCompletion);
+            unitOfWork.addSynchronization(onCompletion);
         }
     }
 
@@ -643,13 +657,12 @@ public final class DefaultExchange implements 
ExtendedExchange {
 
     @Override
     public void handoverCompletions(Exchange target) {
-        if (onCompletions != null) {
+        if (onCompletions != null && !onCompletions.isEmpty()) {
             for (Synchronization onCompletion : onCompletions) {
                 
target.adapt(ExtendedExchange.class).addOnCompletion(onCompletion);
             }
             // cleanup the temporary on completion list as they have been 
handed over
             onCompletions.clear();
-            onCompletions = null;
         } else if (unitOfWork != null) {
             // let unit of work handover
             unitOfWork.handoverSynchronization(target);
@@ -659,10 +672,9 @@ public final class DefaultExchange implements 
ExtendedExchange {
     @Override
     public List<Synchronization> handoverCompletions() {
         List<Synchronization> answer = null;
-        if (onCompletions != null) {
+        if (onCompletions != null && !onCompletions.isEmpty()) {
             answer = new ArrayList<>(onCompletions);
             onCompletions.clear();
-            onCompletions = null;
         }
         return answer;
     }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
index fd2194e..196c6ae 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/UnitOfWorkHelper.java
@@ -21,7 +21,6 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.camel.Exchange;
-import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Route;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.SynchronizationRouteAware;
@@ -56,20 +55,11 @@ public final class UnitOfWorkHelper {
             LOG.warn("Exception occurred during done UnitOfWork for Exchange: 
{}. This exception will be ignored.",
                     exchange, e);
         }
-        // stop
-        try {
-            uow.stop();
-        } catch (Throwable e) {
-            LOG.warn("Exception occurred during stopping UnitOfWork for 
Exchange: {}. This exception will be ignored.",
-                    exchange, e);
-        }
-        // MUST clear and set uow to null on exchange after done
-        ExtendedExchange ee = (ExtendedExchange) exchange;
-        ee.setUnitOfWork(null);
     }
 
     public static void doneSynchronizations(Exchange exchange, 
List<Synchronization> synchronizations, Logger log) {
         if (synchronizations != null && !synchronizations.isEmpty()) {
+            // TODO: only copy/sort if there is > 1 (if 1 then use directly 
(no for loop)
             // work on a copy of the list to avoid any modification which may 
cause ConcurrentModificationException
             List<Synchronization> copy = new ArrayList<>(synchronizations);
 

Reply via email to