This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch fluent in repository https://gitbox.apache.org/repos/asf/camel.git
commit 61881f2eb207d575628c4f51d5ef8c0523b39ae4 Author: Claus Ibsen <[email protected]> AuthorDate: Tue Jan 2 14:54:26 2024 +0100 CAMEL-20289: camel-core - FluentProducerTemplate - Add withVariable and withProperty --- .../org/apache/camel/FluentProducerTemplate.java | 42 ++++++++ .../impl/engine/DefaultFluentProducerTemplate.java | 119 ++++++++++++++++++--- .../org/apache/camel/support/ExchangeHelper.java | 27 +++++ 3 files changed, 172 insertions(+), 16 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java index e26f177f9a2..adf0a3b3f38 100644 --- a/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java +++ b/core/camel-api/src/main/java/org/apache/camel/FluentProducerTemplate.java @@ -191,6 +191,48 @@ public interface FluentProducerTemplate extends Service { */ FluentProducerTemplate withHeader(String key, Object value); + /** + * Set the exchange properties + * + * <b>Important:</b> You can either only use either withExchange, or withProcessor or a combination of + * withBody/withHeaders to construct the message to be sent. + * + * @param properties the exchange properties + */ + FluentProducerTemplate withExchangeProperties(Map<String, Object> properties); + + /** + * Set the exchange property + * + * <b>Important:</b> You can either only use either withExchange, or withProcessor or a combination of + * withBody/withHeaders to construct the message to be sent. + * + * @param key the key of the exchange property + * @param value the value of the exchange property + */ + FluentProducerTemplate withExchangeProperty(String key, Object value); + + /** + * Set the variables + * + * <b>Important:</b> You can either only use either withExchange, or withProcessor or a combination of + * withBody/withHeaders to construct the message to be sent. + * + * @param variables the variables + */ + FluentProducerTemplate withVariables(Map<String, Object> variables); + + /** + * Set the exchange property + * + * <b>Important:</b> You can either only use either withExchange, or withProcessor or a combination of + * withBody/withHeaders to construct the message to be sent. + * + * @param key the key of the variable + * @param value the value of the variable + */ + FluentProducerTemplate withVariable(String key, Object value); + /** * Set the message body * diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java index e82381e50b5..851bf2ce845 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/DefaultFluentProducerTemplate.java @@ -51,8 +51,10 @@ import org.apache.camel.util.ObjectHelper; */ public class DefaultFluentProducerTemplate extends ServiceSupport implements FluentProducerTemplate { - // transient state of endpoint, headers and body which needs to be thread local scoped to be thread-safe + // transient state of endpoint, headers, exchange properties, variables, and body which needs to be thread local scoped to be thread-safe private Map<String, Object> headers; + private Map<String, Object> exchangeProperties; + private Map<String, Object> variables; private Object body; private Supplier<Exchange> exchangeSupplier; private Supplier<Processor> processorSupplier; @@ -207,6 +209,74 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu return clone; } + @Override + public FluentProducerTemplate withExchangeProperties(Map<String, Object> properties) { + DefaultFluentProducerTemplate clone = checkCloned(); + + if (clone.processorSupplier != null) { + throw new IllegalArgumentException("Cannot use both withExchangeProperties and withProcessor with FluentProducerTemplate"); + } + + Map<String, Object> map = clone.exchangeProperties; + if (map == null) { + map = new LinkedHashMap<>(); + clone.exchangeProperties = map; + } + map.putAll(properties); + return clone; + } + + @Override + public FluentProducerTemplate withExchangeProperty(String key, Object value) { + DefaultFluentProducerTemplate clone = checkCloned(); + + if (clone.processorSupplier != null) { + throw new IllegalArgumentException("Cannot use both withExchangeProperty and withProcessor with FluentProducerTemplate"); + } + + Map<String, Object> map = clone.exchangeProperties; + if (map == null) { + map = new LinkedHashMap<>(); + clone.exchangeProperties = map; + } + map.put(key, value); + return clone; + } + + @Override + public FluentProducerTemplate withVariables(Map<String, Object> variables) { + DefaultFluentProducerTemplate clone = checkCloned(); + + if (clone.processorSupplier != null) { + throw new IllegalArgumentException("Cannot use both withVariables and withProcessor with FluentProducerTemplate"); + } + + Map<String, Object> map = clone.variables; + if (map == null) { + map = new LinkedHashMap<>(); + clone.variables = map; + } + map.putAll(variables); + return clone; + } + + @Override + public FluentProducerTemplate withVariable(String key, Object value) { + DefaultFluentProducerTemplate clone = checkCloned(); + + if (clone.processorSupplier != null) { + throw new IllegalArgumentException("Cannot use both withVariable and withProcessor with FluentProducerTemplate"); + } + + Map<String, Object> map = clone.variables; + if (map == null) { + map = new LinkedHashMap<>(); + clone.variables = map; + } + map.put(key, value); + return clone; + } + @Override public FluentProducerTemplate withBody(Object body) { DefaultFluentProducerTemplate clone = checkCloned(); @@ -387,21 +457,24 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu // Determine the target endpoint final Endpoint target = clone.target(); - Future<T> result; - if (ObjectHelper.isNotEmpty(clone.headers)) { - // Make a copy of the headers and body so that async processing won't - // be invalidated by subsequent reuse of the template - final Map<String, Object> headersCopy = new HashMap<>(clone.headers); - final Object bodyCopy = clone.body; - - result = clone.template().asyncRequestBodyAndHeaders(target, bodyCopy, headersCopy, type); - } else { - // Make a copy of the and body so that async processing won't be - // invalidated by subsequent reuse of the template - final Object bodyCopy = clone.body; - - result = clone.template().asyncRequestBody(target, bodyCopy, type); - } + Future<T> result = + clone.template().asyncSend(target, exchange -> { + // Make a copy of the headers and body so that async processing won't + // be invalidated by subsequent reuse of the template + Object bodyCopy = clone.body; + + exchange.setPattern(ExchangePattern.InOut); + exchange.getMessage().setBody(bodyCopy); + if (clone.headers != null) { + exchange.getMessage().setHeaders(new HashMap<>(clone.headers)); + } + if (clone.exchangeProperties != null) { + exchange.getProperties().putAll(clone.exchangeProperties); + } + if (clone.variables != null) { + clone.variables.forEach((k, v) -> ExchangeHelper.setVariable(exchange, k, v)); + } + }).thenApply(answer -> answer.getContext().getTypeConverter().convertTo(type, answer)); // reset cloned flag so when we use it again it has to set values again cloned = false; @@ -537,22 +610,36 @@ public class DefaultFluentProducerTemplate extends ServiceSupport implements Flu if (headers != null) { exchange.getIn().getHeaders().putAll(headers); } + if (exchangeProperties != null) { + exchange.getProperties().putAll(exchangeProperties); + } if (body != null) { exchange.getIn().setBody(body); } + if (variables != null) { + variables.forEach((k, v) -> ExchangeHelper.setVariable(exchange, k, v)); + } }; } private Processor defaultAsyncProcessor() { final Map<String, Object> headersCopy = ObjectHelper.isNotEmpty(this.headers) ? new HashMap<>(this.headers) : null; + final Map<String, Object> propertiesCopy = ObjectHelper.isNotEmpty(this.exchangeProperties) ? new HashMap<>(this.exchangeProperties) : null; + final Map<String, Object> variablesCopy = ObjectHelper.isNotEmpty(this.variables) ? new HashMap<>(this.variables) : null; final Object bodyCopy = this.body; return exchange -> { if (headersCopy != null) { exchange.getIn().getHeaders().putAll(headersCopy); } + if (propertiesCopy != null) { + exchange.getProperties().putAll(propertiesCopy); + } if (bodyCopy != null) { exchange.getIn().setBody(bodyCopy); } + if (variablesCopy != null) { + variablesCopy.forEach((k, v) -> ExchangeHelper.setVariable(exchange, k, v)); + } }; } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java index 9f45cd44431..004d0dacb99 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java @@ -50,6 +50,8 @@ import org.apache.camel.TypeConversionException; import org.apache.camel.WrappedFile; import org.apache.camel.spi.NormalizedEndpointUri; import org.apache.camel.spi.UnitOfWork; +import org.apache.camel.spi.VariableRepository; +import org.apache.camel.spi.VariableRepositoryFactory; import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.Scanner; @@ -1070,4 +1072,29 @@ public final class ExchangeHelper { exchange.getOut().setBody(body); } } + + /** + * Sets the variable + * + * @param exchange the exchange + * @param name the variable name. Can be prefixed with repo-id:name to lookup the variable from a specific + * repository. If no repo-id is provided, then the variable is set on the exchange + * @param value the value of the variable + */ + public static void setVariable(Exchange exchange, String name, Object value) { + String id = StringHelper.before(name, ":"); + if (id != null) { + VariableRepositoryFactory factory = exchange.getContext().getCamelContextExtension().getContextPlugin(VariableRepositoryFactory.class); + VariableRepository repo = factory.getVariableRepository(id); + if (repo != null) { + name = StringHelper.after(name, ":"); + repo.setVariable(name, value); + } else { + exchange.setException( + new IllegalArgumentException("VariableRepository with id: " + id + " does not exists")); + } + } else { + exchange.setVariable(name, value); + } + } }
