This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4313bcad7c18f24994ed2daf1c1c12523dc2f84c
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Jul 30 22:21:16 2019 +0200

    CAMEL-6901: Intercept send to endpoint now supports after url to send 
result to this url like an AOP before|after.
---
 .../apache/camel/spi/InterceptSendToEndpoint.java  |  15 ++-
 .../engine/DefaultInterceptSendToEndpoint.java     |  36 +++++--
 .../engine/InterceptSendToEndpointProcessor.java   |  20 ++--
 .../InterceptSendToMockEndpointStrategy.java       |   2 +-
 .../model/InterceptSendToEndpointDefinition.java   |  25 +++++
 .../reifier/InterceptSendToEndpointReifier.java    |  28 ++++-
 .../InterceptSendToEndpointAfterTest.java          | 114 +++++++++++++++++++++
 7 files changed, 221 insertions(+), 19 deletions(-)

diff --git 
a/core/camel-api/src/main/java/org/apache/camel/spi/InterceptSendToEndpoint.java
 
b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptSendToEndpoint.java
index 55e09aa..d812a2b 100644
--- 
a/core/camel-api/src/main/java/org/apache/camel/spi/InterceptSendToEndpoint.java
+++ 
b/core/camel-api/src/main/java/org/apache/camel/spi/InterceptSendToEndpoint.java
@@ -30,11 +30,24 @@ public interface InterceptSendToEndpoint extends Endpoint {
     Endpoint getOriginalEndpoint();
 
     /**
-     * The processor for routing in a detour
+     * The processor for routing in a detour before sending to the original 
endpoint.
+     *
+     * @deprecated use {@link #getBefore()}
      */
+    @Deprecated
     Processor getDetour();
 
     /**
+     * The processor for routing in a detour before sending to the original 
endpoint.
+     */
+    Processor getBefore();
+
+    /**
+     * The processor for routing after sending to the original endpoint.
+     */
+    Processor getAfter();
+
+    /**
      * Whether to skip sending after the detour to the original endpoint.
      */
     boolean isSkip();
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInterceptSendToEndpoint.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInterceptSendToEndpoint.java
index 434bc6a..8542a2a 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInterceptSendToEndpoint.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInterceptSendToEndpoint.java
@@ -37,7 +37,8 @@ import org.apache.camel.support.service.ServiceHelper;
 public class DefaultInterceptSendToEndpoint implements 
InterceptSendToEndpoint, ShutdownableService {
 
     private final Endpoint delegate;
-    private Processor detour;
+    private Processor before;
+    private Processor after;
     private boolean skip;
 
     /**
@@ -51,13 +52,36 @@ public class DefaultInterceptSendToEndpoint implements 
InterceptSendToEndpoint,
         this.skip = skip;
     }
 
+    @Deprecated
     public void setDetour(Processor detour) {
-        this.detour = detour;
+        setBefore(detour);
+    }
+
+    public void setBefore(Processor before) {
+        this.before = before;
+    }
+
+    public void setAfter(Processor after) {
+        this.after = after;
+    }
+
+    public void setSkip(boolean skip) {
+        this.skip = skip;
     }
 
     @Override
     public Processor getDetour() {
-        return detour;
+        return getBefore();
+    }
+
+    @Override
+    public Processor getBefore() {
+        return before;
+    }
+
+    @Override
+    public Processor getAfter() {
+        return after;
     }
 
     @Override
@@ -125,16 +149,16 @@ public class DefaultInterceptSendToEndpoint implements 
InterceptSendToEndpoint,
     }
 
     public void start() {
-        ServiceHelper.startService(detour, delegate);
+        ServiceHelper.startService(before, delegate);
     }
 
     public void stop() {
-        ServiceHelper.stopService(delegate, detour);
+        ServiceHelper.stopService(delegate, before);
     }
 
     @Override
     public void shutdown() {
-        ServiceHelper.stopAndShutdownServices(delegate, detour);
+        ServiceHelper.stopAndShutdownServices(delegate, before);
     }
 
     @Override
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToEndpointProcessor.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToEndpointProcessor.java
index f14874c..fc91f76 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToEndpointProcessor.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToEndpointProcessor.java
@@ -58,21 +58,29 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
     public boolean process(Exchange exchange, AsyncCallback callback) {
         // process the detour so we do the detour routing
         if (log.isDebugEnabled()) {
-            log.debug("Sending to endpoint: {} is intercepted and detoured to: 
{} for exchange: {}", getEndpoint(), endpoint.getDetour(), exchange);
+            log.debug("Sending to endpoint: {} is intercepted and detoured to: 
{} for exchange: {}", getEndpoint(), endpoint.getBefore(), exchange);
         }
         // add header with the real endpoint uri
         exchange.getIn().setHeader(Exchange.INTERCEPTED_ENDPOINT, 
delegate.getEndpointUri());
 
-        if (endpoint.getDetour() != null) {
+        if (endpoint.getBefore() != null || endpoint.getAfter() != null) {
             // detour the exchange using synchronous processing
-            AsyncProcessor detour = 
AsyncProcessorConverterHelper.convert(endpoint.getDetour());
+            AsyncProcessor before = null;
+            if (endpoint.getBefore() != null) {
+                before = 
AsyncProcessorConverterHelper.convert(endpoint.getBefore());
+            }
             AsyncProcessor ascb = new AsyncProcessorSupport() {
                 @Override
                 public boolean process(Exchange exchange, AsyncCallback 
callback) {
                     return callback(exchange, callback, true);
                 }
             };
-            return new Pipeline(exchange.getContext(), Arrays.asList(detour, 
ascb)).process(exchange, callback);
+            AsyncProcessor after = null;
+            if (endpoint.getAfter() != null) {
+                after = 
AsyncProcessorConverterHelper.convert(endpoint.getAfter());
+            }
+
+            return new Pipeline(exchange.getContext(), Arrays.asList(before, 
ascb, after)).process(exchange, callback);
         }
 
         return callback(exchange, callback, true);
@@ -121,13 +129,13 @@ public class InterceptSendToEndpointProcessor extends 
DefaultAsyncProducer {
     }
 
     public void start() {
-        ServiceHelper.startService(endpoint.getDetour());
+        ServiceHelper.startService(endpoint.getBefore(), endpoint.getAfter());
         // here we also need to start the producer
         ServiceHelper.startService(producer);
     }
 
     public void stop() {
-        // do not stop detour as it should only be stopped when the 
interceptor stops
+        // do not stop before/after as it should only be stopped when the 
interceptor stops
         // we should stop the producer here
         ServiceHelper.stopService(producer);
     }
diff --git 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToMockEndpointStrategy.java
 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToMockEndpointStrategy.java
index c88dec7..d6ebcc3 100644
--- 
a/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToMockEndpointStrategy.java
+++ 
b/core/camel-base/src/main/java/org/apache/camel/impl/engine/InterceptSendToMockEndpointStrategy.java
@@ -100,7 +100,7 @@ public class InterceptSendToMockEndpointStrategy implements 
EndpointStrategy {
 
             // allow custom logic
             producer = onInterceptEndpoint(uri, endpoint, mock, producer);
-            proxy.setDetour(producer);
+            proxy.setBefore(producer);
 
             return proxy;
         } else {
diff --git 
a/core/camel-core/src/main/java/org/apache/camel/model/InterceptSendToEndpointDefinition.java
 
b/core/camel-core/src/main/java/org/apache/camel/model/InterceptSendToEndpointDefinition.java
index ee3f54a..45ea74d 100644
--- 
a/core/camel-core/src/main/java/org/apache/camel/model/InterceptSendToEndpointDefinition.java
+++ 
b/core/camel-core/src/main/java/org/apache/camel/model/InterceptSendToEndpointDefinition.java
@@ -20,6 +20,7 @@ import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.Predicate;
 import org.apache.camel.spi.AsPredicate;
@@ -37,6 +38,8 @@ public class InterceptSendToEndpointDefinition extends 
OutputDefinition<Intercep
     private String uri;
     @XmlAttribute
     private Boolean skipSendToOriginalEndpoint;
+    @XmlAttribute
+    private String afterUri;
 
     public InterceptSendToEndpointDefinition() {
     }
@@ -93,6 +96,16 @@ public class InterceptSendToEndpointDefinition extends 
OutputDefinition<Intercep
     }
 
     /**
+     * After sending to the endpoint then send the message to this url which 
allows to process its result.
+     *
+     * @return the builder
+     */
+    public InterceptSendToEndpointDefinition afterUrl(String url) {
+        setAfterUri(url);
+        return this;
+    }
+
+    /**
      * This method is <b>only</b> for handling some post configuration
      * that is needed since this is an interceptor, and we have to do
      * a bit of magic logic to fixup to handle predicates
@@ -155,4 +168,16 @@ public class InterceptSendToEndpointDefinition extends 
OutputDefinition<Intercep
     public void setUri(String uri) {
         this.uri = uri;
     }
+
+    public String getAfterUri() {
+        return afterUri;
+    }
+
+    /**
+     * After sending to the endpoint then send the message to this uri which 
allows to process its result.
+     */
+    public void setAfterUri(String afterProcessor) {
+        this.afterUri = afterProcessor;
+    }
+
 }
diff --git 
a/core/camel-core/src/main/java/org/apache/camel/reifier/InterceptSendToEndpointReifier.java
 
b/core/camel-core/src/main/java/org/apache/camel/reifier/InterceptSendToEndpointReifier.java
index 6b241b5..23954db 100644
--- 
a/core/camel-core/src/main/java/org/apache/camel/reifier/InterceptSendToEndpointReifier.java
+++ 
b/core/camel-core/src/main/java/org/apache/camel/reifier/InterceptSendToEndpointReifier.java
@@ -26,7 +26,10 @@ import 
org.apache.camel.impl.engine.DefaultInterceptSendToEndpoint;
 import org.apache.camel.model.InterceptSendToEndpointDefinition;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.model.ToDefinition;
 import org.apache.camel.processor.InterceptEndpointProcessor;
+import org.apache.camel.processor.SendDynamicProcessor;
+import org.apache.camel.processor.SendProcessor;
 import org.apache.camel.spi.EndpointStrategy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.support.EndpointHelper;
@@ -40,8 +43,23 @@ public class InterceptSendToEndpointReifier extends 
ProcessorReifier<InterceptSe
 
     @Override
     public Processor createProcessor(final RouteContext routeContext) throws 
Exception {
-        // create the detour
-        final Processor detour = this.createChildProcessor(routeContext, true);
+        // create the before
+        final Processor before = this.createChildProcessor(routeContext, true);
+        // create the after
+        Processor afterProcessor = null;
+        if (definition.getAfterUri() != null) {
+            ToDefinition to = new ToDefinition(definition.getAfterUri());
+            // at first use custom factory
+            if 
(routeContext.getCamelContext().adapt(ExtendedCamelContext.class).getProcessorFactory()
 != null) {
+                afterProcessor = 
routeContext.getCamelContext().adapt(ExtendedCamelContext.class)
+                        .getProcessorFactory().createProcessor(routeContext, 
to);
+            }
+            // fallback to default implementation if factory did not create 
the processor
+            if (afterProcessor == null) {
+                afterProcessor = reifier(to).createProcessor(routeContext);
+            }
+        }
+        final Processor after = afterProcessor;
         final String matchURI = definition.getUri();
 
         // register endpoint callback so we can proxy the endpoint
@@ -55,7 +73,8 @@ public class InterceptSendToEndpointReifier extends 
ProcessorReifier<InterceptSe
                     // should be false by default
                     boolean skip = definition.getSkipSendToOriginalEndpoint() 
!= null && definition.getSkipSendToOriginalEndpoint();
                     DefaultInterceptSendToEndpoint proxy = new 
DefaultInterceptSendToEndpoint(endpoint, skip);
-                    proxy.setDetour(detour);
+                    proxy.setBefore(before);
+                    proxy.setAfter(after);
                     return proxy;
                 } else {
                     // no proxy so return regular endpoint
@@ -64,7 +83,6 @@ public class InterceptSendToEndpointReifier extends 
ProcessorReifier<InterceptSe
             }
         });
 
-
         // remove the original intercepted route from the outputs as we do not 
intercept as the regular interceptor
         // instead we use the proxy endpoints producer do the triggering. That 
is we trigger when someone sends
         // an exchange to the endpoint, see InterceptSendToEndpoint for 
details.
@@ -72,7 +90,7 @@ public class InterceptSendToEndpointReifier extends 
ProcessorReifier<InterceptSe
         List<ProcessorDefinition<?>> outputs = route.getOutputs();
         outputs.remove(this);
 
-        return new InterceptEndpointProcessor(matchURI, detour);
+        return new InterceptEndpointProcessor(matchURI, before);
     }
 
     /**
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointAfterTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointAfterTest.java
new file mode 100644
index 0000000..7b57240
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointAfterTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.intercept;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.Test;
+
+/**
+ * Unit test for intercepting sending to endpoint with after url
+ */
+public class InterceptSendToEndpointAfterTest extends ContextTestSupport {
+
+    @Override
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    @Test
+    public void testInterceptEndpoint() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                context.setTracing(true);
+
+                interceptSendToEndpoint("mock:foo")
+                    .to("mock:detour").transform(constant("Bye 
World")).afterUrl("direct:after");
+
+                from("direct:first")
+                    .to("mock:bar")
+                    .to("mock:foo")
+                    .to("mock:result");
+                
+                from("direct:after")
+                        .to("mock:after");
+
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:detour").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye World");
+
+        template.sendBody("direct:first", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testInterceptEndpointDirectly() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                interceptSendToEndpoint("direct:start")
+                    .to("mock:detour").transform(constant("Bye World"));
+
+                from("direct:start")
+                    .to("mock:foo")
+                    .to("mock:result");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:detour").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Bye World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Bye World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testInterceptEndpointWithStop() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                interceptSendToEndpoint("direct:start")
+                    .to("mock:detour").afterUrl("mock:after");
+
+                from("direct:start")
+                    .to("mock:foo")
+                    .transform().constant("Bye World");
+            }
+        });
+        context.start();
+
+        getMockEndpoint("mock:detour").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:after").expectedBodiesReceived("Bye World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+}

Reply via email to