This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
commit 3bf54fbd4e1f7cb371b61b850c107f4c431e603d Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Oct 19 20:23:43 2021 +0200 CAMEL-16861: Cleanup and update EIP docs --- .../main/docs/modules/eips/pages/throttle-eip.adoc | 87 ++++++++++++++++++++-- .../org/apache/camel/model/ThrottleDefinition.java | 25 +++++++ .../camel/processor/ThrottlingGroupingTest.java | 2 +- 3 files changed, 107 insertions(+), 7 deletions(-) diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/throttle-eip.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/throttle-eip.adoc index 5afd86a..8cbc025 100644 --- a/core/camel-core-engine/src/main/docs/modules/eips/pages/throttle-eip.adoc +++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/throttle-eip.adoc @@ -69,20 +69,33 @@ And to throttle 50 message per second: === Dynamically changing maximum requests per period -TODO: +The Throttler uses an xref:latest@manual:ROOT:expression.adoc[Expression] +to configure the number of requests. In all the examples from above, we used +a xref:components:languages:constant-language.adoc[constant]. However, the expression can +be dynamic, such as determined from a message header from the current `Exchange`. -Since we use an Expression you can adjust this value at runtime, for example you can provide a header with the value. At runtime Camel evaluates the expression and converts the result to a `java.lang.Long` type. In the example below we use a header from the message to determine the maximum requests per period. If the header is absent, then the Throttler uses the old value. So that allows you to only provide a header if the value is to be changed: +At runtime Camel evaluates the expression and converts the result to a `java.lang.Long` type. +In the example below we use a header from the message to determine the maximum requests per period. +If the header is absent, then the Throttler uses the old value. This allows you to only provide a header if the value is to be changed: + +[source,java] +---- +from("seda:a") + .throttle(header("throttleValue")).timePeriodMillis(500) + .to("seda:b") +---- + +And in XML: [source,xml] ---- <route> - <from uri="direct:expressionHeader"/> + <from uri="seda:a"/> <throttle timePeriodMillis="500"> <!-- use a header to determine how many messages to throttle per 0.5 sec --> <header>throttleValue</header> </throttle> - <to uri="log:result"/> - <to uri="mock:result"/> + <to uri="seda:b"/> </route> ---- @@ -112,4 +125,66 @@ And in XML: </throttle> <to uri="seda:b"/> </route> ----- \ No newline at end of file +---- + +=== Rejecting processing if rate limit hit + +When a message is being _throttled_ due the maximum request per limit has been reached, then +the Throttler will by default wait until there is _free space_ before continue routing the message. + +Instead of waiting you can also configure the Throttler to reject the message by throwing `ThrottlerRejectedExecutionException` +exception. + +[source,java] +--------------------- +from("seda:a") + .throttle(100).rejectExecution(true) + .to("seda:b"); +--------------------- + +And in XML: + +[source,xml] +---- +<route> + <from uri="seda:a"/> + <throttle timePeriodMillis="100" rejectExecution="true"> + <constant>100</constant> + </throttle> + <to uri="seda:b"/> +</route> +---- + +=== Throttling per group + +The Throttler will by default throttle all messages in the same group. However, it is possible to use +a _correlation expression_ to diving into multiple groups, where each group is throttled independently. + +For example, you can throttle by a xref:message.adoc[message] header as shown in the following example: + +[source,java] +--------------------- +from("seda:a") + .throttle(100).correlationExpression(header("region")) + .to("seda:b"); +--------------------- + +In the example above messages a throttled by the header with name region. +So suppose there are regions for US, EMEA, and ASIA, then we have three different groups, that each +are throttled by 100 messages per second. + +And in XML: + +[source,xml] +---- +<route> + <from uri="seda:a"/> + <throttle> + <constant>100</constant> + <correlationExpression> + <header>region</header> + </correlationExpression> + </throttle> + <to uri="seda:b"/> +</route> +---- diff --git a/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java b/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java index de5fe21..8864570 100644 --- a/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java +++ b/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java @@ -145,6 +145,31 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic } /** + * To use a correlation expression that can throttle by the given key instead of overall throttling + * + * @param correlationExpression is a correlation key as a long number that can throttle by the given key instead of + * overall throttling + * @return the builder + */ + public ThrottleDefinition correlationExpression(long correlationExpression) { + return correlationExpression(ExpressionBuilder.constantExpression(correlationExpression)); + } + + /** + * To use a correlation expression that can throttle by the given key instead of overall throttling + * + * @param correlationExpression is a correlation key as an expression that can throttle by the given key instead of + * overall throttling + * @return the builder + */ + public ThrottleDefinition correlationExpression(Expression correlationExpression) { + ExpressionSubElementDefinition cor = new ExpressionSubElementDefinition(); + cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression)); + setCorrelationExpression(cor); + return this; + } + + /** * Whether or not the caller should run the task when it was rejected by the thread pool. * <p/> * Is by default <tt>true</tt> diff --git a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java index a842a9f..0c10297 100644 --- a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java @@ -211,7 +211,7 @@ public class ThrottlingGroupingTest extends ContextTestSupport { from("seda:a").throttle(header("max"), 1).to("mock:result"); from("seda:b").throttle(header("max"), 2).to("mock:result2"); - from("seda:c").throttle(header("max"), header("key")).to("mock:resultdynamic"); + from("seda:c").throttle(header("max")).correlationExpression(header("key")).to("mock:resultdynamic"); from("seda:ga").throttle(constant(3), header("key")).timePeriodMillis(1000).to("log:gresult", "mock:gresult");