Author: janstey
Date: Tue Dec 30 11:36:03 2008
New Revision: 730218

URL: http://svn.apache.org/viewvc?rev=730218&view=rev
Log:
CAMEL-1041 - Added ability to customize aggregation strategy for the Splitter 
in Spring DSL

Modified:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java?rev=730218&r1=730217&r2=730218&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitterType.java
 Tue Dec 30 11:36:03 2008
@@ -51,6 +51,8 @@
     @XmlTransient
     private Executor executor;
     @XmlAttribute(required = false)
+    private String strategyRef;
+    @XmlAttribute(required = false)
     private String threadPoolExecutorRef;
     @XmlAttribute(required = false)
     private Boolean streaming = false;
@@ -79,13 +81,36 @@
     @Override
     public Processor createProcessor(RouteContext routeContext) throws 
Exception {
         Processor childProcessor = routeContext.createProcessor(this);
-        if (aggregationStrategy == null) {
-            aggregationStrategy = new UseLatestAggregationStrategy();
-        }
+        aggregationStrategy = createAggregationStrategy(routeContext);
         executor = createThreadPoolExecutor(routeContext);
         return new Splitter(getExpression().createExpression(routeContext), 
childProcessor, aggregationStrategy,
                 isParallelProcessing(), executor, streaming);
     }
+
+    
+    private AggregationStrategy createAggregationStrategy(RouteContext 
routeContext) {
+        AggregationStrategy strategy = getAggregationStrategy();
+        if (strategy == null && strategyRef != null) {
+            strategy = routeContext.lookup(strategyRef, 
AggregationStrategy.class);
+        }
+        if (strategy == null) {
+            // fallback to use latest
+            strategy = new UseLatestAggregationStrategy();
+        }
+        return strategy;
+    }        
+    
+    private Executor createThreadPoolExecutor(RouteContext routeContext) {
+        Executor executor = getExecutor();
+        if (executor == null && threadPoolExecutorRef != null) {
+            executor = routeContext.lookup(threadPoolExecutorRef, 
ThreadPoolExecutor.class);
+        }
+        if (executor == null) {
+            // fall back and use default
+            executor = new ThreadPoolExecutor(4, 16, 0L, 
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+        }
+        return executor;
+    }         
     
     // Fluent API
     // 
-------------------------------------------------------------------------
@@ -181,20 +206,8 @@
 
     public void setStreaming(boolean streaming) {
         this.streaming = streaming;
-    }
-
-    private Executor createThreadPoolExecutor(RouteContext routeContext) {
-        Executor executor = getExecutor();
-        if (executor == null && threadPoolExecutorRef != null) {
-            executor = routeContext.lookup(threadPoolExecutorRef, 
ThreadPoolExecutor.class);
-        }
-        if (executor == null) {
-            // fall back and use default
-            executor = new ThreadPoolExecutor(4, 16, 0L, 
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
-        }
-        return executor;
-    }    
-   
+    }  
+    
     public Executor getExecutor() {
         return executor;
     }


Reply via email to