Repository: camel
Updated Branches:
  refs/heads/master c9a408b20 -> f1dbb4852


CAMEL-7521: Added parallelAggregate option to mutlicast/splitter/recipient list 
eips. Thanks to Jerry Williamson for the patch.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f1dbb485
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f1dbb485
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f1dbb485

Branch: refs/heads/master
Commit: f1dbb4852da4d5848e35e9ab21626cdd8cdf7216
Parents: c9a408b
Author: Claus Ibsen <[email protected]>
Authored: Fri Jul 4 13:30:42 2014 +0200
Committer: Claus Ibsen <[email protected]>
Committed: Fri Jul 4 13:31:56 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/camel/RecipientList.java    |   1 +
 .../apache/camel/component/bean/MethodInfo.java |   1 +
 .../apache/camel/model/MulticastDefinition.java |  37 ++++-
 .../camel/model/RecipientListDefinition.java    |  38 +++++-
 .../org/apache/camel/model/SplitDefinition.java |  35 ++++-
 .../camel/processor/MulticastProcessor.java     |  45 ++++++-
 .../apache/camel/processor/RecipientList.java   |  11 +-
 .../camel/processor/RecipientListProcessor.java |  12 +-
 .../org/apache/camel/processor/Splitter.java    |  11 +-
 .../SplitterParallelAggregateTest.java          | 135 +++++++++++++++++++
 .../apache/camel/processor/SplitterTest.java    |  21 +++
 11 files changed, 335 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/RecipientList.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/RecipientList.java 
b/camel-core/src/main/java/org/apache/camel/RecipientList.java
index e72f025..7cd9cda 100644
--- a/camel-core/src/main/java/org/apache/camel/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/RecipientList.java
@@ -46,6 +46,7 @@ public @interface RecipientList {
     String context() default "";
     String delimiter() default ",";
     boolean parallelProcessing() default false;
+    boolean parallelAggregate() default false;
     boolean stopOnException() default false;
     boolean streaming() default false;
     boolean ignoreInvalidEndpoints() default false;

http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java 
b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
index 0c22e75..25d6b02 100644
--- a/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
+++ b/camel-core/src/main/java/org/apache/camel/component/bean/MethodInfo.java
@@ -159,6 +159,7 @@ public class MethodInfo {
             
recipientList.setStopOnException(recipientListAnnotation.stopOnException());
             
recipientList.setIgnoreInvalidEndpoints(recipientListAnnotation.ignoreInvalidEndpoints());
             
recipientList.setParallelProcessing(recipientListAnnotation.parallelProcessing());
+            
recipientList.setParallelAggregate(recipientListAnnotation.parallelAggregate());
             recipientList.setStreaming(recipientListAnnotation.streaming());
             recipientList.setTimeout(recipientListAnnotation.timeout());
             
recipientList.setShareUnitOfWork(recipientListAnnotation.shareUnitOfWork());

http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
index 1c6845f..9f5b606 100644
--- a/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
@@ -69,6 +69,8 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition> i
     private Processor onPrepare;
     @XmlAttribute
     private Boolean shareUnitOfWork;
+    @XmlAttribute
+    private Boolean parallelAggregate;
 
     public MulticastDefinition() {
     }
@@ -155,7 +157,20 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition> i
         setParallelProcessing(true);
         return this;
     }
-    
+
+    /**
+     * Doing the aggregate work in parallel
+     * <p/>
+     * Notice that if enabled, then the {@link 
org.apache.camel.processor.aggregate.AggregationStrategy} in use
+     * must be implemented as thread safe, as concurrent threads can call the 
<tt>aggregate</tt> methods at the same time.
+     *
+     * @return the builder
+     */
+    public MulticastDefinition parallelAggregate() {
+        setParallelAggregate(true);
+        return this;
+    }
+
     /**
      * Aggregates the responses as the are done (e.g. out of order sequence)
      *
@@ -261,7 +276,7 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition> i
         }
 
         MulticastProcessor answer = new 
MulticastProcessor(routeContext.getCamelContext(), list, strategy, 
isParallelProcessing(),
-                                      threadPool, shutdownThreadPool, 
isStreaming(), isStopOnException(), timeout, onPrepare, isShareUnitOfWork());
+                                      threadPool, shutdownThreadPool, 
isStreaming(), isStopOnException(), timeout, onPrepare, isShareUnitOfWork(), 
isParallelAggregate());
         if (isShareUnitOfWork()) {
             // wrap answer in a sub unit of work, since we share the unit of 
work
             CamelInternalProcessor internalProcessor = new 
CamelInternalProcessor(answer);
@@ -418,4 +433,22 @@ public class MulticastDefinition extends 
OutputDefinition<MulticastDefinition> i
         return shareUnitOfWork != null && shareUnitOfWork;
     }
 
+    public Boolean getParallelAggregate() {
+        return parallelAggregate;
+    }
+
+    /**
+     * Whether to aggregate using a sequential single thread, or allow 
parallel aggregation.
+     * <p/>
+     * Notice that if enabled, then the {@link 
org.apache.camel.processor.aggregate.AggregationStrategy} in use
+     * must be implemented as thread safe, as concurrent threads can call the 
<tt>aggregate</tt> methods at the same time.
+     */
+    public boolean isParallelAggregate() {
+        return parallelAggregate != null && parallelAggregate;
+    }
+
+    public void setParallelAggregate(Boolean parallelAggregate) {
+        this.parallelAggregate = parallelAggregate;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
index c5b5796..b3b6ab5 100644
--- 
a/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
+++ 
b/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
@@ -78,6 +78,8 @@ public class RecipientListDefinition<Type extends 
ProcessorDefinition<Type>> ext
     private Boolean shareUnitOfWork;
     @XmlAttribute
     private Integer cacheSize;
+    @XmlAttribute
+    private Boolean parallelAggregate;
 
     public RecipientListDefinition() {
     }
@@ -117,7 +119,8 @@ public class RecipientListDefinition<Type extends 
ProcessorDefinition<Type>> ext
         }
         answer.setAggregationStrategy(createAggregationStrategy(routeContext));
         answer.setParallelProcessing(isParallelProcessing());
-        answer.setStreaming(isStreaming());   
+        answer.setParallelAggregate(isParallelAggregate());
+        answer.setStreaming(isStreaming());
         answer.setShareUnitOfWork(isShareUnitOfWork());
         if (getCacheSize() != null) {
             answer.setCacheSize(getCacheSize());
@@ -284,7 +287,20 @@ public class RecipientListDefinition<Type extends 
ProcessorDefinition<Type>> ext
         setParallelProcessing(true);
         return this;
     }
-    
+
+    /**
+     * Doing the aggregate work in parallel
+     * <p/>
+     * Notice that if enabled, then the {@link 
org.apache.camel.processor.aggregate.AggregationStrategy} in use
+     * must be implemented as thread safe, as concurrent threads can call the 
<tt>aggregate</tt> methods at the same time.
+     *
+     * @return the builder
+     */
+    public RecipientListDefinition<Type> parallelAggregate() {
+        setParallelAggregate(true);
+        return this;
+    }
+
     /**
      * Doing the recipient list work in streaming model
      *
@@ -533,4 +549,22 @@ public class RecipientListDefinition<Type extends 
ProcessorDefinition<Type>> ext
     public void setCacheSize(Integer cacheSize) {
         this.cacheSize = cacheSize;
     }
+
+    public Boolean getParallelAggregate() {
+        return parallelAggregate;
+    }
+
+    /**
+     * Whether to aggregate using a sequential single thread, or allow 
parallel aggregation.
+     * <p/>
+     * Notice that if enabled, then the {@link 
org.apache.camel.processor.aggregate.AggregationStrategy} in use
+     * must be implemented as thread safe, as concurrent threads can call the 
<tt>aggregate</tt> methods at the same time.
+     */
+    public boolean isParallelAggregate() {
+        return parallelAggregate != null && parallelAggregate;
+    }
+
+    public void setParallelAggregate(Boolean parallelAggregate) {
+        this.parallelAggregate = parallelAggregate;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
index 0b54bdd..aa1a102 100644
--- a/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
@@ -69,6 +69,8 @@ public class SplitDefinition extends ExpressionNode 
implements ExecutorServiceAw
     private Processor onPrepare;
     @XmlAttribute
     private Boolean shareUnitOfWork;
+    @XmlAttribute
+    private Boolean parallelAggregate;
 
     public SplitDefinition() {
     }
@@ -116,7 +118,7 @@ public class SplitDefinition extends ExpressionNode 
implements ExecutorServiceAw
 
         Splitter answer = new Splitter(routeContext.getCamelContext(), exp, 
childProcessor, aggregationStrategy,
                             isParallelProcessing(), threadPool, 
shutdownThreadPool, isStreaming(), isStopOnException(),
-                            timeout, onPrepare, isShareUnitOfWork());
+                            timeout, onPrepare, isShareUnitOfWork(), 
isParallelAggregate());
         if (isShareUnitOfWork()) {
             // wrap answer in a sub unit of work, since we share the unit of 
work
             CamelInternalProcessor internalProcessor = new 
CamelInternalProcessor(answer);
@@ -207,6 +209,19 @@ public class SplitDefinition extends ExpressionNode 
implements ExecutorServiceAw
     }
     
     /**
+     * Doing the aggregate work in parallel
+     * <p/>
+     * Notice that if enabled, then the {@link 
org.apache.camel.processor.aggregate.AggregationStrategy} in use
+     * must be implemented as thread safe, as concurrent threads can call the 
<tt>aggregate</tt> methods at the same time.
+     *
+     * @return the builder
+     */
+    public SplitDefinition parallelAggregate() {
+        setParallelAggregate(true);
+        return this;
+    }
+
+    /**
      * Enables streaming. 
      * See {@link org.apache.camel.model.SplitDefinition#isStreaming()} for 
more information
      *
@@ -335,6 +350,24 @@ public class SplitDefinition extends ExpressionNode 
implements ExecutorServiceAw
         return streaming != null && streaming;
     }
 
+    public Boolean getParallelAggregate() {
+        return parallelAggregate;
+    }
+
+    /**
+     * Whether to aggregate using a sequential single thread, or allow 
parallel aggregation.
+     * <p/>
+     * Notice that if enabled, then the {@link 
org.apache.camel.processor.aggregate.AggregationStrategy} in use
+     * must be implemented as thread safe, as concurrent threads can call the 
<tt>aggregate</tt> methods at the same time.
+     */
+    public boolean isParallelAggregate() {
+        return parallelAggregate != null && parallelAggregate;
+    }
+
+    public void setParallelAggregate(Boolean parallelAggregate) {
+        this.parallelAggregate = parallelAggregate;
+    }
+
     public Boolean getStopOnException() {
         return stopOnException;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java 
b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 697ae32..30c8a55 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -147,6 +147,7 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
     private final AggregationStrategy aggregationStrategy;
     private final boolean parallelProcessing;
     private final boolean streaming;
+    private final boolean parallelAggregate;
     private final boolean stopOnException;
     private final ExecutorService executorService;
     private final boolean shutdownExecutorService;
@@ -160,12 +161,21 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
     }
 
     public MulticastProcessor(CamelContext camelContext, Collection<Processor> 
processors, AggregationStrategy aggregationStrategy) {
-        this(camelContext, processors, aggregationStrategy, false, null, 
false, false, false, 0, null, false);
+        this(camelContext, processors, aggregationStrategy, false, null, 
false, false, false, 0, null, false, false);
     }
 
+    @Deprecated
     public MulticastProcessor(CamelContext camelContext, Collection<Processor> 
processors, AggregationStrategy aggregationStrategy,
                               boolean parallelProcessing, ExecutorService 
executorService, boolean shutdownExecutorService,
                               boolean streaming, boolean stopOnException, long 
timeout, Processor onPrepare, boolean shareUnitOfWork) {
+        this(camelContext, processors, aggregationStrategy, 
parallelProcessing, executorService, shutdownExecutorService,
+                streaming, stopOnException, timeout, onPrepare, 
shareUnitOfWork, false);
+    }
+
+    public MulticastProcessor(CamelContext camelContext, Collection<Processor> 
processors, AggregationStrategy aggregationStrategy,
+                              boolean parallelProcessing, ExecutorService 
executorService, boolean shutdownExecutorService, boolean streaming,
+                              boolean stopOnException, long timeout, Processor 
onPrepare, boolean shareUnitOfWork,
+                              boolean parallelAggregate) {
         notNull(camelContext, "camelContext");
         this.camelContext = camelContext;
         this.processors = processors;
@@ -179,6 +189,7 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
         this.timeout = timeout;
         this.onPrepare = onPrepare;
         this.shareUnitOfWork = shareUnitOfWork;
+        this.parallelAggregate = parallelAggregate;
     }
 
     @Override
@@ -535,7 +546,12 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
 
             LOG.trace("Sequential processing complete for number {} exchange: 
{}", total, subExchange);
 
-            doAggregate(getAggregationStrategy(subExchange), result, 
subExchange);
+            if (parallelAggregate) {
+                doAggregateInternal(getAggregationStrategy(subExchange), 
result, subExchange);
+            } else {
+                doAggregate(getAggregationStrategy(subExchange), result, 
subExchange);
+            }
+            
             total.incrementAndGet();
         }
 
@@ -610,7 +626,11 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
                     }
 
                     try {
-                        doAggregate(getAggregationStrategy(subExchange), 
result, subExchange);
+                        if (parallelAggregate) {
+                            
doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
+                        } else {
+                            doAggregate(getAggregationStrategy(subExchange), 
result, subExchange);
+                        }
                     } catch (Throwable e) {
                         // wrap in exception to explain where it failed
                         subExchange.setException(new 
CamelExchangeException("Sequential processing failed for number " + total, 
subExchange, e));
@@ -655,7 +675,11 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
 
                         // must catch any exceptions from aggregation
                         try {
-                            doAggregate(getAggregationStrategy(subExchange), 
result, subExchange);
+                            if (parallelAggregate) {
+                                
doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange);
+                            } else {
+                                
doAggregate(getAggregationStrategy(subExchange), result, subExchange);
+                            }
                         } catch (Throwable e) {
                             // wrap in exception to explain where it failed
                             subExchange.setException(new 
CamelExchangeException("Sequential processing failed for number " + total, 
subExchange, e));
@@ -800,6 +824,19 @@ public class MulticastProcessor extends ServiceSupport 
implements AsyncProcessor
      * @param exchange the exchange to be added to the result
      */
     protected synchronized void doAggregate(AggregationStrategy strategy, 
AtomicExchange result, Exchange exchange) {
+        doAggregateInternal(strategy, result, exchange);
+    }
+
+    /**
+     * Aggregate the {@link Exchange) with the current result.
+     * This method is unsynchronized and is called directly when 
parallelAggregate is enabled.
+     * In all other cases, this method is called from the doAggregate which is 
a synchronized method
+     *
+     * @param strategy
+     * @param result
+     * @param exchange
+     */
+    protected void doAggregateInternal(AggregationStrategy strategy, 
AtomicExchange result, Exchange exchange) {
         if (strategy != null) {
             // prepare the exchanges for aggregation
             Exchange oldExchange = result.get();

http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java 
b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
index c6c162e..ee80672 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
@@ -57,6 +57,7 @@ public class RecipientList extends ServiceSupport implements 
AsyncProcessor {
     private Expression expression;
     private final String delimiter;
     private boolean parallelProcessing;
+    private boolean parallelAggregate;
     private boolean stopOnException;
     private boolean ignoreInvalidEndpoints;
     private boolean streaming;
@@ -133,7 +134,7 @@ public class RecipientList extends ServiceSupport 
implements AsyncProcessor {
 
         RecipientListProcessor rlp = new 
RecipientListProcessor(exchange.getContext(), producerCache, iter, 
getAggregationStrategy(),
                 isParallelProcessing(), getExecutorService(), 
isShutdownExecutorService(),
-                isStreaming(), isStopOnException(), getTimeout(), 
getOnPrepare(), isShareUnitOfWork()) {
+                isStreaming(), isStopOnException(), getTimeout(), 
getOnPrepare(), isShareUnitOfWork(), isParallelAggregate()) {
             @Override
             protected synchronized ExecutorService 
createAggregateExecutorService(String name) {
                 // use a shared executor service to avoid creating new thread 
pools
@@ -226,6 +227,14 @@ public class RecipientList extends ServiceSupport 
implements AsyncProcessor {
         this.parallelProcessing = parallelProcessing;
     }
 
+    public boolean isParallelAggregate() {
+        return parallelAggregate;
+    }
+
+    public void setParallelAggregate(boolean parallelAggregate) {
+        this.parallelAggregate = parallelAggregate;
+    }
+
     public boolean isStopOnException() {
         return stopOnException;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
index f538e36..70a3853 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
@@ -131,11 +131,21 @@ public class RecipientListProcessor extends 
MulticastProcessor {
         this.iter = iter;
     }
 
+    @Deprecated
     public RecipientListProcessor(CamelContext camelContext, ProducerCache 
producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy,
                                   boolean parallelProcessing, ExecutorService 
executorService, boolean shutdownExecutorService,
                                   boolean streaming, boolean stopOnException, 
long timeout, Processor onPrepare, boolean shareUnitOfWork) {
         super(camelContext, null, aggregationStrategy, parallelProcessing, 
executorService, shutdownExecutorService,
-                streaming, stopOnException, timeout, onPrepare, 
shareUnitOfWork);
+                streaming, stopOnException, timeout, onPrepare, 
shareUnitOfWork, false);
+        this.producerCache = producerCache;
+        this.iter = iter;
+    }
+
+    public RecipientListProcessor(CamelContext camelContext, ProducerCache 
producerCache, Iterator<Object> iter, AggregationStrategy aggregationStrategy,
+                                  boolean parallelProcessing, ExecutorService 
executorService, boolean shutdownExecutorService,
+                                  boolean streaming, boolean stopOnException, 
long timeout, Processor onPrepare, boolean shareUnitOfWork, boolean 
parallelAggregate) {
+        super(camelContext, null, aggregationStrategy, parallelProcessing, 
executorService, shutdownExecutorService,
+                streaming, stopOnException, timeout, onPrepare, 
shareUnitOfWork, parallelAggregate);
         this.producerCache = producerCache;
         this.iter = iter;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java 
b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index d1a0f64..a0b4a2a 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -63,11 +63,20 @@ public class Splitter extends MulticastProcessor implements 
AsyncProcessor, Trac
         this(camelContext, expression, destination, aggregationStrategy, 
false, null, false, false, false, 0, null, false);
     }
 
+    @Deprecated
     public Splitter(CamelContext camelContext, Expression expression, 
Processor destination, AggregationStrategy aggregationStrategy,
                     boolean parallelProcessing, ExecutorService 
executorService, boolean shutdownExecutorService,
                     boolean streaming, boolean stopOnException, long timeout, 
Processor onPrepare, boolean useSubUnitOfWork) {
+        this(camelContext, expression, destination, aggregationStrategy, 
parallelProcessing, executorService, shutdownExecutorService,
+                streaming, stopOnException, timeout, onPrepare, 
useSubUnitOfWork, false);
+    }
+
+    public Splitter(CamelContext camelContext, Expression expression, 
Processor destination, AggregationStrategy aggregationStrategy,
+                    boolean parallelProcessing, ExecutorService 
executorService, boolean shutdownExecutorService,
+                    boolean streaming, boolean stopOnException, long timeout, 
Processor onPrepare, boolean useSubUnitOfWork,
+                    boolean parallelAggregate) {
         super(camelContext, Collections.singleton(destination), 
aggregationStrategy, parallelProcessing, executorService,
-                shutdownExecutorService, streaming, stopOnException, timeout, 
onPrepare, useSubUnitOfWork);
+                shutdownExecutorService, streaming, stopOnException, timeout, 
onPrepare, useSubUnitOfWork, parallelAggregate);
         this.expression = expression;
         notNull(expression, "expression");
         notNull(destination, "destination");

http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelAggregateTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelAggregateTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelAggregateTest.java
new file mode 100644
index 0000000..91ac0b2
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelAggregateTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Future;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.util.StopWatch;
+
+public class SplitterParallelAggregateTest extends ContextTestSupport {
+
+    // run this test manually as it takes some time to process, but shows that 
parallel aggregate can
+    // be faster when enabled.
+    private boolean enabled;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:splitSynchronizedAggregation")
+                    .split(method(new MySplitter(), "rowIterator"), new 
MyAggregationStrategy())
+                        .to("log:someSplitProcessing?groupSize=500");
+
+                from("direct:splitUnsynchronizedAggregation")
+                    .split(method(new MySplitter(), "rowIterator"), new 
MyAggregationStrategy()).parallelAggregate()
+                        .to("log:someSplitProcessing?groupSize=500");
+            }
+        };
+    }
+
+    public void test1() throws Exception {
+        if (!enabled) {
+            return;
+        }
+        int numberOfRequests = 1;
+        timeSplitRoutes(numberOfRequests);
+    }
+
+    public void test2() throws Exception {
+        if (!enabled) {
+            return;
+        }
+        int numberOfRequests = 2;
+        timeSplitRoutes(numberOfRequests);
+    }
+
+    public void test4() throws Exception {
+        if (!enabled) {
+            return;
+        }
+        int numberOfRequests = 4;
+        timeSplitRoutes(numberOfRequests);
+    }
+
+    protected void timeSplitRoutes(int numberOfRequests) throws Exception {
+        String[] endpoints = new 
String[]{"direct:splitSynchronizedAggregation", 
"direct:splitUnsynchronizedAggregation"};
+        List<Future<File>> futures = new ArrayList<Future<File>>();
+        StopWatch stopWatch = new StopWatch(false);
+
+        for (int endpointIndex = 0; endpointIndex < endpoints.length; 
endpointIndex++) {
+            stopWatch.restart();
+            for (int requestIndex = 0; requestIndex < numberOfRequests; 
requestIndex++) {
+                futures.add(template.asyncRequestBody(
+                        endpoints[endpointIndex], null, File.class));
+            }
+
+            for (int i = 0; i < futures.size(); i++) {
+                Future<File> future = futures.get(i);
+                future.get();
+            }
+            stopWatch.stop();
+
+            log.info(String.format("test%d.%s=%d\n", numberOfRequests, 
endpoints[endpointIndex], stopWatch.taken()));
+        }
+    }
+
+    public static class MySplitter {
+        public Iterator<String[]> rowIterator() {
+            // we would normally be reading a large file but for this test,
+            // we'll just manufacture a bunch of string
+            // arrays
+            LinkedList<String[]> rows = new LinkedList<String[]>();
+            String[] row;
+            for (int i = 0; i < 10000; i++) {
+                row = new String[10];
+                for (int j = 0; j < row.length; j++) {
+                    row[j] = String.valueOf(System.nanoTime());
+                }
+                rows.add(row);
+            }
+
+            return rows.iterator();
+        }
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+
+            // emulate some processing
+            Random random = new Random(System.currentTimeMillis());
+            for (int i = 0; i < 10000; i++) {
+                random.nextGaussian();
+            }
+
+            return newExchange;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/f1dbb485/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java 
b/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
index 420af1d..9e0fd6f 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/SplitterTest.java
@@ -164,6 +164,26 @@ public class SplitterTest extends ContextTestSupport {
         assertEquals((Integer) 5, result.getProperty("aggregated", 
Integer.class));
     }
     
+    public void testSplitterParallelAggregate() throws Exception {
+        MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
+        resultEndpoint.expectedMessageCount(5);
+        resultEndpoint.expectedBodiesReceivedInAnyOrder("James", "Guillaume", 
"Hiram", "Rob", "Roman");
+
+        Exchange result = template.request("direct:parallelAggregate", new 
Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody("James,Guillaume,Hiram,Rob,Roman");
+                in.setHeader("foo", "bar");
+            }
+        });
+
+        assertMockEndpointsSatisfied();
+        Message out = result.getOut();
+
+        assertMessageHeader(out, "foo", "bar");
+        assertEquals((Integer) 5, result.getProperty("aggregated", 
Integer.class));
+    }
+
     public void testSplitterWithStreamingAndFileBody() throws Exception {
         URL url = 
this.getClass().getResource("/org/apache/camel/processor/simple.txt");
         assertNotNull("We should find this simple file here.", url);
@@ -250,6 +270,7 @@ public class SplitterTest extends ContextTestSupport {
 
                 from("direct:seqential").split(body().tokenize(","), new 
UseLatestAggregationStrategy()).to("mock:result");
                 from("direct:parallel").split(body().tokenize(","), new 
MyAggregationStrategy()).parallelProcessing().to("mock:result");
+                from("direct:parallelAggregate").split(body().tokenize(","), 
new 
MyAggregationStrategy()).parallelProcessing().parallelAggregate().to("mock:result");
                 
from("direct:streaming").split(body().tokenize(",")).streaming().to("mock:result");
                 from("direct:parallel-streaming").split(body().tokenize(","), 
new MyAggregationStrategy()).parallelProcessing().streaming().to("mock:result");
                 from("direct:exception")

Reply via email to