[CAMEL-6364] Improve processor wrapping when using ProcessorFactory
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d824bce2 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d824bce2 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d824bce2 Branch: refs/heads/master Commit: d824bce2094412054b1b2a97e610aec8cd476c17 Parents: d2b9cfa Author: Guillaume Nodet <gno...@gmail.com> Authored: Thu May 16 17:02:31 2013 +0200 Committer: Guillaume Nodet <gno...@gmail.com> Committed: Fri May 17 13:33:02 2013 +0200 ---------------------------------------------------------------------- .../java/org/apache/camel/model/AOPDefinition.java | 2 +- .../org/apache/camel/model/ChoiceDefinition.java | 8 +- .../org/apache/camel/model/ExpressionNode.java | 2 +- .../apache/camel/model/LoadBalanceDefinition.java | 4 +- .../apache/camel/model/ProcessorDefinition.java | 23 ++- .../java/org/apache/camel/model/TryDefinition.java | 2 +- .../org/apache/camel/model/WireTapDefinition.java | 2 +- .../apache/camel/processor/ChoiceProcessor.java | 124 ++++++++++---- .../org/apache/camel/builder/RouteBuilderTest.java | 12 +- .../apache/camel/processor/ChoiceAsyncTest.java | 95 +++++++++++ 10 files changed, 213 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/model/AOPDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/AOPDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AOPDefinition.java index 8a75452..e13d145 100644 --- a/camel-core/src/main/java/org/apache/camel/model/AOPDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/AOPDefinition.java @@ -108,7 +108,7 @@ public class AOPDefinition extends OutputDefinition<AOPDefinition> { if (afterUri != null) { pipe.add(new ToDefinition(afterUri)); } else if (afterFinallyUri != null) { - finallyProcessor = new ToDefinition(afterFinallyUri).createProcessor(routeContext); + finallyProcessor = createProcessor(routeContext, new ToDefinition(afterFinallyUri)); } Processor tryProcessor = createOutputsProcessor(routeContext, pipe); http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/model/ChoiceDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ChoiceDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ChoiceDefinition.java index 2527aa7..e8e31e4 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ChoiceDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ChoiceDefinition.java @@ -19,7 +19,6 @@ package org.apache.camel.model; import java.util.AbstractList; import java.util.ArrayList; import java.util.List; - import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlElement; @@ -30,7 +29,6 @@ import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.builder.ExpressionClause; import org.apache.camel.processor.ChoiceProcessor; -import org.apache.camel.processor.FilterProcessor; import org.apache.camel.spi.RouteContext; import org.apache.camel.util.CollectionStringBuffer; import org.apache.camel.util.ObjectHelper; @@ -132,13 +130,13 @@ public class ChoiceDefinition extends ProcessorDefinition<ChoiceDefinition> { @Override public Processor createProcessor(RouteContext routeContext) throws Exception { - List<FilterProcessor> filters = new ArrayList<FilterProcessor>(); + List<Processor> filters = new ArrayList<Processor>(); for (WhenDefinition whenClause : whenClauses) { - filters.add(whenClause.createProcessor(routeContext)); + filters.add(createProcessor(routeContext, whenClause)); } Processor otherwiseProcessor = null; if (otherwise != null) { - otherwiseProcessor = otherwise.createProcessor(routeContext); + otherwiseProcessor = createProcessor(routeContext, otherwise); } return new ChoiceProcessor(filters, otherwiseProcessor); } http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java b/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java index 254241d..41f97ce 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java +++ b/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java @@ -106,7 +106,7 @@ public class ExpressionNode extends ProcessorDefinition<ExpressionNode> { * @throws Exception is thrown if error creating the processor */ protected FilterProcessor createFilterProcessor(RouteContext routeContext) throws Exception { - Processor childProcessor = this.createChildProcessor(routeContext, false); + Processor childProcessor = createOutputsProcessor(routeContext); return new FilterProcessor(createPredicate(routeContext), childProcessor); } http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java b/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java index c718623..973e9e5 100644 --- a/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java @@ -122,7 +122,7 @@ public class LoadBalanceDefinition extends ProcessorDefinition<LoadBalanceDefini LoadBalancer loadBalancer = LoadBalancerDefinition.getLoadBalancer(routeContext, loadBalancerType, ref); for (ProcessorDefinition<?> processorType : outputs) { - Processor processor = processorType.createProcessor(routeContext); + Processor processor = createProcessor(routeContext, processorType); loadBalancer.addProcessor(processor); } return loadBalancer; @@ -138,7 +138,7 @@ public class LoadBalanceDefinition extends ProcessorDefinition<LoadBalanceDefini if (LoadBalanceDefinition.class.isInstance(processorType)) { throw new IllegalArgumentException("Loadbalancer already configured to: " + loadBalancerType + ". Cannot set it to: " + processorType); } - Processor processor = processorType.createProcessor(routeContext); + Processor processor = createProcessor(routeContext, processorType); processor = wrapChannel(routeContext, processor, processorType); loadBalancer.addProcessor(processor); } http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index b1e7fe3..ff16150 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -410,15 +410,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> } } - Processor processor = null; - // at first use custom factory - if (routeContext.getCamelContext().getProcessorFactory() != null) { - processor = routeContext.getCamelContext().getProcessorFactory().createProcessor(routeContext, output); - } - // fallback to default implementation if factory did not create the processor - if (processor == null) { - processor = output.createProcessor(routeContext); - } + Processor processor = createProcessor(routeContext, output); if (output instanceof Channel && processor == null) { continue; @@ -441,6 +433,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> return processor; } + protected Processor createProcessor(RouteContext routeContext, ProcessorDefinition<?> output) throws Exception { + Processor processor = null; + // at first use custom factory + if (routeContext.getCamelContext().getProcessorFactory() != null) { + processor = routeContext.getCamelContext().getProcessorFactory().createProcessor(routeContext, output); + } + // fallback to default implementation if factory did not create the processor + if (processor == null) { + processor = output.createProcessor(routeContext); + } + return processor; + } + /** * Creates the processor and wraps it in any necessary interceptors and error handlers */ http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java b/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java index b140efb..d64a7e6 100644 --- a/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java @@ -78,7 +78,7 @@ public class TryDefinition extends OutputDefinition<TryDefinition> { Processor finallyProcessor = null; if (finallyClause != null) { - finallyProcessor = finallyClause.createProcessor(routeContext); + finallyProcessor = createProcessor(routeContext, finallyClause); } List<CatchProcessor> catchProcessors = new ArrayList<CatchProcessor>(); http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java index 4c8359c..cb35864 100644 --- a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java @@ -107,7 +107,7 @@ public class WireTapDefinition<Type extends ProcessorDefinition<Type>> extends N } if (headers != null && !headers.isEmpty()) { for (SetHeaderDefinition header : headers) { - Processor processor = header.createProcessor(routeContext); + Processor processor = createProcessor(routeContext, header); answer.addNewExchangeProcessor(processor); } } http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java index 7d87ef5..30dd3dd 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java @@ -17,13 +17,13 @@ package org.apache.camel.processor; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; import org.apache.camel.Exchange; import org.apache.camel.Navigate; -import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.support.ServiceSupport; @@ -33,6 +33,8 @@ import org.apache.camel.util.ServiceHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.camel.processor.PipelineHelper.continueProcessing; + /** * Implements a Choice structure where one or more predicates are used which if * they are true their processors are used, with a default otherwise clause used @@ -42,12 +44,12 @@ import org.slf4j.LoggerFactory; */ public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable { private static final transient Logger LOG = LoggerFactory.getLogger(ChoiceProcessor.class); - private final List<FilterProcessor> filters; - private final AsyncProcessor otherwise; + private final List<Processor> filters; + private final Processor otherwise; - public ChoiceProcessor(List<FilterProcessor> filters, Processor otherwise) { + public ChoiceProcessor(List<Processor> filters, Processor otherwise) { this.filters = filters; - this.otherwise = otherwise != null ? AsyncProcessorConverterHelper.convert(otherwise) : null; + this.otherwise = otherwise; } public void process(Exchange exchange) throws Exception { @@ -55,54 +57,104 @@ public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, N } public boolean process(Exchange exchange, AsyncCallback callback) { - for (int i = 0; i < filters.size(); i++) { - FilterProcessor filter = filters.get(i); - Predicate predicate = filter.getPredicate(); - - boolean matches = false; - try { - // ensure we handle exceptions thrown when matching predicate - if (predicate != null) { - matches = predicate.matches(exchange); - } - } catch (Throwable e) { - exchange.setException(e); - callback.done(true); - return true; + Iterator<Processor> processors = next().iterator(); + + exchange.setProperty(Exchange.FILTER_MATCHED, false); + while (continueRouting(processors, exchange)) { + // get the next processor + Processor processor = processors.next(); + + AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); + boolean sync = process(exchange, callback, processors, async); + + // continue as long its being processed synchronously + if (!sync) { + LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); + // the remainder of the CBR will be completed async + // so we break out now, then the callback will be invoked which then continue routing from where we left here + return false; } - if (LOG.isDebugEnabled()) { - LOG.debug("#{} - {} matches: {} for: {}", new Object[]{i, predicate, matches, exchange}); - } + LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); - if (matches) { - // process next will also take care (has not null test) if next was a stop(). - // stop() has no processor to execute, and thus we will end in a NPE - return filter.processNext(exchange, callback); + // check for error if so we should break out + if (!continueProcessing(exchange, "so breaking out of content based router", LOG)) { + break; } } - if (otherwise != null) { - return AsyncProcessorHelper.process(otherwise, exchange, callback); - } else { - callback.done(true); - return true; + + LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + + callback.done(true); + return true; + } + + protected boolean continueRouting(Iterator<Processor> it, Exchange exchange) { + boolean answer = it.hasNext(); + if (answer) { + Object matched = exchange.getProperty(Exchange.FILTER_MATCHED); + if (matched != null) { + boolean hasMatched = exchange.getContext().getTypeConverter().convertTo(Boolean.class, matched); + if (hasMatched) { + LOG.debug("ExchangeId: {} has been matched: {}", exchange.getExchangeId(), exchange); + answer = false; + } + } } + LOG.trace("ExchangeId: {} should continue matching: {}", exchange.getExchangeId(), answer); + return answer; + } + + private boolean process(final Exchange exchange, final AsyncCallback callback, + final Iterator<Processor> processors, final AsyncProcessor asyncProcessor) { + // this does the actual processing so log at trace level + LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + + // implement asynchronous routing logic in callback so we can have the callback being + // triggered and then continue routing where we left + boolean sync = AsyncProcessorHelper.process(asyncProcessor, exchange, new AsyncCallback() { + public void done(boolean doneSync) { + // we only have to handle async completion of the pipeline + if (doneSync) { + return; + } + + // continue processing the pipeline asynchronously + while (continueRouting(processors, exchange)) { + AsyncProcessor processor = AsyncProcessorConverterHelper.convert(processors.next()); + + // check for error if so we should break out + if (!continueProcessing(exchange, "so breaking out of pipeline", LOG)) { + break; + } + + doneSync = process(exchange, callback, processors, processor); + if (!doneSync) { + LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); + return; + } + } + + LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + callback.done(false); + } + }); + + return sync; } @Override public String toString() { StringBuilder builder = new StringBuilder("choice{"); boolean first = true; - for (FilterProcessor processor : filters) { + for (Processor processor : filters) { if (first) { first = false; } else { builder.append(", "); } builder.append("when "); - builder.append(processor.getPredicate().toString()); - builder.append(": "); - builder.append(processor.getProcessor()); + builder.append(processor); } if (otherwise != null) { builder.append(", otherwise: "); @@ -116,7 +168,7 @@ public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, N return "choice"; } - public List<FilterProcessor> getFilters() { + public List<Processor> getFilters() { return filters; } http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java b/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java index 11dcd3d..9eaed57 100644 --- a/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java +++ b/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java @@ -157,14 +157,16 @@ public class RouteBuilderTest extends TestSupport { Channel channel = unwrapChannel(consumer.getProcessor()); ChoiceProcessor choiceProcessor = assertIsInstanceOf(ChoiceProcessor.class, channel.getNextProcessor()); - List<FilterProcessor> filters = choiceProcessor.getFilters(); + List<Processor> filters = choiceProcessor.getFilters(); assertEquals("Should be two when clauses", 2, filters.size()); - FilterProcessor filter1 = filters.get(0); - assertSendTo(unwrapChannel(filter1.getProcessor()).getNextProcessor(), "direct://b"); + Processor filter1 = filters.get(0); + assertTrue(filter1 instanceof FilterProcessor); + assertSendTo(unwrapChannel(((FilterProcessor) filter1).getProcessor()).getNextProcessor(), "direct://b"); - FilterProcessor filter2 = filters.get(1); - assertSendTo(unwrapChannel(filter2.getProcessor()).getNextProcessor(), "direct://c"); + Processor filter2 = filters.get(1); + assertTrue(filter2 instanceof FilterProcessor); + assertSendTo(unwrapChannel(((FilterProcessor) filter2).getProcessor()).getNextProcessor(), "direct://c"); assertSendTo(unwrapChannel(choiceProcessor.getOtherwise()).getNextProcessor(), "direct://d"); } http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/test/java/org/apache/camel/processor/ChoiceAsyncTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ChoiceAsyncTest.java b/camel-core/src/test/java/org/apache/camel/processor/ChoiceAsyncTest.java new file mode 100644 index 0000000..a02ca2c --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/ChoiceAsyncTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +import static org.apache.camel.component.mock.MockEndpoint.expectsMessageCount; + +/** + * @version + */ +public class ChoiceAsyncTest extends ContextTestSupport { + protected MockEndpoint x; + protected MockEndpoint y; + protected MockEndpoint z; + protected MockEndpoint end; + + public void testSendToFirstWhen() throws Exception { + String body = "<one/>"; + x.expectedBodiesReceived(body); + end.expectedBodiesReceived(body); + // The SpringChoiceTest.java can't setup the header by Spring configure file + // x.expectedHeaderReceived("name", "a"); + expectsMessageCount(0, y, z); + + sendMessage("bar", body); + + assertMockEndpointsSatisfied(); + } + + public void testSendToSecondWhen() throws Exception { + String body = "<two/>"; + y.expectedBodiesReceived(body); + end.expectedBodiesReceived(body); + expectsMessageCount(0, x, z); + + sendMessage("cheese", body); + + assertMockEndpointsSatisfied(); + } + + public void testSendToOtherwiseClause() throws Exception { + String body = "<three/>"; + z.expectedBodiesReceived(body); + end.expectedBodiesReceived(body); + expectsMessageCount(0, x, y); + + sendMessage("somethingUndefined", body); + + assertMockEndpointsSatisfied(); + } + + protected void sendMessage(final Object headerValue, final Object body) throws Exception { + template.sendBodyAndHeader("direct:start", body, "foo", headerValue); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + x = getMockEndpoint("mock:x"); + y = getMockEndpoint("mock:y"); + z = getMockEndpoint("mock:z"); + end = getMockEndpoint("mock:end"); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:start").choice() + .when().xpath("$foo = 'bar'").delay(10).asyncDelayed().to("mock:x").endChoice() + .when().xpath("$foo = 'cheese'").delay(10).asyncDelayed().to("mock:y").endChoice() + .otherwise().delay(10).asyncDelayed().to("mock:z").endChoice() + .end().to("mock:end"); + } + }; + } + +}