This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4313bcad7c18f24994ed2daf1c1c12523dc2f84c Author: Claus Ibsen <[email protected]> AuthorDate: Tue Jul 30 22:21:16 2019 +0200 CAMEL-6901: Intercept send to endpoint now supports after url to send result to this url like an AOP before|after. --- .../apache/camel/spi/InterceptSendToEndpoint.java | 15 ++- .../engine/DefaultInterceptSendToEndpoint.java | 36 +++++-- .../engine/InterceptSendToEndpointProcessor.java | 20 ++-- .../InterceptSendToMockEndpointStrategy.java | 2 +- .../model/InterceptSendToEndpointDefinition.java | 25 +++++ .../reifier/InterceptSendToEndpointReifier.java | 28 ++++- .../InterceptSendToEndpointAfterTest.java | 114 +++++++++++++++++++++ 7 files changed, 221 insertions(+), 19 deletions(-) diff --git a/core/camel-api/src/main/java/org/apache/camel/spi/InterceptSendToEndpoint.java b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptSendToEndpoint.java index 55e09aa..d812a2b 100644 --- a/core/camel-api/src/main/java/org/apache/camel/spi/InterceptSendToEndpoint.java +++ b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptSendToEndpoint.java @@ -30,11 +30,24 @@ public interface InterceptSendToEndpoint extends Endpoint { Endpoint getOriginalEndpoint(); /** - * The processor for routing in a detour + * The processor for routing in a detour before sending to the original endpoint. + * + * @deprecated use {@link #getBefore()} */ + @Deprecated Processor getDetour(); /** + * The processor for routing in a detour before sending to the original endpoint. + */ + Processor getBefore(); + + /** + * The processor for routing after sending to the original endpoint. + */ + Processor getAfter(); + + /** * Whether to skip sending after the detour to the original endpoint. */ boolean isSkip(); diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInterceptSendToEndpoint.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInterceptSendToEndpoint.java index 434bc6a..8542a2a 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInterceptSendToEndpoint.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInterceptSendToEndpoint.java @@ -37,7 +37,8 @@ import org.apache.camel.support.service.ServiceHelper; public class DefaultInterceptSendToEndpoint implements InterceptSendToEndpoint, ShutdownableService { private final Endpoint delegate; - private Processor detour; + private Processor before; + private Processor after; private boolean skip; /** @@ -51,13 +52,36 @@ public class DefaultInterceptSendToEndpoint implements InterceptSendToEndpoint, this.skip = skip; } + @Deprecated public void setDetour(Processor detour) { - this.detour = detour; + setBefore(detour); + } + + public void setBefore(Processor before) { + this.before = before; + } + + public void setAfter(Processor after) { + this.after = after; + } + + public void setSkip(boolean skip) { + this.skip = skip; } @Override public Processor getDetour() { - return detour; + return getBefore(); + } + + @Override + public Processor getBefore() { + return before; + } + + @Override + public Processor getAfter() { + return after; } @Override @@ -125,16 +149,16 @@ public class DefaultInterceptSendToEndpoint implements InterceptSendToEndpoint, } public void start() { - ServiceHelper.startService(detour, delegate); + ServiceHelper.startService(before, delegate); } public void stop() { - ServiceHelper.stopService(delegate, detour); + ServiceHelper.stopService(delegate, before); } @Override public void shutdown() { - ServiceHelper.stopAndShutdownServices(delegate, detour); + ServiceHelper.stopAndShutdownServices(delegate, before); } @Override diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToEndpointProcessor.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToEndpointProcessor.java index f14874c..fc91f76 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToEndpointProcessor.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToEndpointProcessor.java @@ -58,21 +58,29 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { public boolean process(Exchange exchange, AsyncCallback callback) { // process the detour so we do the detour routing if (log.isDebugEnabled()) { - log.debug("Sending to endpoint: {} is intercepted and detoured to: {} for exchange: {}", getEndpoint(), endpoint.getDetour(), exchange); + log.debug("Sending to endpoint: {} is intercepted and detoured to: {} for exchange: {}", getEndpoint(), endpoint.getBefore(), exchange); } // add header with the real endpoint uri exchange.getIn().setHeader(Exchange.INTERCEPTED_ENDPOINT, delegate.getEndpointUri()); - if (endpoint.getDetour() != null) { + if (endpoint.getBefore() != null || endpoint.getAfter() != null) { // detour the exchange using synchronous processing - AsyncProcessor detour = AsyncProcessorConverterHelper.convert(endpoint.getDetour()); + AsyncProcessor before = null; + if (endpoint.getBefore() != null) { + before = AsyncProcessorConverterHelper.convert(endpoint.getBefore()); + } AsyncProcessor ascb = new AsyncProcessorSupport() { @Override public boolean process(Exchange exchange, AsyncCallback callback) { return callback(exchange, callback, true); } }; - return new Pipeline(exchange.getContext(), Arrays.asList(detour, ascb)).process(exchange, callback); + AsyncProcessor after = null; + if (endpoint.getAfter() != null) { + after = AsyncProcessorConverterHelper.convert(endpoint.getAfter()); + } + + return new Pipeline(exchange.getContext(), Arrays.asList(before, ascb, after)).process(exchange, callback); } return callback(exchange, callback, true); @@ -121,13 +129,13 @@ public class InterceptSendToEndpointProcessor extends DefaultAsyncProducer { } public void start() { - ServiceHelper.startService(endpoint.getDetour()); + ServiceHelper.startService(endpoint.getBefore(), endpoint.getAfter()); // here we also need to start the producer ServiceHelper.startService(producer); } public void stop() { - // do not stop detour as it should only be stopped when the interceptor stops + // do not stop before/after as it should only be stopped when the interceptor stops // we should stop the producer here ServiceHelper.stopService(producer); } diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToMockEndpointStrategy.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToMockEndpointStrategy.java index c88dec7..d6ebcc3 100644 --- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToMockEndpointStrategy.java +++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToMockEndpointStrategy.java @@ -100,7 +100,7 @@ public class InterceptSendToMockEndpointStrategy implements EndpointStrategy { // allow custom logic producer = onInterceptEndpoint(uri, endpoint, mock, producer); - proxy.setDetour(producer); + proxy.setBefore(producer); return proxy; } else { diff --git a/core/camel-core/src/main/java/org/apache/camel/model/InterceptSendToEndpointDefinition.java b/core/camel-core/src/main/java/org/apache/camel/model/InterceptSendToEndpointDefinition.java index ee3f54a..45ea74d 100644 --- a/core/camel-core/src/main/java/org/apache/camel/model/InterceptSendToEndpointDefinition.java +++ b/core/camel-core/src/main/java/org/apache/camel/model/InterceptSendToEndpointDefinition.java @@ -20,6 +20,7 @@ import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.Predicate; import org.apache.camel.spi.AsPredicate; @@ -37,6 +38,8 @@ public class InterceptSendToEndpointDefinition extends OutputDefinition<Intercep private String uri; @XmlAttribute private Boolean skipSendToOriginalEndpoint; + @XmlAttribute + private String afterUri; public InterceptSendToEndpointDefinition() { } @@ -93,6 +96,16 @@ public class InterceptSendToEndpointDefinition extends OutputDefinition<Intercep } /** + * After sending to the endpoint then send the message to this url which allows to process its result. + * + * @return the builder + */ + public InterceptSendToEndpointDefinition afterUrl(String url) { + setAfterUri(url); + return this; + } + + /** * This method is <b>only</b> for handling some post configuration * that is needed since this is an interceptor, and we have to do * a bit of magic logic to fixup to handle predicates @@ -155,4 +168,16 @@ public class InterceptSendToEndpointDefinition extends OutputDefinition<Intercep public void setUri(String uri) { this.uri = uri; } + + public String getAfterUri() { + return afterUri; + } + + /** + * After sending to the endpoint then send the message to this uri which allows to process its result. + */ + public void setAfterUri(String afterProcessor) { + this.afterUri = afterProcessor; + } + } diff --git a/core/camel-core/src/main/java/org/apache/camel/reifier/InterceptSendToEndpointReifier.java b/core/camel-core/src/main/java/org/apache/camel/reifier/InterceptSendToEndpointReifier.java index 6b241b5..23954db 100644 --- a/core/camel-core/src/main/java/org/apache/camel/reifier/InterceptSendToEndpointReifier.java +++ b/core/camel-core/src/main/java/org/apache/camel/reifier/InterceptSendToEndpointReifier.java @@ -26,7 +26,10 @@ import org.apache.camel.impl.engine.DefaultInterceptSendToEndpoint; import org.apache.camel.model.InterceptSendToEndpointDefinition; import org.apache.camel.model.ProcessorDefinition; import org.apache.camel.model.RouteDefinition; +import org.apache.camel.model.ToDefinition; import org.apache.camel.processor.InterceptEndpointProcessor; +import org.apache.camel.processor.SendDynamicProcessor; +import org.apache.camel.processor.SendProcessor; import org.apache.camel.spi.EndpointStrategy; import org.apache.camel.spi.RouteContext; import org.apache.camel.support.EndpointHelper; @@ -40,8 +43,23 @@ public class InterceptSendToEndpointReifier extends ProcessorReifier<InterceptSe @Override public Processor createProcessor(final RouteContext routeContext) throws Exception { - // create the detour - final Processor detour = this.createChildProcessor(routeContext, true); + // create the before + final Processor before = this.createChildProcessor(routeContext, true); + // create the after + Processor afterProcessor = null; + if (definition.getAfterUri() != null) { + ToDefinition to = new ToDefinition(definition.getAfterUri()); + // at first use custom factory + if (routeContext.getCamelContext().adapt(ExtendedCamelContext.class).getProcessorFactory() != null) { + afterProcessor = routeContext.getCamelContext().adapt(ExtendedCamelContext.class) + .getProcessorFactory().createProcessor(routeContext, to); + } + // fallback to default implementation if factory did not create the processor + if (afterProcessor == null) { + afterProcessor = reifier(to).createProcessor(routeContext); + } + } + final Processor after = afterProcessor; final String matchURI = definition.getUri(); // register endpoint callback so we can proxy the endpoint @@ -55,7 +73,8 @@ public class InterceptSendToEndpointReifier extends ProcessorReifier<InterceptSe // should be false by default boolean skip = definition.getSkipSendToOriginalEndpoint() != null && definition.getSkipSendToOriginalEndpoint(); DefaultInterceptSendToEndpoint proxy = new DefaultInterceptSendToEndpoint(endpoint, skip); - proxy.setDetour(detour); + proxy.setBefore(before); + proxy.setAfter(after); return proxy; } else { // no proxy so return regular endpoint @@ -64,7 +83,6 @@ public class InterceptSendToEndpointReifier extends ProcessorReifier<InterceptSe } }); - // remove the original intercepted route from the outputs as we do not intercept as the regular interceptor // instead we use the proxy endpoints producer do the triggering. That is we trigger when someone sends // an exchange to the endpoint, see InterceptSendToEndpoint for details. @@ -72,7 +90,7 @@ public class InterceptSendToEndpointReifier extends ProcessorReifier<InterceptSe List<ProcessorDefinition<?>> outputs = route.getOutputs(); outputs.remove(this); - return new InterceptEndpointProcessor(matchURI, detour); + return new InterceptEndpointProcessor(matchURI, before); } /** diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointAfterTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointAfterTest.java new file mode 100644 index 0000000..7b57240 --- /dev/null +++ b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointAfterTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.intercept; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.junit.Test; + +/** + * Unit test for intercepting sending to endpoint with after url + */ +public class InterceptSendToEndpointAfterTest extends ContextTestSupport { + + @Override + public boolean isUseRouteBuilder() { + return false; + } + + @Test + public void testInterceptEndpoint() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + context.setTracing(true); + + interceptSendToEndpoint("mock:foo") + .to("mock:detour").transform(constant("Bye World")).afterUrl("direct:after"); + + from("direct:first") + .to("mock:bar") + .to("mock:foo") + .to("mock:result"); + + from("direct:after") + .to("mock:after"); + + } + }); + context.start(); + + getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:detour").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:foo").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:after").expectedBodiesReceived("Bye World"); + + template.sendBody("direct:first", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testInterceptEndpointDirectly() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + interceptSendToEndpoint("direct:start") + .to("mock:detour").transform(constant("Bye World")); + + from("direct:start") + .to("mock:foo") + .to("mock:result"); + } + }); + context.start(); + + getMockEndpoint("mock:detour").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:foo").expectedBodiesReceived("Bye World"); + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testInterceptEndpointWithStop() throws Exception { + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + interceptSendToEndpoint("direct:start") + .to("mock:detour").afterUrl("mock:after"); + + from("direct:start") + .to("mock:foo") + .transform().constant("Bye World"); + } + }); + context.start(); + + getMockEndpoint("mock:detour").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:after").expectedBodiesReceived("Bye World"); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + } + +}
