CAMEL-4596: enrich supports dynamic uris.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5fdf8bc5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5fdf8bc5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5fdf8bc5 Branch: refs/heads/master Commit: 5fdf8bc5179326fec39f21508f6727db6f89e515 Parents: 1b4af69 Author: Claus Ibsen <[email protected]> Authored: Mon Jul 13 11:09:30 2015 +0200 Committer: Claus Ibsen <[email protected]> Committed: Mon Jul 13 11:09:30 2015 +0200 ---------------------------------------------------------------------- .../apache/camel/model/EnrichDefinition.java | 169 +++++++++---------- .../camel/model/PollEnrichDefinition.java | 2 - .../apache/camel/model/ProcessorDefinition.java | 93 +++++----- .../org/apache/camel/processor/Enricher.java | 139 +++++++++------ .../enricher/EnrichExpressionNoCacheTest.java | 49 ++++++ .../enricher/EnrichExpressionTest.java | 49 ++++++ 6 files changed, 317 insertions(+), 184 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java index 6f0e358..902524e 100644 --- a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java @@ -23,14 +23,13 @@ import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.XmlTransient; import org.apache.camel.CamelContextAware; -import org.apache.camel.Endpoint; +import org.apache.camel.Expression; import org.apache.camel.Processor; import org.apache.camel.processor.Enricher; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.RouteContext; -import org.apache.camel.util.ObjectHelper; /** * Enriches a message with data from a secondary resource @@ -40,13 +39,7 @@ import org.apache.camel.util.ObjectHelper; @Metadata(label = "eip,transformation") @XmlRootElement(name = "enrich") @XmlAccessorType(XmlAccessType.FIELD) -public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> implements EndpointRequiredDefinition { - @XmlAttribute(name = "uri") - private String resourceUri; - // TODO: For Camel 3.0 we should remove this ref attribute as you can do that in the uri, by prefixing with ref: - @XmlAttribute(name = "ref") - @Deprecated - private String resourceRef; +public class EnrichDefinition extends NoOutputExpressionNode { @XmlAttribute(name = "strategyRef") private String aggregationStrategyRef; @XmlAttribute(name = "strategyMethodName") @@ -59,67 +52,43 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple private AggregationStrategy aggregationStrategy; @XmlAttribute private Boolean shareUnitOfWork; + @XmlAttribute + private Integer cacheSize; public EnrichDefinition() { - this(null, null); + this(null); } - public EnrichDefinition(String resourceUri) { - this(null, resourceUri); - } - - public EnrichDefinition(AggregationStrategy aggregationStrategy, String resourceUri) { + public EnrichDefinition(AggregationStrategy aggregationStrategy) { this.aggregationStrategy = aggregationStrategy; - this.resourceUri = resourceUri; } @Override public String toString() { - return "Enrich[" + description() + " " + aggregationStrategy + "]"; - } - - protected String description() { - return FromDefinition.description(resourceUri, resourceRef, (Endpoint) null); + return "Enrich[" + getExpression() + "]"; } @Override public String getLabel() { - return "enrich[" + description() + "]"; - } - - @Override - public String getEndpointUri() { - if (resourceUri != null) { - return resourceUri; - } else { - return null; - } + return "enrich[" + getExpression() + "]"; } @Override public Processor createProcessor(RouteContext routeContext) throws Exception { - if (ObjectHelper.isEmpty(resourceUri) && ObjectHelper.isEmpty(resourceRef)) { - throw new IllegalArgumentException("Either uri or ref must be provided for resource endpoint"); - } // lookup endpoint - Endpoint endpoint; - if (resourceUri != null) { - endpoint = routeContext.resolveEndpoint(resourceUri); - } else { - endpoint = routeContext.resolveEndpoint(null, resourceRef); - } + + Expression exp = getExpression().createExpression(routeContext); boolean isShareUnitOfWork = getShareUnitOfWork() != null && getShareUnitOfWork(); - Enricher enricher = new Enricher(null, endpoint.createProducer(), isShareUnitOfWork); + Enricher enricher = new Enricher(exp); + enricher.setShareUnitOfWork(isShareUnitOfWork); AggregationStrategy strategy = createAggregationStrategy(routeContext); - if (strategy == null) { - enricher.setDefaultAggregationStrategy(); - } else { + if (strategy != null) { enricher.setAggregationStrategy(strategy); } - if (getAggregateOnException() != null) { - enricher.setAggregateOnException(getAggregateOnException()); + if (aggregateOnException != null) { + enricher.setAggregateOnException(aggregateOnException); } return enricher; } @@ -149,39 +118,85 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple return strategy; } - public String getResourceUri() { - return resourceUri; + // Fluent API + // ------------------------------------------------------------------------- + + /** + * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. + * By default Camel will use the reply from the external service as outgoing message. + */ + public EnrichDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) { + setAggregationStrategy(aggregationStrategy); + return this; + } + + /** + * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. + * By default Camel will use the reply from the external service as outgoing message. + */ + public EnrichDefinition aggregationStrategyRef(String aggregationStrategyRef) { + setAggregationStrategyRef(aggregationStrategyRef); + return this; } /** - * The endpoint uri for the external service to enrich from. You must use either uri or ref. + * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. */ - public void setResourceUri(String resourceUri) { - this.resourceUri = resourceUri; + public EnrichDefinition aggregationStrategyMethodName(String aggregationStrategyMethodName) { + setAggregationStrategyMethodName(aggregationStrategyMethodName); + return this; } - public String getResourceRef() { - return resourceRef; + /** + * If this option is false then the aggregate method is not used if there was no data to enrich. + * If this option is true then null values is used as the oldExchange (when no data to enrich), + * when using POJOs as the AggregationStrategy. + */ + public EnrichDefinition aggregationStrategyMethodAllowNull(boolean aggregationStrategyMethodAllowNull) { + setAggregationStrategyMethodAllowNull(aggregationStrategyMethodAllowNull); + return this; } /** - * Refers to the endpoint for the external service to enrich from. You must use either uri or ref. + * If this option is false then the aggregate method is not used if there was an exception thrown while trying + * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what + * to do if there was an exception in the aggregate method. For example to suppress the exception + * or set a custom message body etc. + */ + public EnrichDefinition aggregateOnException(boolean aggregateOnException) { + setAggregateOnException(aggregateOnException); + return this; + } + + /** + * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and the resource exchange. + * Enrich will by default not share unit of work between the parent exchange and the resource exchange. + * This means the resource exchange has its own individual unit of work. + */ + public EnrichDefinition shareUnitOfWork() { + setShareUnitOfWork(true); + return this; + } + + /** + * Sets the maximum size used by the {@link org.apache.camel.impl.ConsumerCache} which is used + * to cache and reuse consumers when using this pollEnrich, when uris are reused. * - * @deprecated use uri with ref:uri instead + * @param cacheSize the cache size, use <tt>0</tt> for default cache size, or <tt>-1</tt> to turn cache off. + * @return the builder */ - @Deprecated - public void setResourceRef(String resourceRef) { - this.resourceRef = resourceRef; + public EnrichDefinition cacheSize(int cacheSize) { + setCacheSize(cacheSize); + return this; } + // Properties + // ------------------------------------------------------------------------- + public String getAggregationStrategyRef() { return aggregationStrategyRef; } - /** - * Refers to an AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. - * By default Camel will use the reply from the external service as outgoing message. - */ public void setAggregationStrategyRef(String aggregationStrategyRef) { this.aggregationStrategyRef = aggregationStrategyRef; } @@ -190,9 +205,6 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple return aggregationStrategyMethodName; } - /** - * This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. - */ public void setAggregationStrategyMethodName(String aggregationStrategyMethodName) { this.aggregationStrategyMethodName = aggregationStrategyMethodName; } @@ -201,11 +213,6 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple return aggregationStrategyMethodAllowNull; } - /** - * If this option is false then the aggregate method is not used if there was no data to enrich. - * If this option is true then null values is used as the oldExchange (when no data to enrich), - * when using POJOs as the AggregationStrategy. - */ public void setAggregationStrategyMethodAllowNull(Boolean aggregationStrategyMethodAllowNull) { this.aggregationStrategyMethodAllowNull = aggregationStrategyMethodAllowNull; } @@ -214,10 +221,6 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple return aggregationStrategy; } - /** - * Sets the AggregationStrategy to be used to merge the reply from the external service, into a single outgoing message. - * By default Camel will use the reply from the external service as outgoing message. - */ public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { this.aggregationStrategy = aggregationStrategy; } @@ -226,12 +229,6 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple return aggregateOnException; } - /** - * If this option is false then the aggregate method is not used if there was an exception thrown while trying - * to retrieve the data to enrich from the resource. Setting this option to true allows end users to control what - * to do if there was an exception in the aggregate method. For example to suppress the exception - * or set a custom message body etc. - */ public void setAggregateOnException(Boolean aggregateOnException) { this.aggregateOnException = aggregateOnException; } @@ -240,13 +237,15 @@ public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> imple return shareUnitOfWork; } - /** - * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and the resource exchange. - * Enrich will by default not share unit of work between the parent exchange and the resource exchange. - * This means the resource exchange has its own individual unit of work. - */ public void setShareUnitOfWork(Boolean shareUnitOfWork) { this.shareUnitOfWork = shareUnitOfWork; } + public Integer getCacheSize() { + return cacheSize; + } + + public void setCacheSize(Integer cacheSize) { + this.cacheSize = cacheSize; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java index 18801d4..e7c84a6 100644 --- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java @@ -126,8 +126,6 @@ public class PollEnrichDefinition extends NoOutputExpressionNode { // Fluent API // ------------------------------------------------------------------------- - // TODO: add cacheSize option - /** * Timeout in millis when polling from the external service. * <p/> http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java index 589df86..920d85a 100644 --- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java @@ -3136,16 +3136,29 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> /** * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> * enriches an exchange with additional data obtained from a <code>resourceUri</code>. + * <p/> + * The difference between this and {@link #pollEnrich(String)} is that this uses a producer + * to obatin the additional data, where as pollEnrich uses a polling consumer. + * + * @param resourceUri URI of resource endpoint for obtaining additional data. + * @return the builder + * @see org.apache.camel.processor.Enricher + */ + public Type enrich(String resourceUri) { + return enrich(resourceUri, null); + } + + /** + * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a> + * enriches an exchange with additional data obtained from a <code>resourceUri</code>. * * @param resourceUri URI of resource endpoint for obtaining additional data. * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. * @return the builder * @see org.apache.camel.processor.Enricher */ - @SuppressWarnings("unchecked") public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy) { - addOutput(new EnrichDefinition(aggregationStrategy, resourceUri)); - return (Type) this; + return enrich(resourceUri, aggregationStrategy, false); } /** @@ -3159,12 +3172,8 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * @return the builder * @see org.apache.camel.processor.Enricher */ - @SuppressWarnings("unchecked") public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy, boolean aggregateOnException) { - EnrichDefinition enrich = new EnrichDefinition(aggregationStrategy, resourceUri); - enrich.setAggregateOnException(aggregateOnException); - addOutput(enrich); - return (Type) this; + return enrich(resourceUri, aggregationStrategy, false, false); } /** @@ -3181,10 +3190,12 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> */ @SuppressWarnings("unchecked") public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy, boolean aggregateOnException, boolean shareUnitOfWork) { - EnrichDefinition enrich = new EnrichDefinition(aggregationStrategy, resourceUri); - enrich.setAggregateOnException(aggregateOnException); - enrich.setShareUnitOfWork(shareUnitOfWork); - addOutput(enrich); + EnrichDefinition answer = new EnrichDefinition(); + answer.setExpression(new ConstantExpression(resourceUri)); + answer.setAggregationStrategy(aggregationStrategy); + answer.setAggregateOnException(aggregateOnException); + answer.setShareUnitOfWork(shareUnitOfWork); + addOutput(answer); return (Type) this; } @@ -3193,16 +3204,15 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * enriches an exchange with additional data obtained from a <code>resourceUri</code>. * <p/> * The difference between this and {@link #pollEnrich(String)} is that this uses a producer - * to obatin the additional data, where as pollEnrich uses a polling consumer. + * to obtain the additional data, where as pollEnrich uses a polling consumer. * - * @param resourceUri URI of resource endpoint for obtaining additional data. + * @param resourceRef Reference of resource endpoint for obtaining additional data. + * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data. * @return the builder * @see org.apache.camel.processor.Enricher */ - @SuppressWarnings("unchecked") - public Type enrich(String resourceUri) { - addOutput(new EnrichDefinition(resourceUri)); - return (Type) this; + public Type enrichRef(String resourceRef, String aggregationStrategyRef) { + return enrichRef(resourceRef, aggregationStrategyRef, false); } /** @@ -3214,16 +3224,13 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * * @param resourceRef Reference of resource endpoint for obtaining additional data. * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data. + * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if + * an exception was thrown. * @return the builder * @see org.apache.camel.processor.Enricher */ - @SuppressWarnings("unchecked") - public Type enrichRef(String resourceRef, String aggregationStrategyRef) { - EnrichDefinition enrich = new EnrichDefinition(); - enrich.setResourceRef(resourceRef); - enrich.setAggregationStrategyRef(aggregationStrategyRef); - addOutput(enrich); - return (Type) this; + public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException) { + return enrichRef(resourceRef, aggregationStrategyRef, false, false); } /** @@ -3237,16 +3244,18 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data. * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if * an exception was thrown. + * @param shareUnitOfWork whether to share unit of work * @return the builder * @see org.apache.camel.processor.Enricher */ @SuppressWarnings("unchecked") - public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException) { - EnrichDefinition enrich = new EnrichDefinition(); - enrich.setResourceRef(resourceRef); - enrich.setAggregationStrategyRef(aggregationStrategyRef); - enrich.setAggregateOnException(aggregateOnException); - addOutput(enrich); + public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException, boolean shareUnitOfWork) { + EnrichDefinition answer = new EnrichDefinition(); + answer.setExpression(new SimpleExpression("ref:" + resourceRef)); + answer.setAggregationStrategyRef(aggregationStrategyRef); + answer.setAggregateOnException(aggregateOnException); + answer.setShareUnitOfWork(shareUnitOfWork); + addOutput(answer); return (Type) this; } @@ -3257,23 +3266,13 @@ public abstract class ProcessorDefinition<Type extends ProcessorDefinition<Type> * The difference between this and {@link #pollEnrich(String)} is that this uses a producer * to obtain the additional data, where as pollEnrich uses a polling consumer. * - * @param resourceRef Reference of resource endpoint for obtaining additional data. - * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data. - * @param aggregateOnException whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if - * an exception was thrown. - * @param shareUnitOfWork whether to share unit of work - * @return the builder - * @see org.apache.camel.processor.Enricher + * @return a expression builder clause to set the expression to use for computing the endpoint to use + * @see org.apache.camel.processor.PollEnricher */ - @SuppressWarnings("unchecked") - public Type enrichRef(String resourceRef, String aggregationStrategyRef, boolean aggregateOnException, boolean shareUnitOfWork) { - EnrichDefinition enrich = new EnrichDefinition(); - enrich.setResourceRef(resourceRef); - enrich.setAggregationStrategyRef(aggregationStrategyRef); - enrich.setAggregateOnException(aggregateOnException); - enrich.setShareUnitOfWork(shareUnitOfWork); - addOutput(enrich); - return (Type) this; + public ExpressionClause<EnrichDefinition> enrich() { + EnrichDefinition answer = new EnrichDefinition(); + addOutput(answer); + return ExpressionClause.createAndSetExpression(answer); } /** http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/processor/Enricher.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java index b0532ba..cbdf104 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java @@ -18,18 +18,19 @@ package org.apache.camel.processor; import org.apache.camel.AsyncCallback; import org.apache.camel.AsyncProcessor; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; import org.apache.camel.CamelExchangeException; import org.apache.camel.Endpoint; -import org.apache.camel.EndpointAware; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; -import org.apache.camel.Processor; +import org.apache.camel.Expression; import org.apache.camel.Producer; import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.EmptyProducerCache; +import org.apache.camel.impl.ProducerCache; import org.apache.camel.processor.aggregate.AggregationStrategy; import org.apache.camel.spi.IdAware; -import org.apache.camel.spi.RouteContext; -import org.apache.camel.spi.UnitOfWork; import org.apache.camel.support.ServiceSupport; import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; @@ -39,6 +40,7 @@ import org.apache.camel.util.ServiceHelper; import org.apache.camel.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern; /** @@ -53,38 +55,28 @@ import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern; * * @see PollEnricher */ -public class Enricher extends ServiceSupport implements AsyncProcessor, EndpointAware, IdAware { +public class Enricher extends ServiceSupport implements AsyncProcessor, IdAware, CamelContextAware { private static final Logger LOG = LoggerFactory.getLogger(Enricher.class); + private CamelContext camelContext; private String id; + private ProducerCache producerCache; + private final Expression expression; private AggregationStrategy aggregationStrategy; - private Producer producer; private boolean aggregateOnException; private boolean shareUnitOfWork; + private int cacheSize; - /** - * Creates a new {@link Enricher}. The default aggregation strategy is to - * copy the additional data obtained from the enricher's resource over the - * input data. When using the copy aggregation strategy the enricher - * degenerates to a normal transformer. - * - * @param producer producer to resource endpoint. - */ - public Enricher(Producer producer) { - this(defaultAggregationStrategy(), producer, false); + public Enricher(Expression expression) { + this.expression = expression; } - /** - * Creates a new {@link Enricher}. - * - * @param aggregationStrategy aggregation strategy to aggregate input data and additional data. - * @param producer producer to resource endpoint. - * @param shareUnitOfWork whether to share unit of work - */ - public Enricher(AggregationStrategy aggregationStrategy, Producer producer, boolean shareUnitOfWork) { - this.aggregationStrategy = aggregationStrategy; - this.producer = producer; - this.shareUnitOfWork = shareUnitOfWork; + public CamelContext getCamelContext() { + return camelContext; + } + + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; } public String getId() { @@ -95,11 +87,6 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint this.id = id; } - /** - * Sets the aggregation strategy for this enricher. - * - * @param aggregationStrategy the aggregationStrategy to set - */ public void setAggregationStrategy(AggregationStrategy aggregationStrategy) { this.aggregationStrategy = aggregationStrategy; } @@ -112,23 +99,24 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint return aggregateOnException; } - /** - * Whether to call {@link org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange, org.apache.camel.Exchange)} if - * an exception was thrown. - */ public void setAggregateOnException(boolean aggregateOnException) { this.aggregateOnException = aggregateOnException; } - /** - * Sets the default aggregation strategy for this enricher. - */ - public void setDefaultAggregationStrategy() { - this.aggregationStrategy = defaultAggregationStrategy(); + public boolean isShareUnitOfWork() { + return shareUnitOfWork; + } + + public void setShareUnitOfWork(boolean shareUnitOfWork) { + this.shareUnitOfWork = shareUnitOfWork; } - public Endpoint getEndpoint() { - return producer.getEndpoint(); + public int getCacheSize() { + return cacheSize; + } + + public void setCacheSize(int cacheSize) { + this.cacheSize = cacheSize; } public void process(Exchange exchange) throws Exception { @@ -148,6 +136,22 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint * @param exchange input data. */ public boolean process(final Exchange exchange, final AsyncCallback callback) { + // which producer to use + final Producer producer; + final Endpoint endpoint; + + // use dynamic endpoint so calculate the endpoint to use + try { + Object recipient = expression.evaluate(exchange, Object.class); + endpoint = resolveEndpoint(exchange, recipient); + // acquire the consumer from the cache + producer = producerCache.acquireProducer(endpoint); + } catch (Throwable e) { + exchange.setException(e); + callback.done(true); + return true; + } + final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut); final Endpoint destination = producer.getEndpoint(); @@ -192,6 +196,13 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint // set property with the uri of the endpoint enriched so we can use that for tracing etc exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); + // return the producer back to the cache + try { + producerCache.releaseProducer(endpoint, producer); + } catch (Exception e) { + // ignore + } + callback.done(false); } }); @@ -236,10 +247,25 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint // set property with the uri of the endpoint enriched so we can use that for tracing etc exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); + // return the producer back to the cache + try { + producerCache.releaseProducer(endpoint, producer); + } catch (Exception e) { + // ignore + } + callback.done(true); return true; } + protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) { + // trim strings as end users might have added spaces between separators + if (recipient instanceof String) { + recipient = ((String)recipient).trim(); + } + return ExchangeHelper.resolveEndpoint(exchange, recipient); + } + /** * Creates a new {@link DefaultExchange} instance from the given * <code>exchange</code>. The resulting exchange's pattern is defined by @@ -273,21 +299,34 @@ public class Enricher extends ServiceSupport implements AsyncProcessor, Endpoint return new CopyAggregationStrategy(); } - public boolean isShareUnitOfWork() { - return shareUnitOfWork; - } - @Override public String toString() { - return "Enrich[" + producer.getEndpoint() + "]"; + return "Enrich[" + expression + "]"; } protected void doStart() throws Exception { - ServiceHelper.startServices(aggregationStrategy, producer); + if (aggregationStrategy == null) { + aggregationStrategy = defaultAggregationStrategy(); + } + + if (producerCache == null) { + if (cacheSize < 0) { + producerCache = new EmptyProducerCache(this, camelContext); + LOG.debug("Enricher {} is not using ProducerCache", this); + } else if (cacheSize == 0) { + producerCache = new ProducerCache(this, camelContext); + LOG.debug("Enricher {} using ProducerCache with default cache size", this); + } else { + producerCache = new ProducerCache(this, camelContext, cacheSize); + LOG.debug("Enricher {} using ProducerCache with cacheSize={}", this, cacheSize); + } + } + + ServiceHelper.startServices(producerCache, aggregationStrategy); } protected void doStop() throws Exception { - ServiceHelper.stopServices(producer, aggregationStrategy); + ServiceHelper.stopServices(aggregationStrategy, producerCache); } private static class CopyAggregationStrategy implements AggregationStrategy { http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java new file mode 100644 index 0000000..12d4bd7 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.enricher; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +public class EnrichExpressionNoCacheTest extends ContextTestSupport { + + public void testEnrichExpression() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World", "Hello World"); + + template.sendBodyAndHeader("direct:start", null, "source", "direct:foo"); + template.sendBodyAndHeader("direct:start", null, "source", "direct:bar"); + template.sendBodyAndHeader("direct:start", null, "source", "direct:foo"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .enrich().header("source").cacheSize(-1) + .to("mock:result"); + + from("direct:foo").transform().constant("Hello World"); + + from("direct:bar").transform().constant("Bye World"); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java new file mode 100644 index 0000000..0248884 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.enricher; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.builder.RouteBuilder; + +public class EnrichExpressionTest extends ContextTestSupport { + + public void testEnrichExpression() throws Exception { + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World", "Hello World"); + + template.sendBodyAndHeader("direct:start", null, "source", "direct:foo"); + template.sendBodyAndHeader("direct:start", null, "source", "direct:bar"); + template.sendBodyAndHeader("direct:start", null, "source", "direct:foo"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .enrich().header("source") + .to("mock:result"); + + from("direct:foo").transform().constant("Hello World"); + + from("direct:bar").transform().constant("Bye World"); + } + }; + } +}
