Repository: camel Updated Branches: refs/heads/master 8e0e3083e -> 8bc8484b1
CAMEL-10724: Improve Java DSL support for Java 8 Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8bc8484b Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8bc8484b Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8bc8484b Branch: refs/heads/master Commit: 8bc8484b1914f5cb29191e7b91fe48e02ca1f636 Parents: 8e0e308 Author: lburgazzoli <lburgazz...@gmail.com> Authored: Wed Jan 18 18:09:08 2017 +0100 Committer: lburgazzoli <lburgazz...@gmail.com> Committed: Thu Feb 16 18:10:22 2017 +0100 ---------------------------------------------------------------------- .../src/main/java/org/apache/camel/Message.java | 16 ++++- .../org/apache/camel/impl/DefaultMessage.java | 31 +++++++++ .../apache/camel/model/AggregateDefinition.java | 20 ++++++ .../model/IdempotentConsumerDefinition.java | 9 +-- .../apache/camel/model/MulticastDefinition.java | 26 +++++++ .../apache/camel/model/ProcessorDefinition.java | 66 +++++++++++++++++- .../org/apache/camel/util/ExchangeHelper.java | 18 +++++ .../apache/camel/util/function/Suppliers.java | 43 ++++++++++++ .../apache/camel/impl/DefaultExchangeTest.java | 5 ++ .../camel/processor/DynamicRouter4Test.java | 58 ++++++++++++++++ .../processor/IdempotentConsumerDslTest.java | 53 ++++++++++++++ .../apache/camel/processor/LoopDoWhileTest.java | 23 ++++++- .../camel/processor/MulticastDslTest.java | 69 +++++++++++++++++++ .../camel/processor/RoutingSlipDslTest.java | 49 +++++++++++++ .../camel/processor/ThrottlerDslTest.java | 72 ++++++++++++++++++++ .../processor/aggregator/AggregateDslTest.java | 41 +++++++++-- 16 files changed, 582 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/Message.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/Message.java b/camel-core/src/main/java/org/apache/camel/Message.java index 8cfeae0..a0e9c4d 100644 --- a/camel-core/src/main/java/org/apache/camel/Message.java +++ b/camel-core/src/main/java/org/apache/camel/Message.java @@ -18,7 +18,7 @@ package org.apache.camel; import java.util.Map; import java.util.Set; - +import java.util.function.Supplier; import javax.activation.DataHandler; /** @@ -88,6 +88,13 @@ public interface Message { Object getHeader(String name, Object defaultValue); /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + */ + Object getHeader(String name, Supplier<Object> defaultValueSupplier); + + /** * Returns a header associated with this message by name and specifying the * type required * @@ -112,6 +119,13 @@ public interface Message { <T> T getHeader(String name, Object defaultValue, Class<T> type); /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + */ + <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type); + + /** * Sets a header on the message * * @param name of the header http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java index 848586a..1e49766 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultMessage.java @@ -20,6 +20,7 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import javax.activation.DataHandler; import org.apache.camel.Attachment; @@ -65,6 +66,11 @@ public class DefaultMessage extends MessageSupport { return answer != null ? answer : defaultValue; } + public Object getHeader(String name, Supplier<Object> defaultValueSupplier) { + Object answer = getHeaders().get(name); + return answer != null ? answer : defaultValueSupplier.get(); + } + @SuppressWarnings("unchecked") public <T> T getHeader(String name, Class<T> type) { Object value = getHeader(name); @@ -115,6 +121,31 @@ public class DefaultMessage extends MessageSupport { } } + @SuppressWarnings("unchecked") + public <T> T getHeader(String name, Supplier<Object> defaultValueSupplier, Class<T> type) { + Object value = getHeader(name, defaultValueSupplier); + if (value == null) { + // lets avoid NullPointerException when converting to boolean for null values + if (boolean.class.isAssignableFrom(type)) { + return (T) Boolean.FALSE; + } + return null; + } + + // eager same instance type test to avoid the overhead of invoking the type converter + // if already same type + if (type.isInstance(value)) { + return type.cast(value); + } + + Exchange e = getExchange(); + if (e != null) { + return e.getContext().getTypeConverter().convertTo(type, e, value); + } else { + return type.cast(value); + } + } + public void setHeader(String name, Object value) { if (headers == null) { headers = createHeaders(); http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java index ec7d396..12f0a13 100644 --- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java @@ -814,6 +814,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition } /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public AggregateDefinition strategy(AggregationStrategy aggregationStrategy) { + return aggregationStrategy(aggregationStrategy); + } + + /** * Sets the aggregate strategy to use * * @param aggregationStrategy the aggregate strategy to use @@ -930,6 +940,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition } /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public AggregateDefinition completion(@AsPredicate Predicate predicate) { + return completionPredicate(predicate); + } + + /** * Indicates to complete all current aggregated exchanges when the context is stopped */ public AggregateDefinition forceCompletionOnStop() { http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java index 256394d..9a58704 100644 --- a/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/IdempotentConsumerDefinition.java @@ -221,8 +221,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode { public Processor createProcessor(RouteContext routeContext) throws Exception { Processor childProcessor = this.createChildProcessor(routeContext, true); - IdempotentRepository<String> idempotentRepository = - (IdempotentRepository<String>) resolveMessageIdRepository(routeContext); + IdempotentRepository<String> idempotentRepository = resolveMessageIdRepository(routeContext); ObjectHelper.notNull(idempotentRepository, "idempotentRepository", this); Expression expression = getExpression().createExpression(routeContext); @@ -231,6 +230,7 @@ public class IdempotentConsumerDefinition extends ExpressionNode { boolean eager = getEager() == null || getEager(); boolean duplicate = getSkipDuplicate() == null || getSkipDuplicate(); boolean remove = getRemoveOnFailure() == null || getRemoveOnFailure(); + // these boolean should be false by default boolean completionEager = getCompletionEager() != null && getCompletionEager(); @@ -243,10 +243,11 @@ public class IdempotentConsumerDefinition extends ExpressionNode { * @param routeContext route context * @return the repository */ - protected IdempotentRepository<?> resolveMessageIdRepository(RouteContext routeContext) { + @SuppressWarnings("unchecked") + protected <T> IdempotentRepository<T> resolveMessageIdRepository(RouteContext routeContext) { if (messageIdRepositoryRef != null) { idempotentRepository = routeContext.mandatoryLookup(messageIdRepositoryRef, IdempotentRepository.class); } - return idempotentRepository; + return (IdempotentRepository<T>)idempotentRepository; } } http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java index 7bff217..37efcc6 100644 --- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java @@ -27,6 +27,8 @@ import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.CamelContextAware; import org.apache.camel.Processor; +import org.apache.camel.builder.AggregationStrategyClause; +import org.apache.camel.builder.ProcessClause; import org.apache.camel.processor.MulticastProcessor; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; @@ -106,6 +108,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i // ------------------------------------------------------------------------- /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public AggregationStrategyClause<MulticastDefinition> aggregationStrategy() { + AggregationStrategyClause<MulticastDefinition> clause = new AggregationStrategyClause<>(this); + setAggregationStrategy(clause); + return clause; + } + + /** * Sets the AggregationStrategy to be used to assemble the replies from the multicasts, into a single outgoing message from the Multicast. * By default Camel will use the last reply as the outgoing message. You can also use a POJO as the AggregationStrategy. * If an exception is thrown from the aggregate method in the AggregationStrategy, then by default, that exception @@ -248,6 +262,18 @@ public class MulticastDefinition extends OutputDefinition<MulticastDefinition> i } /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public ProcessClause<MulticastDefinition> onPrepare() { + ProcessClause<MulticastDefinition> clause = new ProcessClause<>(this); + setOnPrepare(clause); + return clause; + } + + /** * Uses the {@link Processor} when preparing the {@link org.apache.camel.Exchange} to be send. * This can be used to deep-clone messages that should be send, or any custom logic needed before * the exchange is send. http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 21fbe2e..c40a0bd 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAnyAttribute; @@ -73,6 +74,7 @@ import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.LifecycleStrategy; import org.apache.camel.spi.Policy; import org.apache.camel.spi.RouteContext; +import org.apache.camel.support.ExpressionAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1408,6 +1410,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> } /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer() { + IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition(); + addOutput(answer); + + return ExpressionClause.createAndSetExpression(answer); + } + + /** * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a> * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer} * to avoid duplicate messages @@ -2096,7 +2111,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> */ public ExpressionClause<AggregateDefinition> aggregate(AggregationStrategy aggregationStrategy) { AggregateDefinition answer = new AggregateDefinition(); - ExpressionClause<AggregateDefinition> clause = new ExpressionClause<AggregateDefinition>(answer); + ExpressionClause<AggregateDefinition> clause = new ExpressionClause<>(answer); answer.setExpression(clause); answer.setAggregationStrategy(aggregationStrategy); addOutput(answer); @@ -2173,6 +2188,19 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> } /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public ExpressionClause<ThrottleDefinition> throttle() { + ThrottleDefinition answer = new ThrottleDefinition(); + addOutput(answer); + + return ExpressionClause.createAndSetExpression(answer); + } + + /** * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a> * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded, * or that we don't exceed an agreed SLA with some external service. @@ -2246,6 +2274,21 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> } /** + * TODO: document + * Note: this is experimental and subject to changes in future releases. + * + * @return the builder + */ + public ExpressionClause<LoopDefinition> loopDoWhile() { + LoopDefinition loop = new LoopDefinition(); + loop.setDoWhile(true); + + addOutput(loop); + + return ExpressionClause.createAndSetExpression(loop); + } + + /** * <a href="http://camel.apache.org/loop.html">Loop EIP:</a> * Creates a loop allowing to process the a message a number of times and possibly process them * in a different way. @@ -3094,6 +3137,26 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> } /** + * Adds a processor which sets the header on the IN message + * + * @param name the header name + * @param supplier the supplier used to set the header + * @return the builder + */ + @SuppressWarnings("unchecked") + public Type setHeader(String name, final Supplier<Object> supplier) { + SetHeaderDefinition answer = new SetHeaderDefinition(name, new ExpressionAdapter() { + @Override + public Object evaluate(Exchange exchange) { + return supplier.get(); + } + }); + + addOutput(answer); + return (Type) this; + } + + /** * Adds a processor which sets the header on the OUT message * * @param name the header name @@ -4021,5 +4084,4 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> public String getLabel() { return ""; } - } http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java index ce3fdca..6e5967e 100644 --- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java +++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java @@ -130,6 +130,24 @@ public final class ExchangeHelper { } /** + * Gets an header or property of the correct type + * + * @param exchange the exchange + * @param name the name of the header or the property + * @param type the type + * @return the header or property value + * @throws TypeConversionException is thrown if error during type conversion + * @throws NoSuchHeaderException is thrown if no headers exists + */ + public static <T> T getHeaderOrProperty(Exchange exchange, String name, Class<T> type) throws TypeConversionException { + T answer = exchange.getIn().getHeader(name, type); + if (answer == null) { + answer = exchange.getProperty(name, type); + } + return answer; + } + + /** * Returns the mandatory inbound message body of the correct type or throws * an exception if it is not present * http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java new file mode 100644 index 0000000..4f8f845 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/util/function/Suppliers.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.util.function; + +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +public final class Suppliers { + private Suppliers() { + } + + public static <T> Supplier<T> memorize(Supplier<T> supplier) { + final AtomicReference<T> valueHolder = new AtomicReference<>(); + return () -> { + T supplied = valueHolder.get(); + if (supplied == null) { + synchronized (valueHolder) { + supplied = valueHolder.get(); + if (supplied == null) { + supplied = Objects.requireNonNull(supplier.get(), "Supplier should not return null"); + valueHolder.lazySet(supplied); + } + } + } + return supplied; + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java index 43bd8ff..ee14ca0 100644 --- a/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java +++ b/camel-core/src/test/java/org/apache/camel/impl/DefaultExchangeTest.java @@ -90,13 +90,18 @@ public class DefaultExchangeTest extends ExchangeTestSupport { assertEquals(new Integer(123), exchange.getIn().getHeader("bar", Integer.class)); assertEquals("123", exchange.getIn().getHeader("bar", String.class)); assertEquals(123, exchange.getIn().getHeader("bar", 234)); + assertEquals(123, exchange.getIn().getHeader("bar", () -> 456)); + assertEquals(456, exchange.getIn().getHeader("baz", () -> 456)); assertEquals(123, exchange.getIn().getHeader("bar", 234)); assertEquals(new Integer(123), exchange.getIn().getHeader("bar", 234, Integer.class)); assertEquals("123", exchange.getIn().getHeader("bar", "234", String.class)); + assertEquals("123", exchange.getIn().getHeader("bar", () -> "456", String.class)); + assertEquals("456", exchange.getIn().getHeader("baz", () -> "456", String.class)); assertEquals(234, exchange.getIn().getHeader("cheese", 234)); assertEquals("234", exchange.getIn().getHeader("cheese", 234, String.class)); + assertEquals("456", exchange.getIn().getHeader("cheese", () -> 456, String.class)); } public void testProperty() throws Exception { http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java new file mode 100644 index 0000000..4f68bc0 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/DynamicRouter4Test.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.util.ExchangeHelper; + +public class DynamicRouter4Test extends ContextTestSupport { + public void testDynamicRouter() throws Exception { + getMockEndpoint("mock:a").expectedMessageCount(1); + getMockEndpoint("mock:b").expectedMessageCount(1); + getMockEndpoint("mock:c").expectedMessageCount(1); + + template.sendBody("direct:start-1", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start-1") + .dynamicRouter() + .exchange(DynamicRouter4Test::slip); + } + }; + } + + public static String slip(Exchange exchange) { + String previous = ExchangeHelper.getHeaderOrProperty(exchange, Exchange.SLIP_ENDPOINT, String.class); + if (previous == null) { + return "mock:a,mock:b"; + } else if ("mock://b".equals(previous)) { + return "mock:c"; + } + + // no more so return null + return null; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java new file mode 100644 index 0000000..9afd3f9 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerDslTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.idempotent.MemoryIdempotentRepository; + +public class IdempotentConsumerDslTest extends ContextTestSupport { + + public void testDuplicateMessages() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("one", "two", "three"); + + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "two", "messageId", "2"); + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "two", "messageId", "2"); + template.sendBodyAndHeader("direct:start", "one", "messageId", "1"); + template.sendBodyAndHeader("direct:start", "three", "messageId", "3"); + + mock.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:start") + .idempotentConsumer() + .message(m -> m.getHeader("messageId")) + .messageIdRepository(MemoryIdempotentRepository.memoryIdempotentRepository(200)) + .to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java index 2ef927a..54b5f6a 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/LoopDoWhileTest.java @@ -21,11 +21,20 @@ import org.apache.camel.builder.RouteBuilder; public class LoopDoWhileTest extends ContextTestSupport { - public void testLoopDoWhile() throws Exception { + public void testLoopDoWhileSimple() throws Exception { getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA"); getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA"); - template.sendBody("direct:start", "A"); + template.sendBody("direct:simple", "A"); + + assertMockEndpointsSatisfied(); + } + + public void testLoopDoWhileFunctional() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("AAAAAA"); + getMockEndpoint("mock:loop").expectedBodiesReceived("A", "AA", "AAA", "AAAA", "AAAAA"); + + template.sendBody("direct:functional", "A"); assertMockEndpointsSatisfied(); } @@ -35,12 +44,20 @@ public class LoopDoWhileTest extends ContextTestSupport { return new RouteBuilder() { @Override public void configure() throws Exception { - from("direct:start") + from("direct:simple") .loopDoWhile(simple("${body.length} <= 5")) .to("mock:loop") .transform(body().append("A")) .end() .to("mock:result"); + from("direct:functional") + .loopDoWhile() + .body(String.class, b -> b.length() <= 5) + .to("mock:loop") + .transform() + .body(String.class, b -> b += "A") + .end() + .to("mock:result"); } }; } http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java new file mode 100644 index 0000000..a800e36 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/MulticastDslTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +public class MulticastDslTest extends ContextTestSupport { + public void testMulticastDsl() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.expectedHeaderReceived("onPrepare", true); + mock.expectedBodiesReceived(5); + + template.sendBody("direct:start", 1); + + mock.assertIsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .multicast() + .onPrepare() + .message(m -> m.setHeader("onPrepare", true)) + .aggregationStrategy() + .body(Integer.class, (o, n) -> o != null ? o + n : n) + .to("direct:increase-by-1") + .to("direct:increase-by-2") + .end() + .to("mock:result"); + + from("direct:increase-by-1") + .bean(new Increase(1)); + from("direct:increase-by-2") + .bean(new Increase(2)); + } + }; + } + + public static class Increase { + private final int amount; + public Increase(int amount) { + this.amount = amount; + } + + public int add(int num) { + return num + amount; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java new file mode 100644 index 0000000..9d296b8 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipDslTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +public class RoutingSlipDslTest extends ContextTestSupport { + + public void testRoutingSlipDsl() throws Exception { + MockEndpoint x = getMockEndpoint("mock:x"); + MockEndpoint y = getMockEndpoint("mock:y"); + MockEndpoint z = getMockEndpoint("mock:z"); + + x.expectedBodiesReceived("foo", "bar"); + y.expectedBodiesReceived("foo", "bar"); + z.expectedBodiesReceived("foo", "bar"); + + template.sendBodyAndHeader("direct:a", "foo", "recipientListHeader", "mock:x,mock:y,mock:z"); + template.sendBodyAndHeader("direct:a", "bar", "recipientListHeader", "mock:x,mock:y,mock:z"); + + assertMockEndpointsSatisfied(); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:a").routingSlip() + .message(m -> m.getHeader("recipientListHeader", String.class).split(",")) + .end(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java new file mode 100644 index 0000000..a971332 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +public class ThrottlerDslTest extends ContextTestSupport { + private static final int INTERVAL = 500; + protected int messageCount = 9; + + protected boolean canTest() { + // skip test on windows as it does not run well there + return !isPlatform("windows"); + } + + public void testDsl() throws Exception { + if (!canTest()) { + return; + } + + MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); + resultEndpoint.expectedMessageCount(messageCount); + + ExecutorService executor = Executors.newFixedThreadPool(messageCount); + + long start = System.currentTimeMillis(); + for (int i = 0; i < messageCount; i++) { + executor.execute(() -> template.sendBodyAndHeader("direct:start", "payload", "ThrottleCount", 1)); + } + + // let's wait for the exchanges to arrive + resultEndpoint.assertIsSatisfied(); + + // now assert that they have actually been throttled + long minimumTime = (messageCount - 1) * INTERVAL; + // add a little slack + long delta = System.currentTimeMillis() - start + 200; + assertTrue("Should take at least " + minimumTime + "ms, was: " + delta, delta >= minimumTime); + executor.shutdownNow(); + } + + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + public void configure() { + from("direct:start") + .throttle() + .message(m -> m.getHeader("ThrottleCount", Integer.class)) + .timePeriodMillis(INTERVAL) + .to("log:result", "mock:result"); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/8bc8484b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java index 55fd14e..f8d1db4 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDslTest.java @@ -21,20 +21,21 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; public class AggregateDslTest extends ContextTestSupport { public void testAggregate() throws Exception { - MockEndpoint mock = getMockEndpoint("mock:aggregated"); - mock.expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8"); + getMockEndpoint("mock:aggregated").expectedBodiesReceived("0,3", "1,4", "2,5"); + getMockEndpoint("mock:aggregated-supplier").expectedBodiesReceived("0,3,6", "1,4,7", "2,5,8"); for (int i = 0; i < 9; i++) { template.sendBodyAndHeader("direct:start", i, "type", i % 3); + template.sendBodyAndHeader("direct:start-supplier", i, "type", i % 3); } - mock.assertIsSatisfied(); + assertMockEndpointsSatisfied(); } @Override @@ -46,12 +47,38 @@ public class AggregateDslTest extends ContextTestSupport { .aggregate() .message(m -> m.getHeader("type")) .strategy() - .body(String.class, (o, n) -> Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(","))) + .body(String.class, AggregateDslTest::joinString) .completion() - .body(String.class, s -> s.length() == 5) - .to("mock:aggregated"); + .body(String.class, s -> s.split(",").length == 2) + .to("mock:aggregated"); + + from("direct:start-supplier") + .aggregate() + .header("type") + .strategy(AggregateDslTest::joinStringStrategy) + .completion() + .body(String.class, s -> s.split(",").length == 3) + .to("mock:aggregated-supplier"); } }; } + + // ************************************************************************* + // Strategies + // ************************************************************************* + + private static String joinString(String o, String n) { + return Stream.of(o, n).filter(Objects::nonNull).collect(Collectors.joining(",")); + } + + private static Exchange joinStringStrategy(Exchange oldExchange, Exchange newExchange) { + newExchange.getIn().setBody( + joinString( + oldExchange != null ? oldExchange.getIn().getBody(String.class) : null, + newExchange.getIn().getBody(String.class)) + ); + + return newExchange; + } }