Author: janstey
Date: Tue Dec 30 11:36:03 2008
New Revision: 730218
URL: http://svn.apache.org/viewvc?rev=730218&view=rev
Log:
CAMEL-1041 - Added ability to customize aggregation strategy for the Splitter
in Spring DSL
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java?rev=730218&r1=730217&r2=730218&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
Tue Dec 30 11:36:03 2008
@@ -51,6 +51,8 @@
@XmlTransient
private Executor executor;
@XmlAttribute(required = false)
+ private String strategyRef;
+ @XmlAttribute(required = false)
private String threadPoolExecutorRef;
@XmlAttribute(required = false)
private Boolean streaming = false;
@@ -79,13 +81,36 @@
@Override
public Processor createProcessor(RouteContext routeContext) throws
Exception {
Processor childProcessor = routeContext.createProcessor(this);
- if (aggregationStrategy == null) {
- aggregationStrategy = new UseLatestAggregationStrategy();
- }
+ aggregationStrategy = createAggregationStrategy(routeContext);
executor = createThreadPoolExecutor(routeContext);
return new Splitter(getExpression().createExpression(routeContext),
childProcessor, aggregationStrategy,
isParallelProcessing(), executor, streaming);
}
+
+
+ private AggregationStrategy createAggregationStrategy(RouteContext
routeContext) {
+ AggregationStrategy strategy = getAggregationStrategy();
+ if (strategy == null && strategyRef != null) {
+ strategy = routeContext.lookup(strategyRef,
AggregationStrategy.class);
+ }
+ if (strategy == null) {
+ // fallback to use latest
+ strategy = new UseLatestAggregationStrategy();
+ }
+ return strategy;
+ }
+
+ private Executor createThreadPoolExecutor(RouteContext routeContext) {
+ Executor executor = getExecutor();
+ if (executor == null && threadPoolExecutorRef != null) {
+ executor = routeContext.lookup(threadPoolExecutorRef,
ThreadPoolExecutor.class);
+ }
+ if (executor == null) {
+ // fall back and use default
+ executor = new ThreadPoolExecutor(4, 16, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+ }
+ return executor;
+ }
// Fluent API
//
-------------------------------------------------------------------------
@@ -181,20 +206,8 @@
public void setStreaming(boolean streaming) {
this.streaming = streaming;
- }
-
- private Executor createThreadPoolExecutor(RouteContext routeContext) {
- Executor executor = getExecutor();
- if (executor == null && threadPoolExecutorRef != null) {
- executor = routeContext.lookup(threadPoolExecutorRef,
ThreadPoolExecutor.class);
- }
- if (executor == null) {
- // fall back and use default
- executor = new ThreadPoolExecutor(4, 16, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
- }
- return executor;
- }
-
+ }
+
public Executor getExecutor() {
return executor;
}