This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch var in repository https://gitbox.apache.org/repos/asf/camel.git
commit ab41934d2e43b6fde323611a268449c7702f03df Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Mon Jan 22 20:56:22 2024 +0100 CAMEL-19749: EIPs should make it easy to use together with variables. --- .../processor/SpringWireTapVariableTest.java | 29 ++++++++ .../camel/spring/processor/WireTapVariableTest.xml | 49 ++++++++++++++ .../apache/camel/model/ProcessorDefinition.java | 34 ++++++++++ .../org/apache/camel/model/WireTapDefinition.java | 27 +++++++- .../org/apache/camel/reifier/WireTapReifier.java | 6 ++ .../camel/processor/WireTapVariableTest.java | 56 ++++++++++++++++ .../camel/dsl/yaml/WireTapVariableTest.groovy | 77 ++++++++++++++++++++++ 7 files changed, 276 insertions(+), 2 deletions(-) diff --git a/components/camel-spring-xml/src/test/java/org/apache/camel/spring/processor/SpringWireTapVariableTest.java b/components/camel-spring-xml/src/test/java/org/apache/camel/spring/processor/SpringWireTapVariableTest.java new file mode 100644 index 00000000000..45f9b00ad36 --- /dev/null +++ b/components/camel-spring-xml/src/test/java/org/apache/camel/spring/processor/SpringWireTapVariableTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.processor; + +import org.apache.camel.CamelContext; +import org.apache.camel.processor.WireTapVariableTest; + +import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; + +public class SpringWireTapVariableTest extends WireTapVariableTest { + @Override + protected CamelContext createCamelContext() throws Exception { + return createSpringCamelContext(this, "org/apache/camel/spring/processor/WireTapVariableTest.xml"); + } +} diff --git a/components/camel-spring-xml/src/test/resources/org/apache/camel/spring/processor/WireTapVariableTest.xml b/components/camel-spring-xml/src/test/resources/org/apache/camel/spring/processor/WireTapVariableTest.xml new file mode 100644 index 00000000000..9dac908cf36 --- /dev/null +++ b/components/camel-spring-xml/src/test/resources/org/apache/camel/spring/processor/WireTapVariableTest.xml @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd + "> + + <!-- START SNIPPET: example --> + <camelContext xmlns="http://camel.apache.org/schema/spring"> + <jmxAgent id="jmx" disabled="true"/> + <route> + <from uri="direct:send"/> + <setVariable name="hello"> + <simple>Camel</simple> + </setVariable> + <to uri="mock:before"/> + <wireTap uri="direct:foo" variableSend="hello"/> + <to uri="mock:result"/> + </route> + <route> + <from uri="direct:foo"/> + <transform> + <simple>Bye ${body}</simple> + </transform> + <to uri="mock:tap"/> + </route> + </camelContext> + <!-- END SNIPPET: example --> + +</beans> diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 63a1cd5e5b8..f6a584ce6c9 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -2074,6 +2074,23 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> return answer; } + /** + * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a> Sends messages to all its child outputs; so that + * each processor and destination gets a copy of the original message to avoid the processors interfering with each + * other using {@link ExchangePattern#InOnly}. + * + * @param endpoint the endpoint to wiretap to + * @param variableSend to use a variable as the source for the message body to send. + * @return the builder + */ + public WireTapDefinition<Type> wireTap(@AsEndpointUri EndpointProducerBuilder endpoint, String variableSend) { + WireTapDefinition answer = new WireTapDefinition(); + answer.setEndpointProducerBuilder(endpoint); + answer.setVariableSend(variableSend); + addOutput(answer); + return answer; + } + /** * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a> Sends messages to all its child outputs; so that * each processor and destination gets a copy of the original message to avoid the processors interfering with each @@ -2089,6 +2106,23 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> return answer; } + /** + * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a> Sends messages to all its child outputs; so that + * each processor and destination gets a copy of the original message to avoid the processors interfering with each + * other using {@link ExchangePattern#InOnly}. + * + * @param uri the dynamic endpoint to wiretap to (resolved using simple language by default) + * @param variableSend to use a variable as the source for the message body to send. + * @return the builder + */ + public WireTapDefinition<Type> wireTap(@AsEndpointUri String uri, String variableSend) { + WireTapDefinition answer = new WireTapDefinition(); + answer.setUri(uri); + answer.setVariableSend(variableSend); + addOutput(answer); + return answer; + } + /** * Pushes the given block on the stack as current block * diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/WireTapDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/WireTapDefinition.java index 5ea132c8068..adbe410f64b 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/WireTapDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/WireTapDefinition.java @@ -25,6 +25,7 @@ import jakarta.xml.bind.annotation.XmlRootElement; import jakarta.xml.bind.annotation.XmlTransient; import org.apache.camel.ExchangePattern; +import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.spi.Metadata; @@ -38,8 +39,6 @@ import org.apache.camel.spi.Metadata; public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends ToDynamicDefinition implements ExecutorServiceAwareDefinition<WireTapDefinition<Type>> { - // TODO: variables due to extending ToDynamic - @XmlTransient private ExecutorService executorServiceBean; @XmlTransient @@ -262,6 +261,30 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends T return this; } + /** + * To use a variable as the source for the message body to send. This makes it handy to use variables for user data + * and to easily control what data to use for sending and receiving. + * + * Important: When using send variable then the message body is taken from this variable instead of the current + * {@link Message}, however the headers from the {@link Message} will still be used as well. In other words, the + * variable is used instead of the message body, but everything else is as usual. + */ + public WireTapDefinition<Type> variableReceive(String variableReceive) { + throw new IllegalArgumentException("WireTap does not support variableReceive"); + } + + /** + * To use a variable to store the received message body (only body, not headers). This is handy for easy access to + * the received message body via variables. + * + * Important: When using receive variable then the received body is stored only in this variable and <b>not</b> on + * the current {@link org.apache.camel.Message}. + */ + public WireTapDefinition<Type> variableSend(String variableSend) { + setVariableSend(variableSend); + return this; + } + // Properties // ------------------------------------------------------------------------- diff --git a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java index d8562141286..08942ac00d5 100644 --- a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java +++ b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/WireTapReifier.java @@ -46,6 +46,10 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> { @Override public Processor createProcessor() throws Exception { + if (definition.getVariableReceive() != null) { + throw new IllegalArgumentException("WireTap does not support variableReceive"); + } + // must use InOnly for WireTap definition.setPattern(ExchangePattern.InOnly.name()); @@ -80,6 +84,8 @@ public class WireTapReifier extends ToDynamicReifier<WireTapDefinition<?>> { Endpoint endpoint = CamelContextHelper.resolveEndpoint(camelContext, uri, null); LineNumberAware.trySetLineNumberAware(endpoint, definition); sendProcessor = new SendProcessor(endpoint); + sendProcessor.setVariableSend(definition.getVariableSend()); + sendProcessor.setVariableReceive(definition.getVariableReceive()); } // create error handler we need to use for processing the wire tapped diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/WireTapVariableTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapVariableTest.java new file mode 100644 index 00000000000..af0857056ca --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/WireTapVariableTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.junit.jupiter.api.Test; + +public class WireTapVariableTest extends ContextTestSupport { + + @Test + public void testSend() throws Exception { + getMockEndpoint("mock:before").expectedBodiesReceived("World"); + getMockEndpoint("mock:before").expectedVariableReceived("hello", "Camel"); + getMockEndpoint("mock:tap").expectedBodiesReceived("Bye Camel"); + getMockEndpoint("mock:tap").expectedVariableReceived("hello", "Camel"); + getMockEndpoint("mock:result").expectedBodiesReceived("World"); + getMockEndpoint("mock:result").expectedVariableReceived("hello", "Camel"); + + template.sendBody("direct:send", "World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:send") + .setVariable("hello", simple("Camel")) + .to("mock:before") + .wireTap("direct:foo", "hello") + .to("mock:result"); + + from("direct:foo") + .transform().simple("Bye ${body}") + .to("mock:tap"); + } + }; + } +} diff --git a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/WireTapVariableTest.groovy b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/WireTapVariableTest.groovy new file mode 100644 index 00000000000..0b226c234b9 --- /dev/null +++ b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/test/groovy/org/apache/camel/dsl/yaml/WireTapVariableTest.groovy @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dsl.yaml + +import org.apache.camel.component.mock.MockEndpoint +import org.apache.camel.dsl.yaml.support.YamlTestSupport + +class WireTapVariableTest extends YamlTestSupport { + + def "wireTapVariable send"() { + setup: + loadRoutes ''' + - route: + from: + uri: direct:send + steps: + - setVariable: + name: hello + simple: + expression: Camel + - to: + uri: mock:before + - wireTap: + uri: direct:foo + variableSend: hello + - to: + uri: mock:result + - route: + from: + uri: direct:foo + steps: + - transform: + simple: + expression: "Bye ${body}" + - to: + uri: mock:tap + ''' + + withMock('mock:before') { + expectedBodiesReceived 'World' + expectedVariableReceived("hello", "Camel") + } + withMock('mock:result') { + expectedBodiesReceived 'World' + expectedVariableReceived("hello", "Camel") + } + withMock('mock:tap') { + expectedBodiesReceived 'Bye Camel' + expectedVariableReceived("hello", "Camel") + } + + when: + context.start() + + withTemplate { + to('direct:send').withBody('World').send() + } + + then: + MockEndpoint.assertIsSatisfied(context) + } + +}