This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch var-headers in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0cba30cf1abdf61a06f8f85e0c730fd09a61ca5b Author: Claus Ibsen <[email protected]> AuthorDate: Mon Jan 29 10:12:58 2024 +0100 CAMEL-19749: variables - Should also copy message headers into variable when using EIP variables --- .../camel/processor/SendDynamicProcessor.java | 50 +++++++++++----------- .../org/apache/camel/processor/SendProcessor.java | 18 ++++---- .../org/apache/camel/support/ExchangeHelper.java | 25 +++++++++++ .../camel/support/processor/MarshalProcessor.java | 1 - .../support/processor/UnmarshalProcessor.java | 1 - 5 files changed, 60 insertions(+), 35 deletions(-) diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java index 13620cc6ce6..19ab67b9421 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendDynamicProcessor.java @@ -44,6 +44,8 @@ import org.apache.camel.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; + /** * Processor for forwarding exchanges to a dynamic endpoint destination. * @@ -101,8 +103,6 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa @Override public boolean process(Exchange exchange, final AsyncCallback callback) { - // TODO: variables - if (!isStarted()) { exchange.setException(new IllegalStateException("SendProcessor has not been started: " + this)); callback.done(true); @@ -179,9 +179,11 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa // if we should store the received message body in a variable, // then we need to preserve the original message body Object body = null; + Map<String, Object> headers = null; if (variableReceive != null) { try { body = exchange.getMessage().getBody(); + headers = exchange.getMessage().getHeaders(); } catch (Exception throwable) { exchange.setException(throwable); callback.done(true); @@ -189,6 +191,7 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa } } final Object originalBody = body; + final Map<String, Object> originalHeaders = headers; // send the exchange to the destination using the producer cache final Processor preProcessor = preAwareProcessor; @@ -204,7 +207,6 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa } // replace message body with variable if (variableSend != null) { - // it may be a global variable Object value = ExchangeHelper.getVariable(exchange, variableSend); exchange.getMessage().setBody(value); } @@ -217,30 +219,28 @@ public class SendDynamicProcessor extends AsyncProcessorSupport implements IdAwa } LOG.debug(">>>> {} {}", endpoint, e); - return p.process(target, new AsyncCallback() { - public void done(boolean doneSync) { - // restore previous MEP - target.setPattern(existingPattern); - try { - if (postProcessor != null) { - postProcessor.process(target); - } - } catch (Exception e) { - target.setException(e); - } - // stop endpoint if prototype as it was only used once - if (stopEndpoint) { - ServiceHelper.stopAndShutdownService(endpoint); - } - // result should be stored in variable instead of message body - if (variableReceive != null) { - Object value = exchange.getMessage().getBody(); - ExchangeHelper.setVariable(exchange, variableReceive, value); - exchange.getMessage().setBody(originalBody); + return p.process(target, doneSync -> { + // restore previous MEP + target.setPattern(existingPattern); + try { + if (postProcessor != null) { + postProcessor.process(target); } - // signal we are done - c.done(doneSync); + } catch (Exception e1) { + target.setException(e1); + } + // stop endpoint if prototype as it was only used once + if (stopEndpoint) { + ServiceHelper.stopAndShutdownService(endpoint); + } + // result should be stored in variable instead of message body + if (variableReceive != null) { + ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive); + exchange.getMessage().setBody(originalBody); + exchange.getMessage().setHeaders(originalHeaders); } + // signal we are done + c.done(doneSync); }); }); } diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java index 960450d8d44..907482a70e0 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/SendProcessor.java @@ -16,6 +16,7 @@ */ package org.apache.camel.processor; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.AsyncCallback; @@ -131,9 +132,11 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E // if we should store the received message body in a variable, // then we need to preserve the original message body Object body = null; + Map<String, Object> headers = null; if (variableReceive != null) { try { body = exchange.getMessage().getBody(); + headers = exchange.getMessage().getHeaders(); } catch (Exception throwable) { exchange.setException(throwable); callback.done(true); @@ -141,6 +144,7 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E } } final Object originalBody = body; + final Map<String, Object> originalHeaders = headers; if (extendedStatistics) { counter.incrementAndGet(); @@ -172,11 +176,11 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E if (newCallback) { ac = doneSync -> { try { - // result should be stored in variable instead of message body + // result should be stored in variable instead of message body/headers if (variableReceive != null) { - Object value = exchange.getMessage().getBody(); - ExchangeHelper.setVariable(exchange, variableReceive, value); + ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive); exchange.getMessage().setBody(originalBody); + exchange.getMessage().setHeaders(originalHeaders); } // restore previous MEP target.setPattern(existingPattern); @@ -193,7 +197,6 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E try { // replace message body with variable if (variableSend != null) { - // it may be a global variable Object value = ExchangeHelper.getVariable(exchange, variableSend); exchange.getMessage().setBody(value); } @@ -220,7 +223,6 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E // replace message body with variable if (variableSend != null) { - // it may be a global variable Object value = ExchangeHelper.getVariable(exchange, variableSend); exchange.getMessage().setBody(value); } @@ -232,11 +234,11 @@ public class SendProcessor extends AsyncProcessorSupport implements Traceable, E (producer, ex, cb) -> producer.process(ex, doneSync -> { // restore previous MEP exchange.setPattern(existingPattern); - // result should be stored in variable instead of message body + // result should be stored in variable instead of message body/headers if (variableReceive != null) { - Object value = exchange.getMessage().getBody(); - ExchangeHelper.setVariable(exchange, variableReceive, value); + ExchangeHelper.setVariableFromMessageBodyAndHeaders(exchange, variableReceive); exchange.getMessage().setBody(originalBody); + exchange.getMessage().setHeaders(originalHeaders); } // signal we are done cb.done(doneSync); diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java index d55f4e1ce57..a3eb8c98162 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java @@ -1102,6 +1102,31 @@ public final class ExchangeHelper { } } + public static void setVariableFromMessageBodyAndHeaders(Exchange exchange, String name) { + VariableRepository repo = null; + String id = StringHelper.before(name, ":"); + if (id != null) { + VariableRepositoryFactory factory + = exchange.getContext().getCamelContextExtension().getContextPlugin(VariableRepositoryFactory.class); + repo = factory.getVariableRepository(id); + if (repo == null) { + exchange.setException( + new IllegalArgumentException("VariableRepository with id: " + id + " does not exist")); + } + name = StringHelper.after(name, ":"); + } + Object body = exchange.getMessage().getBody(); + // do a defensive copy of the headers + Map<String, Object> map = exchange.getContext().getCamelContextExtension().getHeadersMapFactory().newMap(exchange.getMessage().getHeaders()); + if (repo != null) { + repo.setVariable(name, body); + repo.setVariable(name + ".headers", map); + } else { + exchange.setVariable(name, body); + exchange.setVariable(name + ".headers", map); + } + } + /** * Gets the variable * diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java index f1a299106ef..107180f753c 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/MarshalProcessor.java @@ -59,7 +59,6 @@ public class MarshalProcessor extends AsyncProcessorSupport implements Traceable final Object originalBody = in.getBody(); Object body = originalBody; if (variableSend != null) { - // it may be a global variable body = ExchangeHelper.getVariable(exchange, variableSend); } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java b/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java index dbfb9e7206c..8b93cabd473 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/processor/UnmarshalProcessor.java @@ -68,7 +68,6 @@ public class UnmarshalProcessor extends AsyncProcessorSupport implements Traceab final Object originalBody = in.getBody(); Object body = originalBody; if (variableSend != null) { - // it may be a global variable body = ExchangeHelper.getVariable(exchange, variableSend); } final Message out;
