This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 020f37b CAMEL-12296: Aggregate EIP - Added option to complete all
previous groups on new incoming group detected.
020f37b is described below
commit 020f37b5cc83347d2a1c4dbfb8a5bd4a3031684e
Author: Claus Ibsen <[email protected]>
AuthorDate: Tue Feb 27 13:49:01 2018 +0100
CAMEL-12296: Aggregate EIP - Added option to complete all previous groups
on new incoming group detected.
---
camel-core/src/main/docs/eips/aggregate-eip.adoc | 3 +-
.../mbean/ManagedAggregateProcessorMBean.java | 3 +
.../mbean/ManagedAggregateProcessor.java | 4 ++
.../apache/camel/model/AggregateDefinition.java | 26 ++++++++
.../processor/aggregate/AggregateProcessor.java | 12 ++++
...gregateCompletionOnNewCorrelationGroupTest.java | 70 ++++++++++++++++++++++
6 files changed, 117 insertions(+), 1 deletion(-)
diff --git a/camel-core/src/main/docs/eips/aggregate-eip.adoc
b/camel-core/src/main/docs/eips/aggregate-eip.adoc
index 80f239b..e20d529 100644
--- a/camel-core/src/main/docs/eips/aggregate-eip.adoc
+++ b/camel-core/src/main/docs/eips/aggregate-eip.adoc
@@ -17,7 +17,7 @@ single correlation key into a single message exchange.
=== Aggregator options
// eip options: START
-The Aggregate EIP supports 24 options which are listed below:
+The Aggregate EIP supports 25 options which are listed below:
[width="100%",cols="2,5,^1,2",options="header"]
|===
@@ -38,6 +38,7 @@ The Aggregate EIP supports 24 options which are listed below:
| *completionInterval* | A repeating period in millis by which the aggregator
will complete all current aggregated exchanges. Camel has a background task
which is triggered every period. You cannot use this option together with
completionTimeout, only one of them can be used. | | Long
| *completionTimeoutChecker Interval* | Interval in millis that is used by the
background task that checks for timeouts (org.apache.camel.TimeoutMap). By
default the timeout checker runs every second. The timeout is an approximation
and there is no guarantee that the a timeout is triggered exactly after the
timeout value. It is not recommended to use very low timeout values or checker
intervals. | 1000 | Long
| *completionFromBatchConsumer* | Enables the batch completion mode where we
aggregate from a org.apache.camel.BatchConsumer and aggregate the total number
of exchanges the org.apache.camel.BatchConsumer has reported as total by
checking the exchange property link org.apache.camel.ExchangeBATCH_COMPLETE
when its complete. | false | Boolean
+| *completionOnNewCorrelation Group* | Enables completion on all previous
groups when a new incoming correlation group. This can for example be used to
complete groups with same correlation keys when they are in consecutive order.
Notice when this is enabled then only 1 correlation group can be in progress as
when a new correlation group starts, then the previous groups is forced
completed. | false | Boolean
| *groupExchanges* | *Deprecated* Enables grouped exchanges, so the aggregator
will group all aggregated exchanges into a single combined Exchange holding all
the aggregated exchanges in a java.util.List. | false | Boolean
| *eagerCheckCompletion* | Use eager completion checking which means that the
completionPredicate will use the incoming Exchange. As opposed to without eager
completion checking the completionPredicate will use the aggregated Exchange. |
false | Boolean
| *ignoreInvalidCorrelation Keys* | If a correlation key cannot be
successfully evaluated it will be ignored by logging a DEBUG and then just
ignore the incoming Exchange. | false | Boolean
diff --git
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
index fa978c5..ee967c5 100644
---
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
+++
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedAggregateProcessorMBean.java
@@ -54,6 +54,9 @@ public interface ManagedAggregateProcessorMBean extends
ManagedProcessorMBean {
@ManagedAttribute(description = "Complete from batch consumers")
boolean isCompletionFromBatchConsumer();
+ @ManagedAttribute(description = "Complete all previous groups on new
incoming correlation group")
+ boolean isCompletionOnNewCorrelationGroup();
+
@ManagedAttribute(description = "Ignore invalid correlation keys")
boolean isIgnoreInvalidCorrelationKeys();
diff --git
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
index 1f68652..4c96df1 100644
---
a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
+++
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedAggregateProcessor.java
@@ -118,6 +118,10 @@ public class ManagedAggregateProcessor extends
ManagedProcessor implements Manag
return processor.isCompletionFromBatchConsumer();
}
+ public boolean isCompletionOnNewCorrelationGroup() {
+ return processor.isCompletionOnNewCorrelationGroup();
+ }
+
public boolean isIgnoreInvalidCorrelationKeys() {
return processor.isIgnoreInvalidCorrelationKeys();
}
diff --git
a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 0de43b0..4b9ab4f 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -108,6 +108,8 @@ public class AggregateDefinition extends
ProcessorDefinition<AggregateDefinition
@XmlAttribute
private Boolean completionFromBatchConsumer;
@XmlAttribute
+ private Boolean completionOnNewCorrelationGroup;
+ @XmlAttribute
@Deprecated
private Boolean groupExchanges;
@XmlAttribute
@@ -257,6 +259,9 @@ public class AggregateDefinition extends
ProcessorDefinition<AggregateDefinition
if (getCompletionFromBatchConsumer() != null) {
answer.setCompletionFromBatchConsumer(getCompletionFromBatchConsumer());
}
+ if (getCompletionOnNewCorrelationGroup() != null) {
+
answer.setCompletionOnNewCorrelationGroup(getCompletionOnNewCorrelationGroup());
+ }
if (getEagerCheckCompletion() != null) {
answer.setEagerCheckCompletion(getEagerCheckCompletion());
}
@@ -541,6 +546,14 @@ public class AggregateDefinition extends
ProcessorDefinition<AggregateDefinition
this.completionFromBatchConsumer = completionFromBatchConsumer;
}
+ public Boolean getCompletionOnNewCorrelationGroup() {
+ return completionOnNewCorrelationGroup;
+ }
+
+ public void setCompletionOnNewCorrelationGroup(Boolean
completionOnNewCorrelationGroup) {
+ this.completionOnNewCorrelationGroup = completionOnNewCorrelationGroup;
+ }
+
public ExecutorService getExecutorService() {
return executorService;
}
@@ -737,6 +750,19 @@ public class AggregateDefinition extends
ProcessorDefinition<AggregateDefinition
}
/**
+ * Enables completion on all previous groups when a new incoming
correlation group. This can for example be used
+ * to complete groups with same correlation keys when they are in
consecutive order.
+ * Notice when this is enabled then only 1 correlation group can be in
progress as when a new correlation group
+ * starts, then the previous groups is forced completed.
+ *
+ * @return builder
+ */
+ public AggregateDefinition completionOnNewCorrelationGroup() {
+ setCompletionOnNewCorrelationGroup(true);
+ return this;
+ }
+
+ /**
* Number of messages aggregated before the aggregation is complete. This
option can be set as either
* a fixed value or using an Expression which allows you to evaluate a
size dynamically - will use Integer as result.
* If both are set Camel will fallback to use the fixed value if the
Expression result was null or 0.
diff --git
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 07bcfc5..34e4bf1 100644
---
a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -206,6 +206,7 @@ public class AggregateProcessor extends ServiceSupport
implements AsyncProcessor
private int completionSize;
private Expression completionSizeExpression;
private boolean completionFromBatchConsumer;
+ private boolean completionOnNewCorrelationGroup;
private AtomicInteger batchConsumerCounter = new AtomicInteger();
private boolean discardOnCompletionTimeout;
private boolean forceCompletionOnStop;
@@ -462,6 +463,9 @@ public class AggregateProcessor extends ServiceSupport
implements AsyncProcessor
// remove the exchange property so we do not complete again
answer.removeProperty(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS);
forceCompletionOfAllGroups();
+ } else if (isCompletionOnNewCorrelationGroup() && originalExchange ==
null) {
+ // its a new group so force complete of all existing groups
+ forceCompletionOfAllGroups();
}
// special for some repository implementations
@@ -937,6 +941,14 @@ public class AggregateProcessor extends ServiceSupport
implements AsyncProcessor
this.completionFromBatchConsumer = completionFromBatchConsumer;
}
+ public boolean isCompletionOnNewCorrelationGroup() {
+ return completionOnNewCorrelationGroup;
+ }
+
+ public void setCompletionOnNewCorrelationGroup(boolean
completionOnNewCorrelationGroup) {
+ this.completionOnNewCorrelationGroup = completionOnNewCorrelationGroup;
+ }
+
public boolean isCompleteAllOnStop() {
return completeAllOnStop;
}
diff --git
a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnNewCorrelationGroupTest.java
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnNewCorrelationGroupTest.java
new file mode 100644
index 0000000..c870f44
--- /dev/null
+++
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnNewCorrelationGroupTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.junit.Test;
+
+public class AggregateCompletionOnNewCorrelationGroupTest extends
ContextTestSupport {
+
+ @Test
+ public void testCompletionOnNewCorrelationGroup() throws Exception {
+ getMockEndpoint("mock:aggregated").expectedBodiesReceived("AA", "BB",
"CCC");
+
+ template.sendBodyAndHeader("direct:start", "A", "id", "1");
+ template.sendBodyAndHeader("direct:start", "A", "id", "1");
+ template.sendBodyAndHeader("direct:start", "B", "id", "2");
+ template.sendBodyAndHeader("direct:start", "B", "id", "2");
+ template.sendBodyAndHeader("direct:start", "C", "id", "3");
+ template.sendBodyAndHeader("direct:start", "C", "id", "3");
+ template.sendBodyAndHeader("direct:start", "C", "id", "3");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .completionOnNewCorrelationGroup()
+ .completionSize(3)
+ .to("log:aggregated", "mock:aggregated");
+ }
+ };
+ }
+
+ public static class MyAggregationStrategy implements AggregationStrategy {
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ String body1 = oldExchange.getIn().getBody(String.class);
+ String body2 = newExchange.getIn().getBody(String.class);
+
+ oldExchange.getIn().setBody(body1 + body2);
+ return oldExchange;
+ }
+ }
+}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
[email protected].