This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 58b27ef3a27b1513fba41372d15ea9d70cee85e5 Author: Otavio R. Piske <angusyo...@gmail.com> AuthorDate: Sat Feb 24 09:32:01 2024 +0100 CAMEL-20459: documentation fixes for the aggregate EIP. Signed-off-by: Otavio R. Piske <angusyo...@gmail.com> --- .../docs/modules/eips/pages/aggregate-eip.adoc | 127 ++++++++++----------- 1 file changed, 63 insertions(+), 64 deletions(-) diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/aggregate-eip.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/aggregate-eip.adoc index 7188c5d3929..1dcff1dfcec 100644 --- a/core/camel-core-engine/src/main/docs/modules/eips/pages/aggregate-eip.adoc +++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/aggregate-eip.adoc @@ -9,9 +9,9 @@ The http://www.enterpriseintegrationpatterns.com/Aggregator.html[Aggregator] from the xref:enterprise-integration-patterns.adoc[EIP patterns] allows -you to combine a number of messages together into a single message. +you to combine a number of messages into a single message. -How do we combine the results of individual, but related messages so that they can be processed as a whole? +How do we combine the results of individual, but related, messages so that they can be processed as a whole? image::eip/Aggregator.gif[image] @@ -20,7 +20,7 @@ Use a stateful filter, an Aggregator, to collect and store individual messages u The aggregator is one of the most complex EIP and has many features and configurations. The logic for combing messages together is _correlated_ in buckets based on a _correlation key_. -Messages with the same correlation key is aggregated together, using an `AggregationStrategy`. +Messages with the same correlation key are aggregated together, using an `AggregationStrategy`. == Aggregate options @@ -36,14 +36,14 @@ include::partial$eip-exchangeProperties.adoc[] == Worker pools -The aggregate EIP will always use a worker pool, that is used to process all the outgoing messages from the aggregator. +The aggregate EIP will always use a worker pool used to process all the outgoing messages from the aggregator. The worker pool is determined accordingly: - If a custom `ExecutorService` has been configured, then this is used as worker pool. - If `parallelProcessing=true` then a _default_ worker pool (is 10 worker threads by default) is created. However, the thread pool size and other configurations can be configured using _thread pool profiles_. - Otherwise, a single threaded worker pool is created. -- In order to achieve synchronous aggregation, use an instance of `SynchronousExecutorService` for the +- To achieve synchronous aggregation, use an instance of `SynchronousExecutorService` for the `executorService` option. The aggregated output will execute in the same thread that called the aggregator. == Aggregating @@ -52,9 +52,10 @@ The `AggregationStrategy` is used for aggregating the old, and the new exchanges that becomes the next old, when the next message is aggregated, and so forth. Possible implementations include performing some kind of combining or -delta processing, such as adding line items together into an invoice or +delta processing. +For instance, adding line items together into an invoice or just using the newest exchange and removing old exchanges such as for -state tracking or market data prices; where old values are of little +state tracking or market data prices, where old values are of little use. Notice the aggregation strategy is a mandatory option and must be @@ -115,8 +116,8 @@ In the route below we group all the exchanges together using [source,java] ---- from("direct:start") - // aggregate all using same expression and group the - // exchanges so we get one single exchange containing all + // aggregates all using the same expression and group the + // exchanges, so we get one single exchange containing all // the others .aggregate(new GroupedExchangeAggregationStrategy()).constant(true) // wait for 0.5 seconds to aggregate @@ -142,10 +143,10 @@ then you can use the `org.apache.camel.processor.aggregate.AbstractListAggregationStrategy` abstract class. -The completed Exchange that is sent out of the aggregator will contain the `List<V>` in +The completed Exchange sent out of the aggregator will contain the `List<V>` in the message body. -For example to aggregate a `List<Integer>` you can extend this class as +For example, to aggregate a `List<Integer>` you can extend this class as shown below, and implement the `getValue` method: [source,java] @@ -154,7 +155,7 @@ public class MyListOfNumbersStrategy extends AbstractListAggregationStrategy<Int @Override public Integer getValue(Exchange exchange) { - // the message body contains a number, so just return that as-is + // the message body contains a number, so return that as-is return exchange.getIn().getBody(Integer.class); } } @@ -186,7 +187,7 @@ from the `timeout` method. The aggregator provides a pluggable repository which you can implement your own `org.apache.camel.spi.AggregationRepository`. -If you need persistent repository then Camel provides numerous implementations, such as from the +If you need a persistent repository, then Camel provides numerous implementations, such as from the xref:ROOT:caffeine-cache-component.adoc[Caffeine], xref:ROOT:cql-component.adoc[CassandraQL], xref:ROOT:ehcache-component.adoc[EHCache], @@ -198,19 +199,19 @@ or xref:ROOT:sql-component.adoc[SQL] components. == Completion -When aggregation xref:manual::exchange.adoc[Exchange]s at some point you need to -indicate that the aggregated exchanges is complete, so they can be sent +When aggregation xref:manual::exchange.adoc[Exchange]s at some point, you need to +indicate that the aggregated exchanges are complete, so they can be sent out of the aggregator. Camel allows you to indicate completion in various ways as follows: -* _completionTimeout_ - Is an inactivity timeout in which is triggered if +* _completionTimeout_: Is an inactivity timeout in that is triggered if no new exchanges have been aggregated for that particular correlation key within the period. -* _completionInterval_ - Once every X period all the current aggregated +* _completionInterval_: Once every X period all the current aggregated exchanges are completed. -* _completionSize_ - Is a number indicating that after X aggregated +* _completionSize_: Is a number indicating that after X aggregated exchanges its complete. -* _completionPredicate_ - Runs a xref:manual::predicate.adoc[Predicate] when a new +* _completionPredicate_: Runs a xref:manual::predicate.adoc[Predicate] when a new exchange is aggregated to determine if we are complete or not. The configured aggregationStrategy can implement the Predicate interface and will be used as the completionPredicate if no @@ -218,16 +219,16 @@ completionPredicate is configured. The configured aggregationStrategy can override the `preComplete` method and will be used as the completionPredicate in pre-complete check mode. See further below for more details. -* _completionFromBatchConsumer_ - Special option for -xref:manual::batch-consumer.adoc[Batch Consumer] which allows you to complete -when all the messages from the batch has been aggregated. -* _forceCompletionOnStop_ - Indicates to complete all current +* _completionFromBatchConsumer_: Special option for +xref:manual::batch-consumer.adoc[Batch Consumer], which allows you to complete +when all the messages from the batch have been aggregated. +* _forceCompletionOnStop_: Indicates to complete all current aggregated exchanges when the context is stopped -* _AggregateController_ - which allows to use an external source (`AggregateController` implementation) to complete groups or all groups. +* _AggregateController_: which allows to use an external source (`AggregateController` implementation) to complete groups or all groups. This can be done using Java or JMX API. All the different completions are per correlation key. You can -combine them in any way you like. It's basically the first which +combine them in any way you like. It's basically the first that triggers that wins. So you can use a completion size together with a completion timeout. Only completionTimeout and completionInterval cannot be used at the same time. @@ -243,7 +244,7 @@ xref:manual::exchange.adoc[Exchange] is starting a new group from scratch. The pre-completion mode must be enabled by the `AggregationStrategy` by overriding the `canPreComplete` method to return a `true` value. -When pre completion is enabled then the `preComplete` method is invoked: +When pre-completion is enabled then the `preComplete` method is invoked: [source,java] ---- @@ -258,7 +259,7 @@ When pre completion is enabled then the `preComplete` method is invoked: boolean preComplete(Exchange oldExchange, Exchange newExchange); ---- -If the `preComplete` method returns `true`, then the existing correlation groups is +If the `preComplete` method returns `true`, then the existing correlation group is completed (without aggregating the incoming exchange (`newExchange`). Then the `newExchange` is used to start the correlation group from scratch, so the group would contain only that new incoming exchange. This is @@ -278,7 +279,7 @@ When the aggregation is in _pre-completion_ mode, then only the following comple * _completionTimeout_ or _completionInterval_ can also be used as fallback completions -* any other completion are not used (such as by size, from batch consumer etc) +* any other completions are not used (such as by size, from batch consumer, etc.) * _eagerCheckCompletion_ is implied as `true`, but the option has no effect === CompletionAwareAggregationStrategy @@ -290,16 +291,14 @@ allows you to do any last minute custom logic such as to clean up some resources, or additional work on the exchange as it's now completed. You must *not* throw any exceptions from the `onCompletion` method. -=== Completing current group decided from the AggregationStrategy +=== Completing the current group decided from the AggregationStrategy -The `AggregationStrategy` supports checking for the - -the exchange property (`Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP`) +The `AggregationStrategy` supports checking for the exchange property (`Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP`) on the returned `Exchange` that contains a boolean to indicate if the current group should be completed. This allows to overrule any existing -completion predicates / sizes / timeouts etc, and complete the group. +completion predicates / sizes / timeouts etc., and complete the group. -For example the following logic will complete the +For example, the following logic will complete the group if the message body size is larger than 5. This is done by setting the exchange property `Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP` to `true`. @@ -328,7 +327,7 @@ The `AggregationStrategy` checks an exchange property, from the returned exchang indicating if all previous groups should be completed. This allows to overrule any existing -completion predicates / sizes / timeouts etc, and complete all the existing +completion predicates / sizes / timeouts etc., and complete all the existing previous group. The following logic will complete all the @@ -376,7 +375,7 @@ you to control the aggregate at runtime using Java or JMX API. This can be used to force completing groups of exchanges, or query its current runtime statistics. -The aggregator provides a default implementation if no custom have been +The aggregator provides a default implementation if no custom one has been configured, which can be accessed using `getAggregateController()` method. Though it may be easier to configure a controller in the route using `aggregateController` as shown below: @@ -393,7 +392,7 @@ from("direct:start") ---- Then there is API on `AggregateController` to force completion. For -example to complete a group with key foo: +example, to complete a group with key foo: [source,java] ---- @@ -413,7 +412,7 @@ int groups = controller.forceCompletionOfAllGroups(); The controller can also be used in XML DSL using the `aggregateController` to refer to a bean with the controller implementation, which is looked up in the registry. -When using Spring XML you can create the bean with `<bean>` as shown: +When using Spring XML, you can create the bean with `<bean>` as shown: [source,xml] ---- @@ -441,26 +440,26 @@ To use the `AggregationStrategy` you had to implement the `org.apache.camel.AggregationStrategy` interface, which means your logic would be tied to the Camel API. You can use a bean for the logic and let Camel adapt to your -bean. To use a bean a convention must be followed: +bean. To use a bean, then a convention must be followed: * there must be a public method to use * the method must not be void * the method can be static or non-static -* the method must have 2 or more parameters +* the method must have two or more parameters * the parameters are paired, so the first half is applied to the `oldExchange`, and the reminder half is for the `newExchange`. - Therefore, there must be an equal number of parameters, eg 2, 4, 6 etc. + Therefore, there must be an equal number of parameters, e.g., 2, 4, 6, etc. -The paired methods is expected to be ordered as follows: +The paired methods are expected to be ordered as follows: * the first parameter is the message body -* optional, the 2nd parameter is a `Map` of the headers -* optional, the 3rd parameter is a `Map` of the exchange properties +* optional, the second parameter is a `Map` of the headers +* optional, the third parameter is a `Map` of the exchange properties This convention is best explained with some examples. -In the method below, we have only 2 parameters, so the 1st parameter is -the body of the `oldExchange`, and the 2nd is paired to the body of the +In the method below, we have only two parameters, so the first parameter is +the body of the `oldExchange`, and the second is paired to the body of the `newExchange`: [source,java] @@ -470,10 +469,10 @@ public String append(String existing, String next) { } ---- -In the method below, we have only 4 parameters, so the 1st parameter is -the body of the `oldExchange`, and the 2nd is the `Map` of the -`oldExchange` headers, and the 3rd is paired to the body of the `newExchange`, -and the 4th parameter is the `Map` of the `newExchange` headers: +In the method below, we have only four parameters, so the first parameter is +the body of the `oldExchange`, and the second is the `Map` of the +`oldExchange` headers, and the third is paired to the body of the `newExchange`, +and the fourth parameter is the `Map` of the `newExchange` headers: [source,java] ---- @@ -482,7 +481,7 @@ public String append(String existing, Map existingHeaders, String next, Map next } ---- -And finally if we have 6 parameters, that includes the exchange properties: +And finally, if we have six parameters, that includes the exchange properties: [source,java] ---- @@ -492,7 +491,7 @@ public String append(String existing, Map existingHeaders, Map existingPropertie } ---- -To use this with the aggregate EIP we can use a bean with the aggregate logic as follows: +To use this with the aggregate EIP, we can use a bean with the aggregate logic as follows: [source,java] ---- @@ -533,7 +532,7 @@ public void configure() throws Exception { } ---- -And if the bean has only one method we do not need to specify the name +And if the bean has only one method, we do not need to specify the name of the method: [source,java] @@ -559,7 +558,7 @@ public class MyBodyAppender { } ---- -If you are using XML DSL then we need to declare a `<bean>` with the bean: +If you are using XML DSL, then we need to declare a `<bean>` with the bean: [source,xml] ---- @@ -585,7 +584,7 @@ to call: </camelContext> ---- -When using XML DSL you can also specify the bean class directly in `aggregationStrategy` +When using XML DSL, you can also specify the bean class directly in `aggregationStrategy` using the `#class:` syntax as shown: [source,xml] @@ -601,8 +600,8 @@ using the `#class:` syntax as shown: </route> ---- -You can use this in XML DSL when you are not using the classic Spring XML files; -where you use XML only for Camel routes. +You can use this in XML DSL when you are not using the classic Spring XML files ( +where you use XML only for Camel routes). === Aggregating when no data @@ -611,7 +610,7 @@ When using bean as `AggregationStrategy`, then the method is is not `null`. In cases where you want to have the method invoked, even when there are no data (message body is `null`), then set the `strategyMethodAllowNull` to `true`. -When using beans this can be configured a bit easier using the `beanAllowNull` method +When using beans, this can be configured a bit easier using the `beanAllowNull` method from `AggregationStrategies` as shown: [source,java] @@ -646,8 +645,8 @@ EIP using `pollEnrich`. The `newExchange` will be `null` in the situation we could not get any data from the "seda:foo" endpoint, and a timeout was hit after 1 second. -So if we need to do special merge logic we would need to set `setAllowNullNewExchange=true`. -If we don't do this then on timeout the append method would normally not be +So if we need to do special merge logic, we would need to set `setAllowNullNewExchange=true`. +If we didn't do this, then on timeout the append method would normally not be invoked, meaning the xref:content-enricher.adoc[Content Enricher] did not merge/change the message. @@ -674,9 +673,9 @@ set it to `true` as shown below: === Aggregating with different body types -When for example using `strategyMethodAllowNull` as `true`, then the -parameter types of the message bodies does not have to be the same. For -example suppose we want to aggregate from a `com.foo.User` type to a +When, for example, using `strategyMethodAllowNull` as `true`, then the +parameter type of the message bodies does not have to be the same. +For example suppose we want to aggregate from a `com.foo.User` type to a `List<String>` that contains the name of the user. We could code a bean as follows: [source,java] @@ -694,4 +693,4 @@ public final class MyUserAppender { ---- Notice that the return type is a `List` which we want to contain the name of the users. -The 1st parameter is the `List` of names, and the 2nd parameter is the incoming `com.foo.User` type. +The first parameter is the `List` of names, and the second parameter is the incoming `com.foo.User` type.