Author: davsclaus
Date: Mon Jun 29 11:43:38 2009
New Revision: 789294
URL: http://svn.apache.org/viewvc?rev=789294&view=rev
Log:
CAMEL-1694: added pollEnrich to DSL to enrich using a polling consumer. For
instance to poll a file or download a FTP file.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
- copied, changed from r789205,
camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
- copied, changed from r789205,
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java
- copied, changed from r789205,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnricherTest.java
- copied, changed from r789205,
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnricherTest.java
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
- copied, changed from r789205,
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/SampleAggregator.java
Copied:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
(from r789205,
camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java&r1=789205&r2=789294&rev=789294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/EnrichDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/PollEnrichDefinition.java
Mon Jun 29 11:43:38 2009
@@ -24,55 +24,62 @@
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
-import org.apache.camel.processor.Enricher;
+import org.apache.camel.processor.PollEnricher;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.spi.RouteContext;
/**
- * Represents an XML <enrich/> element
+ * Represents an XML <pollEnrich/> element
*
- * @see Enricher
+ * @see org.apache.camel.processor.Enricher
*/
-...@xmlrootelement(name = "enrich")
+...@xmlrootelement(name = "pollEnrich")
@XmlAccessorType(XmlAccessType.FIELD)
-public class EnrichDefinition extends OutputDefinition<EnrichDefinition> {
+public class PollEnrichDefinition extends
OutputDefinition<PollEnrichDefinition> {
@XmlAttribute(name = "uri", required = true)
private String resourceUri;
-
- @XmlAttribute(name = "strategyRef", required = false)
+
+ @XmlAttribute(name = "timeout")
+ private Long timeout;
+
+ @XmlAttribute(name = "strategyRef")
private String aggregationStrategyRef;
-
+
@XmlTransient
private AggregationStrategy aggregationStrategy;
-
- public EnrichDefinition() {
- this(null, null);
- }
-
- public EnrichDefinition(String resourceUri) {
- this(null, resourceUri);
+
+ public PollEnrichDefinition() {
+ this(null, null, 0);
}
-
- public EnrichDefinition(AggregationStrategy aggregationStrategy, String
resourceUri) {
+
+ public PollEnrichDefinition(AggregationStrategy aggregationStrategy,
String resourceUri, long timeout) {
this.aggregationStrategy = aggregationStrategy;
this.resourceUri = resourceUri;
+ this.timeout = timeout;
}
-
+
@Override
public String toString() {
- return "Enrich[" + resourceUri + " " + aggregationStrategy + "]";
+ return "PollEnrich[" + resourceUri + " " + aggregationStrategy + "]";
}
@Override
public String getShortName() {
- return "enrich";
+ return "pollEnrich";
}
@Override
public Processor createProcessor(RouteContext routeContext) throws
Exception {
Endpoint endpoint = routeContext.resolveEndpoint(resourceUri);
- Enricher enricher = new Enricher(null, endpoint.createProducer());
+
+ PollEnricher enricher;
+ if (timeout != null) {
+ enricher = new PollEnricher(null,
endpoint.createPollingConsumer(), timeout);
+ } else {
+ enricher = new PollEnricher(null,
endpoint.createPollingConsumer(), 0);
+ }
+
if (aggregationStrategyRef != null) {
aggregationStrategy = routeContext.lookup(aggregationStrategyRef,
AggregationStrategy.class);
}
@@ -81,7 +88,8 @@
} else {
enricher.setAggregationStrategy(aggregationStrategy);
}
+
return enricher;
}
-
-}
+
+}
\ No newline at end of file
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=789294&r1=789293&r2=789294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
Mon Jun 29 11:43:38 2009
@@ -1806,8 +1806,8 @@
}
/**
- * Enriches an exchange with additional data obtained from a
- * <code>resourceUri</code>.
+ * The <a href="http://camel.apache.org/content-enricher.html">Content
Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a
<code>resourceUri</code>.
*
* @param resourceUri URI of resource endpoint for obtaining
additional data.
* @param aggregationStrategy aggregation strategy to aggregate input
data and additional data.
@@ -1821,8 +1821,11 @@
}
/**
- * Enriches an exchange with additional data obtained from a
- * <code>resourceUri</code>.
+ * The <a href="http://camel.apache.org/content-enricher.html">Content
Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a
<code>resourceUri</code>.
+ * <p/>
+ * The difference between this and {...@link #pollEnrich(String)} is that
this uses a producer
+ * to obatin the additional data, where as pollEnrich uses a polling
consumer.
*
* @param resourceUri URI of resource endpoint for obtaining
additional data.
* @return the builder
@@ -1835,6 +1838,96 @@
}
/**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content
Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a
<code>resourceUri</code>
+ * using a {...@link org.apache.camel.PollingConsumer} to poll the
endpoint.
+ * <p/>
+ * The difference between this and {...@link #enrich(String)} is that this
uses a consumer
+ * to obatin the additional data, where as enrich uses a producer.
+ * <p/>
+ * This method will block until data is avialable, use the method with
timeout if you do not
+ * want to risk waiting a long time before data is available from the
resourceUri.
+ *
+ * @param resourceUri URI of resource endpoint for obtaining
additional data.
+ * @return the builder
+ * @see org.apache.camel.processor.PollEnricher
+ */
+ @SuppressWarnings("unchecked")
+ public Type pollEnrich(String resourceUri) {
+ addOutput(new PollEnrichDefinition(null, resourceUri, 0));
+ return (Type) this;
+ }
+
+ /**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content
Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a
<code>resourceUri</code>
+ * using a {...@link org.apache.camel.PollingConsumer} to poll the
endpoint.
+ * <p/>
+ * The difference between this and {...@link #enrich(String)} is that this
uses a consumer
+ * to obatin the additional data, where as enrich uses a producer.
+ * <p/>
+ * This method will block until data is avialable, use the method with
timeout if you do not
+ * want to risk waiting a long time before data is available from the
resourceUri.
+ *
+ * @param resourceUri URI of resource endpoint for obtaining
additional data.
+ * @param aggregationStrategy aggregation strategy to aggregate input
data and additional data.
+ * @return the builder
+ * @see org.apache.camel.processor.PollEnricher
+ */
+ @SuppressWarnings("unchecked")
+ public Type pollEnrich(String resourceUri, AggregationStrategy
aggregationStrategy) {
+ addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri,
0));
+ return (Type) this;
+ }
+
+ /**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content
Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a
<code>resourceUri</code>
+ * using a {...@link org.apache.camel.PollingConsumer} to poll the
endpoint.
+ * <p/>
+ * The difference between this and {...@link #enrich(String)} is that this
uses a consumer
+ * to obatin the additional data, where as enrich uses a producer.
+ * <p/>
+ * The timeout controls which operation to use on {...@link
org.apache.camel.PollingConsumer}.
+ * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then
we use <tt>receiveNoWait</tt>
+ * otherwise we use <tt>receive(timeout)</tt>.
+ *
+ * @param resourceUri URI of resource endpoint for obtaining
additional data.
+ * @param timeout timeout in millis to wait at most for data
to be available.
+ * @param aggregationStrategy aggregation strategy to aggregate input
data and additional data.
+ * @return the builder
+ * @see org.apache.camel.processor.PollEnricher
+ */
+ @SuppressWarnings("unchecked")
+ public Type pollEnrich(String resourceUri, long timeout,
AggregationStrategy aggregationStrategy) {
+ addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri,
timeout));
+ return (Type) this;
+ }
+
+ /**
+ * The <a href="http://camel.apache.org/content-enricher.html">Content
Enricher EIP</a>
+ * enriches an exchange with additional data obtained from a
<code>resourceUri</code>
+ * using a {...@link org.apache.camel.PollingConsumer} to poll the
endpoint.
+ * <p/>
+ * The difference between this and {...@link #enrich(String)} is that this
uses a consumer
+ * to obatin the additional data, where as enrich uses a producer.
+ * <p/>
+ * The timeout controls which operation to use on {...@link
org.apache.camel.PollingConsumer}.
+ * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then
we use <tt>receiveNoWait</tt>
+ * otherwise we use <tt>receive(timeout)</tt>.
+ *
+ * @param resourceUri URI of resource endpoint for obtaining
additional data.
+ * @param timeout timeout in millis to wait at most for data
to be available.
+ * @return the builder
+ * @see org.apache.camel.processor.PollEnricher
+ */
+ @SuppressWarnings("unchecked")
+ public Type pollEnrich(String resourceUri, long timeout) {
+ addOutput(new PollEnrichDefinition(null, resourceUri, timeout));
+ return (Type) this;
+ }
+
+ /**
* Adds a onComplection {...@link org.apache.camel.spi.Synchronization}
hook that invoke this route as
* a callback when the {...@link org.apache.camel.Exchange} has finished
being processed.
* The hook invoke callbacks for either onComplete or onFailure.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=789294&r1=789293&r2=789294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
Mon Jun 29 11:43:38 2009
@@ -35,6 +35,11 @@
* and second by aggregating input data and additional data. Aggregation of
* input data and additional data is delegated to an {...@link
AggregationStrategy}
* object.
+ * <p/>
+ * Uses a {...@link org.apache.camel.Producer} to obatin the additional data
as opposed to {...@link PollEnricher}
+ * that uses a {...@link org.apache.camel.PollingConsumer}.
+ *
+ * @see PollEnricher
*/
public class Enricher extends ServiceSupport implements Processor {
Copied:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
(from r789205,
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java&r1=789205&r2=789294&rev=789294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
Mon Jun 29 11:43:38 2009
@@ -18,55 +18,61 @@
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
+import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
-import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
/**
* A content enricher that enriches input data by first obtaining additional
* data from a <i>resource</i> represented by an endpoint <code>producer</code>
* and second by aggregating input data and additional data. Aggregation of
- * input data and additional data is delegated to an {...@link
AggregationStrategy}
+ * input data and additional data is delegated to an {...@link
org.apache.camel.processor.aggregate.AggregationStrategy}
* object.
+ * <p/>
+ * Uses a {...@link org.apache.camel.PollingConsumer} to obatin the additional
data as opposed to {...@link Enricher}
+ * that uses a {...@link org.apache.camel.Producer}.
+ *
+ * @see Enricher
*/
-public class Enricher extends ServiceSupport implements Processor {
+public class PollEnricher extends ServiceSupport implements Processor {
- private static final transient Log LOG = LogFactory.getLog(Enricher.class);
+ private static final transient Log LOG =
LogFactory.getLog(PollEnricher.class);
private AggregationStrategy aggregationStrategy;
- private Producer producer;
+ private PollingConsumer consumer;
+ private long timeout;
/**
- * Creates a new {...@link Enricher}. The default aggregation strategy is
to
+ * Creates a new {...@link PollEnricher}. The default aggregation strategy
is to
* copy the additional data obtained from the enricher's resource over the
* input data. When using the copy aggregation strategy the enricher
* degenerates to a normal transformer.
- *
- * @param producer producer to resource endpoint.
+ *
+ * @param consumer consumer to resource endpoint.
*/
- public Enricher(Producer producer) {
- this(defaultAggregationStrategy(), producer);
+ public PollEnricher(PollingConsumer consumer) {
+ this(defaultAggregationStrategy(), consumer, 0);
}
/**
- * Creates a new {...@link Enricher}.
- *
+ * Creates a new {...@link PollEnricher}.
+ *
* @param aggregationStrategy aggregation strategy to aggregate input
data and additional data.
- * @param producer producer to resource endpoint.
+ * @param consumer consumer to resource endpoint.
*/
- public Enricher(AggregationStrategy aggregationStrategy, Producer
producer) {
+ public PollEnricher(AggregationStrategy aggregationStrategy,
PollingConsumer consumer, long timeout) {
this.aggregationStrategy = aggregationStrategy;
- this.producer = producer;
+ this.consumer = consumer;
+ this.timeout = timeout;
}
/**
- * Sets the aggregation strategy for this enricher.
+ * Sets the aggregation strategy for this poll enricher.
*
* @param aggregationStrategy the aggregationStrategy to set
*/
@@ -75,29 +81,55 @@
}
/**
- * Sets the default aggregation strategy for this enricher.
+ * Sets the default aggregation strategy for this poll enricher.
*/
public void setDefaultAggregationStrategy() {
this.aggregationStrategy = defaultAggregationStrategy();
}
/**
+ * Sets the timeout to use when polling.
+ * <p/>
+ * Use 0 or negative to not use timeout and block until data is available.
+ *
+ * @param timeout timeout in millis.
+ */
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /**
* Enriches the input data (<code>exchange</code>) by first obtaining
* additional data from an endpoint represented by an endpoint
* <code>producer</code> and second by aggregating input data and
additional
* data. Aggregation of input data and additional data is delegated to an
- * {...@link AggregationStrategy} object set at construction time. If the
+ * {...@link org.apache.camel.processor.aggregate.AggregationStrategy}
object set at construction time. If the
* message exchange with the resource endpoint fails then no aggregation
* will be done and the failed exchange content is copied over to the
* original message exchange.
- *
+ *
* @param exchange input data.
*/
public void process(Exchange exchange) throws Exception {
- Exchange resourceExchange = createResourceExchange(exchange,
ExchangePattern.InOut);
- producer.process(resourceExchange);
+ Exchange resourceExchange;
+ if (timeout < 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Consumer receive: " + consumer);
+ }
+ resourceExchange = consumer.receive();
+ } else if (timeout == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Consumer receiveNoWait: " + consumer);
+ }
+ resourceExchange = consumer.receiveNoWait();
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Consumer receive with timeout: " + timeout + " ms.
" + consumer);
+ }
+ resourceExchange = consumer.receive(timeout);
+ }
- if (resourceExchange.isFailed()) {
+ if (resourceExchange != null && resourceExchange.isFailed()) {
// copy resource exchange onto original exchange (preserving
pattern)
copyResultsPreservePattern(exchange, resourceExchange);
} else {
@@ -105,7 +137,10 @@
// aggregate original exchange and resource exchange
// but do not aggregate if the resource exchange was filtered
- Boolean filtered = resourceExchange.getProperty(Exchange.FILTERED,
Boolean.class);
+ Boolean filtered = null;
+ if (resourceExchange != null) {
+ filtered = resourceExchange.getProperty(Exchange.FILTERED,
Boolean.class);
+ }
if (filtered == null || !filtered) {
// prepare the exchanges for aggregation
ExchangeHelper.prepareAggregation(exchange, resourceExchange);
@@ -121,7 +156,7 @@
}
/**
- * Creates a new {...@link DefaultExchange} instance from the given
+ * Creates a new {...@link org.apache.camel.impl.DefaultExchange} instance
from the given
* <code>exchange</code>. The resulting exchange's pattern is defined by
* <code>pattern</code>.
*
@@ -148,15 +183,15 @@
@Override
public String toString() {
- return "Enrich[" + producer.getEndpoint().getEndpointUri() + "]";
+ return "PollEnrich[" + consumer + "]";
}
protected void doStart() throws Exception {
- producer.start();
+ consumer.start();
}
protected void doStop() throws Exception {
- producer.stop();
+ consumer.stop();
}
private static class CopyAggregationStrategy implements
AggregationStrategy {
@@ -168,4 +203,4 @@
}
-}
+}
\ No newline at end of file
Modified:
camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index?rev=789294&r1=789293&r2=789294&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
(original)
+++ camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index
Mon Jun 29 11:43:38 2009
@@ -45,6 +45,7 @@
PackageScanDefinition
PipelineDefinition
PolicyDefinition
+PollEnrichDefinition
ProcessDefinition
RecipientListDefinition
RedeliveryPolicyDefinition
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java
(from r789205,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java&r1=789205&r2=789294&rev=789294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/PollEnricherTest.java
Mon Jun 29 11:43:38 2009
@@ -23,7 +23,7 @@
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-public class EnricherTest extends ContextTestSupport {
+public class PollEnricherTest extends ContextTestSupport {
private static SampleAggregator aggregationStrategy = new
SampleAggregator();
@@ -44,51 +44,66 @@
// InOnly routes
// -------------------------------------------------------------
- public void testEnrichInOnly() throws InterruptedException {
+ public void testPollEnrichInOnly() throws InterruptedException {
+ template.sendBody("seda:foo1", "blah");
+
mock.expectedBodiesReceived("test:blah");
template.sendBody("direct:enricher-test-1", "test");
mock.assertIsSatisfied();
}
- public void testEnrichFaultInOnly() throws InterruptedException {
- mock.expectedMessageCount(0);
- Exchange exchange = template.send("direct:enricher-test-3", new
Processor() {
- public void process(Exchange exchange) {
- exchange.getIn().setBody("test");
- }
- });
+ public void testPollEnrichInOnlyWaitWithTimeout() throws
InterruptedException {
+ // this first try there is no data so we timeout
+ mock.expectedBodiesReceived("test:blah");
+ template.sendBody("direct:enricher-test-2", "test");
+ // not expected data so we are not happy
+ mock.assertIsNotSatisfied();
+
+ // now send it and try again
+ mock.reset();
+ template.sendBody("seda:foo2", "blah");
+ template.sendBody("direct:enricher-test-2", "test");
mock.assertIsSatisfied();
- assertEquals("test", exchange.getIn().getBody());
- assertEquals("failed", exchange.getFault().getBody());
- assertFalse(exchange.hasOut());
- assertNull(exchange.getException());
}
- public void testEnrichErrorInOnly() throws InterruptedException {
- mock.expectedMessageCount(0);
- Exchange exchange = template.send("direct:enricher-test-4", new
Processor() {
- public void process(Exchange exchange) {
- exchange.getIn().setBody("test");
+ public void testPollEnrichInOnlyWaitNoTimeout() throws
InterruptedException {
+ // use another thread to send it after 2 seconds
+ Thread t = new Thread(new Runnable() {
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ template.sendBody("seda:foo3", "blah");
}
});
+
+ long start = System.currentTimeMillis();
+ mock.expectedBodiesReceived("test:blah");
+ t.start();
+ template.sendBody("direct:enricher-test-3", "test");
+ // should take approx 1 sec to complete as the other thread is sending
a bit later and we wait
mock.assertIsSatisfied();
- assertEquals("test", exchange.getIn().getBody());
- assertEquals("failed", exchange.getException().getMessage());
- assertFalse(exchange.hasFault());
- assertFalse(exchange.hasOut());
+ long delta = System.currentTimeMillis() - start;
+ assertTrue("Should take approx 1 sec: was " + delta, delta > 900);
}
// -------------------------------------------------------------
// InOut routes
// -------------------------------------------------------------
- public void testEnrichInOut() throws InterruptedException {
- String result = (String) template.sendBody("direct:enricher-test-5",
ExchangePattern.InOut, "test");
+ public void testPollEnrichInOut() throws InterruptedException {
+ template.sendBody("seda:foo4", "blah");
+
+ String result = (String) template.sendBody("direct:enricher-test-4",
ExchangePattern.InOut, "test");
assertEquals("test:blah", result);
}
- public void testEnrichInOutPlusHeader() throws InterruptedException {
- Exchange exchange = template.send("direct:enricher-test-5",
ExchangePattern.InOut, new Processor() {
+ public void testPollEnrichInOutPlusHeader() throws InterruptedException {
+ template.sendBody("seda:foo4", "blah");
+
+ Exchange exchange = template.send("direct:enricher-test-4",
ExchangePattern.InOut, new Processor() {
public void process(Exchange exchange) {
exchange.getIn().setHeader("foo", "bar");
exchange.getIn().setBody("test");
@@ -100,30 +115,6 @@
assertNull(exchange.getException());
}
- public void testEnrichFaultInOut() throws InterruptedException {
- Exchange exchange = template.send("direct:enricher-test-7",
ExchangePattern.InOut, new Processor() {
- public void process(Exchange exchange) {
- exchange.getIn().setBody("test");
- }
- });
- assertEquals("test", exchange.getIn().getBody());
- assertEquals("failed", exchange.getFault().getBody());
- assertFalse(exchange.hasOut());
- assertNull(exchange.getException());
- }
-
- public void testEnrichErrorInOut() throws InterruptedException {
- Exchange exchange = template.send("direct:enricher-test-8",
ExchangePattern.InOut, new Processor() {
- public void process(Exchange exchange) {
- exchange.getIn().setBody("test");
- }
- });
- assertEquals("test", exchange.getIn().getBody());
- assertEquals("failed", exchange.getException().getMessage());
- assertFalse(exchange.hasFault());
- assertFalse(exchange.hasOut());
- }
-
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
@@ -132,39 +123,25 @@
//
-------------------------------------------------------------
from("direct:enricher-test-1")
- .enrich("direct:enricher-constant-resource",
aggregationStrategy)
+ .pollEnrich("seda:foo1", aggregationStrategy)
.to("mock:mock");
- from("direct:enricher-test-3")
- .enrich("direct:enricher-fault-resource",
aggregationStrategy)
+ from("direct:enricher-test-2")
+ .pollEnrich("seda:foo2", 1000, aggregationStrategy)
.to("mock:mock");
- from("direct:enricher-test-4").errorHandler(noErrorHandler())
// avoid re-deliveries
- .enrich("direct:enricher-error-resource",
aggregationStrategy).to("mock:mock");
+ from("direct:enricher-test-3")
+ .pollEnrich("seda:foo3", -1, aggregationStrategy)
+ .to("mock:mock");
//
-------------------------------------------------------------
// InOut routes
//
-------------------------------------------------------------
- from("direct:enricher-test-5")
- .enrich("direct:enricher-constant-resource",
aggregationStrategy);
-
- from("direct:enricher-test-7")
- .enrich("direct:enricher-fault-resource",
aggregationStrategy);
-
- from("direct:enricher-test-8").errorHandler(noErrorHandler())
// avoid re-deliveries
- .enrich("direct:enricher-error-resource",
aggregationStrategy);
-
- //
-------------------------------------------------------------
- // Enricher resources
- //
-------------------------------------------------------------
-
-
from("direct:enricher-constant-resource").transform().constant("blah");
-
-
from("direct:enricher-fault-resource").errorHandler(noErrorHandler()).process(new
FailureProcessor(false));
-
from("direct:enricher-error-resource").errorHandler(noErrorHandler()).process(new
FailureProcessor(true));
+ from("direct:enricher-test-4")
+ .pollEnrich("seda:foo4", aggregationStrategy);
}
};
}
-}
+}
\ No newline at end of file
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/SampleAggregator.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/SampleAggregator.java?rev=789294&r1=789293&r2=789294&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/SampleAggregator.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/SampleAggregator.java
Mon Jun 29 11:43:38 2009
@@ -22,6 +22,9 @@
public class SampleAggregator implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (newExchange == null) {
+ return oldExchange;
+ }
Object oldBody = oldExchange.getIn().getBody();
Object newBody = newExchange.getIn().getBody();
oldExchange.getIn().setBody(oldBody + ":" + newBody);
Copied:
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnricherTest.java
(from r789205,
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnricherTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnricherTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnricherTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnricherTest.java&r1=789205&r2=789294&rev=789294&view=diff
==============================================================================
---
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringEnricherTest.java
(original)
+++
camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringPollEnricherTest.java
Mon Jun 29 11:43:38 2009
@@ -17,32 +17,13 @@
package org.apache.camel.spring.processor;
import org.apache.camel.CamelContext;
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.component.mock.MockEndpoint;
-
+import org.apache.camel.processor.enricher.PollEnricherTest;
import static
org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
-/**
- * @author Martin Krasser
- */
-public class SpringEnricherTest extends ContextTestSupport {
-
- private MockEndpoint mock;
-
- @Override
- protected void setUp() throws Exception {
- super.setUp();
- mock = getMockEndpoint("mock:result");
- }
+public class SpringPollEnricherTest extends PollEnricherTest {
- public void testEnrich() throws Exception {
- mock.expectedBodiesReceived("test:blah");
- template.sendBody("direct:start", "test");
- mock.assertIsSatisfied();
- }
-
protected CamelContext createCamelContext() throws Exception {
- return createSpringCamelContext(this,
"org/apache/camel/spring/processor/enricher.xml");
+ return createSpringCamelContext(this,
"org/apache/camel/spring/processor/pollEnricher.xml");
}
-}
+}
\ No newline at end of file
Copied:
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
(from r789205,
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml&r1=789205&r2=789294&rev=789294&view=diff
==============================================================================
---
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/enricher.xml
(original)
+++
camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/pollEnricher.xml
Mon Jun 29 11:43:38 2009
@@ -22,22 +22,34 @@
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
">
- <!-- START SNIPPET: example -->
- <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
- <route>
- <from uri="direct:start"/>
- <enrich uri="direct:resource" strategyRef="sampleAggregator"/>
- <to uri="mock:result"/>
- </route>
- <route>
- <from uri="direct:resource"/>
- <transform>
- <constant>blah</constant>
- </transform>
- </route>
- </camelContext>
+ <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
+ <!-- START SNIPPET: e1 -->
+ <route>
+ <from uri="direct:enricher-test-1"/>
+ <pollEnrich uri="seda:foo1" strategyRef="sampleAggregator"/>
+ <to uri="mock:mock"/>
+ </route>
+ <!-- END SNIPPET: e1 -->
- <bean id="sampleAggregator"
class="org.apache.camel.processor.enricher.SampleAggregator" />
- <!-- END SNIPPET: example -->
+ <route>
+ <from uri="direct:enricher-test-2"/>
+ <pollEnrich uri="seda:foo2" timeout="1000"
strategyRef="sampleAggregator"/>
+ <to uri="mock:mock"/>
+ </route>
+
+ <route>
+ <from uri="direct:enricher-test-3"/>
+ <pollEnrich uri="seda:foo3" timeout="-1"
strategyRef="sampleAggregator"/>
+ <to uri="mock:mock"/>
+ </route>
+
+ <route>
+ <from uri="direct:enricher-test-4"/>
+ <pollEnrich uri="seda:foo4" strategyRef="sampleAggregator"/>
+ <to uri="mock:mock"/>
+ </route>
+ </camelContext>
+
+ <bean id="sampleAggregator"
class="org.apache.camel.processor.enricher.SampleAggregator"/>
</beans>