Author: janstey
Date: Thu Feb 12 13:34:11 2009
New Revision: 743733
URL: http://svn.apache.org/viewvc?rev=743733&view=rev
Log:
CAMEL-209 - Inject producer into enricher processor to improve performance.
Thanks to Martin Krasser for the patch!
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnricherType.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnricherType.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnricherType.java?rev=743733&r1=743732&r2=743733&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnricherType.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnricherType.java
Thu Feb 12 13:34:11 2009
@@ -22,6 +22,7 @@
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
+import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.processor.Enricher;
import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -58,7 +59,8 @@
@Override
public Processor createProcessor(RouteContext routeContext) throws
Exception {
- Enricher enricher = new Enricher(null, resourceUri);
+ Endpoint endpoint = routeContext.resolveEndpoint(resourceUri);
+ Enricher enricher = new Enricher(null, endpoint.createProducer());
if (aggregationStrategyRef != null) {
aggregationStrategy = routeContext.lookup(aggregationStrategyRef,
AggregationStrategy.class);
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=743733&r1=743732&r2=743733&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
Thu Feb 12 13:34:11 2009
@@ -19,47 +19,51 @@
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
-import org.apache.camel.ProducerTemplate;
+import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+
import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
/**
* A content enricher that enriches input data by first obtaining additional
- * data from a <i>resource</i> identified by an <code>resourceUri</code> and
- * second by aggregating input data and additional data. Aggregation of input
- * data and additional data is delegated to an {...@link AggregationStrategy}
+ * data from a <i>resource</i> represented by an endpoint <code>producer</code>
+ * and second by aggregating input data and additional data. Aggregation of
+ * input data and additional data is delegated to an {...@link
AggregationStrategy}
* object.
*/
public class Enricher extends ServiceSupport implements Processor {
- private ProducerTemplate producer;
-
- private String resourceUri;
private AggregationStrategy aggregationStrategy;
+ private Producer producer;
+
/**
* Creates a new {...@link Enricher}. The default aggregation strategy is
to
* copy the additional data obtained from the enricher's resource over the
* input data. When using the copy aggregation strategy the enricher
* degenerates to a normal transformer.
- *
- * @param resourceUri URI of resource endpoint for obtaining additional
data.
+ *
+ * @param producer
+ * producer to resource endpoint.
*/
- public Enricher(String resourceUri) {
- this(defaultAggregationStrategy(), resourceUri);
+ public Enricher(Producer producer) {
+ this(defaultAggregationStrategy(), producer);
}
/**
* Creates a new {...@link Enricher}.
- *
- * @param aggregationStrategy aggregation strategy to aggregate input data
and additional data.
- * @param resourceUri URI of resource endpoint for obtaining
additional data.
+ *
+ * @param aggregationStrategy
+ * aggregation strategy to aggregate input data and additional
+ * data.
+ * @param producer
+ * producer to resource endpoint.
*/
- public Enricher(AggregationStrategy aggregationStrategy, String
resourceUri) {
+ public Enricher(AggregationStrategy aggregationStrategy, Producer
producer) {
this.aggregationStrategy = aggregationStrategy;
- this.resourceUri = resourceUri;
+ this.producer = producer;
}
/**
@@ -80,21 +84,20 @@
/**
* Enriches the input data (<code>exchange</code>) by first obtaining
- * additional data from an endpoint identified by an
- * <code>resourceUri</code> and second by aggregating input data and
- * additional data. Aggregation of input data and additional data is
- * delegated to an {...@link AggregationStrategy} object set at
construction
- * time. If the message exchange with the resource endpoint fails then no
- * aggregation will be done and the failed exchange content is copied over
- * to the original message exchange.
- *
- * @param exchange input data.
+ * additional data from an endpoint represented by an endpoint
+ * <code>producer</code> and second by aggregating input data and
additional
+ * data. Aggregation of input data and additional data is delegated to an
+ * {...@link AggregationStrategy} object set at construction time. If the
+ * message exchange with the resource endpoint fails then no aggregation
+ * will be done and the failed exchange content is copied over to the
+ * original message exchange.
+ *
+ * @param exchange
+ * input data.
*/
public void process(Exchange exchange) throws Exception {
- // create in-out exchange to obtain additional data from resource
Exchange resourceExchange = createResourceExchange(exchange,
ExchangePattern.InOut);
- // send created exchange to resource endpoint
- resourceExchange = getProducerTemplate(exchange).send(resourceUri,
resourceExchange);
+ producer.process(resourceExchange);
if (resourceExchange.isFailed()) {
// copy resource exchange onto original exchange (preserving
pattern)
@@ -134,22 +137,12 @@
return new CopyAggregationStrategy();
}
- private synchronized ProducerTemplate getProducerTemplate(Exchange
exchange) throws Exception {
- if (producer == null) {
- producer = exchange.getContext().createProducerTemplate();
- producer.start();
- }
- return producer;
- }
-
protected void doStart() throws Exception {
+ producer.start();
}
protected void doStop() throws Exception {
- if (producer != null) {
- producer.stop();
- producer = null;
- }
+ producer.stop();
}
private static class CopyAggregationStrategy implements
AggregationStrategy {