Author: davsclaus
Date: Mon Oct 6 13:06:50 2008
New Revision: 702247
URL: http://svn.apache.org/viewvc?rev=702247&view=rev
Log:
CAMEL-951: Aggregator EIP improvements: added unit test. easier configuration
of aggregator collection. added out batch size option to batch processor.
AggregatorCollection is now an interface to let end-users provide their own
implementation. Polished javadoc.
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
(contents, props changed)
- copied, changed from r701171,
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
Mon Oct 6 13:06:50 2008
@@ -47,6 +47,8 @@
* This typically won't be required as an exchange can be created with a
specific MEP
* by calling [EMAIL PROTECTED] Endpoint#createExchange(ExchangePattern)}
but it is here just in case
* it is needed.
+ *
+ * @param pattern the pattern
*/
void setPattern(ExchangePattern pattern);
@@ -122,6 +124,7 @@
* Returns the outbound message; optionally lazily creating one if one has
* not been associated with this exchange
*
+ * @param lazyCreate <tt>true</tt> will lazy create the out message
* @return the response
*/
Message getOut(boolean lazyCreate);
@@ -145,7 +148,6 @@
* not been associated with this exchange
*
* @param lazyCreate <tt>true</tt> will lazy create the fault message
- *
* @return the fault
*/
Message getFault(boolean lazyCreate);
@@ -216,16 +218,12 @@
void setUnitOfWork(UnitOfWork unitOfWork);
/**
- * Returns the exchange id
- *
- * @return the unique id of the exchange
+ * Returns the exchange id (unique)
*/
String getExchangeId();
/**
* Set the exchange id
- *
- * @param id
*/
void setExchangeId(String id);
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
Mon Oct 6 13:06:50 2008
@@ -54,6 +54,8 @@
@XmlAttribute(required = false)
private Integer batchSize;
@XmlAttribute(required = false)
+ private Integer outBatchSize;
+ @XmlAttribute(required = false)
private Long batchTimeout;
@XmlAttribute(required = false)
private String strategyRef;
@@ -120,15 +122,21 @@
final Aggregator aggregator;
if (aggregationCollection != null) {
- aggregator = new Aggregator(from, processor,
aggregationCollection);
- } else {
- AggregationStrategy strategy = getAggregationStrategy();
- if (strategy == null && strategyRef != null) {
- strategy = routeContext.lookup(strategyRef,
AggregationStrategy.class);
+ // create the aggregator using the collection
+ // pre configure the collection if its expression and strategy is
not set, then
+ // use the ones that is pre configured with this type
+ if (aggregationCollection.getCorrelationExpression() == null) {
+
aggregationCollection.setCorrelationExpression(getExpression());
}
- if (strategy == null) {
- strategy = new UseLatestAggregationStrategy();
+ if (aggregationCollection.getAggregationStrategy() == null) {
+ AggregationStrategy strategy =
createAggregationStrategy(routeContext);
+ aggregationCollection.setAggregationStrategy(strategy);
}
+ aggregator = new Aggregator(from, processor,
aggregationCollection);
+ } else {
+ // create the aggregator using a default collection
+ AggregationStrategy strategy =
createAggregationStrategy(routeContext);
+
Expression aggregateExpression =
getExpression().createExpression(routeContext);
Predicate predicate = null;
@@ -149,9 +157,26 @@
if (batchTimeout != null) {
aggregator.setBatchTimeout(batchTimeout);
}
+
+ if (outBatchSize != null) {
+ aggregator.setOutBatchSize(outBatchSize);
+ }
return aggregator;
}
+
+ private AggregationStrategy createAggregationStrategy(RouteContext
routeContext) {
+ AggregationStrategy strategy = getAggregationStrategy();
+ if (strategy == null && strategyRef != null) {
+ strategy = routeContext.lookup(strategyRef,
AggregationStrategy.class);
+ }
+ if (strategy == null) {
+ // fallback to use latest
+ strategy = new UseLatestAggregationStrategy();
+ }
+ return strategy;
+ }
+
public AggregationCollection getAggregationCollection() {
return aggregationCollection;
}
@@ -176,6 +201,14 @@
this.batchSize = batchSize;
}
+ public Integer getOutBatchSize() {
+ return outBatchSize;
+ }
+
+ public void setOutBatchSize(Integer outBatchSize) {
+ this.outBatchSize = outBatchSize;
+ }
+
public Long getBatchTimeout() {
return batchTimeout;
}
@@ -207,11 +240,31 @@
return this;
}
+ public AggregatorType outBatchSize(int batchSize) {
+ setOutBatchSize(batchSize);
+ return this;
+ }
+
public AggregatorType batchTimeout(long batchTimeout) {
setBatchTimeout(batchTimeout);
return this;
}
+ public AggregatorType aggregationCollection(AggregationCollection
aggregationCollection) {
+ setAggregationCollection(aggregationCollection);
+ return this;
+ }
+
+ public AggregatorType aggregationStrategy(AggregationStrategy
aggregationStrategy) {
+ setAggregationStrategy(aggregationStrategy);
+ return this;
+ }
+
+ public AggregatorType strategyRef(String strategyRef) {
+ setStrategyRef(strategyRef);
+ return this;
+ }
+
/**
* Sets the predicate used to determine if the aggregation is completed
*
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
Mon Oct 6 13:06:50 2008
@@ -39,8 +39,6 @@
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Route;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.builder.Builder;
import org.apache.camel.builder.DataFormatClause;
import org.apache.camel.builder.DeadLetterChannelBuilder;
import org.apache.camel.builder.ErrorHandlerBuilder;
@@ -56,8 +54,8 @@
import org.apache.camel.processor.ConvertBodyProcessor;
import org.apache.camel.processor.DelegateProcessor;
import org.apache.camel.processor.Pipeline;
-import org.apache.camel.processor.aggregate.AggregationCollection;
import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.AggregationCollection;
import org.apache.camel.processor.idempotent.MessageIdRepository;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.ErrorHandlerWrappingStrategy;
@@ -702,15 +700,17 @@
/**
* Creates an <a
* href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
- * pattern using a custom aggregation collection implementation.
+ * pattern using a custom aggregation collection implementation. The
aggregation collection must
+ * be configued with the strategy and correlation expression that this
aggregator should use.
+ * This avoids duplicating this configuration on both the collection and
the aggregator itself.
*
* @param aggregationCollection the collection used to perform the
aggregation
*/
- public ExpressionClause<AggregatorType> aggregator(AggregationCollection
aggregationCollection) {
+ public AggregatorType aggregator(AggregationCollection
aggregationCollection) {
AggregatorType answer = new AggregatorType();
answer.setAggregationCollection(aggregationCollection);
addOutput(answer);
- return ExpressionClause.createAndSetExpression(answer);
+ return answer;
}
/**
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
Mon Oct 6 13:06:50 2008
@@ -20,9 +20,10 @@
import org.apache.camel.Expression;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
-import org.apache.camel.processor.aggregate.AggregationCollection;
+import org.apache.camel.processor.aggregate.DefaultAggregationCollection;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.PredicateAggregationCollection;
+import org.apache.camel.processor.aggregate.AggregationCollection;
/**
* An implementation of the <a
@@ -46,7 +47,7 @@
public Aggregator(Endpoint endpoint, Processor processor, Expression
correlationExpression,
AggregationStrategy aggregationStrategy) {
- this(endpoint, processor, new
AggregationCollection(correlationExpression, aggregationStrategy));
+ this(endpoint, processor, new
DefaultAggregationCollection(correlationExpression, aggregationStrategy));
}
public Aggregator(Endpoint endpoint, Processor processor, Expression
correlationExpression,
@@ -67,10 +68,12 @@
@Override
protected boolean isBatchCompleted(int index) {
if (aggregationCompletedPredicate != null) {
+ // TODO: (davsclaus) What is the point with this code? I think its
wrong
if (getCollection().size() > 0) {
return true;
}
}
+
return super.isBatchCompleted(index);
}
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
Mon Oct 6 13:06:50 2008
@@ -46,6 +46,7 @@
private Collection<Exchange> collection;
private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
private int batchSize = DEFAULT_BATCH_SIZE;
+ private int outBatchSize;
private PollingConsumer consumer;
private ExceptionHandler exceptionHandler;
@@ -89,10 +90,31 @@
return batchSize;
}
+ /**
+ * Sets the <b>in</b> batch size. This is the number of incomiing
exchanges that this batch processor
+ * will process before its completed. The default value is [EMAIL
PROTECTED] #DEFAULT_BATCH_SIZE}.
+ *
+ * @param batchSize the size
+ */
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
+ public int getOutBatchSize() {
+ return outBatchSize;
+ }
+
+ /**
+ * Sets the <b>out</b> batch size. If the batch processor holds more
exchanges than this out size then
+ * the completion is triggered. Can for instance be used to ensure that
this batch is completed when
+ * a certain number of exchanges has been collected. By default this
feature is <b>not</b> used.
+ *
+ * @param outBatchSize the size
+ */
+ public void setOutBatchSize(int outBatchSize) {
+ this.outBatchSize = outBatchSize;
+ }
+
public long getBatchTimeout() {
return batchTimeout;
}
@@ -119,12 +141,16 @@
for (int i = 0; !isBatchCompleted(i); i++) {
long timeout = end - System.currentTimeMillis();
if (timeout < 0L) {
- LOG.debug("batch timeout expired at batch index:" + i);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("batch timeout expired at batch index: " + i);
+ }
break;
}
Exchange exchange = consumer.receive(timeout);
if (exchange == null) {
- LOG.debug("receive with timeout: " + timeout + " expired at
batch index:" + i);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("receive with timeout: " + timeout + " expired
at batch index: " + i);
+ }
break;
}
collection.add(exchange);
@@ -148,6 +174,11 @@
* A strategy method to decide if the batch is completed the resulting
exchanges should be sent
*/
protected boolean isBatchCompleted(int index) {
+ // out batch size is optional and we should only check if its enabled
(> 0)
+ if (outBatchSize > 0 && collection.size() >= outBatchSize) {
+ return true;
+ }
+ // fallback yo regular batch size check
return index >= batchSize;
}
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java?rev=702247&view=auto
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
Mon Oct 6 13:06:50 2008
@@ -0,0 +1,60 @@
+package org.apache.camel.processor.aggregate;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+
+/**
+ * A [EMAIL PROTECTED] Collection} which aggregates exchanges together,
+ * using a correlation [EMAIL PROTECTED] Expression} and a [EMAIL PROTECTED]
AggregationStrategy}.
+ * <p/>
+ * The Default Implementation will group messages based on the correlation
expression.
+ * Other implementations could for instance just add all exchanges as a batch.
+ *
+ * @version $Revision$
+ */
+public interface AggregationCollection extends Collection<Exchange> {
+
+ /**
+ * Gets the correlation expression
+ */
+ Expression<Exchange> getCorrelationExpression();
+
+ /**
+ * Sets the correlation expression to be used
+ */
+ void setCorrelationExpression(Expression<Exchange> correlationExpression);
+
+ /**
+ * Gets the aggregation strategy
+ */
+ AggregationStrategy getAggregationStrategy();
+
+ /**
+ * Sets the aggregation strategy to be used
+ */
+ void setAggregationStrategy(AggregationStrategy aggregationStrategy);
+
+ /**
+ * Adds the given exchange to this collection
+ */
+ boolean add(Exchange exchange);
+
+ /**
+ * Gets the iterator to iterate this collection.
+ */
+ Iterator<Exchange> iterator();
+
+ /**
+ * Gets the size of this collection
+ */
+ int size();
+
+ /**
+ * Clears this colleciton
+ */
+ void clear();
+
+}
Copied:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
(from r701171,
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java?p2=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java&p1=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java&r1=701171&r2=702247&rev=702247&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
Mon Oct 6 13:06:50 2008
@@ -34,14 +34,17 @@
*
* @version $Revision$
*/
-public class AggregationCollection extends AbstractCollection<Exchange> {
- private static final transient Log LOG =
LogFactory.getLog(AggregationCollection.class);
- private final Expression<Exchange> correlationExpression;
- private final AggregationStrategy aggregationStrategy;
+public class DefaultAggregationCollection extends AbstractCollection<Exchange>
implements AggregationCollection {
+
+ private static final transient Log LOG =
LogFactory.getLog(DefaultAggregationCollection.class);
+ private Expression<Exchange> correlationExpression;
+ private AggregationStrategy aggregationStrategy;
private Map<Object, Exchange> map = new LinkedHashMap<Object, Exchange>();
- public AggregationCollection(Expression<Exchange> correlationExpression,
- AggregationStrategy aggregationStrategy) {
+ public DefaultAggregationCollection() {
+ }
+
+ public DefaultAggregationCollection(Expression<Exchange>
correlationExpression, AggregationStrategy aggregationStrategy) {
this.correlationExpression = correlationExpression;
this.aggregationStrategy = aggregationStrategy;
}
@@ -96,4 +99,20 @@
*/
protected void onAggregation(Object correlationKey, Exchange newExchange) {
}
+
+ public Expression<Exchange> getCorrelationExpression() {
+ return correlationExpression;
+ }
+
+ public void setCorrelationExpression(Expression<Exchange>
correlationExpression) {
+ this.correlationExpression = correlationExpression;
+ }
+
+ public AggregationStrategy getAggregationStrategy() {
+ return aggregationStrategy;
+ }
+
+ public void setAggregationStrategy(AggregationStrategy
aggregationStrategy) {
+ this.aggregationStrategy = aggregationStrategy;
+ }
}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
Mon Oct 6 13:06:50 2008
@@ -30,7 +30,7 @@
*
* @version $Revision$
*/
-public class PredicateAggregationCollection extends AggregationCollection {
+public class PredicateAggregationCollection extends
DefaultAggregationCollection {
private Predicate aggregationCompletedPredicate;
private List<Exchange> collection = new ArrayList<Exchange>();
@@ -42,8 +42,7 @@
@Override
protected void onAggregation(Object correlationKey, Exchange newExchange) {
if (aggregationCompletedPredicate.matches(newExchange)) {
- // this exchange has now aggregated so lets add it to the
collection of things
- // to send
+ // this exchange has now aggregated so lets add it to the
collection of things to send
super.getMap().remove(correlationKey);
collection.add(newExchange);
}
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java
Mon Oct 6 13:06:50 2008
@@ -70,19 +70,17 @@
template = context.createProducerTemplate();
- if (useRouteBuilder) {
+ if (isUseRouteBuilder()) {
RouteBuilder[] builders = createRouteBuilders();
for (RouteBuilder builder : builders) {
log.debug("Using created route builder: " + builder);
context.addRoutes(builder);
}
+ startCamelContext();
+ log.debug("Routing Rules are: " + context.getRoutes());
} else {
log.debug("Using route builder from the created context: " +
context);
}
-
- startCamelContext();
-
- log.debug("Routing Rules are: " + context.getRoutes());
}
@Override
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
Mon Oct 6 13:06:50 2008
@@ -51,12 +51,6 @@
protected DelegateProcessor interceptor1;
protected DelegateProcessor interceptor2;
- @Override
- protected void setUp() throws Exception {
-
- super.setUp();
- }
-
protected List<Route> buildSimpleRoute() throws Exception {
// START SNIPPET: e1
RouteBuilder builder = new RouteBuilder() {
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java?rev=702247&view=auto
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
(added)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
Mon Oct 6 13:06:50 2008
@@ -0,0 +1,154 @@
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for the batch size options on aggregator.
+ */
+public class AggregatorBatchOptionsTest extends ContextTestSupport {
+
+ public boolean isUseRouteBuilder() {
+ return false;
+ }
+
+ public void testAggregateOutBatchSize() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e1
+ // our route is aggregating from the direct queue and sending
the response to the mock
+ from("direct:start")
+ // aggregated by header id
+ // as we have not configured more on the aggregator it
will default to aggregate the
+ // latest exchange only
+ .aggregator().header("id")
+ // wait for 2 seconds to aggregate
+ .batchTimeout(2000L)
+ // batch size in is the limit of number of exchanges
recieved, so when we have received 100
+ // exchanges then whatever we have in the collection will
be sent
+ .batchSize(100)
+ // limit the out batch size to 3 so when we have
aggregated 3 exchanges
+ // and we reach this limit then the exchanges is send
+ .outBatchSize(3)
+ .to("mock:result");
+ // END SNIPPET: e1
+ }
+ });
+ startCamelContext();
+
+ // START SNIPPET: e2
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ // we expect 3 messages grouped by the latest message only
+ result.expectedMessageCount(3);
+ result.expectedBodiesReceived("Message 1c", "Message 2b", "Message
3a");
+
+ // then we sent all the message at once
+ template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+ // when we send message 4 then we will reach the collection batch size
limit and the
+ // exchanges above is the ones we have aggregated in the first batch
+ template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+ template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+
+ assertMockEndpointsSatisfied();
+ // END SNIPPET: e2
+ }
+
+ public void testAggregateBatchSize() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e3
+ // our route is aggregating from the direct queue and sending
the response to the mock
+ from("direct:start")
+ // aggregated by header id
+ // as we have not configured more on the aggregator it
will default to aggregate the
+ // latest exchange only
+ .aggregator().header("id")
+ // wait for 2 seconds to aggregate
+ .batchTimeout(2000L)
+ // batch size in is the limit of number of exchanges
recieved, so when we have received 100
+ // exchanges then whatever we have in the collection will
be sent
+ .batchSize(5)
+ .to("mock:result");
+ // END SNIPPET: e3
+ }
+ });
+ startCamelContext();
+
+ // START SNIPPET: e4
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ // we expect 3 messages grouped by the latest message only
+ result.expectedMessageCount(2);
+ result.expectedBodiesReceived("Message 1c", "Message 2b");
+
+ // then we sent all the message at once
+ template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+ // when we sent the next message we have reached the in batch size
limit and the current
+ // aggregated exchanges will be sent
+ template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+ template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+
+ assertMockEndpointsSatisfied();
+ // END SNIPPET: e4
+ }
+
+ public void testAggregateBatchTimeout() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e5
+ // our route is aggregating from the direct queue and sending
the response to the mock
+ from("direct:start")
+ // aggregated by header id
+ // as we have not configured more on the aggregator it
will default to aggregate the
+ // latest exchange only
+ .aggregator().header("id")
+ // wait for 0.5 seconds to aggregate
+ .batchTimeout(500L)
+ .to("mock:result");
+ // END SNIPPET: e5
+ }
+ });
+ startCamelContext();
+
+ // START SNIPPET: e6
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ // we expect 3 messages grouped by the latest message only
+ result.expectedMessageCount(3);
+ result.expectedBodiesReceived("Message 1c", "Message 2b", "Message
3a");
+
+ // then we sent all the message at once
+ template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+ Thread.sleep(600L);
+ // these messages are not aggregated as the timeout should have
accoured
+ template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+ template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+
+ assertMockEndpointsSatisfied();
+ // END SNIPPET: e6
+ }
+
+}
\ No newline at end of file
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java?rev=702247&view=auto
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
(added)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
Mon Oct 6 13:06:50 2008
@@ -0,0 +1,61 @@
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Unit test for using our own aggregation strategy.
+ */
+public class CustomAggregationStrategyTest extends ContextTestSupport {
+
+ public void testCustomAggregationStrategy() throws Exception {
+ // START SNIPPET: e2
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ // we expect 4 messages as they have different header id
+ result.expectedMessageCount(2);
+ result.expectedBodiesReceived("200", "150");
+
+ // then we sent all the message at once
+ template.sendBodyAndHeader("direct:start", "100", "id", "1");
+ template.sendBodyAndHeader("direct:start", "150", "id", "2");
+ template.sendBodyAndHeader("direct:start", "130", "id", "2");
+ template.sendBodyAndHeader("direct:start", "200", "id", "1");
+ template.sendBodyAndHeader("direct:start", "190", "id", "1");
+
+ assertMockEndpointsSatisfied();
+ // END SNIPPET: e2
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e1
+ // our route is aggregating from the direct queue and sending
the response to the mock
+ from("direct:start")
+ // aggregated by header id and use our own strategy how to
aggregate
+ .aggregator(new MyAggregationStrategy()).header("id")
+ // wait for 2 seconds to aggregate
+ .batchTimeout(2000L)
+ .to("mock:result");
+ // END SNIPPET: e1
+ }
+ };
+ }
+
+ // START SNIPPET: e3
+ private static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ int oldPrice = oldExchange.getIn().getBody(Integer.class);
+ int newPrice = newExchange.getIn().getBody(Integer.class);
+ // return the "winner" that has the highest price
+ return newPrice > oldPrice ? newExchange : oldExchange;
+ }
+ }
+ // END SNIPPET: e3
+}
\ No newline at end of file
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java?rev=702247&view=auto
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
(added)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
Mon Oct 6 13:06:50 2008
@@ -0,0 +1,54 @@
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for DefaultAggregatorCollection.
+ */
+public class DefaultAggregatorCollectionTest extends ContextTestSupport {
+
+ public void testDefaultAggregateCollection() throws Exception {
+ // START SNIPPET: e2
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ // we expect 4 messages grouped by the latest message only
+ result.expectedMessageCount(4);
+ result.expectedBodiesReceived("Message 1d", "Message 2b", "Message
3c", "Message 4");
+
+ // then we sent all the message at once
+ template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+
+ assertMockEndpointsSatisfied();
+ // END SNIPPET: e2
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e1
+ // our route is aggregating from the direct queue and sending
the response to the mock
+ from("direct:start")
+ // aggregated by header id
+ // as we have not configured more on the aggregator it
will default to aggregate the
+ // latest exchange only
+ .aggregator().header("id")
+ // wait for 2 seconds to aggregate
+ .batchTimeout(2000L)
+ .to("mock:result");
+ // END SNIPPET: e1
+ }
+ };
+ }
+}
\ No newline at end of file
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java?rev=702247&view=auto
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
(added)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
Mon Oct 6 13:06:50 2008
@@ -0,0 +1,65 @@
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.PredicateAggregationCollection;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.apache.camel.processor.aggregate.AggregationCollection;
+
+/**
+ * Unit test for PredicateAggregatorCollection.
+ */
+public class PredicateAggregatorCollectionTest extends ContextTestSupport {
+
+ public void testPredicateAggregateCollection() throws Exception {
+ // START SNIPPET: e2
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ // we only expect two messages as they have reached the completed
predicate
+ // that we want 3 messages that has the same header id
+ result.expectedMessageCount(2);
+ result.expectedBodiesReceived("Message 1c", "Message 3c");
+
+ // then we sent all the message at once
+ template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+ template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+ template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+ template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+
+ assertMockEndpointsSatisfied();
+ // END SNIPPET: e2
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ // START SNIPPET: e1
+ // create the aggregation collection we will use.
+ // - we will correlate the recieved message based on the id
header
+ // - as we will just keep the latest message we use the latest
strategy
+ // - and finally we stop aggregate if we recieve 2 or more
messages
+ AggregationCollection ag = new
PredicateAggregationCollection(header("id"),
+ new UseLatestAggregationStrategy(),
+ header(Exchange.AGGREGATED_COUNT).isEqualTo(3));
+
+ // our route is aggregating from the direct queue and sending
the response to the mock
+ from("direct:start")
+ // we use the collection based aggregator we already have
configued
+ .aggregator(ag)
+ // wait for 2 seconds to aggregate
+ .batchTimeout(2000L)
+ .to("mock:result");
+ // END SNIPPET: e1
+ }
+ };
+ }
+}