Repository: camel
Updated Branches:
  refs/heads/master 8e0e3083e -> 8bc8484b1


CAMEL-10724: Improve Java DSL support for Java 8


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8bc8484b
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8bc8484b
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8bc8484b

Branch: refs/heads/master
Commit: 8bc8484b1914f5cb29191e7b91fe48e02ca1f636
Parents: 8e0e308
Author: lburgazzoli <lburgazz...@gmail.com>
Authored: Wed Jan 18 18:09:08 2017 +0100
Committer: lburgazzoli <lburgazz...@gmail.com>
Committed: Thu Feb 16 18:10:22 2017 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/camel/Message.java | 16 ++++-
 .../org/apache/camel/impl/DefaultMessage.java   | 31 +++++++++
 .../apache/camel/model/AggregateDefinition.java | 20 ++++++
 .../model/IdempotentConsumerDefinition.java     |  9 +--
 .../apache/camel/model/MulticastDefinition.java | 26 +++++++
 .../apache/camel/model/ProcessorDefinition.java | 66 +++++++++++++++++-
 .../org/apache/camel/util/ExchangeHelper.java   | 18 +++++
 .../apache/camel/util/function/Suppliers.java   | 43 ++++++++++++
 .../apache/camel/impl/DefaultExchangeTest.java  |  5 ++
 .../camel/processor/DynamicRouter4Test.java     | 58 ++++++++++++++++
 .../processor/IdempotentConsumerDslTest.java    | 53 ++++++++++++++
 .../apache/camel/processor/LoopDoWhileTest.java | 23 ++++++-
 .../camel/processor/MulticastDslTest.java       | 69 +++++++++++++++++++
 .../camel/processor/RoutingSlipDslTest.java     | 49 +++++++++++++
 .../camel/processor/ThrottlerDslTest.java       | 72 ++++++++++++++++++++
 .../processor/aggregator/AggregateDslTest.java  | 41 +++++++++--
 16 files changed, 582 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/Message.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/Message.java 
b/camel-core/src/main/java/org/apache/camel/Message.java
index 8cfeae0..a0e9c4d 100644
--- a/camel-core/src/main/java/org/apache/camel/Message.java
+++ b/camel-core/src/main/java/org/apache/camel/Message.java
@@ -18,7 +18,7 @@ package org.apache.camel;
 
 import java.util.Map;
 import java.util.Set;
-
+import java.util.function.Supplier;
 import javax.activation.DataHandler;
 
 /**
@@ -88,6 +88,13 @@ public interface Message {
     Object getHeader(String name, Object defaultValue);
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     */
+    Object getHeader(String name, Supplier<Object> defaultValueSupplier);
+
+    /**
      * Returns a header associated with this message by name and specifying the
      * type required
      *
@@ -112,6 +119,13 @@ public interface Message {
     <T> T getHeader(String name, Object defaultValue, Class<T> type);
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     */
+    <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, 
Class<T> type);
+
+    /**
      * Sets a header on the message
      *
      * @param name of the header

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java 
b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
index 848586a..1e49766 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java
@@ -20,6 +20,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Supplier;
 import javax.activation.DataHandler;
 
 import org.apache.camel.Attachment;
@@ -65,6 +66,11 @@ public class DefaultMessage extends MessageSupport {
         return answer != null ? answer : defaultValue;
     }
 
+    public Object getHeader(String name, Supplier<Object> 
defaultValueSupplier) {
+        Object answer = getHeaders().get(name);
+        return answer != null ? answer : defaultValueSupplier.get();
+    }
+
     @SuppressWarnings("unchecked")
     public <T> T getHeader(String name, Class<T> type) {
         Object value = getHeader(name);
@@ -115,6 +121,31 @@ public class DefaultMessage extends MessageSupport {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    public <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, 
Class<T> type) {
+        Object value = getHeader(name, defaultValueSupplier);
+        if (value == null) {
+            // lets avoid NullPointerException when converting to boolean for 
null values
+            if (boolean.class.isAssignableFrom(type)) {
+                return (T) Boolean.FALSE;
+            }
+            return null;
+        }
+
+        // eager same instance type test to avoid the overhead of invoking the 
type converter
+        // if already same type
+        if (type.isInstance(value)) {
+            return type.cast(value);
+        }
+
+        Exchange e = getExchange();
+        if (e != null) {
+            return e.getContext().getTypeConverter().convertTo(type, e, value);
+        } else {
+            return type.cast(value);
+        }
+    }
+
     public void setHeader(String name, Object value) {
         if (headers == null) {
             headers = createHeaders();

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index ec7d396..12f0a13 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -814,6 +814,16 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
     }
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public AggregateDefinition strategy(AggregationStrategy 
aggregationStrategy) {
+        return aggregationStrategy(aggregationStrategy);
+    }
+
+    /**
      * Sets the aggregate strategy to use
      *
      * @param aggregationStrategy  the aggregate strategy to use
@@ -930,6 +940,16 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
     }
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public AggregateDefinition completion(@AsPredicate Predicate predicate) {
+        return completionPredicate(predicate);
+    }
+
+    /**
      * Indicates to complete all current aggregated exchanges when the context 
is stopped
      */
     public AggregateDefinition forceCompletionOnStop() {

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
 
b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
index 256394d..9a58704 100644
--- 
a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
+++ 
b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java
@@ -221,8 +221,7 @@ public class IdempotentConsumerDefinition extends 
ExpressionNode {
     public Processor createProcessor(RouteContext routeContext) throws 
Exception {
         Processor childProcessor = this.createChildProcessor(routeContext, 
true);
 
-        IdempotentRepository<String> idempotentRepository =
-                (IdempotentRepository<String>) 
resolveMessageIdRepository(routeContext);
+        IdempotentRepository<String> idempotentRepository = 
resolveMessageIdRepository(routeContext);
         ObjectHelper.notNull(idempotentRepository, "idempotentRepository", 
this);
 
         Expression expression = getExpression().createExpression(routeContext);
@@ -231,6 +230,7 @@ public class IdempotentConsumerDefinition extends 
ExpressionNode {
         boolean eager = getEager() == null || getEager();
         boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate();
         boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure();
+
         // these boolean should be false by default
         boolean completionEager = getCompletionEager() != null && 
getCompletionEager();
 
@@ -243,10 +243,11 @@ public class IdempotentConsumerDefinition extends 
ExpressionNode {
      * @param routeContext route context
      * @return the repository
      */
-    protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext 
routeContext) {
+    @SuppressWarnings("unchecked")
+    protected <T> IdempotentRepository<T> 
resolveMessageIdRepository(RouteContext routeContext) {
         if (messageIdRepositoryRef != null) {
             idempotentRepository = 
routeContext.mandatoryLookup(messageIdRepositoryRef, 
IdempotentRepository.class);
         }
-        return idempotentRepository;
+        return (IdempotentRepository<T>)idempotentRepository;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
index 7bff217..37efcc6 100644
--- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -27,6 +27,8 @@ import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.Processor;
+import org.apache.camel.builder.AggregationStrategyClause;
+import org.apache.camel.builder.ProcessClause;
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
@@ -106,6 +108,18 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition> i
     // 
-------------------------------------------------------------------------
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public AggregationStrategyClause<MulticastDefinition> 
aggregationStrategy() {
+        AggregationStrategyClause<MulticastDefinition> clause = new 
AggregationStrategyClause<>(this);
+        setAggregationStrategy(clause);
+        return clause;
+    }
+
+    /**
      * Sets the AggregationStrategy to be used to assemble the replies from 
the multicasts, into a single outgoing message from the Multicast.
      * By default Camel will use the last reply as the outgoing message. You 
can also use a POJO as the AggregationStrategy.
      * If an exception is thrown from the aggregate method in the 
AggregationStrategy, then by default, that exception
@@ -248,6 +262,18 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition> i
     }
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public ProcessClause<MulticastDefinition> onPrepare() {
+        ProcessClause<MulticastDefinition> clause = new ProcessClause<>(this);
+        setOnPrepare(clause);
+        return clause;
+    }
+
+    /**
      * Uses the {@link Processor} when preparing the {@link 
org.apache.camel.Exchange} to be send.
      * This can be used to deep-clone messages that should be send, or any 
custom logic needed before
      * the exchange is send.

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 21fbe2e..c40a0bd 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAnyAttribute;
@@ -73,6 +74,7 @@ import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.LifecycleStrategy;
 import org.apache.camel.spi.Policy;
 import org.apache.camel.spi.RouteContext;
+import org.apache.camel.support.ExpressionAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1408,6 +1410,19 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
     }
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer() 
{
+        IdempotentConsumerDefinition answer = new 
IdempotentConsumerDefinition();
+        addOutput(answer);
+
+        return ExpressionClause.createAndSetExpression(answer);
+    }
+
+    /**
      * <a href="http://camel.apache.org/idempotent-consumer.html";>Idempotent 
consumer EIP:</a>
      * Creates an {@link 
org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
      * to avoid duplicate messages
@@ -2096,7 +2111,7 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      */
     public ExpressionClause<AggregateDefinition> aggregate(AggregationStrategy 
aggregationStrategy) {
         AggregateDefinition answer = new AggregateDefinition();
-        ExpressionClause<AggregateDefinition> clause = new 
ExpressionClause<AggregateDefinition>(answer);
+        ExpressionClause<AggregateDefinition> clause = new 
ExpressionClause<>(answer);
         answer.setExpression(clause);
         answer.setAggregationStrategy(aggregationStrategy);
         addOutput(answer);
@@ -2173,6 +2188,19 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
     }
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public ExpressionClause<ThrottleDefinition> throttle() {
+        ThrottleDefinition answer = new ThrottleDefinition();
+        addOutput(answer);
+
+        return ExpressionClause.createAndSetExpression(answer);
+    }
+
+    /**
      * <a href="http://camel.apache.org/throttler.html";>Throttler EIP:</a>
      * Creates a throttler allowing you to ensure that a specific endpoint 
does not get overloaded,
      * or that we don't exceed an agreed SLA with some external service.
@@ -2246,6 +2274,21 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
     }
 
     /**
+     * TODO: document
+     * Note: this is experimental and subject to changes in future releases.
+     *
+     * @return the builder
+     */
+    public ExpressionClause<LoopDefinition> loopDoWhile() {
+        LoopDefinition loop = new LoopDefinition();
+        loop.setDoWhile(true);
+
+        addOutput(loop);
+
+        return ExpressionClause.createAndSetExpression(loop);
+    }
+
+    /**
      * <a href="http://camel.apache.org/loop.html";>Loop EIP:</a>
      * Creates a loop allowing to process the a message a number of times and 
possibly process them
      * in a different way.
@@ -3094,6 +3137,26 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
     }
 
     /**
+     * Adds a processor which sets the header on the IN message
+     *
+     * @param name  the header name
+     * @param supplier the supplier used to set the header
+     * @return the builder
+     */
+    @SuppressWarnings("unchecked")
+    public Type setHeader(String name, final Supplier<Object> supplier) {
+        SetHeaderDefinition answer = new SetHeaderDefinition(name, new 
ExpressionAdapter() {
+            @Override
+            public Object evaluate(Exchange exchange) {
+                return supplier.get();
+            }
+        });
+
+        addOutput(answer);
+        return (Type) this;
+    }
+
+    /**
      * Adds a processor which sets the header on the OUT message
      *
      * @param name  the header name
@@ -4021,5 +4084,4 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
     public String getLabel() {
         return "";
     }
-    
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java 
b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index ce3fdca..6e5967e 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -130,6 +130,24 @@ public final class ExchangeHelper {
     }
 
     /**
+     * Gets an header or property of the correct type
+     *
+     * @param exchange      the exchange
+     * @param name          the name of the header or the property
+     * @param type          the type
+     * @return the header or property value
+     * @throws TypeConversionException is thrown if error during type 
conversion
+     * @throws NoSuchHeaderException is thrown if no headers exists
+     */
+    public static <T> T getHeaderOrProperty(Exchange exchange, String name, 
Class<T> type) throws TypeConversionException {
+        T answer = exchange.getIn().getHeader(name, type);
+        if (answer == null) {
+            answer = exchange.getProperty(name, type);
+        }
+        return answer;
+    }
+
+    /**
      * Returns the mandatory inbound message body of the correct type or throws
      * an exception if it is not present
      *

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java 
b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
new file mode 100644
index 0000000..4f8f845
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.util.function;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+public final class Suppliers {
+    private Suppliers() {
+    }
+
+    public static <T> Supplier<T> memorize(Supplier<T> supplier) {
+        final AtomicReference<T> valueHolder = new AtomicReference<>();
+        return () -> {
+            T supplied = valueHolder.get();
+            if (supplied == null) {
+                synchronized (valueHolder) {
+                    supplied = valueHolder.get();
+                    if (supplied == null) {
+                        supplied = Objects.requireNonNull(supplier.get(), 
"Supplier should not return null");
+                        valueHolder.lazySet(supplied);
+                    }
+                }
+            }
+            return supplied;
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java 
b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
index 43bd8ff..ee14ca0 100644
--- a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
+++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java
@@ -90,13 +90,18 @@ public class DefaultExchangeTest extends 
ExchangeTestSupport {
         assertEquals(new Integer(123), exchange.getIn().getHeader("bar", 
Integer.class));
         assertEquals("123", exchange.getIn().getHeader("bar", String.class));
         assertEquals(123, exchange.getIn().getHeader("bar", 234));
+        assertEquals(123, exchange.getIn().getHeader("bar", () -> 456));
+        assertEquals(456, exchange.getIn().getHeader("baz", () -> 456));
 
         assertEquals(123, exchange.getIn().getHeader("bar", 234));
         assertEquals(new Integer(123), exchange.getIn().getHeader("bar", 234, 
Integer.class));
         assertEquals("123", exchange.getIn().getHeader("bar", "234", 
String.class));
+        assertEquals("123", exchange.getIn().getHeader("bar", () -> "456", 
String.class));
+        assertEquals("456", exchange.getIn().getHeader("baz", () -> "456", 
String.class));
 
         assertEquals(234, exchange.getIn().getHeader("cheese", 234));
         assertEquals("234", exchange.getIn().getHeader("cheese", 234, 
String.class));
+        assertEquals("456", exchange.getIn().getHeader("cheese", () -> 456, 
String.class));
     }
 
     public void testProperty() throws Exception {

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java 
b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
new file mode 100644
index 0000000..4f68bc0
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.ExchangeHelper;
+
+public class DynamicRouter4Test extends ContextTestSupport {
+    public void testDynamicRouter() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:c").expectedMessageCount(1);
+
+        template.sendBody("direct:start-1", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start-1")
+                    .dynamicRouter()
+                        .exchange(DynamicRouter4Test::slip);
+            }
+        };
+    }
+
+    public static String slip(Exchange exchange) {
+        String previous = ExchangeHelper.getHeaderOrProperty(exchange, 
Exchange.SLIP_ENDPOINT, String.class);
+        if (previous == null) {
+            return "mock:a,mock:b";
+        } else if ("mock://b".equals(previous)) {
+            return "mock:c";
+        }
+
+        // no more so return null
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
new file mode 100644
index 0000000..9afd3f9
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+
+public class IdempotentConsumerDslTest extends ContextTestSupport {
+
+    public void testDuplicateMessages() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("one", "two", "three");
+
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "two", "messageId", "2");
+        template.sendBodyAndHeader("direct:start", "one", "messageId", "1");
+        template.sendBodyAndHeader("direct:start", "three", "messageId", "3");
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:start")
+                    .idempotentConsumer()
+                        .message(m -> m.getHeader("messageId"))
+                        
.messageIdRepository(MemoryIdempotentRepository.memoryIdempotentRepository(200))
+                    .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java 
b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
index 2ef927a..54b5f6a 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java
@@ -21,11 +21,20 @@ import org.apache.camel.builder.RouteBuilder;
 
 public class LoopDoWhileTest extends ContextTestSupport {
 
-    public void testLoopDoWhile() throws Exception {
+    public void testLoopDoWhileSimple() throws Exception {
         getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
         getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", 
"AAAA", "AAAAA");
 
-        template.sendBody("direct:start", "A");
+        template.sendBody("direct:simple", "A");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testLoopDoWhileFunctional() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA");
+        getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", 
"AAAA", "AAAAA");
+
+        template.sendBody("direct:functional", "A");
 
         assertMockEndpointsSatisfied();
     }
@@ -35,12 +44,20 @@ public class LoopDoWhileTest extends ContextTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:start")
+                from("direct:simple")
                     .loopDoWhile(simple("${body.length} <= 5"))
                         .to("mock:loop")
                         .transform(body().append("A"))
                     .end()
                     .to("mock:result");
+                from("direct:functional")
+                    .loopDoWhile()
+                        .body(String.class, b -> b.length() <= 5)
+                        .to("mock:loop")
+                        .transform()
+                            .body(String.class, b -> b += "A")
+                    .end()
+                    .to("mock:result");
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java 
b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
new file mode 100644
index 0000000..a800e36
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class MulticastDslTest extends ContextTestSupport {
+    public void testMulticastDsl() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.expectedHeaderReceived("onPrepare", true);
+        mock.expectedBodiesReceived(5);
+
+        template.sendBody("direct:start", 1);
+
+        mock.assertIsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .multicast()
+                        .onPrepare()
+                            .message(m -> m.setHeader("onPrepare", true))
+                        .aggregationStrategy()
+                            .body(Integer.class, (o, n) -> o != null ? o + n : 
n)
+                        .to("direct:increase-by-1")
+                        .to("direct:increase-by-2")
+                        .end()
+                    .to("mock:result");
+
+                from("direct:increase-by-1")
+                    .bean(new Increase(1));
+                from("direct:increase-by-2")
+                    .bean(new Increase(2));
+            }
+        };
+    }
+
+    public static class Increase {
+        private final int amount;
+        public Increase(int amount) {
+            this.amount = amount;
+        }
+
+        public int add(int num) {
+            return num + amount;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java 
b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
new file mode 100644
index 0000000..9d296b8
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class RoutingSlipDslTest extends ContextTestSupport {
+
+    public void testRoutingSlipDsl() throws Exception {
+        MockEndpoint x = getMockEndpoint("mock:x");
+        MockEndpoint y = getMockEndpoint("mock:y");
+        MockEndpoint z = getMockEndpoint("mock:z");
+
+        x.expectedBodiesReceived("foo", "bar");
+        y.expectedBodiesReceived("foo", "bar");
+        z.expectedBodiesReceived("foo", "bar");
+
+        template.sendBodyAndHeader("direct:a", "foo", "recipientListHeader", 
"mock:x,mock:y,mock:z");
+        template.sendBodyAndHeader("direct:a", "bar", "recipientListHeader", 
"mock:x,mock:y,mock:z");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:a").routingSlip()
+                    .message(m -> m.getHeader("recipientListHeader", 
String.class).split(","))
+                    .end();
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java 
b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
new file mode 100644
index 0000000..a971332
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class ThrottlerDslTest extends ContextTestSupport {
+    private static final int INTERVAL = 500;
+    protected int messageCount = 9;
+
+    protected boolean canTest() {
+        // skip test on windows as it does not run well there
+        return !isPlatform("windows");
+    }
+
+    public void testDsl() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", 
MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(messageCount);
+
+        ExecutorService executor = Executors.newFixedThreadPool(messageCount);
+
+        long start = System.currentTimeMillis();
+        for (int i = 0; i < messageCount; i++) {
+            executor.execute(() -> template.sendBodyAndHeader("direct:start", 
"payload", "ThrottleCount", 1));
+        }
+
+        // let's wait for the exchanges to arrive
+        resultEndpoint.assertIsSatisfied();
+
+        // now assert that they have actually been throttled
+        long minimumTime = (messageCount - 1) * INTERVAL;
+        // add a little slack
+        long delta = System.currentTimeMillis() - start + 200;
+        assertTrue("Should take at least " + minimumTime + "ms, was: " + 
delta, delta >= minimumTime);
+        executor.shutdownNow();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("direct:start")
+                    .throttle()
+                        .message(m -> m.getHeader("ThrottleCount", 
Integer.class))
+                        .timePeriodMillis(INTERVAL)
+                    .to("log:result", "mock:result");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
index 55fd14e..f8d1db4 100644
--- 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java
@@ -21,20 +21,21 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
 
 public class AggregateDslTest extends ContextTestSupport {
 
     public void testAggregate() throws Exception {
-        MockEndpoint mock = getMockEndpoint("mock:aggregated");
-        mock.expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8");
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("0,3", 
"1,4", "2,5");
+        
getMockEndpoint("mock:aggregated-supplier").expectedBodiesReceived("0,3,6", 
"1,4,7", "2,5,8");
 
         for (int i = 0; i < 9; i++) {
             template.sendBodyAndHeader("direct:start", i, "type", i % 3);
+            template.sendBodyAndHeader("direct:start-supplier", i, "type", i % 
3);
         }
 
-        mock.assertIsSatisfied();
+        assertMockEndpointsSatisfied();
     }
 
     @Override
@@ -46,12 +47,38 @@ public class AggregateDslTest extends ContextTestSupport {
                     .aggregate()
                         .message(m -> m.getHeader("type"))
                         .strategy()
-                            .body(String.class, (o, n) ->  Stream.of(o, 
n).filter(Objects::nonNull).collect(Collectors.joining(",")))
+                            .body(String.class, AggregateDslTest::joinString)
                         .completion()
-                            .body(String.class, s -> s.length() == 5)
-                                    .to("mock:aggregated");
+                            .body(String.class, s -> s.split(",").length == 2)
+                    .to("mock:aggregated");
+
+                from("direct:start-supplier")
+                    .aggregate()
+                        .header("type")
+                        .strategy(AggregateDslTest::joinStringStrategy)
+                        .completion()
+                            .body(String.class, s -> s.split(",").length == 3)
+                    .to("mock:aggregated-supplier");
             }
         };
     }
+
+    // 
*************************************************************************
+    // Strategies
+    // 
*************************************************************************
+
+    private static String joinString(String o, String n) {
+        return Stream.of(o, 
n).filter(Objects::nonNull).collect(Collectors.joining(","));
+    }
+
+    private static Exchange joinStringStrategy(Exchange oldExchange, Exchange 
newExchange) {
+        newExchange.getIn().setBody(
+            joinString(
+                oldExchange != null ? 
oldExchange.getIn().getBody(String.class) : null,
+                newExchange.getIn().getBody(String.class))
+        );
+
+        return newExchange;
+    }
 }
 

Reply via email to