This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 973da413465 CAMEL-21580: camel-core: Fix rare potential
ConcurrentModificationException when using producer template result processor
during header copying. Use specialized processor as the message body is the
only desired output so we avoid the header copying entirely. (#16690)
973da413465 is described below
commit 973da4134654c9cceae3e59d709e46419cb1fbdd
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Jan 3 08:33:26 2025 +0100
CAMEL-21580: camel-core: Fix rare potential ConcurrentModificationException
when using producer template result processor during header copying. Use
specialized processor as the message body is the only desired output so we
avoid the header copying entirely. (#16690)
---
.../impl/engine/DefaultFluentProducerTemplate.java | 3 +-
.../camel/impl/engine/DefaultProducerTemplate.java | 19 ++++----
.../engine/ProducerTemplateResultProcessor.java | 51 ++++++++++++++++++++++
3 files changed, 63 insertions(+), 10 deletions(-)
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
index b1063041a6e..b7e85d5da83 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java
@@ -34,7 +34,6 @@ import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.processor.ConvertBodyProcessor;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
@@ -77,7 +76,7 @@ public class DefaultFluentProducerTemplate extends
ServiceSupport implements Flu
this.resultProcessors = new ClassValue<>() {
@Override
protected Processor computeValue(Class<?> type) {
- return new ConvertBodyProcessor(type);
+ return new ProducerTemplateResultProcessor(type);
}
};
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultProducerTemplate.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultProducerTemplate.java
index 522969f5ce0..91f4f57f84b 100644
---
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultProducerTemplate.java
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultProducerTemplate.java
@@ -38,7 +38,6 @@ import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.cache.DefaultProducerCache;
-import org.apache.camel.support.processor.ConvertBodyProcessor;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
@@ -401,7 +400,8 @@ public class DefaultProducerTemplate extends ServiceSupport
implements ProducerT
@Override
public <T> T requestBody(Endpoint endpoint, Object body, Class<T> type) {
Exchange exchange
- = send(endpoint, ExchangePattern.InOut,
createSetBodyProcessor(body), new ConvertBodyProcessor(type));
+ = send(endpoint, ExchangePattern.InOut,
createSetBodyProcessor(body),
+ new ProducerTemplateResultProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer);
}
@@ -410,7 +410,8 @@ public class DefaultProducerTemplate extends ServiceSupport
implements ProducerT
public <T> T requestBody(String endpointUri, Object body, Class<T> type) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
Exchange exchange
- = send(endpoint, ExchangePattern.InOut,
createSetBodyProcessor(body), new ConvertBodyProcessor(type));
+ = send(endpoint, ExchangePattern.InOut,
createSetBodyProcessor(body),
+ new ProducerTemplateResultProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer);
}
@@ -418,7 +419,7 @@ public class DefaultProducerTemplate extends ServiceSupport
implements ProducerT
@Override
public <T> T requestBodyAndHeader(Endpoint endpoint, Object body, String
header, Object headerValue, Class<T> type) {
Exchange exchange = send(endpoint, ExchangePattern.InOut,
createBodyAndHeaderProcessor(body, header, headerValue),
- new ConvertBodyProcessor(type));
+ new ProducerTemplateResultProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer);
}
@@ -427,7 +428,7 @@ public class DefaultProducerTemplate extends ServiceSupport
implements ProducerT
public <T> T requestBodyAndHeader(String endpointUri, Object body, String
header, Object headerValue, Class<T> type) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
Exchange exchange = send(endpoint, ExchangePattern.InOut,
createBodyAndHeaderProcessor(body, header, headerValue),
- new ConvertBodyProcessor(type));
+ new ProducerTemplateResultProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer);
}
@@ -436,7 +437,8 @@ public class DefaultProducerTemplate extends ServiceSupport
implements ProducerT
public <T> T requestBodyAndHeaders(String endpointUri, Object body,
Map<String, Object> headers, Class<T> type) {
Endpoint endpoint = resolveMandatoryEndpoint(endpointUri);
Exchange exchange
- = send(endpoint, ExchangePattern.InOut,
createBodyAndHeaders(body, headers), new ConvertBodyProcessor(type));
+ = send(endpoint, ExchangePattern.InOut,
createBodyAndHeaders(body, headers),
+ new ProducerTemplateResultProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer);
}
@@ -444,7 +446,8 @@ public class DefaultProducerTemplate extends ServiceSupport
implements ProducerT
@Override
public <T> T requestBodyAndHeaders(Endpoint endpoint, Object body,
Map<String, Object> headers, Class<T> type) {
Exchange exchange
- = send(endpoint, ExchangePattern.InOut,
createBodyAndHeaders(body, headers), new ConvertBodyProcessor(type));
+ = send(endpoint, ExchangePattern.InOut,
createBodyAndHeaders(body, headers),
+ new ProducerTemplateResultProcessor(type));
Object answer = extractResultBody(exchange);
return camelContext.getTypeConverter().convertTo(type, answer);
}
@@ -673,7 +676,7 @@ public class DefaultProducerTemplate extends ServiceSupport
implements ProducerT
}
protected <T> CompletableFuture<T> asyncRequestBody(final Endpoint
endpoint, Processor processor, final Class<T> type) {
- return asyncRequestBody(endpoint, processor, new
ConvertBodyProcessor(type))
+ return asyncRequestBody(endpoint, processor, new
ProducerTemplateResultProcessor(type))
.thenApply(answer ->
camelContext.getTypeConverter().convertTo(type, answer));
}
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/ProducerTemplateResultProcessor.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/ProducerTemplateResultProcessor.java
new file mode 100644
index 00000000000..512f89bed6b
--- /dev/null
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/ProducerTemplateResultProcessor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+/**
+ * To be used by producer template to prepare the message body for a given
expected result type.
+ */
+class ProducerTemplateResultProcessor implements Processor {
+
+ private final Class<?> type;
+
+ public ProducerTemplateResultProcessor(Class<?> type) {
+ this.type = type;
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ if (exchange.getMessage().getBody() == null) {
+ // only convert if there is a body
+ return;
+ }
+
+ if (exchange.getException() != null) {
+ // do not convert if an exception has been thrown as if we attempt
to convert and it also fails with a new
+ // exception then it will override the existing exception
+ return;
+ }
+
+ Object newBody = exchange.getMessage().getBody(type);
+ if (newBody != null) {
+ exchange.getMessage().setBody(newBody);
+ }
+ }
+}