CAMEL-4596: enrich supports dynamic uris.

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5fdf8bc5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5fdf8bc5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5fdf8bc5

Branch: refs/heads/master
Commit: 5fdf8bc5179326fec39f21508f6727db6f89e515
Parents: 1b4af69
Author: Claus Ibsen <[email protected]>
Authored: Mon Jul 13 11:09:30 2015 +0200
Committer: Claus Ibsen <[email protected]>
Committed: Mon Jul 13 11:09:30 2015 +0200

----------------------------------------------------------------------
 .../apache/camel/model/EnrichDefinition.java    | 169 +++++++++----------
 .../camel/model/PollEnrichDefinition.java       |   2 -
 .../apache/camel/model/ProcessorDefinition.java |  93 +++++-----
 .../org/apache/camel/processor/Enricher.java    | 139 +++++++++------
 .../enricher/EnrichExpressionNoCacheTest.java   |  49 ++++++
 .../enricher/EnrichExpressionTest.java          |  49 ++++++
 6 files changed, 317 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
index 6f0e358..902524e 100644
--- a/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
@@ -23,14 +23,13 @@ import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.camel.CamelContextAware;
-import org.apache.camel.Endpoint;
+import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.Enricher;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.ObjectHelper;
 
 /**
  * Enriches a message with data from a secondary resource
@@ -40,13 +39,7 @@ import org.apache.camel.util.ObjectHelper;
 @Metadata(label = "eip,transformation")
 @XmlRootElement(name = "enrich")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class EnrichDefinition extends NoOutputDefinition<EnrichDefinition> 
implements EndpointRequiredDefinition {
-    @XmlAttribute(name = "uri")
-    private String resourceUri;
-    // TODO: For Camel 3.0 we should remove this ref attribute as you can do 
that in the uri, by prefixing with ref:
-    @XmlAttribute(name = "ref")
-    @Deprecated
-    private String resourceRef;
+public class EnrichDefinition extends NoOutputExpressionNode {
     @XmlAttribute(name = "strategyRef")
     private String aggregationStrategyRef;
     @XmlAttribute(name = "strategyMethodName")
@@ -59,67 +52,43 @@ public class EnrichDefinition extends 
NoOutputDefinition<EnrichDefinition> imple
     private AggregationStrategy aggregationStrategy;
     @XmlAttribute
     private Boolean shareUnitOfWork;
+    @XmlAttribute
+    private Integer cacheSize;
 
     public EnrichDefinition() {
-        this(null, null);
+        this(null);
     }
 
-    public EnrichDefinition(String resourceUri) {
-        this(null, resourceUri);
-    }
-    
-    public EnrichDefinition(AggregationStrategy aggregationStrategy, String 
resourceUri) {
+    public EnrichDefinition(AggregationStrategy aggregationStrategy) {
         this.aggregationStrategy = aggregationStrategy;
-        this.resourceUri = resourceUri;
     }
     
     @Override
     public String toString() {
-        return "Enrich[" + description() + " " + aggregationStrategy + "]";
-    }
-    
-    protected String description() {
-        return FromDefinition.description(resourceUri, resourceRef, (Endpoint) 
null);
+        return "Enrich[" + getExpression() + "]";
     }
     
     @Override
     public String getLabel() {
-        return "enrich[" + description() + "]";
-    }
-
-    @Override
-    public String getEndpointUri() {
-        if (resourceUri != null) {
-            return resourceUri;
-        } else {
-            return null;
-        }
+        return "enrich[" + getExpression() + "]";
     }
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws 
Exception {
-        if (ObjectHelper.isEmpty(resourceUri) && 
ObjectHelper.isEmpty(resourceRef)) {
-            throw new IllegalArgumentException("Either uri or ref must be 
provided for resource endpoint");
-        }
 
         // lookup endpoint
-        Endpoint endpoint;
-        if (resourceUri != null) {
-            endpoint = routeContext.resolveEndpoint(resourceUri);
-        } else {
-            endpoint = routeContext.resolveEndpoint(null, resourceRef);
-        }
+
+        Expression exp = getExpression().createExpression(routeContext);
         boolean isShareUnitOfWork = getShareUnitOfWork() != null && 
getShareUnitOfWork();
 
-        Enricher enricher = new Enricher(null, endpoint.createProducer(), 
isShareUnitOfWork);
+        Enricher enricher = new Enricher(exp);
+        enricher.setShareUnitOfWork(isShareUnitOfWork);
         AggregationStrategy strategy = createAggregationStrategy(routeContext);
-        if (strategy == null) {
-            enricher.setDefaultAggregationStrategy();
-        } else {
+        if (strategy != null) {
             enricher.setAggregationStrategy(strategy);
         }
-        if (getAggregateOnException() != null) {
-            enricher.setAggregateOnException(getAggregateOnException());
+        if (aggregateOnException != null) {
+            enricher.setAggregateOnException(aggregateOnException);
         }
         return enricher;
     }
@@ -149,39 +118,85 @@ public class EnrichDefinition extends 
NoOutputDefinition<EnrichDefinition> imple
         return strategy;
     }
 
-    public String getResourceUri() {
-        return resourceUri;
+    // Fluent API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the AggregationStrategy to be used to merge the reply from the 
external service, into a single outgoing message.
+     * By default Camel will use the reply from the external service as 
outgoing message.
+     */
+    public EnrichDefinition aggregationStrategy(AggregationStrategy 
aggregationStrategy) {
+        setAggregationStrategy(aggregationStrategy);
+        return this;
+    }
+
+    /**
+     * Refers to an AggregationStrategy to be used to merge the reply from the 
external service, into a single outgoing message.
+     * By default Camel will use the reply from the external service as 
outgoing message.
+     */
+    public EnrichDefinition aggregationStrategyRef(String 
aggregationStrategyRef) {
+        setAggregationStrategyRef(aggregationStrategyRef);
+        return this;
     }
 
     /**
-     * The endpoint uri for the external service to enrich from. You must use 
either uri or ref.
+     * This option can be used to explicit declare the method name to use, 
when using POJOs as the AggregationStrategy.
      */
-    public void setResourceUri(String resourceUri) {
-        this.resourceUri = resourceUri;
+    public EnrichDefinition aggregationStrategyMethodName(String 
aggregationStrategyMethodName) {
+        setAggregationStrategyMethodName(aggregationStrategyMethodName);
+        return this;
     }
 
-    public String getResourceRef() {
-        return resourceRef;
+    /**
+     * If this option is false then the aggregate method is not used if there 
was no data to enrich.
+     * If this option is true then null values is used as the oldExchange 
(when no data to enrich),
+     * when using POJOs as the AggregationStrategy.
+     */
+    public EnrichDefinition aggregationStrategyMethodAllowNull(boolean 
aggregationStrategyMethodAllowNull) {
+        
setAggregationStrategyMethodAllowNull(aggregationStrategyMethodAllowNull);
+        return this;
     }
 
     /**
-     * Refers to the endpoint for the external service to enrich from. You 
must use either uri or ref.
+     * If this option is false then the aggregate method is not used if there 
was an exception thrown while trying
+     * to retrieve the data to enrich from the resource. Setting this option 
to true allows end users to control what
+     * to do if there was an exception in the aggregate method. For example to 
suppress the exception
+     * or set a custom message body etc.
+     */
+    public EnrichDefinition aggregateOnException(boolean aggregateOnException) 
{
+        setAggregateOnException(aggregateOnException);
+        return this;
+    }
+
+    /**
+     * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and 
the resource exchange.
+     * Enrich will by default not share unit of work between the parent 
exchange and the resource exchange.
+     * This means the resource exchange has its own individual unit of work.
+     */
+    public EnrichDefinition shareUnitOfWork() {
+        setShareUnitOfWork(true);
+        return this;
+    }
+
+    /**
+     * Sets the maximum size used by the {@link 
org.apache.camel.impl.ConsumerCache} which is used
+     * to cache and reuse consumers when using this pollEnrich, when uris are 
reused.
      *
-     * @deprecated use uri with ref:uri instead
+     * @param cacheSize  the cache size, use <tt>0</tt> for default cache 
size, or <tt>-1</tt> to turn cache off.
+     * @return the builder
      */
-    @Deprecated
-    public void setResourceRef(String resourceRef) {
-        this.resourceRef = resourceRef;
+    public EnrichDefinition cacheSize(int cacheSize) {
+        setCacheSize(cacheSize);
+        return this;
     }
 
+    // Properties
+    // 
-------------------------------------------------------------------------
+
     public String getAggregationStrategyRef() {
         return aggregationStrategyRef;
     }
 
-    /**
-     * Refers to an AggregationStrategy to be used to merge the reply from the 
external service, into a single outgoing message.
-     * By default Camel will use the reply from the external service as 
outgoing message.
-     */
     public void setAggregationStrategyRef(String aggregationStrategyRef) {
         this.aggregationStrategyRef = aggregationStrategyRef;
     }
@@ -190,9 +205,6 @@ public class EnrichDefinition extends 
NoOutputDefinition<EnrichDefinition> imple
         return aggregationStrategyMethodName;
     }
 
-    /**
-     * This option can be used to explicit declare the method name to use, 
when using POJOs as the AggregationStrategy.
-     */
     public void setAggregationStrategyMethodName(String 
aggregationStrategyMethodName) {
         this.aggregationStrategyMethodName = aggregationStrategyMethodName;
     }
@@ -201,11 +213,6 @@ public class EnrichDefinition extends 
NoOutputDefinition<EnrichDefinition> imple
         return aggregationStrategyMethodAllowNull;
     }
 
-    /**
-     * If this option is false then the aggregate method is not used if there 
was no data to enrich.
-     * If this option is true then null values is used as the oldExchange 
(when no data to enrich),
-     * when using POJOs as the AggregationStrategy.
-     */
     public void setAggregationStrategyMethodAllowNull(Boolean 
aggregationStrategyMethodAllowNull) {
         this.aggregationStrategyMethodAllowNull = 
aggregationStrategyMethodAllowNull;
     }
@@ -214,10 +221,6 @@ public class EnrichDefinition extends 
NoOutputDefinition<EnrichDefinition> imple
         return aggregationStrategy;
     }
 
-    /**
-     * Sets the AggregationStrategy to be used to merge the reply from the 
external service, into a single outgoing message.
-     * By default Camel will use the reply from the external service as 
outgoing message.
-     */
     public void setAggregationStrategy(AggregationStrategy 
aggregationStrategy) {
         this.aggregationStrategy = aggregationStrategy;
     }
@@ -226,12 +229,6 @@ public class EnrichDefinition extends 
NoOutputDefinition<EnrichDefinition> imple
         return aggregateOnException;
     }
 
-    /**
-     * If this option is false then the aggregate method is not used if there 
was an exception thrown while trying
-     * to retrieve the data to enrich from the resource. Setting this option 
to true allows end users to control what
-     * to do if there was an exception in the aggregate method. For example to 
suppress the exception
-     * or set a custom message body etc.
-     */
     public void setAggregateOnException(Boolean aggregateOnException) {
         this.aggregateOnException = aggregateOnException;
     }
@@ -240,13 +237,15 @@ public class EnrichDefinition extends 
NoOutputDefinition<EnrichDefinition> imple
         return shareUnitOfWork;
     }
 
-    /**
-     * Shares the {@link org.apache.camel.spi.UnitOfWork} with the parent and 
the resource exchange.
-     * Enrich will by default not share unit of work between the parent 
exchange and the resource exchange.
-     * This means the resource exchange has its own individual unit of work.
-     */
     public void setShareUnitOfWork(Boolean shareUnitOfWork) {
         this.shareUnitOfWork = shareUnitOfWork;
     }
 
+    public Integer getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(Integer cacheSize) {
+        this.cacheSize = cacheSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
index 18801d4..e7c84a6 100644
--- a/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
@@ -126,8 +126,6 @@ public class PollEnrichDefinition extends 
NoOutputExpressionNode {
     // Fluent API
     // 
-------------------------------------------------------------------------
 
-    // TODO: add cacheSize option
-
     /**
      * Timeout in millis when polling from the external service.
      * <p/>

http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 589df86..920d85a 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -3136,16 +3136,29 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
     /**
      * The <a href="http://camel.apache.org/content-enricher.html";>Content 
Enricher EIP</a>
      * enriches an exchange with additional data obtained from a 
<code>resourceUri</code>.
+     * <p/>
+     * The difference between this and {@link #pollEnrich(String)} is that 
this uses a producer
+     * to obatin the additional data, where as pollEnrich uses a polling 
consumer.
+     *
+     * @param resourceUri           URI of resource endpoint for obtaining 
additional data.
+     * @return the builder
+     * @see org.apache.camel.processor.Enricher
+     */
+    public Type enrich(String resourceUri) {
+        return enrich(resourceUri, null);
+    }
+
+    /**
+     * The <a href="http://camel.apache.org/content-enricher.html";>Content 
Enricher EIP</a>
+     * enriches an exchange with additional data obtained from a 
<code>resourceUri</code>.
      * 
      * @param resourceUri           URI of resource endpoint for obtaining 
additional data.
      * @param aggregationStrategy   aggregation strategy to aggregate input 
data and additional data.
      * @return the builder
      * @see org.apache.camel.processor.Enricher
      */
-    @SuppressWarnings("unchecked")
     public Type enrich(String resourceUri, AggregationStrategy 
aggregationStrategy) {
-        addOutput(new EnrichDefinition(aggregationStrategy, resourceUri));
-        return (Type) this;
+        return enrich(resourceUri, aggregationStrategy, false);
     }
 
     /**
@@ -3159,12 +3172,8 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * @return the builder
      * @see org.apache.camel.processor.Enricher
      */
-    @SuppressWarnings("unchecked")
     public Type enrich(String resourceUri, AggregationStrategy 
aggregationStrategy, boolean aggregateOnException) {
-        EnrichDefinition enrich = new EnrichDefinition(aggregationStrategy, 
resourceUri);
-        enrich.setAggregateOnException(aggregateOnException);
-        addOutput(enrich);
-        return (Type) this;
+        return enrich(resourceUri, aggregationStrategy, false, false);
     }
 
     /**
@@ -3181,10 +3190,12 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      */
     @SuppressWarnings("unchecked")
     public Type enrich(String resourceUri, AggregationStrategy 
aggregationStrategy, boolean aggregateOnException, boolean shareUnitOfWork) {
-        EnrichDefinition enrich = new EnrichDefinition(aggregationStrategy, 
resourceUri);
-        enrich.setAggregateOnException(aggregateOnException);
-        enrich.setShareUnitOfWork(shareUnitOfWork);
-        addOutput(enrich);
+        EnrichDefinition answer = new EnrichDefinition();
+        answer.setExpression(new ConstantExpression(resourceUri));
+        answer.setAggregationStrategy(aggregationStrategy);
+        answer.setAggregateOnException(aggregateOnException);
+        answer.setShareUnitOfWork(shareUnitOfWork);
+        addOutput(answer);
         return (Type) this;
     }
 
@@ -3193,16 +3204,15 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * enriches an exchange with additional data obtained from a 
<code>resourceUri</code>.
      * <p/>
      * The difference between this and {@link #pollEnrich(String)} is that 
this uses a producer
-     * to obatin the additional data, where as pollEnrich uses a polling 
consumer.
+     * to obtain the additional data, where as pollEnrich uses a polling 
consumer.
      *
-     * @param resourceUri           URI of resource endpoint for obtaining 
additional data.
+     * @param resourceRef            Reference of resource endpoint for 
obtaining additional data.
+     * @param aggregationStrategyRef Reference of aggregation strategy to 
aggregate input data and additional data.
      * @return the builder
      * @see org.apache.camel.processor.Enricher
      */
-    @SuppressWarnings("unchecked")
-    public Type enrich(String resourceUri) {
-        addOutput(new EnrichDefinition(resourceUri));
-        return (Type) this;
+    public Type enrichRef(String resourceRef, String aggregationStrategyRef) {
+        return enrichRef(resourceRef, aggregationStrategyRef, false);
     }
 
     /**
@@ -3214,16 +3224,13 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      *
      * @param resourceRef            Reference of resource endpoint for 
obtaining additional data.
      * @param aggregationStrategyRef Reference of aggregation strategy to 
aggregate input data and additional data.
+     * @param aggregateOnException   whether to call {@link 
org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange,
 org.apache.camel.Exchange)} if
+     *                               an exception was thrown.
      * @return the builder
      * @see org.apache.camel.processor.Enricher
      */
-    @SuppressWarnings("unchecked")
-    public Type enrichRef(String resourceRef, String aggregationStrategyRef) {
-        EnrichDefinition enrich = new EnrichDefinition();
-        enrich.setResourceRef(resourceRef);
-        enrich.setAggregationStrategyRef(aggregationStrategyRef);
-        addOutput(enrich);
-        return (Type) this;
+    public Type enrichRef(String resourceRef, String aggregationStrategyRef, 
boolean aggregateOnException) {
+        return enrichRef(resourceRef, aggregationStrategyRef, false, false);
     }
 
     /**
@@ -3237,16 +3244,18 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * @param aggregationStrategyRef Reference of aggregation strategy to 
aggregate input data and additional data.
      * @param aggregateOnException   whether to call {@link 
org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange,
 org.apache.camel.Exchange)} if
      *                               an exception was thrown.
+     * @param shareUnitOfWork        whether to share unit of work
      * @return the builder
      * @see org.apache.camel.processor.Enricher
      */
     @SuppressWarnings("unchecked")
-    public Type enrichRef(String resourceRef, String aggregationStrategyRef, 
boolean aggregateOnException) {
-        EnrichDefinition enrich = new EnrichDefinition();
-        enrich.setResourceRef(resourceRef);
-        enrich.setAggregationStrategyRef(aggregationStrategyRef);
-        enrich.setAggregateOnException(aggregateOnException);
-        addOutput(enrich);
+    public Type enrichRef(String resourceRef, String aggregationStrategyRef, 
boolean aggregateOnException, boolean shareUnitOfWork) {
+        EnrichDefinition answer = new EnrichDefinition();
+        answer.setExpression(new SimpleExpression("ref:" + resourceRef));
+        answer.setAggregationStrategyRef(aggregationStrategyRef);
+        answer.setAggregateOnException(aggregateOnException);
+        answer.setShareUnitOfWork(shareUnitOfWork);
+        addOutput(answer);
         return (Type) this;
     }
 
@@ -3257,23 +3266,13 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
      * The difference between this and {@link #pollEnrich(String)} is that 
this uses a producer
      * to obtain the additional data, where as pollEnrich uses a polling 
consumer.
      *
-     * @param resourceRef            Reference of resource endpoint for 
obtaining additional data.
-     * @param aggregationStrategyRef Reference of aggregation strategy to 
aggregate input data and additional data.
-     * @param aggregateOnException   whether to call {@link 
org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange,
 org.apache.camel.Exchange)} if
-     *                               an exception was thrown.
-     * @param shareUnitOfWork        whether to share unit of work
-     * @return the builder
-     * @see org.apache.camel.processor.Enricher
+     * @return a expression builder clause to set the expression to use for 
computing the endpoint to use
+     * @see org.apache.camel.processor.PollEnricher
      */
-    @SuppressWarnings("unchecked")
-    public Type enrichRef(String resourceRef, String aggregationStrategyRef, 
boolean aggregateOnException, boolean shareUnitOfWork) {
-        EnrichDefinition enrich = new EnrichDefinition();
-        enrich.setResourceRef(resourceRef);
-        enrich.setAggregationStrategyRef(aggregationStrategyRef);
-        enrich.setAggregateOnException(aggregateOnException);
-        enrich.setShareUnitOfWork(shareUnitOfWork);
-        addOutput(enrich);
-        return (Type) this;
+    public ExpressionClause<EnrichDefinition> enrich() {
+        EnrichDefinition answer = new EnrichDefinition();
+        addOutput(answer);
+        return ExpressionClause.createAndSetExpression(answer);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java 
b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
index b0532ba..cbdf104 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
@@ -18,18 +18,19 @@ package org.apache.camel.processor;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
-import org.apache.camel.EndpointAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
-import org.apache.camel.Processor;
+import org.apache.camel.Expression;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.EmptyProducerCache;
+import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.IdAware;
-import org.apache.camel.spi.RouteContext;
-import org.apache.camel.spi.UnitOfWork;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.AsyncProcessorHelper;
@@ -39,6 +40,7 @@ import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.StopWatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
 
 /**
@@ -53,38 +55,28 @@ import static 
org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
  *
  * @see PollEnricher
  */
-public class Enricher extends ServiceSupport implements AsyncProcessor, 
EndpointAware, IdAware {
+public class Enricher extends ServiceSupport implements AsyncProcessor, 
IdAware, CamelContextAware {
 
     private static final Logger LOG = LoggerFactory.getLogger(Enricher.class);
+    private CamelContext camelContext;
     private String id;
+    private ProducerCache producerCache;
+    private final Expression expression;
     private AggregationStrategy aggregationStrategy;
-    private Producer producer;
     private boolean aggregateOnException;
     private boolean shareUnitOfWork;
+    private int cacheSize;
 
-    /**
-     * Creates a new {@link Enricher}. The default aggregation strategy is to
-     * copy the additional data obtained from the enricher's resource over the
-     * input data. When using the copy aggregation strategy the enricher
-     * degenerates to a normal transformer.
-     * 
-     * @param producer producer to resource endpoint.
-     */
-    public Enricher(Producer producer) {
-        this(defaultAggregationStrategy(), producer, false);
+    public Enricher(Expression expression) {
+        this.expression = expression;
     }
 
-    /**
-     * Creates a new {@link Enricher}.
-     * 
-     * @param aggregationStrategy  aggregation strategy to aggregate input 
data and additional data.
-     * @param producer producer to resource endpoint.
-     * @param shareUnitOfWork whether to share unit of work
-     */
-    public Enricher(AggregationStrategy aggregationStrategy, Producer 
producer, boolean shareUnitOfWork) {
-        this.aggregationStrategy = aggregationStrategy;
-        this.producer = producer;
-        this.shareUnitOfWork = shareUnitOfWork;
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
     }
 
     public String getId() {
@@ -95,11 +87,6 @@ public class Enricher extends ServiceSupport implements 
AsyncProcessor, Endpoint
         this.id = id;
     }
 
-    /**
-     * Sets the aggregation strategy for this enricher.
-     *
-     * @param aggregationStrategy the aggregationStrategy to set
-     */
     public void setAggregationStrategy(AggregationStrategy 
aggregationStrategy) {
         this.aggregationStrategy = aggregationStrategy;
     }
@@ -112,23 +99,24 @@ public class Enricher extends ServiceSupport implements 
AsyncProcessor, Endpoint
         return aggregateOnException;
     }
 
-    /**
-     * Whether to call {@link 
org.apache.camel.processor.aggregate.AggregationStrategy#aggregate(org.apache.camel.Exchange,
 org.apache.camel.Exchange)} if
-     * an exception was thrown.
-     */
     public void setAggregateOnException(boolean aggregateOnException) {
         this.aggregateOnException = aggregateOnException;
     }
 
-    /**
-     * Sets the default aggregation strategy for this enricher.
-     */
-    public void setDefaultAggregationStrategy() {
-        this.aggregationStrategy = defaultAggregationStrategy();
+    public boolean isShareUnitOfWork() {
+        return shareUnitOfWork;
+    }
+
+    public void setShareUnitOfWork(boolean shareUnitOfWork) {
+        this.shareUnitOfWork = shareUnitOfWork;
     }
 
-    public Endpoint getEndpoint() {
-        return producer.getEndpoint();
+    public int getCacheSize() {
+        return cacheSize;
+    }
+
+    public void setCacheSize(int cacheSize) {
+        this.cacheSize = cacheSize;
     }
 
     public void process(Exchange exchange) throws Exception {
@@ -148,6 +136,22 @@ public class Enricher extends ServiceSupport implements 
AsyncProcessor, Endpoint
      * @param exchange input data.
      */
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
+        // which producer to use
+        final Producer producer;
+        final Endpoint endpoint;
+
+        // use dynamic endpoint so calculate the endpoint to use
+        try {
+            Object recipient = expression.evaluate(exchange, Object.class);
+            endpoint = resolveEndpoint(exchange, recipient);
+            // acquire the consumer from the cache
+            producer = producerCache.acquireProducer(endpoint);
+        } catch (Throwable e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+
         final Exchange resourceExchange = createResourceExchange(exchange, 
ExchangePattern.InOut);
         final Endpoint destination = producer.getEndpoint();
 
@@ -192,6 +196,13 @@ public class Enricher extends ServiceSupport implements 
AsyncProcessor, Endpoint
                 // set property with the uri of the endpoint enriched so we 
can use that for tracing etc
                 exchange.setProperty(Exchange.TO_ENDPOINT, 
producer.getEndpoint().getEndpointUri());
 
+                // return the producer back to the cache
+                try {
+                    producerCache.releaseProducer(endpoint, producer);
+                } catch (Exception e) {
+                    // ignore
+                }
+
                 callback.done(false);
             }
         });
@@ -236,10 +247,25 @@ public class Enricher extends ServiceSupport implements 
AsyncProcessor, Endpoint
         // set property with the uri of the endpoint enriched so we can use 
that for tracing etc
         exchange.setProperty(Exchange.TO_ENDPOINT, 
producer.getEndpoint().getEndpointUri());
 
+        // return the producer back to the cache
+        try {
+            producerCache.releaseProducer(endpoint, producer);
+        } catch (Exception e) {
+            // ignore
+        }
+
         callback.done(true);
         return true;
     }
 
+    protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof String) {
+            recipient = ((String)recipient).trim();
+        }
+        return ExchangeHelper.resolveEndpoint(exchange, recipient);
+    }
+
     /**
      * Creates a new {@link DefaultExchange} instance from the given
      * <code>exchange</code>. The resulting exchange's pattern is defined by
@@ -273,21 +299,34 @@ public class Enricher extends ServiceSupport implements 
AsyncProcessor, Endpoint
         return new CopyAggregationStrategy();
     }
 
-    public boolean isShareUnitOfWork() {
-        return shareUnitOfWork;
-    }
-
     @Override
     public String toString() {
-        return "Enrich[" + producer.getEndpoint() + "]";
+        return "Enrich[" + expression + "]";
     }
 
     protected void doStart() throws Exception {
-        ServiceHelper.startServices(aggregationStrategy, producer);
+        if (aggregationStrategy == null) {
+            aggregationStrategy = defaultAggregationStrategy();
+        }
+
+        if (producerCache == null) {
+            if (cacheSize < 0) {
+                producerCache = new EmptyProducerCache(this, camelContext);
+                LOG.debug("Enricher {} is not using ProducerCache", this);
+            } else if (cacheSize == 0) {
+                producerCache = new ProducerCache(this, camelContext);
+                LOG.debug("Enricher {} using ProducerCache with default cache 
size", this);
+            } else {
+                producerCache = new ProducerCache(this, camelContext, 
cacheSize);
+                LOG.debug("Enricher {} using ProducerCache with cacheSize={}", 
this, cacheSize);
+            }
+        }
+
+        ServiceHelper.startServices(producerCache, aggregationStrategy);
     }
 
     protected void doStop() throws Exception {
-        ServiceHelper.stopServices(producer, aggregationStrategy);
+        ServiceHelper.stopServices(aggregationStrategy, producerCache);
     }
 
     private static class CopyAggregationStrategy implements 
AggregationStrategy {

http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java
new file mode 100644
index 0000000..12d4bd7
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionNoCacheTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.enricher;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class EnrichExpressionNoCacheTest extends ContextTestSupport {
+
+    public void testEnrichExpression() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", 
"Bye World", "Hello World");
+
+        template.sendBodyAndHeader("direct:start", null, "source", 
"direct:foo");
+        template.sendBodyAndHeader("direct:start", null, "source", 
"direct:bar");
+        template.sendBodyAndHeader("direct:start", null, "source", 
"direct:foo");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .enrich().header("source").cacheSize(-1)
+                    .to("mock:result");
+
+                from("direct:foo").transform().constant("Hello World");
+
+                from("direct:bar").transform().constant("Bye World");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/5fdf8bc5/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java
new file mode 100644
index 0000000..0248884
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/enricher/EnrichExpressionTest.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.enricher;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+public class EnrichExpressionTest extends ContextTestSupport {
+
+    public void testEnrichExpression() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", 
"Bye World", "Hello World");
+
+        template.sendBodyAndHeader("direct:start", null, "source", 
"direct:foo");
+        template.sendBodyAndHeader("direct:start", null, "source", 
"direct:bar");
+        template.sendBodyAndHeader("direct:start", null, "source", 
"direct:foo");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .enrich().header("source")
+                    .to("mock:result");
+
+                from("direct:foo").transform().constant("Hello World");
+
+                from("direct:bar").transform().constant("Bye World");
+            }
+        };
+    }
+}

Reply via email to