CAMEL-4596: pollEnrich supports dynamic uris.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1b4af69c Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1b4af69c Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1b4af69c Branch: refs/heads/master Commit: 1b4af69c1bb2b03057feffc4ddd4e84baa2fba0b Parents: b5be4d6 Author: Claus Ibsen <[email protected]> Authored: Mon Jul 13 10:42:20 2015 +0200 Committer: Claus Ibsen <[email protected]> Committed: Mon Jul 13 10:42:20 2015 +0200 ---------------------------------------------------------------------- .../apache/camel/impl/DefaultCamelContext.java | 1 + .../apache/camel/impl/EmptyConsumerCache.java | 76 ++++++++++++++++++++ .../camel/model/PollEnrichDefinition.java | 24 +++++++ .../apache/camel/processor/PollEnricher.java | 21 +++++- .../processor/RecipientListNoCacheTest.java | 2 +- .../PollEnrichExpressionNoCacheTest.java | 47 ++++++++++++ .../enricher/PollEnrichExpressionTest.java | 6 +- .../camel/spring/processor/pollEnricherRef.xml | 8 +-- 8 files changed, 177 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index 7056932..c24674e 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -2720,6 +2720,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon // special for executorServiceManager as want to stop it manually doAddService(executorServiceManager, false); addService(producerServicePool); + addService(pollingConsumerServicePool); addService(inflightRepository); addService(asyncProcessorAwaitManager); addService(shutdownStrategy); http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java b/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java new file mode 100644 index 0000000..219371a --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/impl/EmptyConsumerCache.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl; + +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.FailedToCreateConsumerException; +import org.apache.camel.IsSingleton; +import org.apache.camel.PollingConsumer; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; + +/** + * A {@link ConsumerCache} which is always empty and does not cache any {@link org.apache.camel.Consumer}s. + */ +public class EmptyConsumerCache extends ConsumerCache { + + public EmptyConsumerCache(Object source, CamelContext camelContext) { + super(source, camelContext, 0); + } + + @Override + public PollingConsumer acquirePollingConsumer(Endpoint endpoint) { + // always create a new consumer + PollingConsumer answer; + try { + answer = endpoint.createPollingConsumer(); + boolean singleton = true; + if (answer instanceof IsSingleton) { + singleton = ((IsSingleton) answer).isSingleton(); + } + if (getCamelContext().isStartingRoutes() && singleton) { + // if we are currently starting a route, then add as service and enlist in JMX + // - but do not enlist non-singletons in JMX + // - note addService will also start the service + getCamelContext().addService(answer); + } else { + // must then start service so producer is ready to be used + ServiceHelper.startService(answer); + } + } catch (Exception e) { + throw new FailedToCreateConsumerException(endpoint, e); + } + return answer; + } + + @Override + public void releasePollingConsumer(Endpoint endpoint, PollingConsumer pollingConsumer) { + // stop and shutdown the consumer as its not cache or reused + try { + ServiceHelper.stopAndShutdownService(pollingConsumer); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + @Override + public String toString() { + return "EmptyConsumerCache for source: " + getSource(); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java index eb1247b..18801d4 100644 --- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java @@ -52,6 +52,8 @@ public class PollEnrichDefinition extends NoOutputExpressionNode { private Boolean aggregateOnException; @XmlTransient private AggregationStrategy aggregationStrategy; + @XmlAttribute + private Integer cacheSize; public PollEnrichDefinition() { } @@ -89,6 +91,9 @@ public class PollEnrichDefinition extends NoOutputExpressionNode { if (getAggregateOnException() != null) { enricher.setAggregateOnException(getAggregateOnException()); } + if (getCacheSize() != null) { + enricher.setCacheSize(getCacheSize()); + } return enricher; } @@ -186,6 +191,18 @@ public class PollEnrichDefinition extends NoOutputExpressionNode { return this; } + /** + * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used + * to cache and reuse consumers when using this pollEnrich, when uris are reused. + * + * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off. + * @return the builder + */ + public PollEnrichDefinition cacheSize(int cacheSize) { + setCacheSize(cacheSize); + return this; + } + // Properties // ------------------------------------------------------------------------- @@ -237,4 +254,11 @@ public class PollEnrichDefinition extends NoOutputExpressionNode { this.aggregateOnException = aggregateOnException; } + public Integer getCacheSize() { + return cacheSize; + } + + public void setCacheSize(Integer cacheSize) { + this.cacheSize = cacheSize; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java index 9873cbb..de3c7b4 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -26,6 +26,7 @@ import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.PollingConsumer; import org.apache.camel.impl.ConsumerCache; +import org.apache.camel.impl.EmptyConsumerCache; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; @@ -59,6 +60,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw private final Expression expression; private long timeout; private boolean aggregateOnException; + private int cacheSize; /** * Creates a new {@link PollEnricher}. @@ -131,6 +133,14 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw this.aggregationStrategy = defaultAggregationStrategy(); } + public int getCacheSize() { + return cacheSize; + } + + public void setCacheSize(int cacheSize) { + this.cacheSize = cacheSize; + } + public void process(Exchange exchange) throws Exception { AsyncProcessorHelper.process(this, exchange); } @@ -275,7 +285,16 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAw protected void doStart() throws Exception { if (consumerCache == null) { // create consumer cache if we use dynamic expressions for computing the endpoints to poll - consumerCache = new ConsumerCache(this, getCamelContext()); + if (cacheSize < 0) { + consumerCache = new EmptyConsumerCache(this, camelContext); + LOG.debug("PollEnrich {} is not using ConsumerCache", this); + } else if (cacheSize == 0) { + consumerCache = new ConsumerCache(this, camelContext); + LOG.debug("PollEnrich {} using ConsumerCache with default cache size", this); + } else { + consumerCache = new ConsumerCache(this, camelContext, cacheSize); + LOG.debug("PollEnrich {} using ConsumerCache with cacheSize={}", this, cacheSize); + } } ServiceHelper.startServices(consumerCache, aggregationStrategy); } http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java index 122e72b..dd4b699 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/RecipientListNoCacheTest.java @@ -49,7 +49,7 @@ public class RecipientListNoCacheTest extends ContextTestSupport { return new RouteBuilder() { public void configure() { from("direct:a").recipientList( - header("recipientListHeader").tokenize(",")).cacheSize(0); + header("recipientListHeader").tokenize(",")).cacheSize(-1); } }; http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java new file mode 100644 index 0000000..5c125d6 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionNoCacheTest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.enricher; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +public class PollEnrichExpressionNoCacheTest extends ContextTestSupport { + + public void testPollEnricExpression() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World"); + + template.sendBody("seda:foo", "Hello World"); + template.sendBody("seda:bar", "Bye World"); + + template.sendBodyAndHeader("direct:start", null, "source", "seda:foo"); + template.sendBodyAndHeader("direct:start", null, "source", "seda:bar"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .pollEnrich().header("source").cacheSize(-1) + .to("mock:result"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java index 38a42ab..c14de38 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java @@ -21,14 +21,16 @@ import org.apache.camel.builder.RouteBuilder; public class PollEnrichExpressionTest extends ContextTestSupport { - public void testPollEnricExpression() throws Exception { - getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World"); + public void testPollEnrichExpression() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World", "Hi World"); template.sendBody("seda:foo", "Hello World"); template.sendBody("seda:bar", "Bye World"); + template.sendBody("seda:foo", "Hi World"); template.sendBodyAndHeader("direct:start", null, "source", "seda:foo"); template.sendBodyAndHeader("direct:start", null, "source", "seda:bar"); + template.sendBodyAndHeader("direct:start", null, "source", "seda:foo"); assertMockEndpointsSatisfied(); } http://git-wip-us.apache.org/repos/asf/camel/blob/1b4af69c/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml index c0e0c0b..3703be4 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml @@ -33,7 +33,7 @@ <route> <from uri="direct:enricher-test-1"/> <pollEnrich strategyRef="sampleAggregator"> - <ref>foo1</ref> + <simple>ref:foo1</simple> </pollEnrich> <to uri="mock:mock"/> </route> @@ -42,7 +42,7 @@ <route> <from uri="direct:enricher-test-2"/> <pollEnrich timeout="1000" strategyRef="sampleAggregator"> - <ref>foo2</ref> + <simple>ref:foo2</simple> </pollEnrich> <to uri="mock:mock"/> </route> @@ -50,7 +50,7 @@ <route> <from uri="direct:enricher-test-3"/> <pollEnrich timeout="-1" strategyRef="sampleAggregator"> - <ref>foo3</ref> + <simple>ref:foo3</simple> </pollEnrich> <to uri="mock:mock"/> </route> @@ -58,7 +58,7 @@ <route> <from uri="direct:enricher-test-4"/> <pollEnrich strategyRef="sampleAggregator"> - <ref>foo4</ref> + <simple>ref:foo4</simple> </pollEnrich> <to uri="mock:mock"/> </route>
