This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new bbf062a4251 CAMEL-19801: pre-work for cleaning up copying exchanges 
(#11304)
bbf062a4251 is described below

commit bbf062a425141287d14617c282a982bca9100c00
Author: Otavio Rodolfo Piske <[email protected]>
AuthorDate: Tue Sep 5 18:14:40 2023 +0200

    CAMEL-19801: pre-work for cleaning up copying exchanges (#11304)
---
 .../apache/camel/support/AbstractExchangeTest.java |  19 +++-
 .../org/apache/camel/support/AbstractExchange.java | 115 +++++++++------------
 .../org/apache/camel/support/DefaultExchange.java  |   9 ++
 .../camel/support/DefaultPooledExchange.java       |   6 ++
 .../camel/support/ExtendedExchangeExtension.java   |  12 +--
 5 files changed, 83 insertions(+), 78 deletions(-)

diff --git 
a/core/camel-core/src/test/java/org/apache/camel/support/AbstractExchangeTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/support/AbstractExchangeTest.java
index 8b26b593e8f..2710b25b52f 100644
--- 
a/core/camel-core/src/test/java/org/apache/camel/support/AbstractExchangeTest.java
+++ 
b/core/camel-core/src/test/java/org/apache/camel/support/AbstractExchangeTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.support;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.spi.DataType;
@@ -30,9 +31,25 @@ import static org.junit.jupiter.api.Assertions.assertSame;
  */
 public class AbstractExchangeTest {
 
+    static class CustomAbstractExchange extends AbstractExchange {
+
+        CustomAbstractExchange(CustomAbstractExchange abstractExchange) {
+            super(abstractExchange);
+        }
+
+        public CustomAbstractExchange(CamelContext context) {
+            super(context);
+        }
+
+        @Override
+        AbstractExchange newCopy() {
+            return new CustomAbstractExchange(this);
+        }
+    }
+
     @Test
     void shouldPreserveDataTypeOnCopy() {
-        AbstractExchange e1 = new AbstractExchange(new DefaultCamelContext());
+        AbstractExchange e1 = new CustomAbstractExchange(new 
DefaultCamelContext());
         Object body1 = new Object();
         DataType type1 = new DataType("foo1");
         DefaultMessage in = new DefaultMessage((Exchange) null);
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
index 709497805b9..ac2824eda19 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
@@ -34,14 +34,11 @@ import org.apache.camel.ExchangePropertyKey;
 import org.apache.camel.Message;
 import org.apache.camel.MessageHistory;
 import org.apache.camel.SafeCopyProperty;
-import org.apache.camel.spi.HeadersMapFactory;
 import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.trait.message.MessageTrait;
 import org.apache.camel.trait.message.RedeliveryTraitPayload;
 import org.apache.camel.util.ObjectHelper;
 
-import static org.apache.camel.support.MessageHelper.copyBody;
-
 /**
  * Base class for the two official and only implementations of {@link 
Exchange}, the {@link DefaultExchange} and
  * {@link DefaultPooledExchange}.
@@ -51,7 +48,7 @@ import static org.apache.camel.support.MessageHelper.copyBody;
  *
  * @see DefaultExchange
  */
-class AbstractExchange implements Exchange {
+abstract class AbstractExchange implements Exchange {
     protected final EnumMap<ExchangePropertyKey, Object> internalProperties;
 
     protected final CamelContext context;
@@ -74,7 +71,7 @@ class AbstractExchange implements Exchange {
         this.context = context;
         this.internalProperties = new EnumMap<>(internalProperties);
         this.privateExtension = new ExtendedExchangeExtension(this);
-        this.properties = properties;
+        this.properties = safeCopyProperties(properties);
     }
 
     public AbstractExchange(CamelContext context) {
@@ -103,6 +100,43 @@ class AbstractExchange implements Exchange {
         privateExtension.setUnitOfWork(parent.getUnitOfWork());
     }
 
+    AbstractExchange(AbstractExchange parent) {
+        this.context = parent.getContext();
+        this.pattern = parent.getPattern();
+        this.created = parent.getCreated();
+
+        this.internalProperties = new EnumMap<>(parent.internalProperties);
+
+        privateExtension = new ExtendedExchangeExtension(this);
+        privateExtension.setFromEndpoint(parent.getFromEndpoint());
+        privateExtension.setFromRouteId(parent.getFromRouteId());
+        privateExtension.setUnitOfWork(parent.getUnitOfWork());
+
+        setIn(parent.getIn().copy());
+
+        if (parent.hasOut()) {
+            setOut(parent.getOut().copy());
+        }
+
+        setException(parent.exception);
+        setRouteStop(parent.routeStop);
+        setRollbackOnly(parent.rollbackOnly);
+        setRollbackOnlyLast(parent.rollbackOnlyLast);
+
+        
privateExtension.setNotifyEvent(parent.getExchangeExtension().isNotifyEvent());
+        
privateExtension.setRedeliveryExhausted(parent.getExchangeExtension().isRedeliveryExhausted());
+        
privateExtension.setErrorHandlerHandled(parent.getExchangeExtension().getErrorHandlerHandled());
+        
privateExtension.setStreamCacheDisabled(parent.getExchangeExtension().isStreamCacheDisabled());
+
+        if (parent.hasProperties()) {
+            this.properties = safeCopyProperties(parent.properties);
+        }
+
+        if (parent.hasSafeCopyProperties()) {
+            this.safeCopyProperties = parent.getSafeCopyProperties();
+        }
+    }
+
     public AbstractExchange(Endpoint fromEndpoint) {
         this.context = fromEndpoint.getCamelContext();
         this.pattern = fromEndpoint.getExchangePattern();
@@ -128,43 +162,11 @@ class AbstractExchange implements Exchange {
         return created;
     }
 
+    abstract AbstractExchange newCopy();
+
     @Override
     public Exchange copy() {
-        DefaultExchange exchange = new DefaultExchange(this);
-
-        exchange.setIn(getIn().copy());
-        copyBody(getIn(), exchange.getIn());
-        if (getIn().hasHeaders()) {
-            exchange.getIn().setHeaders(safeCopyHeaders(getIn().getHeaders()));
-        }
-        if (hasOut()) {
-            exchange.setOut(getOut().copy());
-            copyBody(getOut(), exchange.getOut());
-            if (getOut().hasHeaders()) {
-                
exchange.getOut().setHeaders(safeCopyHeaders(getOut().getHeaders()));
-            }
-        }
-
-        exchange.setException(exception);
-        exchange.setRouteStop(routeStop);
-        exchange.setRollbackOnly(rollbackOnly);
-        exchange.setRollbackOnlyLast(rollbackOnlyLast);
-        final ExtendedExchangeExtension newExchangeExtension = 
exchange.getExchangeExtension();
-        
newExchangeExtension.setNotifyEvent(getExchangeExtension().isNotifyEvent());
-        
newExchangeExtension.setRedeliveryExhausted(getExchangeExtension().isRedeliveryExhausted());
-        
newExchangeExtension.setErrorHandlerHandled(getExchangeExtension().getErrorHandlerHandled());
-        
newExchangeExtension.setStreamCacheDisabled(getExchangeExtension().isStreamCacheDisabled());
-
-        // copy properties after body as body may trigger lazy init
-        if (hasProperties()) {
-            copyProperties(getProperties(), exchange.getProperties());
-        }
-
-        if (hasSafeCopyProperties()) {
-            safeCopyProperties(this.safeCopyProperties, 
exchange.getSafeCopyProperties());
-        }
-        // copy over internal properties
-        exchange.internalProperties.putAll(internalProperties);
+        AbstractExchange exchange = newCopy();
 
         if (getContext().isMessageHistory()) {
             
exchange.internalProperties.computeIfPresent(ExchangePropertyKey.MESSAGE_HISTORY,
@@ -174,32 +176,6 @@ class AbstractExchange implements Exchange {
         return exchange;
     }
 
-    private Map<String, Object> safeCopyHeaders(Map<String, Object> headers) {
-        if (headers == null) {
-            return null;
-        }
-
-        if (context != null) {
-            HeadersMapFactory factory = 
context.getCamelContextExtension().getHeadersMapFactory();
-            if (factory != null) {
-                return factory.newMap(headers);
-            }
-        }
-        // should not really happen but some tests dont start camel context
-        return new HashMap<>(headers);
-    }
-
-    private void copyProperties(Map<String, Object> source, Map<String, 
Object> target) {
-        target.putAll(source);
-    }
-
-    private void safeCopyProperties(
-            Map<String, SafeCopyProperty> source, Map<String, 
SafeCopyProperty> target) {
-        source.entrySet().stream().forEach(entry -> {
-            target.put(entry.getKey(), entry.getValue().safeCopy());
-        });
-    }
-
     @Override
     public CamelContext getContext() {
         return context;
@@ -719,4 +695,11 @@ class AbstractExchange implements Exchange {
     public ExtendedExchangeExtension getExchangeExtension() {
         return privateExtension;
     }
+
+    private static Map<String, Object> safeCopyProperties(Map<String, Object> 
properties) {
+        if (properties == null) {
+            return null;
+        }
+        return new ConcurrentHashMap<>(properties);
+    }
 }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index a3a58e9c1c6..91cb6c882b0 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -47,6 +47,10 @@ public final class DefaultExchange extends AbstractExchange {
         super(parent);
     }
 
+    DefaultExchange(AbstractExchange parent) {
+        super(parent);
+    }
+
     public DefaultExchange(Endpoint fromEndpoint) {
         super(fromEndpoint);
     }
@@ -54,4 +58,9 @@ public final class DefaultExchange extends AbstractExchange {
     public DefaultExchange(Endpoint fromEndpoint, ExchangePattern pattern) {
         super(fromEndpoint, pattern);
     }
+
+    @Override
+    AbstractExchange newCopy() {
+        return new DefaultExchange(this);
+    }
 }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
index 75bcfca549a..19c757c1309 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
@@ -66,6 +66,12 @@ public final class DefaultPooledExchange extends 
AbstractExchange implements Poo
         this.properties = new ConcurrentHashMap<>(8);
     }
 
+    @Override
+    AbstractExchange newCopy() {
+        // NOTE: this is the same behavior as done previously from 
AbstractExchange when returning a copy.
+        return new DefaultExchange(this);
+    }
+
     public boolean isAutoRelease() {
         return autoRelease;
     }
diff --git 
a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
 
b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
index 3f60ac6ca0d..42cb778526b 100644
--- 
a/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
+++ 
b/core/camel-support/src/main/java/org/apache/camel/support/ExtendedExchangeExtension.java
@@ -20,7 +20,6 @@ package org.apache.camel.support;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -331,18 +330,9 @@ public class ExtendedExchangeExtension implements 
ExchangeExtension {
         setStreamCacheDisabled(false);
     }
 
-    private static Map<String, Object> safeCopyProperties(Map<String, Object> 
properties) {
-        if (properties == null) {
-            return null;
-        }
-        return new ConcurrentHashMap<>(properties);
-    }
-
     @Override
     public Exchange createCopyWithProperties(CamelContext context) {
-        final Map<String, Object> properties = 
safeCopyProperties(exchange.properties);
-
-        DefaultExchange answer = new DefaultExchange(context, 
exchange.internalProperties, properties);
+        DefaultExchange answer = new DefaultExchange(context, 
exchange.internalProperties, exchange.properties);
 
         answer.setPattern(exchange.pattern);
 

Reply via email to