This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 38ad670536f7 CAMEL-23681: Optimize Exchange memory pressure with
copy-on-write headers (#23738)
38ad670536f7 is described below
commit 38ad670536f749955d4d3cf8d942f88f83d709fb
Author: Guillaume Nodet <[email protected]>
AuthorDate: Thu Jun 4 20:41:44 2026 +0200
CAMEL-23681: Optimize Exchange memory pressure with copy-on-write headers
(#23738)
- Implement CopyOnWriteHeadersMap: a self-contained lazy COW wrapper that
defers header map cloning until the first mutation. Read operations
delegate directly; writes trigger a one-time copy via HeadersMapFactory.
- MessageSupport.copyFrom() shares headers via
DefaultMessage.copyHeadersFrom()
instead of deep-copying the map on every Exchange copy.
- DefaultMessage mutation methods (setHeader, removeHeader, removeHeaders)
delegate through getHeaders() which returns the COW wrapper transparently.
- DefaultMessage.reset() nulls COW headers instead of clearing (avoids
triggering a copy just to discard).
- DefaultPooledExchange.done() reuses cleared properties maps (discards
only when oversized > 50 entries).
- COW wrapper satisfies the full Map contract (equals, hashCode, toString,
forEach) and provides COW-aware keySet/values/entrySet views with lazy
iterators that only trigger copy on Iterator.remove().
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../camel/impl/DefaultMessageHeaderTest.java | 540 ++++++++++++++++++
.../camel/support/CopyOnWriteHeadersMap.java | 611 +++++++++++++++++++++
.../org/apache/camel/support/DefaultMessage.java | 67 ++-
.../camel/support/DefaultPooledExchange.java | 11 +-
.../org/apache/camel/support/MessageSupport.java | 10 +-
5 files changed, 1209 insertions(+), 30 deletions(-)
diff --git
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultMessageHeaderTest.java
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultMessageHeaderTest.java
index d5a4b25eb1fe..c12174e63754 100644
---
a/core/camel-core/src/test/java/org/apache/camel/impl/DefaultMessageHeaderTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/impl/DefaultMessageHeaderTest.java
@@ -16,6 +16,11 @@
*/
package org.apache.camel.impl;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.camel.CamelContext;
import org.apache.camel.Message;
import org.apache.camel.support.DefaultExchange;
@@ -310,4 +315,539 @@ public class DefaultMessageHeaderTest {
assertEquals(Integer.valueOf(123), msg.getHeader("beer", "123",
Integer.class));
}
+ @Test
+ public void testCopyOnWriteHeaders() {
+ // Test that headers are shared until modified
+ DefaultMessage msg1 = new DefaultMessage(camelContext);
+ msg1.setHeader("foo", "cheese");
+ msg1.setHeader("bar", "beer");
+
+ DefaultMessage msg2 = new DefaultMessage(camelContext);
+ msg2.copyFrom(msg1);
+
+ // Both should have the same headers
+ assertEquals("cheese", msg2.getHeader("foo"));
+ assertEquals("beer", msg2.getHeader("bar"));
+
+ // Modify msg2 - should trigger copy-on-write
+ msg2.setHeader("foo", "wine");
+
+ // msg2 should have the new value
+ assertEquals("wine", msg2.getHeader("foo"));
+ assertEquals("beer", msg2.getHeader("bar"));
+
+ // msg1 should still have the original value (not affected by msg2's
modification)
+ assertEquals("cheese", msg1.getHeader("foo"));
+ assertEquals("beer", msg1.getHeader("bar"));
+ }
+
+ @Test
+ public void testCopyOnWriteHeadersRemove() {
+ // Test that removeHeader triggers copy-on-write
+ DefaultMessage msg1 = new DefaultMessage(camelContext);
+ msg1.setHeader("foo", "cheese");
+ msg1.setHeader("bar", "beer");
+
+ DefaultMessage msg2 = new DefaultMessage(camelContext);
+ msg2.copyFrom(msg1);
+
+ // Remove header from msg2 - should trigger copy-on-write
+ msg2.removeHeader("bar");
+
+ // msg2 should not have bar
+ assertNull(msg2.getHeader("bar"));
+ assertEquals("cheese", msg2.getHeader("foo"));
+
+ // msg1 should still have both headers
+ assertEquals("cheese", msg1.getHeader("foo"));
+ assertEquals("beer", msg1.getHeader("bar"));
+ }
+
+ @Test
+ public void testCopyOnWriteHeadersRemoveMultiple() {
+ // Test that removeHeaders triggers copy-on-write
+ DefaultMessage msg1 = new DefaultMessage(camelContext);
+ msg1.setHeader("foo", "cheese");
+ msg1.setHeader("bar", "beer");
+ msg1.setHeader("baz", "wine");
+
+ DefaultMessage msg2 = new DefaultMessage(camelContext);
+ msg2.copyFrom(msg1);
+
+ // Remove headers from msg2 - should trigger copy-on-write
+ msg2.removeHeaders("ba*");
+
+ // msg2 should not have bar or baz
+ assertNull(msg2.getHeader("bar"));
+ assertNull(msg2.getHeader("baz"));
+ assertEquals("cheese", msg2.getHeader("foo"));
+
+ // msg1 should still have all headers
+ assertEquals("cheese", msg1.getHeader("foo"));
+ assertEquals("beer", msg1.getHeader("bar"));
+ assertEquals("wine", msg1.getHeader("baz"));
+ }
+
+ @Test
+ public void testLazyHeadersInitialization() {
+ DefaultMessage msg = new DefaultMessage(camelContext);
+
+ assertNull(msg.getHeader("foo"));
+ assertNull(msg.getHeader("bar", Object.class));
+ assertEquals("default", msg.getHeader("baz", "default"));
+
+ assertFalse(msg.hasHeaders());
+ assertTrue(msg.getHeaders().isEmpty());
+ }
+
+ @Test
+ public void testCopyOnWriteGetHeadersPut() {
+ // getHeaders().put() on a copy must NOT affect the original
+ DefaultMessage original = new DefaultMessage(camelContext);
+ original.setHeader("foo", "bar");
+ original.setHeader("hello", "world");
+
+ DefaultMessage copy = new DefaultMessage(camelContext);
+ copy.copyFrom(original);
+
+ // Mutate via getHeaders().put()
+ copy.getHeaders().put("new-key", "new-value");
+
+ // Original must be unaffected
+ assertNull(original.getHeader("new-key"));
+ assertEquals("bar", original.getHeader("foo"));
+ assertEquals("world", original.getHeader("hello"));
+ assertEquals(2, original.getHeaders().size());
+
+ // Copy must have all three
+ assertEquals("new-value", copy.getHeader("new-key"));
+ assertEquals("bar", copy.getHeader("foo"));
+ assertEquals(3, copy.getHeaders().size());
+ }
+
+ @Test
+ public void testCopyOnWriteGetHeadersClear() {
+ // getHeaders().clear() on a copy must NOT affect the original
+ DefaultMessage original = new DefaultMessage(camelContext);
+ original.setHeader("foo", "bar");
+ original.setHeader("hello", "world");
+
+ DefaultMessage copy = new DefaultMessage(camelContext);
+ copy.copyFrom(original);
+
+ // Clear copy's headers via map reference
+ copy.getHeaders().clear();
+
+ // Original must still have its headers
+ assertEquals("bar", original.getHeader("foo"));
+ assertEquals("world", original.getHeader("hello"));
+ assertEquals(2, original.getHeaders().size());
+
+ // Copy must be empty
+ assertTrue(copy.getHeaders().isEmpty());
+ }
+
+ @Test
+ public void testCopyOnWriteGetHeadersPutAll() {
+ // getHeaders().putAll() on a copy must NOT affect the original
+ DefaultMessage original = new DefaultMessage(camelContext);
+ original.setHeader("foo", "bar");
+
+ DefaultMessage copy = new DefaultMessage(camelContext);
+ copy.copyFrom(original);
+
+ // putAll on copy
+ copy.getHeaders().putAll(Map.of("x", "1", "y", "2"));
+
+ // Original must only have foo
+ assertEquals(1, original.getHeaders().size());
+ assertEquals("bar", original.getHeader("foo"));
+ assertNull(original.getHeader("x"));
+
+ // Copy must have all three
+ assertEquals(3, copy.getHeaders().size());
+ assertEquals("bar", copy.getHeader("foo"));
+ assertEquals("1", copy.getHeader("x"));
+ assertEquals("2", copy.getHeader("y"));
+ }
+
+ @Test
+ public void testCopyOnWriteGetHeadersRemove() {
+ // getHeaders().remove() on a copy must NOT affect the original
+ DefaultMessage original = new DefaultMessage(camelContext);
+ original.setHeader("foo", "bar");
+ original.setHeader("hello", "world");
+
+ DefaultMessage copy = new DefaultMessage(camelContext);
+ copy.copyFrom(original);
+
+ // Remove via map reference
+ copy.getHeaders().remove("foo");
+
+ // Original must still have foo
+ assertEquals("bar", original.getHeader("foo"));
+ assertEquals(2, original.getHeaders().size());
+
+ // Copy must not have foo
+ assertNull(copy.getHeader("foo"));
+ assertEquals(1, copy.getHeaders().size());
+ }
+
+ @Test
+ public void testCopyOnWriteSetHeadersResetsShared() {
+ // setHeaders() must break shared state
+ DefaultMessage original = new DefaultMessage(camelContext);
+ original.setHeader("foo", "bar");
+
+ DefaultMessage copy = new DefaultMessage(camelContext);
+ copy.copyFrom(original);
+
+ // Replace copy's headers entirely
+ copy.setHeaders(new HashMap<>(Map.of("new", "headers")));
+
+ // Mutating original should not affect copy
+ original.setHeader("extra", "value");
+
+ // Copy must only have {new=headers}
+ assertEquals(1, copy.getHeaders().size());
+ assertEquals("headers", copy.getHeader("new"));
+ assertNull(copy.getHeader("foo"));
+ assertNull(copy.getHeader("extra"));
+ }
+
+ @Test
+ public void testCopyOnWriteMultipleCopiesIsolation() {
+ // Multiple copies from the same original must be fully isolated
+ DefaultMessage original = new DefaultMessage(camelContext);
+ for (int i = 0; i < 100; i++) {
+ original.setHeader("key-" + i, "value-" + i);
+ }
+
+ DefaultMessage copy1 = new DefaultMessage(camelContext);
+ copy1.copyFrom(original);
+
+ DefaultMessage copy2 = new DefaultMessage(camelContext);
+ copy2.copyFrom(original);
+
+ // Mutate each copy differently
+ for (int i = 0; i < 10; i++) {
+ copy1.getHeaders().put("copy1-" + i, "v1");
+ copy2.getHeaders().put("copy2-" + i, "v2");
+ }
+ copy1.removeHeader("key-0");
+ copy2.removeHeader("key-1");
+
+ // Original must be unmodified (still exactly 100 headers)
+ assertEquals(100, original.getHeaders().size());
+ for (int i = 0; i < 100; i++) {
+ assertEquals("value-" + i, original.getHeader("key-" + i));
+ }
+ assertNull(original.getHeader("copy1-0"));
+ assertNull(original.getHeader("copy2-0"));
+
+ // copy1: 99 original + 10 copy1 keys, no copy2 keys
+ assertEquals(109, copy1.getHeaders().size());
+ assertNull(copy1.getHeader("key-0"));
+ assertEquals("value-1", copy1.getHeader("key-1"));
+ assertEquals("v1", copy1.getHeader("copy1-0"));
+ assertNull(copy1.getHeader("copy2-0"));
+
+ // copy2: 99 original + 10 copy2 keys, no copy1 keys
+ assertEquals(109, copy2.getHeaders().size());
+ assertEquals("value-0", copy2.getHeader("key-0"));
+ assertNull(copy2.getHeader("key-1"));
+ assertNull(copy2.getHeader("copy1-0"));
+ assertEquals("v2", copy2.getHeader("copy2-0"));
+ }
+
+ @Test
+ public void testLazyCopyOnWriteReadOperations() {
+ DefaultMessage original = new DefaultMessage(camelContext);
+ original.setHeader("foo", "bar");
+ original.setHeader("hello", "world");
+
+ DefaultMessage copy = new DefaultMessage(camelContext);
+ copy.copyFrom(original);
+
+ Map<String, Object> copyHeaders = copy.getHeaders();
+
+ // Read operations should work correctly without triggering COW
+ assertEquals(2, copyHeaders.size());
+ assertTrue(copyHeaders.containsKey("foo"));
+ assertTrue(copyHeaders.containsKey("hello"));
+ assertEquals("bar", copyHeaders.get("foo"));
+ assertEquals("world", copyHeaders.get("hello"));
+ assertFalse(copyHeaders.isEmpty());
+
+ // Original must be unaffected by reads on the copy
+ assertEquals("bar", original.getHeader("foo"));
+ assertEquals("world", original.getHeader("hello"));
+ assertEquals(2, original.getHeaders().size());
+
+ // Iterate over entries (read operation)
+ int count = 0;
+ for (Map.Entry<String, Object> entry : copyHeaders.entrySet()) {
+ assertNotNull(entry.getKey());
+ assertNotNull(entry.getValue());
+ count++;
+ }
+ assertEquals(2, count);
+
+ // Original still intact after iteration
+ assertEquals("bar", original.getHeader("foo"));
+ assertEquals("world", original.getHeader("hello"));
+ assertEquals(2, original.getHeaders().size());
+
+ // Now mutate - this should trigger the copy
+ copyHeaders.put("new-key", "new-value");
+
+ // Original must be unaffected by the mutation
+ assertNull(original.getHeader("new-key"));
+ assertEquals(2, original.getHeaders().size());
+
+ // Copy must have the new key
+ assertEquals("new-value", copy.getHeader("new-key"));
+ assertEquals(3, copy.getHeaders().size());
+ }
+
+ @Test
+ public void testLazyCopyOnWriteRemove() {
+ DefaultMessage original = new DefaultMessage(camelContext);
+ original.setHeader("foo", "bar");
+ original.setHeader("hello", "world");
+
+ DefaultMessage copy = new DefaultMessage(camelContext);
+ copy.copyFrom(original);
+
+ Map<String, Object> copyHeaders = copy.getHeaders();
+
+ // Read first - original must be unaffected
+ assertEquals("bar", copyHeaders.get("foo"));
+ assertEquals("bar", original.getHeader("foo"));
+ assertEquals(2, original.getHeaders().size());
+
+ // Now remove from copy
+ copyHeaders.remove("foo");
+
+ // Original unaffected
+ assertEquals("bar", original.getHeader("foo"));
+ assertEquals(2, original.getHeaders().size());
+
+ // Copy affected
+ assertNull(copy.getHeader("foo"));
+ assertEquals(1, copy.getHeaders().size());
+ }
+
+ @Test
+ public void testLazyCopyOnWriteClear() {
+ DefaultMessage original = new DefaultMessage(camelContext);
+ original.setHeader("foo", "bar");
+ original.setHeader("hello", "world");
+
+ DefaultMessage copy = new DefaultMessage(camelContext);
+ copy.copyFrom(original);
+
+ Map<String, Object> copyHeaders = copy.getHeaders();
+
+ // Read operations - original must be unaffected
+ assertFalse(copyHeaders.isEmpty());
+ assertEquals("bar", original.getHeader("foo"));
+ assertEquals(2, original.getHeaders().size());
+
+ // Clear copy
+ copyHeaders.clear();
+
+ // Original unaffected
+ assertEquals(2, original.getHeaders().size());
+ assertEquals("bar", original.getHeader("foo"));
+
+ // Copy cleared
+ assertTrue(copy.getHeaders().isEmpty());
+ }
+
+ @Test
+ public void testLazyCopyOnWriteKeySetIteration() {
+ DefaultMessage original = new DefaultMessage(camelContext);
+ original.setHeader("foo", "bar");
+ original.setHeader("hello", "world");
+
+ DefaultMessage copy = new DefaultMessage(camelContext);
+ copy.copyFrom(original);
+
+ Map<String, Object> copyHeaders = copy.getHeaders();
+
+ // Iterate over keySet (read-only)
+ int count = 0;
+ for (String key : copyHeaders.keySet()) {
+ assertNotNull(key);
+ count++;
+ }
+ assertEquals(2, count);
+
+ // Original must be unaffected by read-only iteration
+ assertEquals("bar", original.getHeader("foo"));
+ assertEquals("world", original.getHeader("hello"));
+ assertEquals(2, original.getHeaders().size());
+
+ // Now use iterator.remove() - triggers COW
+ var iterator = copyHeaders.keySet().iterator();
+ assertTrue(iterator.hasNext());
+ iterator.next();
+ iterator.remove();
+
+ // Original unaffected
+ assertEquals(2, original.getHeaders().size());
+
+ // Copy affected
+ assertEquals(1, copy.getHeaders().size());
+ }
+
+ @Test
+ public void testLazyCopyOnWriteValues() {
+ DefaultMessage original = new DefaultMessage(camelContext);
+ original.setHeader("foo", "bar");
+ original.setHeader("hello", "world");
+
+ DefaultMessage copy = new DefaultMessage(camelContext);
+ copy.copyFrom(original);
+
+ Map<String, Object> copyHeaders = copy.getHeaders();
+
+ // Read from values()
+ Collection<Object> values = copyHeaders.values();
+ assertEquals(2, values.size());
+ assertTrue(values.contains("bar"));
+ assertTrue(values.contains("world"));
+
+ // Original must be unaffected by values() read
+ assertEquals("bar", original.getHeader("foo"));
+ assertEquals(2, original.getHeaders().size());
+
+ // Remove from values() - triggers COW
+ values.remove("bar");
+
+ // Original unaffected
+ assertEquals("bar", original.getHeader("foo"));
+ assertEquals(2, original.getHeaders().size());
+
+ // Copy affected
+ assertNull(copy.getHeader("foo"));
+ assertEquals(1, copy.getHeaders().size());
+ }
+
+ @Test
+ public void testLazyCopyOnWriteEntrySet() {
+ DefaultMessage original = new DefaultMessage(camelContext);
+ original.setHeader("foo", "bar");
+ original.setHeader("hello", "world");
+
+ DefaultMessage copy = new DefaultMessage(camelContext);
+ copy.copyFrom(original);
+
+ Map<String, Object> copyHeaders = copy.getHeaders();
+
+ // Read from entrySet()
+ Set<Map.Entry<String, Object>> entries = copyHeaders.entrySet();
+ assertEquals(2, entries.size());
+
+ // Iterate (read-only)
+ for (Map.Entry<String, Object> entry : entries) {
+ assertNotNull(entry.getKey());
+ assertNotNull(entry.getValue());
+ }
+
+ // Original must be unaffected by read-only entrySet iteration
+ assertEquals("bar", original.getHeader("foo"));
+ assertEquals("world", original.getHeader("hello"));
+ assertEquals(2, original.getHeaders().size());
+
+ // Remove from entrySet iterator - triggers COW
+ var iterator = entries.iterator();
+ assertTrue(iterator.hasNext());
+ iterator.next();
+ iterator.remove();
+
+ // Original unaffected
+ assertEquals(2, original.getHeaders().size());
+
+ // Copy affected
+ assertEquals(1, copy.getHeaders().size());
+ }
+
+ // ========== Lazy populated headers tests ==========
+
+ private static class LazyPopulatedMessage extends DefaultMessage {
+ LazyPopulatedMessage(CamelContext camelContext) {
+ super(camelContext);
+ }
+
+ @Override
+ protected void populateInitialHeaders(Map<String, Object> map) {
+ map.put("transport-header", "transport-value");
+ map.put("content-type", "text/plain");
+ map.put("message-id", "12345");
+ }
+
+ @Override
+ protected boolean isPopulateHeadersSupported() {
+ return true;
+ }
+ }
+
+ @Test
+ public void testLazyPopulatedHeadersGetHeader() {
+ LazyPopulatedMessage msg = new LazyPopulatedMessage(camelContext);
+
+ assertEquals("transport-value", msg.getHeader("transport-header"));
+ assertEquals("text/plain", msg.getHeader("content-type"));
+ assertEquals("12345", msg.getHeader("message-id"));
+ assertNull(msg.getHeader("nonexistent"));
+ }
+
+ @Test
+ public void testLazyPopulatedHeadersGetHeaderWithDefault() {
+ LazyPopulatedMessage msg = new LazyPopulatedMessage(camelContext);
+
+ assertEquals("transport-value", msg.getHeader("transport-header",
"fallback"));
+ assertEquals("fallback", msg.getHeader("nonexistent", "fallback"));
+ }
+
+ @Test
+ public void testLazyPopulatedHeadersGetHeaderWithType() {
+ LazyPopulatedMessage msg = new LazyPopulatedMessage(camelContext);
+ msg.setExchange(new DefaultExchange(camelContext));
+
+ assertEquals("12345", msg.getHeader("message-id", String.class));
+ assertEquals(Integer.valueOf(12345), msg.getHeader("message-id",
Integer.class));
+ assertNull(msg.getHeader("nonexistent", String.class));
+ }
+
+ @Test
+ public void testLazyPopulatedHeadersRemoveHeader() {
+ LazyPopulatedMessage msg = new LazyPopulatedMessage(camelContext);
+
+ assertEquals("transport-value", msg.removeHeader("transport-header"));
+ assertNull(msg.getHeader("transport-header"));
+ assertEquals("text/plain", msg.getHeader("content-type"));
+ assertEquals(2, msg.getHeaders().size());
+ }
+
+ @Test
+ public void testLazyPopulatedHeadersGetHeaders() {
+ LazyPopulatedMessage msg = new LazyPopulatedMessage(camelContext);
+
+ Map<String, Object> headers = msg.getHeaders();
+ assertEquals(3, headers.size());
+ assertEquals("transport-value", headers.get("transport-header"));
+ assertEquals("text/plain", headers.get("content-type"));
+ assertEquals("12345", headers.get("message-id"));
+ }
+
+ @Test
+ public void testLazyPopulatedHeadersHasHeaders() {
+ LazyPopulatedMessage msg = new LazyPopulatedMessage(camelContext);
+
+ assertTrue(msg.hasHeaders());
+ }
+
}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/CopyOnWriteHeadersMap.java
b/core/camel-support/src/main/java/org/apache/camel/support/CopyOnWriteHeadersMap.java
new file mode 100644
index 000000000000..7175658cbb95
--- /dev/null
+++
b/core/camel-support/src/main/java/org/apache/camel/support/CopyOnWriteHeadersMap.java
@@ -0,0 +1,611 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.support;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import org.apache.camel.spi.HeadersMapFactory;
+
+/**
+ * A lazy copy-on-write wrapper for message headers that defers cloning until
the first mutation.
+ * <p>
+ * This wrapper delegates all read operations (get, containsKey, size, etc.)
directly to the shared headers map without
+ * copying. Write operations (put, remove, clear, etc.) trigger a one-time
clone of the shared map into a private copy,
+ * then perform the mutation on the private copy.
+ * <p>
+ * After the copy-on-write is triggered, all subsequent operations (reads and
writes) operate on the private copy.
+ * <p>
+ * This class is not thread-safe. It follows the same threading model as
{@link DefaultMessage} — an Exchange is
+ * typically processed by a single thread at a time.
+ */
+final class CopyOnWriteHeadersMap implements Map<String, Object> {
+
+ private final HeadersMapFactory factory;
+ private Map<String, Object> delegate;
+ private boolean shared;
+
+ CopyOnWriteHeadersMap(Map<String, Object> sharedHeaders, HeadersMapFactory
factory) {
+ this.delegate = sharedHeaders;
+ this.factory = factory;
+ this.shared = true;
+ }
+
+ /**
+ * Ensures the headers map is writable by triggering copy-on-write if
necessary.
+ */
+ private void ensureWritable() {
+ if (shared) {
+ delegate = factory.newMap(delegate);
+ shared = false;
+ }
+ }
+
+ Map<String, Object> getUnderlying() {
+ return delegate;
+ }
+
+ // ========== Read-only operations (no COW trigger) ==========
+
+ @Override
+ public int size() {
+ return delegate.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return delegate.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return delegate.containsValue(value);
+ }
+
+ @Override
+ public Object get(Object key) {
+ return delegate.get(key);
+ }
+
+ @Override
+ public Object getOrDefault(Object key, Object defaultValue) {
+ return delegate.getOrDefault(key, defaultValue);
+ }
+
+ @Override
+ public void forEach(BiConsumer<? super String, ? super Object> action) {
+ delegate.forEach(action);
+ }
+
+ @Override
+ public Set<String> keySet() {
+ return new CopyOnWriteKeySet();
+ }
+
+ @Override
+ public Collection<Object> values() {
+ return new CopyOnWriteValues();
+ }
+
+ @Override
+ public Set<Entry<String, Object>> entrySet() {
+ return new CopyOnWriteEntrySet();
+ }
+
+ // ========== Write operations (trigger COW first) ==========
+
+ @Override
+ public Object put(String key, Object value) {
+ ensureWritable();
+ return delegate.put(key, value);
+ }
+
+ @Override
+ public Object remove(Object key) {
+ ensureWritable();
+ return delegate.remove(key);
+ }
+
+ @Override
+ public void putAll(Map<? extends String, ?> m) {
+ if (m.isEmpty()) {
+ return;
+ }
+ ensureWritable();
+ delegate.putAll(m);
+ }
+
+ @Override
+ public void clear() {
+ if (delegate.isEmpty()) {
+ return;
+ }
+ ensureWritable();
+ delegate.clear();
+ }
+
+ @Override
+ public Object putIfAbsent(String key, Object value) {
+ ensureWritable();
+ return delegate.putIfAbsent(key, value);
+ }
+
+ @Override
+ public boolean remove(Object key, Object value) {
+ ensureWritable();
+ return delegate.remove(key, value);
+ }
+
+ @Override
+ public boolean replace(String key, Object oldValue, Object newValue) {
+ ensureWritable();
+ return delegate.replace(key, oldValue, newValue);
+ }
+
+ @Override
+ public Object replace(String key, Object value) {
+ ensureWritable();
+ return delegate.replace(key, value);
+ }
+
+ @Override
+ public Object computeIfAbsent(String key, Function<? super String, ?>
mappingFunction) {
+ ensureWritable();
+ return delegate.computeIfAbsent(key, mappingFunction);
+ }
+
+ @Override
+ public Object computeIfPresent(
+ String key, BiFunction<? super String, ? super Object, ?>
remappingFunction) {
+ ensureWritable();
+ return delegate.computeIfPresent(key, remappingFunction);
+ }
+
+ @Override
+ public Object compute(
+ String key, BiFunction<? super String, ? super Object, ?>
remappingFunction) {
+ ensureWritable();
+ return delegate.compute(key, remappingFunction);
+ }
+
+ @Override
+ public Object merge(
+ String key, Object value, BiFunction<? super Object, ? super
Object, ?> remappingFunction) {
+ ensureWritable();
+ return delegate.merge(key, value, remappingFunction);
+ }
+
+ @Override
+ public void replaceAll(BiFunction<? super String, ? super Object, ?>
function) {
+ ensureWritable();
+ delegate.replaceAll(function);
+ }
+
+ // ========== Object methods ==========
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ return delegate.equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return delegate.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return delegate.toString();
+ }
+
+ // ========== COW-aware collection view wrappers ==========
+
+ /**
+ * A COW-aware Set wrapper for keySet() that triggers copy-on-write for
mutating operations. The wrapper does NOT
+ * cache the view at construction time; instead, it accesses the current
delegate's keySet() on each operation. This
+ * ensures that after ensureWritable() clones the map, subsequent
operations work on the new private copy, not the
+ * old shared map.
+ */
+ private class CopyOnWriteKeySet implements Set<String> {
+
+ // Read operations - no COW trigger
+ @Override
+ public int size() {
+ return delegate.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return delegate.containsKey(o);
+ }
+
+ @Override
+ public Object[] toArray() {
+ return delegate.keySet().toArray();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ return delegate.keySet().toArray(a);
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ return delegate.keySet().containsAll(c);
+ }
+
+ // Write operations - trigger COW first
+ @Override
+ public boolean add(String e) {
+ ensureWritable();
+ return delegate.keySet().add(e);
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ ensureWritable();
+ return delegate.keySet().remove(o);
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends String> c) {
+ if (c.isEmpty()) {
+ return false;
+ }
+ ensureWritable();
+ return delegate.keySet().addAll(c);
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ if (delegate.isEmpty()) {
+ return false;
+ }
+ ensureWritable();
+ return delegate.keySet().retainAll(c);
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ if (c.isEmpty() || delegate.isEmpty()) {
+ return false;
+ }
+ ensureWritable();
+ return delegate.keySet().removeAll(c);
+ }
+
+ @Override
+ public void clear() {
+ if (delegate.isEmpty()) {
+ return;
+ }
+ ensureWritable();
+ delegate.clear();
+ }
+
+ @Override
+ public Iterator<String> iterator() {
+ final Iterator<String> iter = delegate.keySet().iterator();
+ return new Iterator<String>() {
+ private String lastReturned;
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public String next() {
+ lastReturned = iter.next();
+ return lastReturned;
+ }
+
+ @Override
+ public void remove() {
+ ensureWritable();
+ delegate.remove(lastReturned);
+ }
+ };
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return delegate.keySet().equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return delegate.keySet().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return delegate.keySet().toString();
+ }
+ }
+
+ /**
+ * A COW-aware Collection wrapper for values() that triggers copy-on-write
for mutating operations.
+ */
+ private class CopyOnWriteValues implements Collection<Object> {
+
+ // Read operations - no COW trigger
+ @Override
+ public int size() {
+ return delegate.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return delegate.containsValue(o);
+ }
+
+ @Override
+ public Object[] toArray() {
+ return delegate.values().toArray();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ return delegate.values().toArray(a);
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ return delegate.values().containsAll(c);
+ }
+
+ // Write operations - trigger COW first
+ @Override
+ public boolean add(Object e) {
+ ensureWritable();
+ return delegate.values().add(e);
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ ensureWritable();
+ return delegate.values().remove(o);
+ }
+
+ @Override
+ public boolean addAll(Collection<?> c) {
+ if (c.isEmpty()) {
+ return false;
+ }
+ ensureWritable();
+ return delegate.values().addAll(c);
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ if (delegate.isEmpty()) {
+ return false;
+ }
+ ensureWritable();
+ return delegate.values().retainAll(c);
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ if (c.isEmpty() || delegate.isEmpty()) {
+ return false;
+ }
+ ensureWritable();
+ return delegate.values().removeAll(c);
+ }
+
+ @Override
+ public void clear() {
+ if (delegate.isEmpty()) {
+ return;
+ }
+ ensureWritable();
+ delegate.clear();
+ }
+
+ @Override
+ public Iterator<Object> iterator() {
+ final Iterator<Object> iter = delegate.values().iterator();
+ return new Iterator<Object>() {
+ private Object lastReturned;
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public Object next() {
+ lastReturned = iter.next();
+ return lastReturned;
+ }
+
+ @Override
+ public void remove() {
+ ensureWritable();
+ delegate.values().remove(lastReturned);
+ }
+ };
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return delegate.values().equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return delegate.values().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return delegate.values().toString();
+ }
+ }
+
+ /**
+ * A COW-aware Set wrapper for entrySet() that triggers copy-on-write for
mutating operations.
+ */
+ private class CopyOnWriteEntrySet implements Set<Entry<String, Object>> {
+
+ // Read operations - no COW trigger
+ @Override
+ public int size() {
+ return delegate.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return delegate.isEmpty();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return delegate.entrySet().contains(o);
+ }
+
+ @Override
+ public Object[] toArray() {
+ return delegate.entrySet().toArray();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ return delegate.entrySet().toArray(a);
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ return delegate.entrySet().containsAll(c);
+ }
+
+ // Write operations - trigger COW first
+ @Override
+ public boolean add(Entry<String, Object> e) {
+ ensureWritable();
+ return delegate.entrySet().add(e);
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ ensureWritable();
+ return delegate.entrySet().remove(o);
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends Entry<String, Object>> c) {
+ if (c.isEmpty()) {
+ return false;
+ }
+ ensureWritable();
+ return delegate.entrySet().addAll(c);
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ if (delegate.isEmpty()) {
+ return false;
+ }
+ ensureWritable();
+ return delegate.entrySet().retainAll(c);
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ if (c.isEmpty() || delegate.isEmpty()) {
+ return false;
+ }
+ ensureWritable();
+ return delegate.entrySet().removeAll(c);
+ }
+
+ @Override
+ public void clear() {
+ if (delegate.isEmpty()) {
+ return;
+ }
+ ensureWritable();
+ delegate.clear();
+ }
+
+ @Override
+ public Iterator<Entry<String, Object>> iterator() {
+ final Iterator<Entry<String, Object>> iter =
delegate.entrySet().iterator();
+ return new Iterator<Entry<String, Object>>() {
+ private Entry<String, Object> lastReturned;
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public Entry<String, Object> next() {
+ lastReturned = iter.next();
+ return lastReturned;
+ }
+
+ @Override
+ public void remove() {
+ ensureWritable();
+ delegate.remove(lastReturned.getKey());
+ }
+ };
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return delegate.entrySet().equals(o);
+ }
+
+ @Override
+ public int hashCode() {
+ return delegate.entrySet().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return delegate.entrySet().toString();
+ }
+ }
+
+}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java
index 5f79a1520785..b7396c7964d3 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultMessage.java
@@ -53,7 +53,9 @@ public class DefaultMessage extends MessageSupport {
@Override
public void reset() {
super.reset();
- if (headers != null) {
+ if (headers instanceof CopyOnWriteHeadersMap) {
+ headers = null;
+ } else if (headers != null) {
headers.clear();
}
removeTrait(MessageTrait.ATTACHMENTS);
@@ -62,7 +64,7 @@ public class DefaultMessage extends MessageSupport {
@Override
public Object getHeader(String name) {
if (headers == null) {
- // force creating headers
+ // force creating headers (supports lazy population in subclasses
like JmsMessage)
headers = createHeaders();
}
@@ -204,22 +206,16 @@ public class DefaultMessage extends MessageSupport {
@Override
public void setHeader(String name, Object value) {
- if (headers == null) {
- headers = createHeaders();
- }
- headers.put(name, value);
+ getHeaders().put(name, value);
}
@Override
public Object removeHeader(String name) {
- if (headers == null) {
- // force creating headers
- headers = createHeaders();
- }
- if (headers.isEmpty()) {
+ Map<String, Object> h = getHeaders();
+ if (h.isEmpty()) {
return null;
}
- return headers.remove(name);
+ return h.remove(name);
}
@Override
@@ -229,29 +225,25 @@ public class DefaultMessage extends MessageSupport {
@Override
public boolean removeHeaders(String pattern, String... excludePatterns) {
- if (headers == null) {
- // force creating headers
- headers = createHeaders();
- }
- if (headers.isEmpty()) {
+ Map<String, Object> h = getHeaders();
+ if (h.isEmpty()) {
return false;
}
// special optimized
if (excludePatterns == null && "*".equals(pattern)) {
- headers.clear();
+ h.clear();
return true;
}
- final Set<String> toBeRemoved = PatternHelper.matchingSet(headers,
pattern, excludePatterns);
+ final Set<String> toBeRemoved = PatternHelper.matchingSet(h, pattern,
excludePatterns);
if (toBeRemoved != null) {
- if (toBeRemoved.size() == headers.size()) {
- // special optimization when all should be removed
- headers.clear();
+ if (toBeRemoved.size() == h.size()) {
+ h.clear();
} else {
for (String key : toBeRemoved) {
- headers.remove(key);
+ h.remove(key);
}
}
@@ -341,4 +333,33 @@ public class DefaultMessage extends MessageSupport {
protected boolean hasPopulatedHeaders() {
return headers != null;
}
+
+ /**
+ * Shares headers with the source message using copy-on-write. Both
messages will share the same underlying map
+ * until one of them mutates it.
+ */
+ void copyHeadersFrom(DefaultMessage source) {
+ if (source.headers == null && source.isPopulateHeadersSupported()) {
+ // force creating headers
+ source.headers = source.createHeaders();
+ }
+ if (source.headers != null) {
+ HeadersMapFactory factory =
camelContext.getCamelContextExtension().getHeadersMapFactory();
+ if (factory != null) {
+ Map<String, Object> raw;
+ if (source.headers instanceof CopyOnWriteHeadersMap cow) {
+ raw = cow.getUnderlying();
+ } else {
+ raw = source.headers;
+ }
+ source.headers = new CopyOnWriteHeadersMap(raw, factory);
+ this.headers = new CopyOnWriteHeadersMap(raw, factory);
+ } else {
+ // no factory available, fall back to deep copy
+ this.headers = new HashMap<>(source.headers);
+ }
+ } else {
+ this.headers = null;
+ }
+ }
}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
index 9d965c16e8d8..528075c8e1b6 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultPooledExchange.java
@@ -90,7 +90,16 @@ public final class DefaultPooledExchange extends
AbstractExchange implements Poo
// by unsetting (setting to 0) we also flag that this exchange is
done and needs to be reset to use again
clock.unset();
- this.properties.clear();
+ // Reuse properties map if it's not too large
+ if (this.properties != null) {
+ if (this.properties.size() > 50) {
+ // Too big, discard and recreate smaller map next time
+ this.properties = null;
+ } else {
+ // Small enough, just clear and reuse
+ this.properties.clear();
+ }
+ }
internalProperties.clear();
if (this.safeCopyProperties != null) {
this.safeCopyProperties.clear();
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
b/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
index cb241291c1d0..ee8966272ead 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/MessageSupport.java
@@ -221,12 +221,10 @@ public abstract class MessageSupport implements Message,
CamelContextAware, Data
// the headers may be the same instance if the end user has made some
mistake
// and set the OUT message with the same header instance of the IN
message etc
if (!sameHeaders(that)) {
- if (hasHeaders()) {
- // okay its safe to clear the headers
- getHeaders().clear();
- }
- if (that.hasHeaders()) {
- getHeaders().putAll(that.getHeaders());
+ if (that instanceof DefaultMessage thatDefault && this instanceof
DefaultMessage thisDefault) {
+ thisDefault.copyHeadersFrom(thatDefault);
+ } else {
+ setHeaders(that.getHeaders());
}
}