This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 020f37b  CAMEL-12296: Aggregate EIP - Added option to complete all 
previous groups on new incoming group detected.
020f37b is described below

commit 020f37b5cc83347d2a1c4dbfb8a5bd4a3031684e
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Feb 27 13:49:01 2018 +0100

    CAMEL-12296: Aggregate EIP - Added option to complete all previous groups 
on new incoming group detected.
---
 camel-core/src/main/docs/eips/aggregate-eip.adoc   |  3 +-
 .../mbean/ManagedAggregateProcessorMBean.java      |  3 +
 .../mbean/ManagedAggregateProcessor.java           |  4 ++
 .../apache/camel/model/AggregateDefinition.java    | 26 ++++++++
 .../processor/aggregate/AggregateProcessor.java    | 12 ++++
 ...gregateCompletionOnNewCorrelationGroupTest.java | 70 ++++++++++++++++++++++
 6 files changed, 117 insertions(+), 1 deletion(-)

diff --git a/camel-core/src/main/docs/eips/aggregate-eip.adoc 
b/camel-core/src/main/docs/eips/aggregate-eip.adoc
index 80f239b..e20d529 100644
--- a/camel-core/src/main/docs/eips/aggregate-eip.adoc
+++ b/camel-core/src/main/docs/eips/aggregate-eip.adoc
@@ -17,7 +17,7 @@ single correlation key into a single message exchange.
 === Aggregator options
 
 // eip options: START
-The Aggregate EIP supports 24 options which are listed below:
+The Aggregate EIP supports 25 options which are listed below:
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
@@ -38,6 +38,7 @@ The Aggregate EIP supports 24 options which are listed below:
 | *completionInterval* | A repeating period in millis by which the aggregator 
will complete all current aggregated exchanges. Camel has a background task 
which is triggered every period. You cannot use this option together with 
completionTimeout, only one of them can be used. |  | Long
 | *completionTimeoutChecker Interval* | Interval in millis that is used by the 
background task that checks for timeouts (org.apache.camel.TimeoutMap). By 
default the timeout checker runs every second. The timeout is an approximation 
and there is no guarantee that the a timeout is triggered exactly after the 
timeout value. It is not recommended to use very low timeout values or checker 
intervals. | 1000 | Long
 | *completionFromBatchConsumer* | Enables the batch completion mode where we 
aggregate from a org.apache.camel.BatchConsumer and aggregate the total number 
of exchanges the org.apache.camel.BatchConsumer has reported as total by 
checking the exchange property link org.apache.camel.ExchangeBATCH_COMPLETE 
when its complete. | false | Boolean
+| *completionOnNewCorrelation Group* | Enables completion on all previous 
groups when a new incoming correlation group. This can for example be used to 
complete groups with same correlation keys when they are in consecutive order. 
Notice when this is enabled then only 1 correlation group can be in progress as 
when a new correlation group starts, then the previous groups is forced 
completed. | false | Boolean
 | *groupExchanges* | *Deprecated* Enables grouped exchanges, so the aggregator 
will group all aggregated exchanges into a single combined Exchange holding all 
the aggregated exchanges in a java.util.List. | false | Boolean
 | *eagerCheckCompletion* | Use eager completion checking which means that the 
completionPredicate will use the incoming Exchange. As opposed to without eager 
completion checking the completionPredicate will use the aggregated Exchange. | 
false | Boolean
 | *ignoreInvalidCorrelation Keys* | If a correlation key cannot be 
successfully evaluated it will be ignored by logging a DEBUG and then just 
ignore the incoming Exchange. | false | Boolean
diff --git 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
index fa978c5..ee967c5 100644
--- 
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
+++ 
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -54,6 +54,9 @@ public interface ManagedAggregateProcessorMBean extends 
ManagedProcessorMBean {
     @ManagedAttribute(description = "Complete from batch consumers")
     boolean isCompletionFromBatchConsumer();
 
+    @ManagedAttribute(description = "Complete all previous groups on new 
incoming correlation group")
+    boolean isCompletionOnNewCorrelationGroup();
+
     @ManagedAttribute(description = "Ignore invalid correlation keys")
     boolean isIgnoreInvalidCorrelationKeys();
 
diff --git 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
index 1f68652..4c96df1 100644
--- 
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -118,6 +118,10 @@ public class ManagedAggregateProcessor extends 
ManagedProcessor implements Manag
         return processor.isCompletionFromBatchConsumer();
     }
 
+    public boolean isCompletionOnNewCorrelationGroup() {
+        return processor.isCompletionOnNewCorrelationGroup();
+    }
+
     public boolean isIgnoreInvalidCorrelationKeys() {
         return processor.isIgnoreInvalidCorrelationKeys();
     }
diff --git 
a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java 
b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 0de43b0..4b9ab4f 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -108,6 +108,8 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
     @XmlAttribute
     private Boolean completionFromBatchConsumer;
     @XmlAttribute
+    private Boolean completionOnNewCorrelationGroup;
+    @XmlAttribute
     @Deprecated
     private Boolean groupExchanges;
     @XmlAttribute
@@ -257,6 +259,9 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
         if (getCompletionFromBatchConsumer() != null) {
             
answer.setCompletionFromBatchConsumer(getCompletionFromBatchConsumer());
         }
+        if (getCompletionOnNewCorrelationGroup() != null) {
+            
answer.setCompletionOnNewCorrelationGroup(getCompletionOnNewCorrelationGroup());
+        }
         if (getEagerCheckCompletion() != null) {
             answer.setEagerCheckCompletion(getEagerCheckCompletion());
         }
@@ -541,6 +546,14 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
         this.completionFromBatchConsumer = completionFromBatchConsumer;
     }
 
+    public Boolean getCompletionOnNewCorrelationGroup() {
+        return completionOnNewCorrelationGroup;
+    }
+
+    public void setCompletionOnNewCorrelationGroup(Boolean 
completionOnNewCorrelationGroup) {
+        this.completionOnNewCorrelationGroup = completionOnNewCorrelationGroup;
+    }
+
     public ExecutorService getExecutorService() {
         return executorService;
     }
@@ -737,6 +750,19 @@ public class AggregateDefinition extends 
ProcessorDefinition<AggregateDefinition
     }
 
     /**
+     * Enables completion on all previous groups when a new incoming 
correlation group. This can for example be used
+     * to complete groups with same correlation keys when they are in 
consecutive order.
+     * Notice when this is enabled then only 1 correlation group can be in 
progress as when a new correlation group
+     * starts, then the previous groups is forced completed.
+     *
+     * @return builder
+     */
+    public AggregateDefinition completionOnNewCorrelationGroup() {
+        setCompletionOnNewCorrelationGroup(true);
+        return this;
+    }
+
+    /**
      * Number of messages aggregated before the aggregation is complete. This 
option can be set as either
      * a fixed value or using an Expression which allows you to evaluate a 
size dynamically - will use Integer as result.
      * If both are set Camel will fallback to use the fixed value if the 
Expression result was null or 0.
diff --git 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 07bcfc5..34e4bf1 100644
--- 
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ 
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -206,6 +206,7 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
     private int completionSize;
     private Expression completionSizeExpression;
     private boolean completionFromBatchConsumer;
+    private boolean completionOnNewCorrelationGroup;
     private AtomicInteger batchConsumerCounter = new AtomicInteger();
     private boolean discardOnCompletionTimeout;
     private boolean forceCompletionOnStop;
@@ -462,6 +463,9 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
             // remove the exchange property so we do not complete again
             answer.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
             forceCompletionOfAllGroups();
+        } else if (isCompletionOnNewCorrelationGroup() && originalExchange == 
null) {
+            // its a new group so force complete of all existing groups
+            forceCompletionOfAllGroups();
         }
 
         // special for some repository implementations
@@ -937,6 +941,14 @@ public class AggregateProcessor extends ServiceSupport 
implements AsyncProcessor
         this.completionFromBatchConsumer = completionFromBatchConsumer;
     }
 
+    public boolean isCompletionOnNewCorrelationGroup() {
+        return completionOnNewCorrelationGroup;
+    }
+
+    public void setCompletionOnNewCorrelationGroup(boolean 
completionOnNewCorrelationGroup) {
+        this.completionOnNewCorrelationGroup = completionOnNewCorrelationGroup;
+    }
+
     public boolean isCompleteAllOnStop() {
         return completeAllOnStop;
     }
diff --git 
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnNewCorrelationGroupTest.java
 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnNewCorrelationGroupTest.java
new file mode 100644
index 0000000..c870f44
--- /dev/null
+++ 
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnNewCorrelationGroupTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.junit.Test;
+
+public class AggregateCompletionOnNewCorrelationGroupTest extends 
ContextTestSupport {
+
+    @Test
+    public void testCompletionOnNewCorrelationGroup() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("AA", "BB", 
"CCC");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", "1");
+        template.sendBodyAndHeader("direct:start", "A", "id", "1");
+        template.sendBodyAndHeader("direct:start", "B", "id", "2");
+        template.sendBodyAndHeader("direct:start", "B", "id", "2");
+        template.sendBodyAndHeader("direct:start", "C", "id", "3");
+        template.sendBodyAndHeader("direct:start", "C", "id", "3");
+        template.sendBodyAndHeader("direct:start", "C", "id", "3");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        .completionOnNewCorrelationGroup()
+                        .completionSize(3)
+                    .to("log:aggregated", "mock:aggregated");
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to