Repository: camel Updated Branches: refs/heads/master 71b43bc83 -> fe5960aef
CAMEL-4596: pollEnrich supports dynamic uris. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/9fd4d549 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/9fd4d549 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/9fd4d549 Branch: refs/heads/master Commit: 9fd4d549056c12754c0fa76e253e00580e8ceb7a Parents: 71b43bc Author: Claus Ibsen <[email protected]> Authored: Sun Jul 12 20:54:52 2015 +0200 Committer: Claus Ibsen <[email protected]> Committed: Mon Jul 13 08:14:47 2015 +0200 ---------------------------------------------------------------------- .../org/apache/camel/impl/ConsumerCache.java | 1 - .../camel/model/PollEnrichDefinition.java | 62 ++++++++++--- .../apache/camel/model/ProcessorDefinition.java | 31 +++++++ .../apache/camel/processor/PollEnricher.java | 93 +++++++++++++++++--- .../apache/camel/processor/SendProcessor.java | 1 - .../enricher/PollEnrichExpressionTest.java | 47 ++++++++++ 6 files changed, 211 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java b/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java index 6f60f46..d957efe 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java +++ b/camel-core/src/main/java/org/apache/camel/impl/ConsumerCache.java @@ -58,7 +58,6 @@ public class ConsumerCache extends ServiceSupport { this(source, camelContext, cache, camelContext.getPollingConsumerServicePool()); } - public ConsumerCache(Object source, CamelContext camelContext, Map<String, PollingConsumer> cache, ServicePool<Endpoint, PollingConsumer> pool) { this.camelContext = camelContext; this.consumers = cache; http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java index 7b6f9d1..a5d7cb1 100644 --- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java @@ -19,12 +19,16 @@ package org.apache.camel.model; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlElementRef; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.CamelContextAware; import org.apache.camel.Endpoint; +import org.apache.camel.Expression; +import org.apache.camel.PollingConsumer; import org.apache.camel.Processor; +import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.processor.PollEnricher; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; @@ -41,6 +45,8 @@ import org.apache.camel.util.ObjectHelper; @XmlRootElement(name = "pollEnrich") @XmlAccessorType(XmlAccessType.FIELD) public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinition> implements EndpointRequiredDefinition { + @XmlElementRef + private ExpressionDefinition expression; @XmlAttribute(name = "uri") private String resourceUri; // TODO: For Camel 3.0 we should remove this ref attribute as you can do that in the uri, by prefixing with ref: @@ -94,24 +100,33 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio @Override public Processor createProcessor(RouteContext routeContext) throws Exception { - if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef)) { - throw new IllegalArgumentException("Either uri or ref must be provided for resource endpoint"); + if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef) && expression == null) { + throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured"); } // lookup endpoint - Endpoint endpoint; + PollingConsumer consumer = null; if (resourceUri != null) { - endpoint = routeContext.resolveEndpoint(resourceUri); - } else { - endpoint = routeContext.resolveEndpoint(null, resourceRef); + Endpoint endpoint = routeContext.resolveEndpoint(resourceUri); + consumer = endpoint.createPollingConsumer(); + } else if (resourceRef != null) { + Endpoint endpoint = routeContext.resolveEndpoint(null, resourceRef); + consumer = endpoint.createPollingConsumer(); } + // if no timeout then we should block, and there use a negative timeout + long time = timeout != null ? timeout : -1; + + // create the expression if any was configured + Expression exp = createResourceExpression(routeContext); + PollEnricher enricher; - if (timeout != null) { - enricher = new PollEnricher(null, endpoint.createPollingConsumer(), timeout); + if (exp != null) { + enricher = new PollEnricher(null, exp, time); + } else if (consumer != null) { + enricher = new PollEnricher(null, consumer, time); } else { - // if no timeout then we should block, and there use a negative timeout - enricher = new PollEnricher(null, endpoint.createPollingConsumer(), -1); + throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured"); } AggregationStrategy strategy = createAggregationStrategy(routeContext); @@ -152,6 +167,20 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio return strategy; } + /** + * Creates the {@link org.apache.camel.Expression} from the expression node to use to compute the endpoint to poll from. + * + * @param routeContext the route context + * @return the created expression, or <tt>null</tt> if no expression configured + */ + protected Expression createResourceExpression(RouteContext routeContext) { + if (expression != null) { + return expression.createExpression(routeContext); + } else { + return null; + } + } + public String getResourceUri() { return resourceUri; } @@ -257,4 +286,17 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio public void setAggregateOnException(Boolean aggregateOnException) { this.aggregateOnException = aggregateOnException; } + + public ExpressionDefinition getExpression() { + return expression; + } + + /** + * Sets an expression to use for dynamic computing the endpoint to poll from. + * <p/> + * If this option is set, then <tt>resourceUri</tt> or <tt>resourceRef</tt> is not in use. + */ + public void setExpression(ExpressionDefinition expression) { + this.expression = expression; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index c84ea47..b525c0f 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -3453,6 +3453,37 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> } /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + * <p/> + * The difference between this and {@link #enrich(String)} is that this uses a consumer + * to obtain the additional data, where as enrich uses a producer. + * <p/> + * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}. + * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt> + * otherwise we use <tt>receive(timeout)</tt>. + * + * @param expression to use an expression to dynamically compute the endpoint to poll from + * @param timeout timeout in millis to wait at most for data to be available. + * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data. + * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if + * an exception was thrown. + * @return the builder + * @see org.apache.camel.processor.PollEnricher + */ + @SuppressWarnings("unchecked") + public Type pollEnrich(Expression expression, long timeout, String aggregationStrategyRef, boolean aggregateOnException) { + PollEnrichDefinition pollEnrich = new PollEnrichDefinition(); + pollEnrich.setExpression(new ExpressionDefinition(expression)); + pollEnrich.setTimeout(timeout); + pollEnrich.setAggregationStrategyRef(aggregationStrategyRef); + pollEnrich.setAggregateOnException(aggregateOnException); + addOutput(pollEnrich); + return (Type) this; + } + + /** * Adds a onComplection {@link org.apache.camel.spi.Synchronization} hook that invoke this route as * a callback when the {@link org.apache.camel.Exchange} has finished being processed. * The hook invoke callbacks for either onComplete or onFailure. http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java index 9cbca74..ab313fb 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -18,11 +18,15 @@ package org.apache.camel.processor; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; import org.apache.camel.CamelExchangeException; import org.apache.camel.Endpoint; import org.apache.camel.EndpointAware; import org.apache.camel.Exchange; +import org.apache.camel.Expression; import org.apache.camel.PollingConsumer; +import org.apache.camel.impl.ConsumerCache; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.spi.IdAware; import org.apache.camel.support.ServiceSupport; @@ -46,12 +50,15 @@ import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern; * * @see Enricher */ -public class PollEnricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware { +public class PollEnricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware, CamelContextAware { private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class); + private CamelContext camelContext; + private ConsumerCache consumerCache; private String id; private AggregationStrategy aggregationStrategy; - private PollingConsumer consumer; + private final PollingConsumer consumer; + private final Expression expression; private long timeout; private boolean aggregateOnException; @@ -77,9 +84,32 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) { this.aggregationStrategy = aggregationStrategy; this.consumer = consumer; + this.expression = null; this.timeout = timeout; } + /** + * Creates a new {@link PollEnricher}. + * + * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. + * @param expression expression to use to compute the endpoint to poll from. + * @param timeout timeout in millis + */ + public PollEnricher(AggregationStrategy aggregationStrategy, Expression expression, long timeout) { + this.aggregationStrategy = aggregationStrategy; + this.expression = expression; + this.consumer = null; + this.timeout = timeout; + } + + public CamelContext getCamelContext() { + return camelContext; + } + + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + public String getId() { return id; } @@ -89,7 +119,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp } public Endpoint getEndpoint() { - return consumer.getEndpoint(); + return consumer != null ? consumer.getEndpoint() : null; } public AggregationStrategy getAggregationStrategy() { @@ -162,17 +192,35 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp return true; } + // which consumer to use + PollingConsumer target = consumer; + Endpoint endpoint = null; + + // use dynamic endpoint so calculate the endpoint to use + if (expression != null) { + try { + Object recipient = expression.evaluate(exchange, Object.class); + endpoint = resolveEndpoint(exchange, recipient); + // acquire the consumer from the cache + target = consumerCache.acquirePollingConsumer(endpoint); + } catch (Throwable e) { + exchange.setException(e); + callback.done(true); + return true; + } + } + Exchange resourceExchange; try { if (timeout < 0) { - LOG.debug("Consumer receive: {}", consumer); + LOG.debug("Consumer receive: {}", target); resourceExchange = consumer.receive(); } else if (timeout == 0) { - LOG.debug("Consumer receiveNoWait: {}", consumer); - resourceExchange = consumer.receiveNoWait(); + LOG.debug("Consumer receiveNoWait: {}", target); + resourceExchange = target.receiveNoWait(); } else { - LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer); - resourceExchange = consumer.receive(timeout); + LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, target); + resourceExchange = target.receive(timeout); } if (resourceExchange == null) { @@ -184,6 +232,11 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp exchange.setException(new CamelExchangeException("Error during poll", exchange, e)); callback.done(true); return true; + } finally { + // return the consumer back to the cache + if (expression != null) { + consumerCache.releasePollingConsumer(endpoint, target); + } } try { @@ -209,9 +262,9 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp // set header with the uri of the endpoint enriched so we can use that for tracing etc if (exchange.hasOut()) { - exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri()); + exchange.getOut().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri()); } else { - exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri()); + exchange.getIn().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri()); } } catch (Throwable e) { exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e)); @@ -223,6 +276,14 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp return true; } + protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) { + // trim strings as end users might have added spaces between separators + if (recipient instanceof String) { + recipient = ((String)recipient).trim(); + } + return ExchangeHelper.resolveEndpoint(exchange, recipient); + } + /** * Strategy to pre check polling. * <p/> @@ -251,11 +312,19 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp } protected void doStart() throws Exception { - ServiceHelper.startServices(aggregationStrategy, consumer); + if (expression != null && consumerCache == null) { + // create consumer cache if we use dynamic expressions for computing the endpoints to poll + consumerCache = new ConsumerCache(this, getCamelContext()); + } + ServiceHelper.startServices(consumerCache, consumer, aggregationStrategy); } protected void doStop() throws Exception { - ServiceHelper.stopServices(consumer, aggregationStrategy); + ServiceHelper.stopServices(consumerCache, consumer, aggregationStrategy); + } + + protected void doShutdown() throws Exception { + ServiceHelper.stopAndShutdownServices(consumerCache, consumer, aggregationStrategy); } private static class CopyAggregationStrategy implements AggregationStrategy { http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java index 884f674..3ab90d6 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java @@ -118,7 +118,6 @@ public class SendProcessor extends ServiceSupport implements AsyncProcessor, Tra return true; } - // we should preserve existing MEP so remember old MEP // if you want to permanently to change the MEP then use .setExchangePattern in the DSL final ExchangePattern existingPattern = exchange.getPattern(); http://git-wip-us.apache.org/repos/asf/camel/blob/9fd4d549/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java new file mode 100644 index 0000000..4e983a7 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.enricher; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +public class PollEnrichExpressionTest extends ContextTestSupport { + + public void testPollEnricExpression() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World"); + + template.sendBody("seda:foo", "Hello World"); + template.sendBody("seda:bar", "Bye World"); + + template.sendBodyAndHeader("direct:start", null, "source", "seda:foo"); + template.sendBodyAndHeader("direct:start", null, "source", "seda:bar"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .pollEnrich(header("source"), 1000, null, false) + .to("mock:result"); + } + }; + } +}
