[CAMEL-6364] Improve processor wrapping when using ProcessorFactory

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d824bce2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d824bce2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d824bce2

Branch: refs/heads/master
Commit: d824bce2094412054b1b2a97e610aec8cd476c17
Parents: d2b9cfa
Author: Guillaume Nodet <gno...@gmail.com>
Authored: Thu May 16 17:02:31 2013 +0200
Committer: Guillaume Nodet <gno...@gmail.com>
Committed: Fri May 17 13:33:02 2013 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/model/AOPDefinition.java |    2 +-
 .../org/apache/camel/model/ChoiceDefinition.java   |    8 +-
 .../org/apache/camel/model/ExpressionNode.java     |    2 +-
 .../apache/camel/model/LoadBalanceDefinition.java  |    4 +-
 .../apache/camel/model/ProcessorDefinition.java    |   23 ++-
 .../java/org/apache/camel/model/TryDefinition.java |    2 +-
 .../org/apache/camel/model/WireTapDefinition.java  |    2 +-
 .../apache/camel/processor/ChoiceProcessor.java    |  124 ++++++++++----
 .../org/apache/camel/builder/RouteBuilderTest.java |   12 +-
 .../apache/camel/processor/ChoiceAsyncTest.java    |   95 +++++++++++
 10 files changed, 213 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/model/AOPDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/AOPDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/AOPDefinition.java
index 8a75452..e13d145 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AOPDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AOPDefinition.java
@@ -108,7 +108,7 @@ public class AOPDefinition extends 
OutputDefinition<AOPDefinition> {
         if (afterUri != null) {
             pipe.add(new ToDefinition(afterUri));
         } else if (afterFinallyUri != null) {
-            finallyProcessor = new 
ToDefinition(afterFinallyUri).createProcessor(routeContext);
+            finallyProcessor = createProcessor(routeContext, new 
ToDefinition(afterFinallyUri));
         }
 
         Processor tryProcessor = createOutputsProcessor(routeContext, pipe);

http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/model/ChoiceDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ChoiceDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/ChoiceDefinition.java
index 2527aa7..e8e31e4 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ChoiceDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ChoiceDefinition.java
@@ -19,7 +19,6 @@ package org.apache.camel.model;
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.List;
-
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlElement;
@@ -30,7 +29,6 @@ import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.ExpressionClause;
 import org.apache.camel.processor.ChoiceProcessor;
-import org.apache.camel.processor.FilterProcessor;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.CollectionStringBuffer;
 import org.apache.camel.util.ObjectHelper;
@@ -132,13 +130,13 @@ public class ChoiceDefinition extends 
ProcessorDefinition<ChoiceDefinition> {
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws 
Exception {
-        List<FilterProcessor> filters = new ArrayList<FilterProcessor>();
+        List<Processor> filters = new ArrayList<Processor>();
         for (WhenDefinition whenClause : whenClauses) {
-            filters.add(whenClause.createProcessor(routeContext));
+            filters.add(createProcessor(routeContext, whenClause));
         }
         Processor otherwiseProcessor = null;
         if (otherwise != null) {
-            otherwiseProcessor = otherwise.createProcessor(routeContext);
+            otherwiseProcessor = createProcessor(routeContext, otherwise);
         }
         return new ChoiceProcessor(filters, otherwiseProcessor);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java 
b/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
index 254241d..41f97ce 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
@@ -106,7 +106,7 @@ public class ExpressionNode extends 
ProcessorDefinition<ExpressionNode> {
      * @throws Exception is thrown if error creating the processor
      */
     protected FilterProcessor createFilterProcessor(RouteContext routeContext) 
throws Exception {
-        Processor childProcessor = this.createChildProcessor(routeContext, 
false);
+        Processor childProcessor = createOutputsProcessor(routeContext);
         return new FilterProcessor(createPredicate(routeContext), 
childProcessor);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
index c718623..973e9e5 100644
--- a/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
@@ -122,7 +122,7 @@ public class LoadBalanceDefinition extends 
ProcessorDefinition<LoadBalanceDefini
 
         LoadBalancer loadBalancer = 
LoadBalancerDefinition.getLoadBalancer(routeContext, loadBalancerType, ref);
         for (ProcessorDefinition<?> processorType : outputs) {
-            Processor processor = processorType.createProcessor(routeContext);
+            Processor processor = createProcessor(routeContext, processorType);
             loadBalancer.addProcessor(processor);
         }
         return loadBalancer;
@@ -138,7 +138,7 @@ public class LoadBalanceDefinition extends 
ProcessorDefinition<LoadBalanceDefini
             if (LoadBalanceDefinition.class.isInstance(processorType)) {
                 throw new IllegalArgumentException("Loadbalancer already 
configured to: " + loadBalancerType + ". Cannot set it to: " + processorType);
             }
-            Processor processor = processorType.createProcessor(routeContext);
+            Processor processor = createProcessor(routeContext, processorType);
             processor = wrapChannel(routeContext, processor, processorType);
             loadBalancer.addProcessor(processor);
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index b1e7fe3..ff16150 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -410,15 +410,7 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
                 }
             }
 
-            Processor processor = null;
-            // at first use custom factory
-            if (routeContext.getCamelContext().getProcessorFactory() != null) {
-                processor = 
routeContext.getCamelContext().getProcessorFactory().createProcessor(routeContext,
 output);
-            }
-            // fallback to default implementation if factory did not create 
the processor
-            if (processor == null) {
-                processor = output.createProcessor(routeContext);
-            }
+            Processor processor = createProcessor(routeContext, output);
 
             if (output instanceof Channel && processor == null) {
                 continue;
@@ -441,6 +433,19 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
         return processor;
     }
 
+    protected Processor createProcessor(RouteContext routeContext, 
ProcessorDefinition<?> output) throws Exception {
+        Processor processor = null;
+        // at first use custom factory
+        if (routeContext.getCamelContext().getProcessorFactory() != null) {
+            processor = 
routeContext.getCamelContext().getProcessorFactory().createProcessor(routeContext,
 output);
+        }
+        // fallback to default implementation if factory did not create the 
processor
+        if (processor == null) {
+            processor = output.createProcessor(routeContext);
+        }
+        return processor;
+    }
+
     /**
      * Creates the processor and wraps it in any necessary interceptors and 
error handlers
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java
index b140efb..d64a7e6 100644
--- a/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java
@@ -78,7 +78,7 @@ public class TryDefinition extends 
OutputDefinition<TryDefinition> {
 
         Processor finallyProcessor = null;
         if (finallyClause != null) {
-            finallyProcessor = finallyClause.createProcessor(routeContext);
+            finallyProcessor = createProcessor(routeContext, finallyClause);
         }
 
         List<CatchProcessor> catchProcessors = new ArrayList<CatchProcessor>();

http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
index 4c8359c..cb35864 100644
--- a/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/WireTapDefinition.java
@@ -107,7 +107,7 @@ public class WireTapDefinition<Type extends 
ProcessorDefinition<Type>> extends N
         }
         if (headers != null && !headers.isEmpty()) {
             for (SetHeaderDefinition header : headers) {
-                Processor processor = header.createProcessor(routeContext);
+                Processor processor = createProcessor(routeContext, header);
                 answer.addNewExchangeProcessor(processor);
             }
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
index 7d87ef5..30dd3dd 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
@@ -17,13 +17,13 @@
 package org.apache.camel.processor;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
-import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
 import org.apache.camel.support.ServiceSupport;
@@ -33,6 +33,8 @@ import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.processor.PipelineHelper.continueProcessing;
+
 /**
  * Implements a Choice structure where one or more predicates are used which if
  * they are true their processors are used, with a default otherwise clause 
used
@@ -42,12 +44,12 @@ import org.slf4j.LoggerFactory;
  */
 public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, 
Navigate<Processor>, Traceable {
     private static final transient Logger LOG = 
LoggerFactory.getLogger(ChoiceProcessor.class);
-    private final List<FilterProcessor> filters;
-    private final AsyncProcessor otherwise;
+    private final List<Processor> filters;
+    private final Processor otherwise;
 
-    public ChoiceProcessor(List<FilterProcessor> filters, Processor otherwise) 
{
+    public ChoiceProcessor(List<Processor> filters, Processor otherwise) {
         this.filters = filters;
-        this.otherwise = otherwise != null ? 
AsyncProcessorConverterHelper.convert(otherwise) : null;
+        this.otherwise = otherwise;
     }
 
     public void process(Exchange exchange) throws Exception {
@@ -55,54 +57,104 @@ public class ChoiceProcessor extends ServiceSupport 
implements AsyncProcessor, N
     }
 
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        for (int i = 0; i < filters.size(); i++) {
-            FilterProcessor filter = filters.get(i);
-            Predicate predicate = filter.getPredicate();
-
-            boolean matches = false;
-            try {
-                // ensure we handle exceptions thrown when matching predicate
-                if (predicate != null) {
-                    matches = predicate.matches(exchange);
-                }
-            } catch (Throwable e) {
-                exchange.setException(e);
-                callback.done(true);
-                return true;
+        Iterator<Processor> processors = next().iterator();
+
+        exchange.setProperty(Exchange.FILTER_MATCHED, false);
+        while (continueRouting(processors, exchange)) {
+            // get the next processor
+            Processor processor = processors.next();
+
+            AsyncProcessor async = 
AsyncProcessorConverterHelper.convert(processor);
+            boolean sync = process(exchange, callback, processors, async);
+
+            // continue as long its being processed synchronously
+            if (!sync) {
+                LOG.trace("Processing exchangeId: {} is continued being 
processed asynchronously", exchange.getExchangeId());
+                // the remainder of the CBR will be completed async
+                // so we break out now, then the callback will be invoked 
which then continue routing from where we left here
+                return false;
             }
 
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("#{} - {} matches: {} for: {}", new Object[]{i, 
predicate, matches, exchange});
-            }
+            LOG.trace("Processing exchangeId: {} is continued being processed 
synchronously", exchange.getExchangeId());
 
-            if (matches) {
-                // process next will also take care (has not null test) if 
next was a stop().
-                // stop() has no processor to execute, and thus we will end in 
a NPE
-                return filter.processNext(exchange, callback);
+            // check for error if so we should break out
+            if (!continueProcessing(exchange, "so breaking out of content 
based router", LOG)) {
+                break;
             }
         }
-        if (otherwise != null) {
-            return AsyncProcessorHelper.process(otherwise, exchange, callback);
-        } else {
-            callback.done(true);
-            return true;
+
+        LOG.trace("Processing complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
+
+        callback.done(true);
+        return true;
+    }
+
+    protected boolean continueRouting(Iterator<Processor> it, Exchange 
exchange) {
+        boolean answer = it.hasNext();
+        if (answer) {
+            Object matched = exchange.getProperty(Exchange.FILTER_MATCHED);
+            if (matched != null) {
+                boolean hasMatched = 
exchange.getContext().getTypeConverter().convertTo(Boolean.class, matched);
+                if (hasMatched) {
+                    LOG.debug("ExchangeId: {} has been matched: {}", 
exchange.getExchangeId(), exchange);
+                    answer = false;
+                }
+            }
         }
+        LOG.trace("ExchangeId: {} should continue matching: {}", 
exchange.getExchangeId(), answer);
+        return answer;
+    }
+
+    private boolean process(final Exchange exchange, final AsyncCallback 
callback,
+                            final Iterator<Processor> processors, final 
AsyncProcessor asyncProcessor) {
+        // this does the actual processing so log at trace level
+        LOG.trace("Processing exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
+
+        // implement asynchronous routing logic in callback so we can have the 
callback being
+        // triggered and then continue routing where we left
+        boolean sync = AsyncProcessorHelper.process(asyncProcessor, exchange, 
new AsyncCallback() {
+            public void done(boolean doneSync) {
+                // we only have to handle async completion of the pipeline
+                if (doneSync) {
+                    return;
+                }
+
+                // continue processing the pipeline asynchronously
+                while (continueRouting(processors, exchange)) {
+                    AsyncProcessor processor = 
AsyncProcessorConverterHelper.convert(processors.next());
+
+                    // check for error if so we should break out
+                    if (!continueProcessing(exchange, "so breaking out of 
pipeline", LOG)) {
+                        break;
+                    }
+
+                    doneSync = process(exchange, callback, processors, 
processor);
+                    if (!doneSync) {
+                        LOG.trace("Processing exchangeId: {} is continued 
being processed asynchronously", exchange.getExchangeId());
+                        return;
+                    }
+                }
+
+                LOG.trace("Processing complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
+                callback.done(false);
+            }
+        });
+
+        return sync;
     }
 
     @Override
     public String toString() {
         StringBuilder builder = new StringBuilder("choice{");
         boolean first = true;
-        for (FilterProcessor processor : filters) {
+        for (Processor processor : filters) {
             if (first) {
                 first = false;
             } else {
                 builder.append(", ");
             }
             builder.append("when ");
-            builder.append(processor.getPredicate().toString());
-            builder.append(": ");
-            builder.append(processor.getProcessor());
+            builder.append(processor);
         }
         if (otherwise != null) {
             builder.append(", otherwise: ");
@@ -116,7 +168,7 @@ public class ChoiceProcessor extends ServiceSupport 
implements AsyncProcessor, N
         return "choice";
     }
 
-    public List<FilterProcessor> getFilters() {
+    public List<Processor> getFilters() {
         return filters;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java 
b/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
index 11dcd3d..9eaed57 100644
--- a/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
+++ b/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
@@ -157,14 +157,16 @@ public class RouteBuilderTest extends TestSupport {
             Channel channel = unwrapChannel(consumer.getProcessor());
 
             ChoiceProcessor choiceProcessor = 
assertIsInstanceOf(ChoiceProcessor.class, channel.getNextProcessor());
-            List<FilterProcessor> filters = choiceProcessor.getFilters();
+            List<Processor> filters = choiceProcessor.getFilters();
             assertEquals("Should be two when clauses", 2, filters.size());
 
-            FilterProcessor filter1 = filters.get(0);
-            
assertSendTo(unwrapChannel(filter1.getProcessor()).getNextProcessor(), 
"direct://b");
+            Processor filter1 = filters.get(0);
+            assertTrue(filter1 instanceof FilterProcessor);
+            assertSendTo(unwrapChannel(((FilterProcessor) 
filter1).getProcessor()).getNextProcessor(), "direct://b");
 
-            FilterProcessor filter2 = filters.get(1);
-            
assertSendTo(unwrapChannel(filter2.getProcessor()).getNextProcessor(), 
"direct://c");
+            Processor filter2 = filters.get(1);
+            assertTrue(filter2 instanceof FilterProcessor);
+            assertSendTo(unwrapChannel(((FilterProcessor) 
filter2).getProcessor()).getNextProcessor(), "direct://c");
 
             
assertSendTo(unwrapChannel(choiceProcessor.getOtherwise()).getNextProcessor(), 
"direct://d");
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/d824bce2/camel-core/src/test/java/org/apache/camel/processor/ChoiceAsyncTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/ChoiceAsyncTest.java 
b/camel-core/src/test/java/org/apache/camel/processor/ChoiceAsyncTest.java
new file mode 100644
index 0000000..a02ca2c
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ChoiceAsyncTest.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+import static org.apache.camel.component.mock.MockEndpoint.expectsMessageCount;
+
+/**
+ * @version 
+ */
+public class ChoiceAsyncTest extends ContextTestSupport {
+    protected MockEndpoint x;
+    protected MockEndpoint y;
+    protected MockEndpoint z;
+    protected MockEndpoint end;
+
+    public void testSendToFirstWhen() throws Exception {
+        String body = "<one/>";
+        x.expectedBodiesReceived(body);
+        end.expectedBodiesReceived(body);
+        // The SpringChoiceTest.java can't setup the header by Spring 
configure file
+        // x.expectedHeaderReceived("name", "a");
+        expectsMessageCount(0, y, z);
+
+        sendMessage("bar", body);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testSendToSecondWhen() throws Exception {
+        String body = "<two/>";
+        y.expectedBodiesReceived(body);
+        end.expectedBodiesReceived(body);
+        expectsMessageCount(0, x, z);
+
+        sendMessage("cheese", body);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testSendToOtherwiseClause() throws Exception {
+        String body = "<three/>";
+        z.expectedBodiesReceived(body);
+        end.expectedBodiesReceived(body);
+        expectsMessageCount(0, x, y);
+
+        sendMessage("somethingUndefined", body);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected void sendMessage(final Object headerValue, final Object body) 
throws Exception {
+        template.sendBodyAndHeader("direct:start", body, "foo", headerValue);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        x = getMockEndpoint("mock:x");
+        y = getMockEndpoint("mock:y");
+        z = getMockEndpoint("mock:z");
+        end = getMockEndpoint("mock:end");
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start").choice()
+                  .when().xpath("$foo = 
'bar'").delay(10).asyncDelayed().to("mock:x").endChoice()
+                  .when().xpath("$foo = 
'cheese'").delay(10).asyncDelayed().to("mock:y").endChoice()
+                  
.otherwise().delay(10).asyncDelayed().to("mock:z").endChoice()
+                  .end().to("mock:end");
+            }
+        };
+    }
+
+}

Reply via email to