Author: janstey
Date: Thu Feb 12 13:34:11 2009
New Revision: 743733

URL: http://svn.apache.org/viewvc?rev=743733&view=rev
Log:
CAMEL-209 - Inject producer into enricher processor to improve performance.

Thanks to Martin Krasser for the patch!


Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnricherType.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnricherType.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnricherType.java?rev=743733&r1=743732&r2=743733&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnricherType.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnricherType.java 
Thu Feb 12 13:34:11 2009
@@ -22,6 +22,7 @@
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
+import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.Enricher;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -58,7 +59,8 @@
     
     @Override
     public Processor createProcessor(RouteContext routeContext) throws 
Exception {
-        Enricher enricher = new Enricher(null, resourceUri);
+        Endpoint endpoint = routeContext.resolveEndpoint(resourceUri);
+        Enricher enricher = new Enricher(null, endpoint.createProducer());
         if (aggregationStrategyRef != null) {
             aggregationStrategy = routeContext.lookup(aggregationStrategyRef, 
AggregationStrategy.class);
         }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=743733&r1=743732&r2=743733&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java 
(original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java 
Thu Feb 12 13:34:11 2009
@@ -19,47 +19,51 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
-import org.apache.camel.ProducerTemplate;
+import org.apache.camel.Producer;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+
 import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
 
 /**
  * A content enricher that enriches input data by first obtaining additional
- * data from a <i>resource</i> identified by an <code>resourceUri</code> and
- * second by aggregating input data and additional data. Aggregation of input
- * data and additional data is delegated to an {...@link AggregationStrategy}
+ * data from a <i>resource</i> represented by an endpoint <code>producer</code>
+ * and second by aggregating input data and additional data. Aggregation of
+ * input data and additional data is delegated to an {...@link 
AggregationStrategy}
  * object.
  */
 public class Enricher extends ServiceSupport implements Processor {
 
-    private ProducerTemplate producer;
-
-    private String resourceUri;
     private AggregationStrategy aggregationStrategy;
 
+    private Producer producer;
+
     /**
      * Creates a new {...@link Enricher}. The default aggregation strategy is 
to
      * copy the additional data obtained from the enricher's resource over the
      * input data. When using the copy aggregation strategy the enricher
      * degenerates to a normal transformer.
-     *
-     * @param resourceUri URI of resource endpoint for obtaining additional 
data.
+     * 
+     * @param producer
+     *            producer to resource endpoint.
      */
-    public Enricher(String resourceUri) {
-        this(defaultAggregationStrategy(), resourceUri);
+    public Enricher(Producer producer) {
+        this(defaultAggregationStrategy(), producer);
     }
 
     /**
      * Creates a new {...@link Enricher}.
-     *
-     * @param aggregationStrategy aggregation strategy to aggregate input data 
and additional data.
-     * @param resourceUri         URI of resource endpoint for obtaining 
additional data.
+     * 
+     * @param aggregationStrategy
+     *            aggregation strategy to aggregate input data and additional
+     *            data.
+     * @param producer
+     *            producer to resource endpoint.
      */
-    public Enricher(AggregationStrategy aggregationStrategy, String 
resourceUri) {
+    public Enricher(AggregationStrategy aggregationStrategy, Producer 
producer) {
         this.aggregationStrategy = aggregationStrategy;
-        this.resourceUri = resourceUri;
+        this.producer = producer;
     }
 
     /**
@@ -80,21 +84,20 @@
 
     /**
      * Enriches the input data (<code>exchange</code>) by first obtaining
-     * additional data from an endpoint identified by an
-     * <code>resourceUri</code> and second by aggregating input data and
-     * additional data. Aggregation of input data and additional data is
-     * delegated to an {...@link AggregationStrategy} object set at 
construction
-     * time. If the message exchange with the resource endpoint fails then no
-     * aggregation will be done and the failed exchange content is copied over
-     * to the original message exchange.
-     *
-     * @param exchange input data.
+     * additional data from an endpoint represented by an endpoint
+     * <code>producer</code> and second by aggregating input data and 
additional
+     * data. Aggregation of input data and additional data is delegated to an
+     * {...@link AggregationStrategy} object set at construction time. If the
+     * message exchange with the resource endpoint fails then no aggregation
+     * will be done and the failed exchange content is copied over to the
+     * original message exchange.
+     * 
+     * @param exchange
+     *            input data.
      */
     public void process(Exchange exchange) throws Exception {
-        // create in-out exchange to obtain additional data from resource
         Exchange resourceExchange = createResourceExchange(exchange, 
ExchangePattern.InOut);
-        // send created exchange to resource endpoint
-        resourceExchange = getProducerTemplate(exchange).send(resourceUri, 
resourceExchange);
+        producer.process(resourceExchange);
 
         if (resourceExchange.isFailed()) {
             // copy resource exchange onto original exchange (preserving 
pattern)
@@ -134,22 +137,12 @@
         return new CopyAggregationStrategy();
     }
 
-    private synchronized ProducerTemplate getProducerTemplate(Exchange 
exchange) throws Exception {
-        if (producer == null) {
-            producer = exchange.getContext().createProducerTemplate();
-            producer.start();
-        }
-        return producer;
-    }
-
     protected void doStart() throws Exception {
+        producer.start();
     }
 
     protected void doStop() throws Exception {
-        if (producer != null) {
-            producer.stop();
-            producer = null;
-        }
+        producer.stop();
     }
 
     private static class CopyAggregationStrategy implements 
AggregationStrategy {


Reply via email to