CAMEL-4596: pollEnrich supports dynamic uris.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b5be4d6e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b5be4d6e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b5be4d6e Branch: refs/heads/master Commit: b5be4d6ed829700ca9af852940dcf28f3c00b598 Parents: 9fd4d54 Author: Claus Ibsen <[email protected]> Authored: Mon Jul 13 09:34:01 2015 +0200 Committer: Claus Ibsen <[email protected]> Committed: Mon Jul 13 10:06:13 2015 +0200 ---------------------------------------------------------------------- .../camel/model/PollEnrichDefinition.java | 184 ++++++------------- .../apache/camel/model/ProcessorDefinition.java | 43 +++-- .../apache/camel/processor/PollEnricher.java | 91 +++------ .../enricher/PollEnrichExpressionTest.java | 2 +- .../SpringPollEnrichExpressionTest.java | 30 +++ .../processor/pollEnrichExpressionTest.xml | 37 ++++ .../camel/spring/processor/pollEnricher.xml | 66 ++++--- .../camel/spring/processor/pollEnricherRef.xml | 78 ++++---- 8 files changed, 267 insertions(+), 264 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java index a5d7cb1..eb1247b 100644 --- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java @@ -19,22 +19,17 @@ package org.apache.camel.model; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlAttribute; -import javax.xml.bind.annotation.XmlElementRef; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.CamelContextAware; -import org.apache.camel.Endpoint; import org.apache.camel.Expression; -import org.apache.camel.PollingConsumer; import org.apache.camel.Processor; -import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.processor.PollEnricher; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.RouteContext; -import org.apache.camel.util.ObjectHelper; /** * Enriches messages with data polled from a secondary resource @@ -44,15 +39,7 @@ import org.apache.camel.util.ObjectHelper; @Metadata(label = "eip,transformation") @XmlRootElement(name = "pollEnrich") @XmlAccessorType(XmlAccessType.FIELD) -public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinition> implements EndpointRequiredDefinition { - @XmlElementRef - private ExpressionDefinition expression; - @XmlAttribute(name = "uri") - private String resourceUri; - // TODO: For Camel 3.0 we should remove this ref attribute as you can do that in the uri, by prefixing with ref: - @XmlAttribute(name = "ref") - @Deprecated - private String resourceRef; +public class PollEnrichDefinition extends NoOutputExpressionNode { @XmlAttribute @Metadata(defaultValue = "-1") private Long timeout; @XmlAttribute(name = "strategyRef") @@ -69,65 +56,29 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio public PollEnrichDefinition() { } - public PollEnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri, long timeout) { + public PollEnrichDefinition(AggregationStrategy aggregationStrategy, long timeout) { this.aggregationStrategy = aggregationStrategy; - this.resourceUri = resourceUri; this.timeout = timeout; } @Override public String toString() { - return "PollEnrich[" + description() + " " + aggregationStrategy + "]"; + return "PollEnrich[" + getExpression() + "]"; } - protected String description() { - return FromDefinition.description(getResourceUri(), getResourceRef(), (Endpoint) null); - } - @Override public String getLabel() { - return "pollEnrich[" + description() + "]"; - } - - @Override - public String getEndpointUri() { - if (resourceUri != null) { - return resourceUri; - } else { - return null; - } + return "pollEnrich[" + getExpression() + "]"; } @Override public Processor createProcessor(RouteContext routeContext) throws Exception { - if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef) && expression == null) { - throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured"); - } - - // lookup endpoint - PollingConsumer consumer = null; - if (resourceUri != null) { - Endpoint endpoint = routeContext.resolveEndpoint(resourceUri); - consumer = endpoint.createPollingConsumer(); - } else if (resourceRef != null) { - Endpoint endpoint = routeContext.resolveEndpoint(null, resourceRef); - consumer = endpoint.createPollingConsumer(); - } // if no timeout then we should block, and there use a negative timeout long time = timeout != null ? timeout : -1; + Expression exp = getExpression().createExpression(routeContext); - // create the expression if any was configured - Expression exp = createResourceExpression(routeContext); - - PollEnricher enricher; - if (exp != null) { - enricher = new PollEnricher(null, exp, time); - } else if (consumer != null) { - enricher = new PollEnricher(null, consumer, time); - } else { - throw new IllegalArgumentException("Either resourceUri, resourceRef or expression must be configured"); - } + PollEnricher enricher = new PollEnricher(exp, time); AggregationStrategy strategy = createAggregationStrategy(routeContext); if (strategy == null) { @@ -167,60 +118,81 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio return strategy; } + // Fluent API + // ------------------------------------------------------------------------- + + // TODO: add cacheSize option + /** - * Creates the {@link org.apache.camel.Expression} from the expression node to use to compute the endpoint to poll from. - * - * @param routeContext the route context - * @return the created expression, or <tt>null</tt> if no expression configured + * Timeout in millis when polling from the external service. + * <p/> + * The timeout has influence about the poll enrich behavior. It basically operations in three different modes: + * <ul> + * <li>negative value - Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available.</li> + * <li>0 - Attempts to receive a message exchange immediately without waiting and returning <tt>null</tt> if a message exchange is not available yet.</li> + * <li>positive value - Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns <tt>null</tt> if timed out</li> + * </ul> + * The default value is -1 and therefore the method could block indefinitely, and therefore its recommended to use a timeout value */ - protected Expression createResourceExpression(RouteContext routeContext) { - if (expression != null) { - return expression.createExpression(routeContext); - } else { - return null; - } + public PollEnrichDefinition timeout(long timeout) { + setTimeout(timeout); + return this; } - public String getResourceUri() { - return resourceUri; + /** + * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. + * By default Camel will use the reply from the external service as outgoing message. + */ + public PollEnrichDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { + setAggregationStrategy(aggregationStrategy); + return this; } /** - * The endpoint uri for the external service to poll enrich from. You must use either uri or ref. + * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. + * By default Camel will use the reply from the external service as outgoing message. */ - public void setResourceUri(String resourceUri) { - this.resourceUri = resourceUri; + public PollEnrichDefinition aggregationStrategyRef(String aggregationStrategyRef) { + setAggregationStrategyRef(aggregationStrategyRef); + return this; } - public String getResourceRef() { - return resourceRef; + /** + * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. + */ + public PollEnrichDefinition aggregationStrategyMethodName(String aggregationStrategyMethodName) { + setAggregationStrategyMethodName(aggregationStrategyMethodName); + return this; + } + + /** + * If this option is false then the aggregate method is not used if there was no data to enrich. + * If this option is true then null values is used as the oldExchange (when no data to enrich), + * when using POJOs as the AggregationStrategy. + */ + public PollEnrichDefinition aggregationStrategyMethodAllowNull(boolean aggregationStrategyMethodAllowNull) { + setAggregationStrategyMethodAllowNull(aggregationStrategyMethodAllowNull); + return this; } /** - * Refers to the endpoint for the external service to poll enrich from. You must use either uri or ref. - * - * @deprecated use uri with ref:uri instead + * If this option is false then the aggregate method is not used if there was an exception thrown while trying + * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what + * to do if there was an exception in the aggregate method. For example to suppress the exception + * or set a custom message body etc. */ - @Deprecated - public void setResourceRef(String resourceRef) { - this.resourceRef = resourceRef; + public PollEnrichDefinition aggregateOnException(boolean aggregateOnException) { + setAggregateOnException(aggregateOnException); + return this; } + // Properties + // ------------------------------------------------------------------------- + public Long getTimeout() { return timeout; } - /** - * Timeout in millis when polling from the external service. - * <p/> - * The timeout has influence about the poll enrich behavior. It basically operations in three different modes: - * <ul> - * <li>negative value - Waits until a message is available and then returns it. Warning that this method could block indefinitely if no messages are available.</li> - * <li>0 - Attempts to receive a message exchange immediately without waiting and returning <tt>null</tt> if a message exchange is not available yet.</li> - * <li>positive value - Attempts to receive a message exchange, waiting up to the given timeout to expire if a message is not yet available. Returns <tt>null</tt> if timed out</li> - * </ul> - * The default value is -1 and therefore the method could block indefinitely, and therefore its recommended to use a timeout value - */ public void setTimeout(Long timeout) { this.timeout = timeout; } @@ -229,10 +201,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio return aggregationStrategyRef; } - /** - * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. - * By default Camel will use the reply from the external service as outgoing message. - */ public void setAggregationStrategyRef(String aggregationStrategyRef) { this.aggregationStrategyRef = aggregationStrategyRef; } @@ -241,9 +209,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio return aggregationStrategyMethodName; } - /** - * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. - */ public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) { this.aggregationStrategyMethodName = aggregationStrategyMethodName; } @@ -252,11 +217,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio return aggregationStrategyMethodAllowNull; } - /** - * If this option is false then the aggregate method is not used if there was no data to enrich. - * If this option is true then null values is used as the oldExchange (when no data to enrich), - * when using POJOs as the AggregationStrategy. - */ public void setAggregationStrategyMethodAllowNull(Boolean aggregationStrategyMethodAllowNull) { this.aggregationStrategyMethodAllowNull = aggregationStrategyMethodAllowNull; } @@ -265,10 +225,6 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio return aggregationStrategy; } - /** - * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. - * By default Camel will use the reply from the external service as outgoing message. - */ public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { this.aggregationStrategy = aggregationStrategy; } @@ -277,26 +233,8 @@ public class PollEnrichDefinition extends NoOutputDefinition<PollEnrichDefinitio return aggregateOnException; } - /** - * If this option is false then the aggregate method is not used if there was an exception thrown while trying - * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what - * to do if there was an exception in the aggregate method. For example to suppress the exception - * or set a custom message body etc. - */ public void setAggregateOnException(Boolean aggregateOnException) { this.aggregateOnException = aggregateOnException; } - public ExpressionDefinition getExpression() { - return expression; - } - - /** - * Sets an expression to use for dynamic computing the endpoint to poll from. - * <p/> - * If this option is set, then <tt>resourceUri</tt> or <tt>resourceRef</tt> is not in use. - */ - public void setExpression(ExpressionDefinition expression) { - this.expression = expression; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index b525c0f..589df86 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -52,6 +52,7 @@ import org.apache.camel.builder.ProcessorBuilder; import org.apache.camel.model.language.ConstantExpression; import org.apache.camel.model.language.ExpressionDefinition; import org.apache.camel.model.language.LanguageExpression; +import org.apache.camel.model.language.SimpleExpression; import org.apache.camel.model.rest.RestDefinition; import org.apache.camel.processor.InterceptEndpointProcessor; import org.apache.camel.processor.Pipeline; @@ -3292,8 +3293,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> */ @SuppressWarnings("unchecked") public Type pollEnrich(String resourceUri) { - addOutput(new PollEnrichDefinition(null, resourceUri, -1)); - return (Type) this; + return pollEnrich(resourceUri, null); } /** @@ -3314,8 +3314,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> */ @SuppressWarnings("unchecked") public Type pollEnrich(String resourceUri, AggregationStrategy aggregationStrategy) { - addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, -1)); - return (Type) this; + return pollEnrich(resourceUri, -1, aggregationStrategy); } /** @@ -3338,8 +3337,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> */ @SuppressWarnings("unchecked") public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy) { - addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout)); - return (Type) this; + return pollEnrich(resourceUri, timeout, aggregationStrategy, false); } /** @@ -3364,7 +3362,10 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> */ @SuppressWarnings("unchecked") public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy, boolean aggregateOnException) { - PollEnrichDefinition pollEnrich = new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout); + PollEnrichDefinition pollEnrich = new PollEnrichDefinition(); + pollEnrich.setExpression(new ConstantExpression(resourceUri)); + pollEnrich.setTimeout(timeout); + pollEnrich.setAggregationStrategy(aggregationStrategy); pollEnrich.setAggregateOnException(aggregateOnException); addOutput(pollEnrich); return (Type) this; @@ -3389,8 +3390,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> */ @SuppressWarnings("unchecked") public Type pollEnrich(String resourceUri, long timeout) { - addOutput(new PollEnrichDefinition(null, resourceUri, timeout)); - return (Type) this; + return pollEnrich(resourceUri, timeout, null); } /** @@ -3414,7 +3414,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> @SuppressWarnings("unchecked") public Type pollEnrichRef(String resourceRef, long timeout, String aggregationStrategyRef) { PollEnrichDefinition pollEnrich = new PollEnrichDefinition(); - pollEnrich.setResourceRef(resourceRef); + pollEnrich.setExpression(new SimpleExpression("ref:" + resourceRef)); pollEnrich.setTimeout(timeout); pollEnrich.setAggregationStrategyRef(aggregationStrategyRef); addOutput(pollEnrich); @@ -3444,7 +3444,7 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> @SuppressWarnings("unchecked") public Type pollEnrichRef(String resourceRef, long timeout, String aggregationStrategyRef, boolean aggregateOnException) { PollEnrichDefinition pollEnrich = new PollEnrichDefinition(); - pollEnrich.setResourceRef(resourceRef); + pollEnrich.setExpression(new SimpleExpression("ref:" + resourceRef)); pollEnrich.setTimeout(timeout); pollEnrich.setAggregationStrategyRef(aggregationStrategyRef); pollEnrich.setAggregateOnException(aggregateOnException); @@ -3484,6 +3484,27 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> } /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code> + * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint. + * <p/> + * The difference between this and {@link #enrich(String)} is that this uses a consumer + * to obtain the additional data, where as enrich uses a producer. + * <p/> + * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}. + * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt> + * otherwise we use <tt>receive(timeout)</tt>. + * + * @return a expression builder clause to set the expression to use for computing the endpoint to poll from + * @see org.apache.camel.processor.PollEnricher + */ + public ExpressionClause<PollEnrichDefinition> pollEnrich() { + PollEnrichDefinition answer = new PollEnrichDefinition(); + addOutput(answer); + return ExpressionClause.createAndSetExpression(answer); + } + + /** * Adds a onComplection {@link org.apache.camel.spi.Synchronization} hook that invoke this route as * a callback when the {@link org.apache.camel.Exchange} has finished being processed. * The hook invoke callbacks for either onComplete or onFailure. http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java index ab313fb..9873cbb 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java @@ -22,7 +22,6 @@ import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.CamelExchangeException; import org.apache.camel.Endpoint; -import org.apache.camel.EndpointAware; import org.apache.camel.Exchange; import org.apache.camel.Expression; import org.apache.camel.PollingConsumer; @@ -50,55 +49,25 @@ import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern; * * @see Enricher */ -public class PollEnricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware, CamelContextAware { +public class PollEnricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware { private static final Logger LOG = LoggerFactory.getLogger(PollEnricher.class); private CamelContext camelContext; private ConsumerCache consumerCache; private String id; private AggregationStrategy aggregationStrategy; - private final PollingConsumer consumer; private final Expression expression; private long timeout; private boolean aggregateOnException; /** - * Creates a new {@link PollEnricher}. The default aggregation strategy is to - * copy the additional data obtained from the enricher's resource over the - * input data. When using the copy aggregation strategy the enricher - * degenerates to a normal transformer. - * - * @param consumer consumer to resource endpoint. - */ - public PollEnricher(PollingConsumer consumer) { - this(defaultAggregationStrategy(), consumer, 0); - } - - /** * Creates a new {@link PollEnricher}. * - * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. - * @param consumer consumer to resource endpoint. - * @param timeout timeout in millis - */ - public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) { - this.aggregationStrategy = aggregationStrategy; - this.consumer = consumer; - this.expression = null; - this.timeout = timeout; - } - - /** - * Creates a new {@link PollEnricher}. - * - * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. * @param expression expression to use to compute the endpoint to poll from. * @param timeout timeout in millis */ - public PollEnricher(AggregationStrategy aggregationStrategy, Expression expression, long timeout) { - this.aggregationStrategy = aggregationStrategy; + public PollEnricher(Expression expression, long timeout) { this.expression = expression; - this.consumer = null; this.timeout = timeout; } @@ -118,10 +87,6 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp this.id = id; } - public Endpoint getEndpoint() { - return consumer != null ? consumer.getEndpoint() : null; - } - public AggregationStrategy getAggregationStrategy() { return aggregationStrategy; } @@ -193,34 +158,32 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp } // which consumer to use - PollingConsumer target = consumer; - Endpoint endpoint = null; + PollingConsumer consumer; + Endpoint endpoint; // use dynamic endpoint so calculate the endpoint to use - if (expression != null) { - try { - Object recipient = expression.evaluate(exchange, Object.class); - endpoint = resolveEndpoint(exchange, recipient); - // acquire the consumer from the cache - target = consumerCache.acquirePollingConsumer(endpoint); - } catch (Throwable e) { - exchange.setException(e); - callback.done(true); - return true; - } + try { + Object recipient = expression.evaluate(exchange, Object.class); + endpoint = resolveEndpoint(exchange, recipient); + // acquire the consumer from the cache + consumer = consumerCache.acquirePollingConsumer(endpoint); + } catch (Throwable e) { + exchange.setException(e); + callback.done(true); + return true; } Exchange resourceExchange; try { if (timeout < 0) { - LOG.debug("Consumer receive: {}", target); + LOG.debug("Consumer receive: {}", consumer); resourceExchange = consumer.receive(); } else if (timeout == 0) { - LOG.debug("Consumer receiveNoWait: {}", target); - resourceExchange = target.receiveNoWait(); + LOG.debug("Consumer receiveNoWait: {}", consumer); + resourceExchange = consumer.receiveNoWait(); } else { - LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, target); - resourceExchange = target.receive(timeout); + LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer); + resourceExchange = consumer.receive(timeout); } if (resourceExchange == null) { @@ -234,9 +197,7 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp return true; } finally { // return the consumer back to the cache - if (expression != null) { - consumerCache.releasePollingConsumer(endpoint, target); - } + consumerCache.releasePollingConsumer(endpoint, consumer); } try { @@ -262,9 +223,9 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp // set header with the uri of the endpoint enriched so we can use that for tracing etc if (exchange.hasOut()) { - exchange.getOut().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri()); + exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri()); } else { - exchange.getIn().setHeader(Exchange.TO_ENDPOINT, target.getEndpoint().getEndpointUri()); + exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri()); } } catch (Throwable e) { exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e)); @@ -308,23 +269,23 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor, Endp @Override public String toString() { - return "PollEnrich[" + consumer + "]"; + return "PollEnrich[" + expression + "]"; } protected void doStart() throws Exception { - if (expression != null && consumerCache == null) { + if (consumerCache == null) { // create consumer cache if we use dynamic expressions for computing the endpoints to poll consumerCache = new ConsumerCache(this, getCamelContext()); } - ServiceHelper.startServices(consumerCache, consumer, aggregationStrategy); + ServiceHelper.startServices(consumerCache, aggregationStrategy); } protected void doStop() throws Exception { - ServiceHelper.stopServices(consumerCache, consumer, aggregationStrategy); + ServiceHelper.stopServices(aggregationStrategy, consumerCache); } protected void doShutdown() throws Exception { - ServiceHelper.stopAndShutdownServices(consumerCache, consumer, aggregationStrategy); + ServiceHelper.stopAndShutdownServices(aggregationStrategy, consumerCache); } private static class CopyAggregationStrategy implements AggregationStrategy { http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java index 4e983a7..38a42ab 100644 --- a/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java +++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnrichExpressionTest.java @@ -39,7 +39,7 @@ public class PollEnrichExpressionTest extends ContextTestSupport { @Override public void configure() throws Exception { from("direct:start") - .pollEnrich(header("source"), 1000, null, false) + .pollEnrich().header("source") .to("mock:result"); } }; http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java new file mode 100644 index 0000000..8017e8d --- /dev/null +++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnrichExpressionTest.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.spring.processor; + +import org.apache.camel.CamelContext; +import org.apache.camel.processor.enricher.PollEnrichExpressionTest; + +import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext; + +public class SpringPollEnrichExpressionTest extends PollEnrichExpressionTest { + + protected CamelContext createCamelContext() throws Exception { + return createSpringCamelContext(this, "org/apache/camel/spring/processor/pollEnrichExpressionTest.xml"); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml new file mode 100644 index 0000000..cb932e3 --- /dev/null +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnrichExpressionTest.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd + http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd + "> + + <camelContext xmlns="http://camel.apache.org/schema/spring"> + <!-- START SNIPPET: e1 --> + <route> + <from uri="direct:start"/> + <pollEnrich> + <header>source</header> + </pollEnrich> + <to uri="mock:result"/> + </route> + <!-- END SNIPPET: e1 --> + </camelContext> + +</beans> http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml index f63b63a..f0f71ce 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml @@ -22,34 +22,42 @@ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> - <camelContext xmlns="http://camel.apache.org/schema/spring"> - <!-- START SNIPPET: e1 --> - <route> - <from uri="direct:enricher-test-1"/> - <pollEnrich uri="seda:foo1" strategyRef="sampleAggregator"/> - <to uri="mock:mock"/> - </route> - <!-- END SNIPPET: e1 --> - - <route> - <from uri="direct:enricher-test-2"/> - <pollEnrich uri="seda:foo2" timeout="1000" strategyRef="sampleAggregator"/> - <to uri="mock:mock"/> - </route> - - <route> - <from uri="direct:enricher-test-3"/> - <pollEnrich uri="seda:foo3" timeout="-1" strategyRef="sampleAggregator"/> - <to uri="mock:mock"/> - </route> - - <route> - <from uri="direct:enricher-test-4"/> - <pollEnrich uri="seda:foo4" strategyRef="sampleAggregator"/> - <to uri="mock:mock"/> - </route> - </camelContext> - - <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/> + <camelContext xmlns="http://camel.apache.org/schema/spring"> + <!-- START SNIPPET: e1 --> + <route> + <from uri="direct:enricher-test-1"/> + <pollEnrich strategyRef="sampleAggregator"> + <constant>seda:foo1</constant> + </pollEnrich> + <to uri="mock:mock"/> + </route> + <!-- END SNIPPET: e1 --> + + <route> + <from uri="direct:enricher-test-2"/> + <pollEnrich timeout="1000" strategyRef="sampleAggregator"> + <constant>seda:foo2</constant> + </pollEnrich> + <to uri="mock:mock"/> + </route> + + <route> + <from uri="direct:enricher-test-3"/> + <pollEnrich timeout="-1" strategyRef="sampleAggregator"> + <constant>seda:foo3</constant> + </pollEnrich> + <to uri="mock:mock"/> + </route> + + <route> + <from uri="direct:enricher-test-4"/> + <pollEnrich strategyRef="sampleAggregator"> + <constant>seda:foo4</constant> + </pollEnrich> + <to uri="mock:mock"/> + </route> + </camelContext> + + <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/> </beans> http://git-wip-us.apache.org/repos/asf/camel/blob/b5be4d6e/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml ---------------------------------------------------------------------- diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml index 6046e7d..c0e0c0b 100644 --- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml +++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricherRef.xml @@ -22,40 +22,48 @@ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd "> - <camelContext xmlns="http://camel.apache.org/schema/spring"> - - <endpoint id="foo1" uri="seda:foo1"/> - <endpoint id="foo2" uri="seda:foo2"/> - <endpoint id="foo3" uri="seda:foo3"/> - <endpoint id="foo4" uri="seda:foo4"/> - - <!-- START SNIPPET: e1 --> - <route> - <from uri="direct:enricher-test-1"/> - <pollEnrich ref="foo1" strategyRef="sampleAggregator"/> - <to uri="mock:mock"/> - </route> - <!-- END SNIPPET: e1 --> - - <route> - <from uri="direct:enricher-test-2"/> - <pollEnrich ref="foo2" timeout="1000" strategyRef="sampleAggregator"/> - <to uri="mock:mock"/> - </route> - - <route> - <from uri="direct:enricher-test-3"/> - <pollEnrich ref="foo3" timeout="-1" strategyRef="sampleAggregator"/> - <to uri="mock:mock"/> - </route> - - <route> - <from uri="direct:enricher-test-4"/> - <pollEnrich ref="foo4" strategyRef="sampleAggregator"/> - <to uri="mock:mock"/> - </route> - </camelContext> - - <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/> + <camelContext xmlns="http://camel.apache.org/schema/spring"> + + <endpoint id="foo1" uri="seda:foo1"/> + <endpoint id="foo2" uri="seda:foo2"/> + <endpoint id="foo3" uri="seda:foo3"/> + <endpoint id="foo4" uri="seda:foo4"/> + + <!-- START SNIPPET: e1 --> + <route> + <from uri="direct:enricher-test-1"/> + <pollEnrich strategyRef="sampleAggregator"> + <ref>foo1</ref> + </pollEnrich> + <to uri="mock:mock"/> + </route> + <!-- END SNIPPET: e1 --> + + <route> + <from uri="direct:enricher-test-2"/> + <pollEnrich timeout="1000" strategyRef="sampleAggregator"> + <ref>foo2</ref> + </pollEnrich> + <to uri="mock:mock"/> + </route> + + <route> + <from uri="direct:enricher-test-3"/> + <pollEnrich timeout="-1" strategyRef="sampleAggregator"> + <ref>foo3</ref> + </pollEnrich> + <to uri="mock:mock"/> + </route> + + <route> + <from uri="direct:enricher-test-4"/> + <pollEnrich strategyRef="sampleAggregator"> + <ref>foo4</ref> + </pollEnrich> + <to uri="mock:mock"/> + </route> + </camelContext> + + <bean id="sampleAggregator" class="org.apache.camel.processor.enricher.SampleAggregator"/> </beans>
