Repository: camel
Updated Branches:
  refs/heads/master 472903bf2 -> 3f9651578


CAMEL-7433: Allow aggregation strategy to determine pre complete when using 
aggregator.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/50945969
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/50945969
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/50945969

Branch: refs/heads/master
Commit: 5094596967e93417dc86e5f3fa38b08fe44c9797
Parents: 472903b
Author: Claus Ibsen <[email protected]>
Authored: Mon Mar 23 07:43:41 2015 +0100
Committer: Claus Ibsen <[email protected]>
Committed: Mon Mar 23 11:55:56 2015 +0100

----------------------------------------------------------------------
 .../processor/aggregate/AggregateProcessor.java | 57 ++++++++++----------
 .../PreCompletionAwareAggregationStrategy.java  | 32 +++++++++++
 2 files changed, 62 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/50945969/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index b71c0bf..9b93c36 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -422,44 +422,47 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
 
         List<Exchange> list = new ArrayList<Exchange>();
 
-        // only need to update aggregation repository if we are not complete
         if (complete == null) {
+            // only need to update aggregation repository if we are not 
complete
             doAggregationRepositoryAdd(newExchange.getContext(), key, 
originalExchange, answer);
-            // we are not complete so the answer should be null
-            answer = null;
         } else {
-            // if batch consumer completion is enabled then we need to 
complete the group
-            if ("consumer".equals(complete)) {
-                for (String batchKey : batchConsumerCorrelationKeys) {
-                    Exchange batchAnswer;
-                    if (batchKey.equals(key)) {
-                        // skip the current aggregated key as we have already 
aggregated it and have the answer
-                        batchAnswer = answer;
-                    } else {
-                        batchAnswer = aggregationRepository.get(camelContext, 
batchKey);
-                    }
+            // if we are complete then add the answer to the list
+            doAggregationComplete(complete, list, key, originalExchange, 
answer);
+        }
 
-                    if (batchAnswer != null) {
-                        
batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
-                        onCompletion(batchKey, originalExchange, batchAnswer, 
false);
-                        list.add(batchAnswer);
-                    }
+        LOG.trace("onAggregation +++  end  +++ with correlation key: {}", key);
+        return list;
+    }
+
+    protected void doAggregationComplete(String complete, List<Exchange> list, 
String key, Exchange originalExchange, Exchange answer) {
+        if ("consumer".equals(complete)) {
+            for (String batchKey : batchConsumerCorrelationKeys) {
+                Exchange batchAnswer;
+                if (batchKey.equals(key)) {
+                    // skip the current aggregated key as we have already 
aggregated it and have the answer
+                    batchAnswer = answer;
+                } else {
+                    batchAnswer = aggregationRepository.get(camelContext, 
batchKey);
+                }
+
+                if (batchAnswer != null) {
+                    batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, 
complete);
+                    onCompletion(batchKey, originalExchange, batchAnswer, 
false);
+                    list.add(batchAnswer);
                 }
-                batchConsumerCorrelationKeys.clear();
-                // we have already submitted to completion, so answer should 
be null
-                answer = null;
-            } else {
-                // we are complete for this exchange
-                answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
-                answer = onCompletion(key, originalExchange, answer, false);
             }
+            batchConsumerCorrelationKeys.clear();
+            // we have already submitted to completion, so answer should be 
null
+            answer = null;
+        } else {
+            // we are complete for this exchange
+            answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
+            answer = onCompletion(key, originalExchange, answer, false);
         }
 
-        LOG.trace("onAggregation +++  end  +++ with correlation key: {}", key);
         if (answer != null) {
             list.add(answer);
         }
-        return list;
     }
 
     protected void doAggregationRepositoryAdd(CamelContext camelContext, 
String key, Exchange oldExchange, Exchange newExchange) {

http://git-wip-us.apache.org/repos/asf/camel/blob/50945969/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
----------------------------------------------------------------------
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
new file mode 100644
index 0000000..53fc3f0
--- /dev/null
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/PreCompletionAwareAggregationStrategy.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.Exchange;
+
+public interface PreCompletionAwareAggregationStrategy extends 
AggregationStrategy {
+
+    /**
+     * Determines if the aggregation should complete the current group, and 
start a new group, or the aggregation
+     * should continue using the current group.
+     *
+     * @param oldExchange the oldest exchange (is <tt>null</tt> on first 
aggregation as we only have the new exchange)
+     * @param newExchange the newest exchange (can be <tt>null</tt> if there 
was no data possible to acquire)
+     * @return <tt>true</tt> to complete current group and start a new group, 
or <tt>false</tt> to keep using current
+     */
+    boolean preComplete(Exchange oldExchange, Exchange newExchange);
+}

Reply via email to