This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-3.7.x by this push:
new 8c5c40b CAMEL-16629: camel-core - InterceptSendToEndpoint - AfterUri
should only trigger if when was true
8c5c40b is described below
commit 8c5c40b0f40aa9d7dcfe6cd161517b8af8026f4e
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed May 19 07:33:24 2021 +0200
CAMEL-16629: camel-core - InterceptSendToEndpoint - AfterUri should only
trigger if when was true
---
.../InterceptSendToEndpointProcessor.java | 78 +++++++++++++++-------
.../InterceptSendToEndpointAfterTest.java | 23 +++++++
2 files changed, 77 insertions(+), 24 deletions(-)
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
index 6a8187b..834a713 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/InterceptSendToEndpointProcessor.java
@@ -21,8 +21,10 @@ import java.util.Arrays;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.AsyncProducer;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.Predicate;
import org.apache.camel.spi.InterceptSendToEndpoint;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
@@ -46,6 +48,7 @@ public class InterceptSendToEndpointProcessor extends
DefaultAsyncProducer {
private final Endpoint delegate;
private final AsyncProducer producer;
private final boolean skip;
+ private AsyncProcessor pipeline;
public InterceptSendToEndpointProcessor(InterceptSendToEndpoint endpoint,
Endpoint delegate, AsyncProducer producer,
boolean skip) {
@@ -71,24 +74,9 @@ public class InterceptSendToEndpointProcessor extends
DefaultAsyncProducer {
// add header with the real endpoint uri
exchange.getIn().setHeader(Exchange.INTERCEPTED_ENDPOINT,
delegate.getEndpointUri());
- if (endpoint.getBefore() != null || endpoint.getAfter() != null) {
- // detour the exchange using synchronous processing
- AsyncProcessor before = null;
- if (endpoint.getBefore() != null) {
- before =
AsyncProcessorConverterHelper.convert(endpoint.getBefore());
- }
- AsyncProcessor ascb = new AsyncProcessorSupport() {
- @Override
- public boolean process(Exchange exchange, AsyncCallback
callback) {
- return callback(exchange, callback, true);
- }
- };
- AsyncProcessor after = null;
- if (endpoint.getAfter() != null) {
- after =
AsyncProcessorConverterHelper.convert(endpoint.getAfter());
- }
-
- return new Pipeline(exchange.getContext(), Arrays.asList(before,
ascb, after)).process(exchange, callback);
+ if (pipeline != null) {
+ // detour the exchange with the pipeline that has before and after
included
+ return pipeline.process(exchange, callback);
}
return callback(exchange, callback, true);
@@ -106,7 +94,14 @@ public class InterceptSendToEndpointProcessor extends
DefaultAsyncProducer {
boolean shouldSkip = skip;
// if then interceptor had a when predicate, then we should only skip
if it matched
- Boolean whenMatches = (Boolean)
exchange.removeProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
+ Boolean whenMatches;
+ if (endpoint.getAfter() != null) {
+ // only get the property as after also needs to check this property
+ whenMatches = (Boolean)
exchange.getProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
+ } else {
+ // remove property as its not longer needed
+ whenMatches = (Boolean)
exchange.removeProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
+ }
if (whenMatches != null) {
shouldSkip = skip && whenMatches;
}
@@ -139,17 +134,52 @@ public class InterceptSendToEndpointProcessor extends
DefaultAsyncProducer {
}
@Override
- public void start() {
- ServiceHelper.startService(endpoint.getBefore(), endpoint.getAfter());
- // here we also need to start the producer
- ServiceHelper.startService(producer);
+ protected void doInit() throws Exception {
+ CamelContextAware.trySetCamelContext(producer,
endpoint.getCamelContext());
+ // build pipeline with before/after processors
+ if (endpoint.getBefore() != null || endpoint.getAfter() != null) {
+ // detour the exchange using synchronous processing
+ AsyncProcessor before = null;
+ if (endpoint.getBefore() != null) {
+ before =
AsyncProcessorConverterHelper.convert(endpoint.getBefore());
+ }
+ AsyncProcessor ascb = new AsyncProcessorSupport() {
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback
callback) {
+ return callback(exchange, callback, true);
+ }
+ };
+ // only execute the after if the intercept when predicate matches
+ Predicate predicate = exchange -> {
+ Boolean whenMatches
+ = (Boolean)
exchange.removeProperty(Exchange.INTERCEPT_SEND_TO_ENDPOINT_WHEN_MATCHED);
+ return whenMatches == null || whenMatches;
+ };
+ AsyncProcessor after = null;
+ if (endpoint.getAfter() != null) {
+ after =
AsyncProcessorConverterHelper.convert(endpoint.getAfter());
+ }
+ FilterProcessor filter = new
FilterProcessor(getEndpoint().getCamelContext(), predicate, after);
+ pipeline = new Pipeline(getEndpoint().getCamelContext(),
Arrays.asList(before, ascb, filter));
+ }
+
+ ServiceHelper.initService(producer, pipeline);
}
@Override
- public void stop() {
+ protected void doStart() throws Exception {
+ ServiceHelper.startService(producer, pipeline);
+ }
+
+ @Override
+ public void doStop() {
// do not stop before/after as it should only be stopped when the
interceptor stops
// we should stop the producer here
ServiceHelper.stopService(producer);
}
+ @Override
+ protected void doShutdown() throws Exception {
+ ServiceHelper.stopAndShutdownServices(producer, pipeline);
+ }
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointAfterTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointAfterTest.java
index 32630a8..4a5852c 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointAfterTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/intercept/InterceptSendToEndpointAfterTest.java
@@ -100,4 +100,27 @@ public class InterceptSendToEndpointAfterTest extends
ContextTestSupport {
assertMockEndpointsSatisfied();
}
+ @Test
+ public void testInterceptEndpointWhen() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ interceptSendToEndpoint("direct:start").when(simple("${body}
contains 'World'")).to("mock:detour")
+ .afterUrl("mock:after");
+
+ from("direct:start").to("mock:foo").transform().constant("Bye
World");
+ }
+ });
+ context.start();
+
+ getMockEndpoint("mock:detour").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World", "Hi
Camel");
+ getMockEndpoint("mock:after").expectedBodiesReceived("Bye World");
+
+ template.sendBody("direct:start", "Hello World");
+ template.sendBody("direct:start", "Hi Camel");
+
+ assertMockEndpointsSatisfied();
+ }
+
}