This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 61d3552429a Camel 16099: throttler EIP - throttle on concurrent
requests (#12138)
61d3552429a is described below
commit 61d3552429af0cb3688504cdec793b4436f33d4c
Author: Jono Morris <[email protected]>
AuthorDate: Sat Nov 25 19:49:34 2023 +1300
Camel 16099: throttler EIP - throttle on concurrent requests (#12138)
* CAMEL-16099 throttle on number of concurrent requests
* CAMEL-16099 update unit tests
* CAMEL-16099 generated changes
* CAMEL-16099 update throttle-group tests
* CAMEL-16099 add note to 4_3 upgrade guide
---
.../org/apache/camel/catalog/models/throttle.json | 13 +-
.../apache/camel/catalog/schemas/camel-spring.xsd | 9 -
.../resources/org/apache/camel/model/throttle.json | 13 +-
.../apache/camel/model/ProcessorDefinition.java | 57 ++++---
.../org/apache/camel/model/ThrottleDefinition.java | 72 ++------
.../java/org/apache/camel/processor/Throttler.java | 187 +++++++++------------
.../org/apache/camel/reifier/ThrottleReifier.java | 5 +-
.../ThrottlerAsyncDelayedCallerRunsTest.java | 2 +-
.../camel/processor/ThrottlerAsyncDelayedTest.java | 4 +-
.../apache/camel/processor/ThrottlerDslTest.java | 4 +-
.../camel/processor/ThrottlerMethodCallTest.java | 2 +-
.../org/apache/camel/processor/ThrottlerTest.java | 170 ++++++++++---------
.../camel/processor/ThrottlingGroupingTest.java | 150 +++++++----------
.../management/mbean/ManagedThrottlerMBean.java | 14 +-
.../camel/management/mbean/ManagedThrottler.java | 18 +-
.../camel/management/ManagedThrottlerTest.java | 30 ++--
.../java/org/apache/camel/xml/in/ModelParser.java | 1 -
.../java/org/apache/camel/xml/out/ModelWriter.java | 1 -
.../org/apache/camel/yaml/out/ModelWriter.java | 1 -
.../ROOT/pages/camel-4x-upgrade-guide-4_3.adoc | 10 +-
.../dsl/yaml/deserializers/ModelDeserializers.java | 8 +-
.../generated/resources/schema/camelYamlDsl.json | 6 -
etc/eclipse/camel_xml_templates.xml | 2 +-
23 files changed, 327 insertions(+), 452 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/throttle.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/throttle.json
index c9eaae7ee68..eda460dde65 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/throttle.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/models/throttle.json
@@ -15,12 +15,11 @@
"expression": { "index": 0, "kind": "expression", "displayName":
"Expression", "required": true, "type": "object", "javaType":
"org.apache.camel.model.language.ExpressionDefinition", "oneOf": [ "constant",
"csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser",
"java", "joor", "jq", "js", "jsonpath", "language", "method", "mvel", "ognl",
"python", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize"
], "deprecated": false, "autowired": false, "sec [...]
"correlationExpression": { "index": 1, "kind": "expression",
"displayName": "Correlation Expression", "required": false, "type": "object",
"javaType": "org.apache.camel.model.ExpressionSubElementDefinition", "oneOf": [
"constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header",
"hl7terser", "java", "joor", "jq", "js", "jsonpath", "language", "method",
"mvel", "ognl", "python", "ref", "simple", "spel", "tokenize", "xpath",
"xquery", "xtokenize" ], "deprecated": false, [...]
"executorService": { "index": 2, "kind": "attribute", "displayName":
"Executor Service", "label": "advanced", "required": false, "type": "object",
"javaType": "java.util.concurrent.ExecutorService", "deprecated": false,
"autowired": false, "secret": false, "description": "To use a custom thread
pool (ScheduledExecutorService) by the throttler." },
- "timePeriodMillis": { "index": 3, "kind": "attribute", "displayName":
"Time Period Millis", "required": false, "type": "duration", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "1000", "description": "Sets the time period during which the
maximum request count is valid for" },
- "asyncDelayed": { "index": 4, "kind": "attribute", "displayName": "Async
Delayed", "label": "advanced", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Enables asynchronous
delay which means the thread will not block while delaying." },
- "callerRunsWhenRejected": { "index": 5, "kind": "attribute",
"displayName": "Caller Runs When Rejected", "label": "advanced", "required":
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true, "description":
"Whether or not the caller should run the task when it was rejected by the
thread pool. Is by default true" },
- "rejectExecution": { "index": 6, "kind": "attribute", "displayName":
"Reject Execution", "label": "advanced", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Whether or not
throttler throws the ThrottlerRejectedExecutionException when the exchange
exceeds the request limit Is by default false" },
- "disabled": { "index": 7, "kind": "attribute", "displayName": "Disabled",
"label": "advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether to disable this EIP from the
route during build time. Once an EIP has been disabled then it cannot be
enabled later at runtime." },
- "id": { "index": 8, "kind": "attribute", "displayName": "Id", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "Sets the id of this node"
},
- "description": { "index": 9, "kind": "element", "displayName":
"Description", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Sets the description of this node" }
+ "asyncDelayed": { "index": 3, "kind": "attribute", "displayName": "Async
Delayed", "label": "advanced", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Enables asynchronous
delay which means the thread will not block while delaying." },
+ "callerRunsWhenRejected": { "index": 4, "kind": "attribute",
"displayName": "Caller Runs When Rejected", "label": "advanced", "required":
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true, "description":
"Whether or not the caller should run the task when it was rejected by the
thread pool. Is by default true" },
+ "rejectExecution": { "index": 5, "kind": "attribute", "displayName":
"Reject Execution", "label": "advanced", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Whether or not
throttler throws the ThrottlerRejectedExecutionException when the exchange
exceeds the request limit Is by default false" },
+ "disabled": { "index": 6, "kind": "attribute", "displayName": "Disabled",
"label": "advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether to disable this EIP from the
route during build time. Once an EIP has been disabled then it cannot be
enabled later at runtime." },
+ "id": { "index": 7, "kind": "attribute", "displayName": "Id", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "Sets the id of this node"
},
+ "description": { "index": 8, "kind": "element", "displayName":
"Description", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Sets the description of this node" }
}
}
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
index ecdc5ffb115..cb3f2bc85ea 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/schemas/camel-spring.xsd
@@ -13086,15 +13086,6 @@ correlation key is throttled together.
<xs:documentation xml:lang="en">
<![CDATA[
To use a custom thread pool (ScheduledExecutorService) by the throttler.
-]]>
- </xs:documentation>
- </xs:annotation>
- </xs:attribute>
- <xs:attribute name="timePeriodMillis" type="xs:string">
- <xs:annotation>
- <xs:documentation xml:lang="en">
-<![CDATA[
-Sets the time period during which the maximum request count is valid for.
Default value: 1000
]]>
</xs:documentation>
</xs:annotation>
diff --git
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/throttle.json
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/throttle.json
index c9eaae7ee68..eda460dde65 100644
---
a/core/camel-core-model/src/generated/resources/org/apache/camel/model/throttle.json
+++
b/core/camel-core-model/src/generated/resources/org/apache/camel/model/throttle.json
@@ -15,12 +15,11 @@
"expression": { "index": 0, "kind": "expression", "displayName":
"Expression", "required": true, "type": "object", "javaType":
"org.apache.camel.model.language.ExpressionDefinition", "oneOf": [ "constant",
"csimple", "datasonnet", "exchangeProperty", "groovy", "header", "hl7terser",
"java", "joor", "jq", "js", "jsonpath", "language", "method", "mvel", "ognl",
"python", "ref", "simple", "spel", "tokenize", "xpath", "xquery", "xtokenize"
], "deprecated": false, "autowired": false, "sec [...]
"correlationExpression": { "index": 1, "kind": "expression",
"displayName": "Correlation Expression", "required": false, "type": "object",
"javaType": "org.apache.camel.model.ExpressionSubElementDefinition", "oneOf": [
"constant", "csimple", "datasonnet", "exchangeProperty", "groovy", "header",
"hl7terser", "java", "joor", "jq", "js", "jsonpath", "language", "method",
"mvel", "ognl", "python", "ref", "simple", "spel", "tokenize", "xpath",
"xquery", "xtokenize" ], "deprecated": false, [...]
"executorService": { "index": 2, "kind": "attribute", "displayName":
"Executor Service", "label": "advanced", "required": false, "type": "object",
"javaType": "java.util.concurrent.ExecutorService", "deprecated": false,
"autowired": false, "secret": false, "description": "To use a custom thread
pool (ScheduledExecutorService) by the throttler." },
- "timePeriodMillis": { "index": 3, "kind": "attribute", "displayName":
"Time Period Millis", "required": false, "type": "duration", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": "1000", "description": "Sets the time period during which the
maximum request count is valid for" },
- "asyncDelayed": { "index": 4, "kind": "attribute", "displayName": "Async
Delayed", "label": "advanced", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Enables asynchronous
delay which means the thread will not block while delaying." },
- "callerRunsWhenRejected": { "index": 5, "kind": "attribute",
"displayName": "Caller Runs When Rejected", "label": "advanced", "required":
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true, "description":
"Whether or not the caller should run the task when it was rejected by the
thread pool. Is by default true" },
- "rejectExecution": { "index": 6, "kind": "attribute", "displayName":
"Reject Execution", "label": "advanced", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Whether or not
throttler throws the ThrottlerRejectedExecutionException when the exchange
exceeds the request limit Is by default false" },
- "disabled": { "index": 7, "kind": "attribute", "displayName": "Disabled",
"label": "advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether to disable this EIP from the
route during build time. Once an EIP has been disabled then it cannot be
enabled later at runtime." },
- "id": { "index": 8, "kind": "attribute", "displayName": "Id", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "Sets the id of this node"
},
- "description": { "index": 9, "kind": "element", "displayName":
"Description", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Sets the description of this node" }
+ "asyncDelayed": { "index": 3, "kind": "attribute", "displayName": "Async
Delayed", "label": "advanced", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Enables asynchronous
delay which means the thread will not block while delaying." },
+ "callerRunsWhenRejected": { "index": 4, "kind": "attribute",
"displayName": "Caller Runs When Rejected", "label": "advanced", "required":
false, "type": "boolean", "javaType": "java.lang.Boolean", "deprecated": false,
"autowired": false, "secret": false, "defaultValue": true, "description":
"Whether or not the caller should run the task when it was rejected by the
thread pool. Is by default true" },
+ "rejectExecution": { "index": 5, "kind": "attribute", "displayName":
"Reject Execution", "label": "advanced", "required": false, "type": "boolean",
"javaType": "java.lang.Boolean", "deprecated": false, "autowired": false,
"secret": false, "defaultValue": false, "description": "Whether or not
throttler throws the ThrottlerRejectedExecutionException when the exchange
exceeds the request limit Is by default false" },
+ "disabled": { "index": 6, "kind": "attribute", "displayName": "Disabled",
"label": "advanced", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "autowired": false, "secret": false,
"defaultValue": false, "description": "Whether to disable this EIP from the
route during build time. Once an EIP has been disabled then it cannot be
enabled later at runtime." },
+ "id": { "index": 7, "kind": "attribute", "displayName": "Id", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"autowired": false, "secret": false, "description": "Sets the id of this node"
},
+ "description": { "index": 8, "kind": "element", "displayName":
"Description", "required": false, "type": "string", "javaType":
"java.lang.String", "deprecated": false, "autowired": false, "secret": false,
"description": "Sets the description of this node" }
}
}
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 8f4e1920f4f..55b5a9c09b8 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -1765,14 +1765,14 @@ public abstract class ProcessorDefinition<Type extends
ProcessorDefinition<Type>
* that a specific endpoint does not get overloaded, or that we don't
exceed an agreed SLA with some external
* service.
* <p/>
- * Will default use a time period of 1 second, so setting the
maximumRequestCount to eg 10 will default ensure at
- * most 10 messages per second.
+ * Setting the maximumConcurrentRequest will ensure that no more than the
specified number of messages will flow to
+ * the endpoint at any given time.
*
- * @param maximumRequestCount the maximum messages
- * @return the builder
+ * @param maximumConcurrentRequests the maximum number of concurrent
messages
+ * @return the builder
*/
- public ThrottleDefinition throttle(long maximumRequestCount) {
- return
throttle(ExpressionBuilder.constantExpression(maximumRequestCount));
+ public ThrottleDefinition throttle(long maximumConcurrentRequests) {
+ return
throttle(ExpressionBuilder.constantExpression(maximumConcurrentRequests));
}
/**
@@ -1780,14 +1780,14 @@ public abstract class ProcessorDefinition<Type extends
ProcessorDefinition<Type>
* that a specific endpoint does not get overloaded, or that we don't
exceed an agreed SLA with some external
* service.
* <p/>
- * Will default use a time period of 1 second, so setting the
maximumRequestCount to eg 10 will default ensure at
- * most 10 messages per second.
+ * Setting the maximumConcurrentRequest will ensure that no more than the
specified number of messages will flow to
+ * the endpoint at any given time.
*
- * @param maximumRequestCount an expression to calculate the maximum
request count
- * @return the builder
+ * @param maximumConcurrentRequests an expression to calculate the
maximum concurrent request count
+ * @return the builder
*/
- public ThrottleDefinition throttle(Expression maximumRequestCount) {
- ThrottleDefinition answer = new
ThrottleDefinition(maximumRequestCount);
+ public ThrottleDefinition throttle(Expression maximumConcurrentRequests) {
+ ThrottleDefinition answer = new
ThrottleDefinition(maximumConcurrentRequests);
addOutput(answer);
return answer;
}
@@ -1799,17 +1799,18 @@ public abstract class ProcessorDefinition<Type extends
ProcessorDefinition<Type>
* based on the key expression to group exchanges. This will make
key-based throttling instead of overall
* throttling.
* <p/>
- * Will default use a time period of 1 second, so setting the
maximumRequestCount to eg 10 will default ensure at
- * most 10 messages per second.
+ * Setting the maximumConcurrentRequest will ensure that no more than the
specified number of messages will flow to
+ * the endpoint at any given time.
*
- * @param maximumRequestCount an expression to calculate the maximum
request count
- * @param correlationExpressionKey is a correlation key that can throttle
by the given key instead of overall
- * throttling
- * @return the builder
+ * @param maximumConcurrentRequests an expression to calculate the
maximum concurrent request count
+ * @param correlationExpressionKey is a correlation key that can
throttle by the given key instead of overall
+ * throttling
+ * @return the builder
*/
- public ThrottleDefinition throttle(Expression maximumRequestCount, long
correlationExpressionKey) {
+ public ThrottleDefinition throttle(Expression maximumConcurrentRequests,
long correlationExpressionKey) {
ThrottleDefinition answer
- = new ThrottleDefinition(maximumRequestCount,
ExpressionBuilder.constantExpression(correlationExpressionKey));
+ = new ThrottleDefinition(
+ maximumConcurrentRequests,
ExpressionBuilder.constantExpression(correlationExpressionKey));
addOutput(answer);
return answer;
}
@@ -1821,16 +1822,16 @@ public abstract class ProcessorDefinition<Type extends
ProcessorDefinition<Type>
* based on the key expression to group exchanges. This will make
key-based throttling instead of overall
* throttling.
* <p/>
- * Will default use a time period of 1 second, so setting the
maximumRequestCount to eg 10 will default ensure at
- * most 10 messages per second.
+ * Setting the maximumConcurrentRequest will ensure that no more than the
specified number of messages will flow to
+ * the endpoint at any given time.
*
- * @param maximumRequestCount an expression to calculate the maximum
request count
- * @param correlationExpressionKey is a correlation key as an expression
that can throttle by the given key instead
- * of overall throttling
- * @return the builder
+ * @param maximumConcurrentRequests an expression to calculate the
maximum concurrent request count
+ * @param correlationExpressionKey is a correlation key as an expression
that can throttle by the given key
+ * instead of overall throttling
+ * @return the builder
*/
- public ThrottleDefinition throttle(Expression maximumRequestCount,
Expression correlationExpressionKey) {
- ThrottleDefinition answer = new
ThrottleDefinition(maximumRequestCount, correlationExpressionKey);
+ public ThrottleDefinition throttle(Expression maximumConcurrentRequests,
Expression correlationExpressionKey) {
+ ThrottleDefinition answer = new
ThrottleDefinition(maximumConcurrentRequests, correlationExpressionKey);
addOutput(answer);
return answer;
}
diff --git
a/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java
b/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java
index 8c0b78b3e78..2a7611997b4 100644
---
a/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java
+++
b/core/camel-core-model/src/main/java/org/apache/camel/model/ThrottleDefinition.java
@@ -49,9 +49,6 @@ public class ThrottleDefinition extends ExpressionNode
implements ExecutorServic
@Metadata(label = "advanced", javaType =
"java.util.concurrent.ExecutorService")
private String executorService;
@XmlAttribute
- @Metadata(defaultValue = "1000", javaType = "java.time.Duration")
- private String timePeriodMillis;
- @XmlAttribute
@Metadata(label = "advanced", javaType = "java.lang.Boolean")
private String asyncDelayed;
@XmlAttribute
@@ -64,16 +61,16 @@ public class ThrottleDefinition extends ExpressionNode
implements ExecutorServic
public ThrottleDefinition() {
}
- public ThrottleDefinition(Expression maximumRequestsPerPeriod) {
- super(maximumRequestsPerPeriod);
+ public ThrottleDefinition(Expression maximumConcurrentRequests) {
+ super(maximumConcurrentRequests);
}
- public ThrottleDefinition(Expression maximumRequestsPerPeriod, Expression
correlationExpression) {
-
this(ExpressionNodeHelper.toExpressionDefinition(maximumRequestsPerPeriod),
correlationExpression);
+ public ThrottleDefinition(Expression maximumConcurrentRequests, Expression
correlationExpression) {
+
this(ExpressionNodeHelper.toExpressionDefinition(maximumConcurrentRequests),
correlationExpression);
}
- private ThrottleDefinition(ExpressionDefinition maximumRequestsPerPeriod,
Expression correlationExpression) {
- super(maximumRequestsPerPeriod);
+ private ThrottleDefinition(ExpressionDefinition maximumConcurrentRequests,
Expression correlationExpression) {
+ super(maximumConcurrentRequests);
ExpressionSubElementDefinition cor = new
ExpressionSubElementDefinition();
cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression));
@@ -82,11 +79,7 @@ public class ThrottleDefinition extends ExpressionNode
implements ExecutorServic
@Override
public String toString() {
- return "Throttle[" + description() + "]";
- }
-
- protected String description() {
- return getExpression() + " request per " + getTimePeriodMillis() + "
millis";
+ return "Throttle[" + getExpression() + "]";
}
@Override
@@ -96,53 +89,32 @@ public class ThrottleDefinition extends ExpressionNode
implements ExecutorServic
@Override
public String getLabel() {
- return "throttle[" + description() + "]";
+ return "throttle[" + getExpression() + "]";
}
// Fluent API
//
-------------------------------------------------------------------------
/**
- * Sets the time period during which the maximum request count is valid for
- *
- * @param timePeriodMillis period in millis
- * @return the builder
- */
- public ThrottleDefinition timePeriodMillis(long timePeriodMillis) {
- return timePeriodMillis(Long.toString(timePeriodMillis));
- }
-
- /**
- * Sets the time period during which the maximum request count is valid for
- *
- * @param timePeriodMillis period in millis
- * @return the builder
- */
- public ThrottleDefinition timePeriodMillis(String timePeriodMillis) {
- setTimePeriodMillis(timePeriodMillis);
- return this;
- }
-
- /**
- * Sets the time period during which the maximum request count per period
+ * Sets the maximum number of concurrent requests
*
- * @param maximumRequestsPerPeriod the maximum request count number per
time period
- * @return the builder
+ * @param maximumConcurrentRequests the maximum number of concurrent
requests
+ * @return the builder
*/
- public ThrottleDefinition maximumRequestsPerPeriod(long
maximumRequestsPerPeriod) {
+ public ThrottleDefinition maximumConcurrentRequests(long
maximumConcurrentRequests) {
setExpression(
-
ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.constantExpression(maximumRequestsPerPeriod)));
+
ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.constantExpression(maximumConcurrentRequests)));
return this;
}
/**
- * Sets the time period during which the maximum request count per period
+ * Sets the number of concurrent requests
*
- * @param maximumRequestsPerPeriod the maximum request count number per
time period
- * @return the builder
+ * @param maximumConcurrentRequests the maximum number of concurrent
requests
+ * @return the builder
*/
- public ThrottleDefinition maximumRequestsPerPeriod(String
maximumRequestsPerPeriod) {
+ public ThrottleDefinition maximumConcurrentRequests(String
maximumConcurrentRequests) {
setExpression(
-
ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.simpleExpression(maximumRequestsPerPeriod)));
+
ExpressionNodeHelper.toExpressionDefinition(ExpressionBuilder.simpleExpression(maximumConcurrentRequests)));
return this;
}
@@ -297,14 +269,6 @@ public class ThrottleDefinition extends ExpressionNode
implements ExecutorServic
super.setExpression(expression);
}
- public String getTimePeriodMillis() {
- return timePeriodMillis;
- }
-
- public void setTimePeriodMillis(String timePeriodMillis) {
- this.timePeriodMillis = timePeriodMillis;
- }
-
public String getAsyncDelayed() {
return asyncDelayed;
}
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
index a7854805cde..b74b145914e 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
@@ -18,11 +18,10 @@ package org.apache.camel.processor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Delayed;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -34,6 +33,7 @@ import org.apache.camel.RuntimeExchangeException;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.RouteIdAware;
+import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
@@ -41,20 +41,17 @@ import org.slf4j.LoggerFactory;
/**
* A <a href="http://camel.apache.org/throttler.html">Throttler</a> will set a
limit on the maximum number of message
- * exchanges which can be sent to a processor within a specific time period.
+ * exchanges which can be sent to a processor concurrently.
* <p/>
- * This pattern can be extremely useful if you have some external system which
meters access; such as only allowing 100
- * requests per second; or if huge load can cause a particular system to
malfunction or to reduce its throughput you
+ * This pattern can be extremely useful if you have some external system which
meters access; such as only allowing 10
+ * concurrent requests; or if huge load can cause a particular system to
malfunction or to reduce its throughput you
* might want to introduce some throttling.
*
* This throttle implementation is thread-safe and is therefore safe to be
used by multiple concurrent threads in a
* single route.
*
- * The throttling mechanism is a DelayQueue with maxRequestsPerPeriod permits
on it. Each permit is set to be delayed by
- * timePeriodMillis (except when the throttler is initialized or the throttle
rate increased, then there is no delay for
- * those permits). Callers trying to acquire a permit from the DelayQueue will
block if necessary. The end result is a
- * rolling window of time. Where from the callers point of view in the last
timePeriodMillis no more than
- * maxRequestsPerPeriod have been allowed to be acquired.
+ * The throttling mechanism is a Semaphore with maxConcurrentRequests permits
on it. Callers trying to acquire a permit
+ * will block if necessary when maxConcurrentRequests permits have been
acquired.
*/
public class Throttler extends AsyncProcessorSupport implements Traceable,
IdAware, RouteIdAware {
@@ -64,6 +61,7 @@ public class Throttler extends AsyncProcessorSupport
implements Traceable, IdAwa
private static final String PROPERTY_EXCHANGE_QUEUED_TIMESTAMP =
"CamelThrottlerExchangeQueuedTimestamp";
private static final String PROPERTY_EXCHANGE_STATE =
"CamelThrottlerExchangeState";
+ private static final long CLEAN_PERIOD = 1000L * 10;
private enum State {
SYNC,
@@ -74,34 +72,24 @@ public class Throttler extends AsyncProcessorSupport
implements Traceable, IdAwa
private final CamelContext camelContext;
private final ScheduledExecutorService asyncExecutor;
private final boolean shutdownAsyncExecutor;
-
- private volatile long timePeriodMillis;
- private final long cleanPeriodMillis;
private String id;
private String routeId;
- private Expression maxRequestsPerPeriodExpression;
+ private Expression maxConcurrentRequestsExpression;
private boolean rejectExecution;
private boolean asyncDelayed;
private boolean callerRunsWhenRejected = true;
private final Expression correlationExpression;
private final Map<String, ThrottlingState> states = new
ConcurrentHashMap<>();
- public Throttler(final CamelContext camelContext, final Expression
maxRequestsPerPeriodExpression,
- final long timePeriodMillis,
+ public Throttler(final CamelContext camelContext, final Expression
maxConcurrentRequestsExpression,
final ScheduledExecutorService asyncExecutor, final
boolean shutdownAsyncExecutor,
final boolean rejectExecution, Expression correlation) {
this.camelContext = camelContext;
this.rejectExecution = rejectExecution;
this.shutdownAsyncExecutor = shutdownAsyncExecutor;
- ObjectHelper.notNull(maxRequestsPerPeriodExpression,
"maxRequestsPerPeriodExpression");
- this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression;
-
- if (timePeriodMillis <= 0) {
- throw new IllegalArgumentException("TimePeriodMillis should be a
positive number, was: " + timePeriodMillis);
- }
- this.timePeriodMillis = timePeriodMillis;
- this.cleanPeriodMillis = timePeriodMillis * 10;
+ ObjectHelper.notNull(maxConcurrentRequestsExpression,
"maxConcurrentRequestsExpression");
+ this.maxConcurrentRequestsExpression = maxConcurrentRequestsExpression;
this.asyncExecutor = asyncExecutor;
this.correlationExpression = correlation;
}
@@ -127,16 +115,12 @@ public class Throttler extends AsyncProcessorSupport
implements Traceable, IdAwa
key = correlationExpression.evaluate(exchange, String.class);
}
ThrottlingState throttlingState = states.computeIfAbsent(key,
ThrottlingState::new);
- throttlingState.calculateAndSetMaxRequestsPerPeriod(exchange);
-
- ThrottlePermit permit = throttlingState.poll();
+
throttlingState.calculateAndSetMaxConcurrentRequestsExpression(exchange);
- if (permit == null) {
+ if (!throttlingState.tryAcquire(exchange)) {
if (isRejectExecution()) {
throw new ThrottlerRejectedExecutionException(
- "Exceeded the max throttle rate of "
- +
throttlingState.getThrottleRate() + " within "
- +
timePeriodMillis + "ms");
+ "Exceeded the max throttle rate of " +
throttlingState.getThrottleRate());
} else {
// delegate to async pool
if (isAsyncDelayed() && !exchange.isTransacted() && state
== State.SYNC) {
@@ -154,12 +138,10 @@ public class Throttler extends AsyncProcessorSupport
implements Traceable, IdAwa
if (LOG.isTraceEnabled()) {
start = System.currentTimeMillis();
}
- permit = throttlingState.take();
+ throttlingState.acquire(exchange);
if (LOG.isTraceEnabled()) {
elapsed = System.currentTimeMillis() - start;
}
- throttlingState.enqueue(permit, exchange);
-
if (state == State.ASYNC) {
if (LOG.isTraceEnabled()) {
long queuedTime = start - queuedStart;
@@ -175,8 +157,7 @@ public class Throttler extends AsyncProcessorSupport
implements Traceable, IdAwa
}
}
} else {
- throttlingState.enqueue(permit, exchange);
-
+ // permit acquired
if (state == State.ASYNC) {
if (LOG.isTraceEnabled()) {
long queuedTime = System.currentTimeMillis() -
queuedStart;
@@ -214,7 +195,7 @@ public class Throttler extends AsyncProcessorSupport
implements Traceable, IdAwa
}
/**
- * Delegate blocking on the DelayQueue to an asyncExecutor. Except if the
executor rejects the submission and
+ * Delegate blocking to an asyncExecutor. Except if the executor rejects
the submission and
* isCallerRunsWhenRejected() is enabled, then this method will delegate
back to process(), but not before changing
* the exchange state to stop any recursion.
*/
@@ -225,8 +206,7 @@ public class Throttler extends AsyncProcessorSupport
implements Traceable, IdAwa
exchange.setProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP,
System.currentTimeMillis());
}
exchange.setProperty(PROPERTY_EXCHANGE_STATE, State.ASYNC);
- long delay = throttlingState.peek().getDelay(TimeUnit.NANOSECONDS);
- asyncExecutor.schedule(() -> process(exchange, callback), delay,
TimeUnit.NANOSECONDS);
+ asyncExecutor.submit(() -> process(exchange, callback));
return false;
} catch (final RejectedExecutionException e) {
if (isCallerRunsWhenRejected()) {
@@ -259,68 +239,82 @@ public class Throttler extends AsyncProcessorSupport
implements Traceable, IdAwa
private class ThrottlingState {
private final String key;
- private final DelayQueue<ThrottlePermit> delayQueue = new
DelayQueue<>();
private final AtomicReference<ScheduledFuture<?>> cleanFuture = new
AtomicReference<>();
private volatile int throttleRate;
+ private WrappedSemaphore semaphore;
ThrottlingState(String key) {
this.key = key;
+ semaphore = new WrappedSemaphore();
}
public int getThrottleRate() {
return throttleRate;
}
- public ThrottlePermit poll() {
- return delayQueue.poll();
+ public void clean() {
+ states.remove(key);
}
- public ThrottlePermit peek() {
- return delayQueue.peek();
+ public boolean tryAcquire(Exchange exchange) {
+ boolean acquired = semaphore.tryAcquire();
+ if (acquired) {
+ addSynchronization(exchange);
+ }
+ return acquired;
}
- public ThrottlePermit take() throws InterruptedException {
- return delayQueue.take();
+ public void acquire(Exchange exchange) throws InterruptedException {
+ semaphore.acquire();
+ addSynchronization(exchange);
}
- public void clean() {
- states.remove(key);
+ private void addSynchronization(final Exchange exchange) {
+ exchange.getExchangeExtension().addOnCompletion(new
Synchronization() {
+ @Override
+ public void onComplete(Exchange exchange) {
+ release(exchange);
+ }
+
+ @Override
+ public void onFailure(Exchange exchange) {
+ release(exchange);
+ }
+ });
}
/**
- * Returns a permit to the DelayQueue, first resetting it's delay to
be relative to now.
+ * Returns a permit.
*/
- public void enqueue(final ThrottlePermit permit, final Exchange
exchange) {
- permit.setDelayMs(getTimePeriodMillis());
- delayQueue.put(permit);
+ public void release(final Exchange exchange) {
+ semaphore.release();
try {
- ScheduledFuture<?> next = asyncExecutor.schedule(this::clean,
cleanPeriodMillis, TimeUnit.MILLISECONDS);
+ ScheduledFuture<?> next = asyncExecutor.schedule(this::clean,
CLEAN_PERIOD, TimeUnit.MILLISECONDS);
ScheduledFuture<?> prev = cleanFuture.getAndSet(next);
if (prev != null) {
prev.cancel(false);
}
- // try and incur the least amount of overhead while releasing
permits back to the queue
if (LOG.isTraceEnabled()) {
LOG.trace("Permit released, for exchangeId: {}",
exchange.getExchangeId());
}
} catch (RejectedExecutionException e) {
- LOG.debug("Throttling queue cleaning rejected", e);
+ LOG.debug("Throttle cleaning rejected", e);
}
}
/**
- * Evaluates the maxRequestsPerPeriodExpression and adjusts the
throttle rate up or down.
+ * Evaluates the maxConcurrentRequestsExpression and adjusts the
throttle rate up or down.
*/
- public synchronized void calculateAndSetMaxRequestsPerPeriod(final
Exchange exchange) throws Exception {
- Integer newThrottle =
maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class);
+ public synchronized void
calculateAndSetMaxConcurrentRequestsExpression(final Exchange exchange) throws
Exception {
+ Integer newThrottle =
maxConcurrentRequestsExpression.evaluate(exchange, Integer.class);
if (newThrottle != null && newThrottle < 0) {
- throw new IllegalStateException("The maximumRequestsPerPeriod
must be a positive number, was: " + newThrottle);
+ throw new IllegalStateException("The maximumConcurrentRequests
must be a positive number, was: " + newThrottle);
}
if (newThrottle == null && throttleRate == 0) {
throw new RuntimeExchangeException(
- "The maxRequestsPerPeriodExpression was evaluated as
null: " + maxRequestsPerPeriodExpression,
+ "The maxConcurrentRequestsExpression was evaluated as
null: " + maxConcurrentRequestsExpression,
exchange);
}
@@ -331,14 +325,7 @@ public class Throttler extends AsyncProcessorSupport
implements Traceable, IdAwa
int delta = throttleRate - newThrottle;
// discard any permits that are needed to decrease
throttling
- while (delta > 0) {
- delayQueue.take();
- delta--;
- if (LOG.isTraceEnabled()) {
- LOG.trace("Permit discarded due to throttling
rate decrease, triggered by ExchangeId: {}",
- exchange.getExchangeId());
- }
- }
+ semaphore.reducePermits(delta);
if (LOG.isDebugEnabled()) {
LOG.debug("Throttle rate decreased from {} to {},
triggered by ExchangeId: {}", throttleRate,
newThrottle, exchange.getExchangeId());
@@ -347,9 +334,7 @@ public class Throttler extends AsyncProcessorSupport
implements Traceable, IdAwa
// increase
} else if (newThrottle > throttleRate) {
int delta = newThrottle - throttleRate;
- for (int i = 0; i < delta; i++) {
- delayQueue.put(new ThrottlePermit(-1));
- }
+ semaphore.increasePermits(delta);
if (throttleRate == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Initial throttle rate set to {},
triggered by ExchangeId: {}", newThrottle,
@@ -368,28 +353,29 @@ public class Throttler extends AsyncProcessorSupport
implements Traceable, IdAwa
}
}
- /**
- * Permit that implements the Delayed interface needed by DelayQueue.
- */
- private static class ThrottlePermit implements Delayed {
- private volatile long scheduledTime;
-
- ThrottlePermit(final long delayMs) {
- setDelayMs(delayMs);
+ // extend Semaphore so we can reduce permits if required
+ private class WrappedSemaphore extends Semaphore {
+ public WrappedSemaphore() {
+ super(0, true);
}
- public void setDelayMs(final long delayMs) {
- this.scheduledTime = System.currentTimeMillis() + delayMs;
+ public boolean tryAcquire() {
+ try {
+ // honours fairness setting
+ return super.tryAcquire(0L, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException e) {
+ return false;
+ }
}
- @Override
- public long getDelay(final TimeUnit unit) {
- return unit.convert(scheduledTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
+ // decrease throttling
+ public void reducePermits(int n) {
+ super.reducePermits(n);
}
- @Override
- public int compareTo(final Delayed o) {
- return Long.compare(getDelay(TimeUnit.MILLISECONDS),
o.getDelay(TimeUnit.MILLISECONDS));
+ // increase throttling
+ public void increasePermits(int n) {
+ super.release(n);
}
}
@@ -440,36 +426,25 @@ public class Throttler extends AsyncProcessorSupport
implements Traceable, IdAwa
/**
* Sets the maximum number of requests per time period expression
*/
- public void setMaximumRequestsPerPeriodExpression(Expression
maxRequestsPerPeriodExpression) {
- this.maxRequestsPerPeriodExpression = maxRequestsPerPeriodExpression;
+ public void setMaximumConcurrentRequestsExpression(Expression
maxConcurrentRequestsExpression) {
+ this.maxConcurrentRequestsExpression = maxConcurrentRequestsExpression;
}
- public Expression getMaximumRequestsPerPeriodExpression() {
- return maxRequestsPerPeriodExpression;
+ public Expression getMaximumConcurrentRequests() {
+ return maxConcurrentRequestsExpression;
}
/**
- * Gets the current maximum request per period value. If it is grouped
throttling applied with correlationExpression
- * than the max per period within the group will return
+ * Gets the current maximum request. If it is grouped throttling applied
with correlationExpression then the max
+ * within the group will return
*/
- public int getCurrentMaximumRequestsPerPeriod() {
+ public int getCurrentMaximumConcurrentRequests() {
return
states.values().stream().mapToInt(ThrottlingState::getThrottleRate).max().orElse(0);
}
- /**
- * Sets the time period during which the maximum number of requests apply
- */
- public void setTimePeriodMillis(final long timePeriodMillis) {
- this.timePeriodMillis = timePeriodMillis;
- }
-
- public long getTimePeriodMillis() {
- return timePeriodMillis;
- }
-
@Override
public String getTraceLabel() {
- return "throttle[" + maxRequestsPerPeriodExpression + " per: " +
timePeriodMillis + "]";
+ return "throttle[" + maxConcurrentRequestsExpression + "]";
}
@Override
diff --git
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ThrottleReifier.java
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ThrottleReifier.java
index 20dc0bec3ef..a046d0bf1c4 100644
---
a/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ThrottleReifier.java
+++
b/core/camel-core-reifier/src/main/java/org/apache/camel/reifier/ThrottleReifier.java
@@ -37,9 +37,6 @@ public class ThrottleReifier extends
ExpressionReifier<ThrottleDefinition> {
boolean shutdownThreadPool = willCreateNewThreadPool(definition, true);
ScheduledExecutorService threadPool =
getConfiguredScheduledExecutorService("Throttle", definition, true);
- // should be default 1000 millis
- long period = parseDuration(definition.getTimePeriodMillis(), 1000L);
-
// max requests per period is mandatory
Expression maxRequestsExpression =
createMaxRequestsPerPeriodExpression();
if (maxRequestsExpression == null) {
@@ -53,7 +50,7 @@ public class ThrottleReifier extends
ExpressionReifier<ThrottleDefinition> {
boolean reject = parseBoolean(definition.getRejectExecution(), false);
Throttler answer = new Throttler(
- camelContext, maxRequestsExpression, period, threadPool,
shutdownThreadPool, reject, correlation);
+ camelContext, maxRequestsExpression, threadPool,
shutdownThreadPool, reject, correlation);
answer.setAsyncDelayed(async);
// should be true by default
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
index 56efee27f12..1af2943081c 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedCallerRunsTest.java
@@ -50,7 +50,7 @@ public class ThrottlerAsyncDelayedCallerRunsTest extends
ContextTestSupport {
builder.maxQueueSize(2);
context.getExecutorServiceManager().registerThreadPoolProfile(builder.build());
-
from("seda:start").throttle(1).timePeriodMillis(100).asyncDelayed().executorService("myThrottler")
+
from("seda:start").throttle(1).delay(100).asyncDelayed().executorService("myThrottler")
.callerRunsWhenRejected(true).to("mock:result");
}
};
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java
index ef14ff4b3e8..71adf7ea555 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerAsyncDelayedTest.java
@@ -67,10 +67,10 @@ public class ThrottlerAsyncDelayedTest extends
ContextTestSupport {
return new RouteBuilder() {
public void configure() {
// START SNIPPET: ex
-
from("seda:a").throttle(3).timePeriodMillis(INTERVAL).asyncDelayed().to("log:result",
"mock:result");
+
from("seda:a").throttle(3).delay(INTERVAL).asyncDelayed().to("log:result",
"mock:result");
// END SNIPPET: ex
-
from("direct:a").throttle(3).timePeriodMillis(INTERVAL).asyncDelayed().to("log:result",
"mock:result");
+
from("direct:a").throttle(3).delay(INTERVAL).asyncDelayed().to("log:result",
"mock:result");
}
};
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
index 2d09d32c026..bb34216469a 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerDslTest.java
@@ -49,7 +49,7 @@ public class ThrottlerDslTest extends ContextTestSupport {
resultEndpoint.assertIsSatisfied();
// now assert that they have actually been throttled
- long minimumTime = (messageCount - 1) * INTERVAL;
+ long minimumTime = messageCount * INTERVAL;
// add a little slack
long delta = System.currentTimeMillis() - start + 200;
assertTrue(delta >= minimumTime, "Should take at least " + minimumTime
+ "ms, was: " + delta);
@@ -61,7 +61,7 @@ public class ThrottlerDslTest extends ContextTestSupport {
return new RouteBuilder() {
public void configure() {
from("direct:start").throttle().message(m ->
m.getHeader("ThrottleCount", Integer.class))
- .timePeriodMillis(INTERVAL).to("log:result",
"mock:result");
+ .delay(INTERVAL).to("log:result", "mock:result");
}
};
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java
index 7255f31da86..79289495426 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerMethodCallTest.java
@@ -75,7 +75,7 @@ public class ThrottlerMethodCallTest extends
ContextTestSupport {
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
- from("direct:expressionMethod").throttle(method("myBean",
"getMessagesPerInterval")).timePeriodMillis(INTERVAL)
+ from("direct:expressionMethod").throttle(method("myBean",
"getMessagesPerInterval")).delay(INTERVAL)
.to("log:result", "mock:result");
}
};
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
index b25f77941f1..37b92903927 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.processor;
+import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -35,23 +36,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@DisabledIfSystemProperty(named = "ci.env.name", matches = "github.com",
disabledReason = "Flaky on Github CI")
public class ThrottlerTest extends ContextTestSupport {
private static final int INTERVAL = 500;
- private static final int TOLERANCE = 50;
private static final int MESSAGE_COUNT = 9;
-
- @Test
- public void testSendLotsOfMessagesButOnly3GetThroughWithin2Seconds()
throws Exception {
- MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
MockEndpoint.class);
- resultEndpoint.expectedMessageCount(3);
- resultEndpoint.setResultWaitTime(2000);
-
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- template.sendBody("seda:a", "<message>" + i + "</message>");
- }
-
- // lets pause to give the requests time to be processed
- // to check that the throttle really does kick in
- resultEndpoint.assertIsSatisfied();
- }
+ private static final int CONCURRENT_REQUESTS = 2;
+ private volatile int curr;
+ private volatile int max;
@Test
public void testSendLotsOfMessagesWithRejectExecution() throws Exception {
@@ -61,27 +49,29 @@ public class ThrottlerTest extends ContextTestSupport {
MockEndpoint errorEndpoint = resolveMandatoryEndpoint("mock:error",
MockEndpoint.class);
errorEndpoint.expectedMessageCount(4);
- for (int i = 0; i < 6; i++) {
- template.sendBody("direct:start", "<message>" + i + "</message>");
+ ExecutorService executor = Executors.newFixedThreadPool(6);
+ try {
+ for (int i = 0; i < 6; i++) {
+ executor.execute(() -> template.sendBody("direct:start",
"<message>payload</message>"));
+ }
+ assertMockEndpointsSatisfied();
+ } finally {
+ executor.shutdownNow();
}
-
- // lets pause to give the requests time to be processed
- // to check that the throttle really does kick in
- assertMockEndpointsSatisfied();
}
@Test
public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough()
throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
MockEndpoint.class);
- long elapsed = sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:a",
MESSAGE_COUNT, resultEndpoint);
- assertThrottlerTiming(elapsed, 5, INTERVAL, MESSAGE_COUNT);
+ sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:a", MESSAGE_COUNT,
resultEndpoint);
+ assertTrue(max <= CONCURRENT_REQUESTS);
}
@Test
public void testConfigurationWithConstantExpression() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
MockEndpoint.class);
- long elapsed = sendMessagesAndAwaitDelivery(MESSAGE_COUNT,
"direct:expressionConstant", MESSAGE_COUNT, resultEndpoint);
- assertThrottlerTiming(elapsed, 5, INTERVAL, MESSAGE_COUNT);
+ sendMessagesAndAwaitDelivery(MESSAGE_COUNT,
"direct:expressionConstant", MESSAGE_COUNT, resultEndpoint);
+ assertTrue(max <= CONCURRENT_REQUESTS);
}
@Test
@@ -91,7 +81,7 @@ public class ThrottlerTest extends ContextTestSupport {
ExecutorService executor = Executors.newFixedThreadPool(MESSAGE_COUNT);
try {
- sendMessagesWithHeaderExpression(executor, resultEndpoint, 5,
INTERVAL, MESSAGE_COUNT);
+ sendMessagesWithHeaderExpression(executor, resultEndpoint,
CONCURRENT_REQUESTS, INTERVAL, MESSAGE_COUNT);
} finally {
executor.shutdownNow();
}
@@ -102,47 +92,46 @@ public class ThrottlerTest extends ContextTestSupport {
ExecutorService executor = Executors.newFixedThreadPool(5);
try {
MockEndpoint resultEndpoint =
resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
- sendMessagesWithHeaderExpression(executor, resultEndpoint, 5,
INTERVAL, MESSAGE_COUNT);
- Thread.sleep(INTERVAL + TOLERANCE); // sleep here to ensure the
- // first throttle rate does not
- // influence the next one.
+ sendMessagesWithHeaderExpression(executor, resultEndpoint, 2,
INTERVAL, MESSAGE_COUNT);
+ Thread.sleep(INTERVAL); // sleep here to ensure the
+ // first throttle rate does not
+ // influence the next one.
resultEndpoint.reset();
- sendMessagesWithHeaderExpression(executor, resultEndpoint, 10,
INTERVAL, MESSAGE_COUNT);
- Thread.sleep(INTERVAL + TOLERANCE); // sleep here to ensure the
- // first throttle rate does not
- // influence the next one.
+ sendMessagesWithHeaderExpression(executor, resultEndpoint, 4,
INTERVAL, MESSAGE_COUNT);
+ Thread.sleep(INTERVAL); // sleep here to ensure the
+ // first throttle rate does not
+ // influence the next one.
resultEndpoint.reset();
- sendMessagesWithHeaderExpression(executor, resultEndpoint, 5,
INTERVAL, MESSAGE_COUNT);
- Thread.sleep(INTERVAL + TOLERANCE); // sleep here to ensure the
- // first throttle rate does not
- // influence the next one.
+ sendMessagesWithHeaderExpression(executor, resultEndpoint, 2,
INTERVAL, MESSAGE_COUNT);
+ Thread.sleep(INTERVAL); // sleep here to ensure the
+ // first throttle rate does not
+ // influence the next one.
resultEndpoint.reset();
- sendMessagesWithHeaderExpression(executor, resultEndpoint, 10,
INTERVAL, MESSAGE_COUNT);
+ sendMessagesWithHeaderExpression(executor, resultEndpoint, 4,
INTERVAL, MESSAGE_COUNT);
} finally {
executor.shutdownNow();
}
}
- private void assertThrottlerTiming(
- final long elapsedTimeMs, final int throttle, final int
intervalMs, final int messageCount) {
- // now assert that they have actually been throttled (use +/- 50 as
- // slack)
- long minimum = calculateMinimum(intervalMs, throttle, messageCount) -
50;
- long maximum = calculateMaximum(intervalMs, throttle, messageCount) +
50;
- // add 500 in case running on slow CI boxes
- maximum += 500;
- log.info("Sent {} exchanges in {}ms, with throttle rate of {} per
{}ms. Calculated min {}ms and max {}ms", messageCount,
- elapsedTimeMs, throttle, intervalMs, minimum,
- maximum);
-
- assertTrue(elapsedTimeMs >= minimum, "Should take at least " + minimum
+ "ms, was: " + elapsedTimeMs);
- assertTrue(elapsedTimeMs <= maximum + TOLERANCE, "Should take at most
" + maximum + "ms, was: " + elapsedTimeMs);
+ @Test
+ public void testFifo() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("A", "B", "C",
"D", "E", "F", "G", "H");
+ sendBody("direct:fifo");
+ assertMockEndpointsSatisfied();
}
- private long sendMessagesAndAwaitDelivery(
+ @Test
+ public void testPermitReleaseOnException() throws Exception {
+ // verify that failed processing releases throttle permit
+ getMockEndpoint("mock:error").expectedBodiesReceived("A", "B", "C",
"D", "E", "F", "G", "H");
+ sendBody("direct:release");
+ assertMockEndpointsSatisfied();
+ }
+
+ private void sendMessagesAndAwaitDelivery(
final int messageCount, final String endpointUri, final int
threadPoolSize, final MockEndpoint receivingEndpoint)
throws InterruptedException {
ExecutorService executor =
Executors.newFixedThreadPool(threadPoolSize);
@@ -151,7 +140,6 @@ public class ThrottlerTest extends ContextTestSupport {
receivingEndpoint.expectedMessageCount(messageCount);
}
- long start = System.nanoTime();
for (int i = 0; i < messageCount; i++) {
executor.execute(new Runnable() {
public void run() {
@@ -164,7 +152,6 @@ public class ThrottlerTest extends ContextTestSupport {
if (receivingEndpoint != null) {
receivingEndpoint.assertIsSatisfied();
}
- return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
} finally {
executor.shutdownNow();
}
@@ -176,6 +163,7 @@ public class ThrottlerTest extends ContextTestSupport {
throws InterruptedException {
resultEndpoint.expectedMessageCount(messageCount);
+ max = 0;
long start = System.nanoTime();
for (int i = 0; i < messageCount; i++) {
executor.execute(new Runnable() {
@@ -189,19 +177,12 @@ public class ThrottlerTest extends ContextTestSupport {
// let's wait for the exchanges to arrive
resultEndpoint.assertIsSatisfied();
long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
start);
- assertThrottlerTiming(elapsed, throttle, intervalMs, messageCount);
- }
-
- private long calculateMinimum(final long periodMs, final long
throttleRate, final long messageCount) {
- if (messageCount % throttleRate > 0) {
- return (long) Math.floor((double) messageCount / (double)
throttleRate) * periodMs;
- } else {
- return (long) (Math.floor((double) messageCount / (double)
throttleRate) * periodMs) - periodMs;
- }
+ assertTrue(max <= throttle);
}
- private long calculateMaximum(final long periodMs, final long
throttleRate, final long messageCount) {
- return ((long) Math.ceil((double) messageCount / (double)
throttleRate)) * periodMs;
+ private void sendBody(String endpoint) {
+ Arrays.stream(new String[] { "A", "B", "C", "D", "E", "F", "G", "H" })
+ .forEach(b -> template.sendBody(endpoint, b));
}
@Override
@@ -211,21 +192,44 @@ public class ThrottlerTest extends ContextTestSupport {
onException(ThrottlerRejectedExecutionException.class).handled(true).to("mock:error");
- // START SNIPPET: ex
-
from("seda:a").throttle(3).timePeriodMillis(1000).to("log:result",
"mock:result");
- // END SNIPPET: ex
-
-
from("direct:a").throttle(5).timePeriodMillis(INTERVAL).to("log:result",
"mock:result");
-
-
from("direct:expressionConstant").throttle(constant(5)).timePeriodMillis(INTERVAL).to("log:result",
- "mock:result");
-
-
from("direct:expressionHeader").throttle(header("throttleValue")).timePeriodMillis(INTERVAL).to("log:result",
- "mock:result");
-
-
from("direct:start").throttle(2).timePeriodMillis(1000).rejectExecution(true).to("log:result",
"mock:result");
-
-
from("direct:highThrottleRate").throttle(10000).timePeriodMillis(INTERVAL).to("mock:result");
+ from("direct:a").throttle(CONCURRENT_REQUESTS)
+ .process(exchange -> {
+ curr++;
+ })
+ .delay(INTERVAL)
+ .process(exchange -> {
+ max = Math.max(max, curr--);
+ })
+ .to("log:result", "mock:result");
+
+
from("direct:expressionConstant").throttle(constant(CONCURRENT_REQUESTS))
+ .process(exchange -> {
+ curr++;
+ })
+ .delay(INTERVAL)
+ .process(exchange -> {
+ max = Math.max(max, curr--);
+ })
+ .to("log:result", "mock:result");
+
+
from("direct:expressionHeader").throttle(header("throttleValue"))
+ .process(exchange -> {
+ curr++;
+ })
+ .delay(INTERVAL)
+ .process(exchange -> {
+ max = Math.max(max, curr--);
+ })
+ .to("log:result", "mock:result");
+
+
from("direct:start").throttle(2).rejectExecution(true).delay(1000).to("log:result",
"mock:result");
+
+ from("direct:fifo").throttle(1).delay(100).to("mock:result");
+
+
from("direct:release").errorHandler(deadLetterChannel("mock:error")).throttle(1).delay(100)
+ .process(exchange -> {
+ throw new RuntimeException();
+ }).to("mock:result");
}
};
}
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
index d030cffb74e..3922f069748 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
@@ -18,9 +18,10 @@ package org.apache.camel.processor;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
@@ -33,8 +34,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Isolated
public class ThrottlingGroupingTest extends ContextTestSupport {
private static final int INTERVAL = 500;
- private static final int MESSAGE_COUNT = 9;
- private static final int TOLERANCE = 50;
+ private static final int MESSAGE_COUNT = 20;
+ private static final int CONCURRENT_REQUESTS = 2;
+ private Map<String, AtomicInteger> curr;
+ private static int max;
@Test
public void testGroupingWithSingleConstant() throws Exception {
@@ -73,64 +76,35 @@ public class ThrottlingGroupingTest extends
ContextTestSupport {
}
@Test
- public void testSendLotsOfMessagesButOnly3GetThroughWithin2Seconds()
throws Exception {
-
+ public void
testSendLotsOfMessagesSimultaneouslyButOnlyGetThroughAsConstantThrottleValue()
throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:gresult",
MockEndpoint.class);
- resultEndpoint.expectedMessageCount(3);
- resultEndpoint.setResultWaitTime(2000);
-
- Map<String, Object> headers = new HashMap<>();
- for (int i = 0; i < 9; i++) {
- if (i % 2 == 0) {
- headers.put("key", "1");
- } else {
- headers.put("key", "2");
- }
- template.sendBodyAndHeaders("seda:ga", "<message>" + i +
"</message>", headers);
- }
-
- // lets pause to give the requests time to be processed
- // to check that the throttle really does kick in
- resultEndpoint.assertIsSatisfied();
+ sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:ga",
CONCURRENT_REQUESTS, resultEndpoint);
}
- private void assertThrottlerTiming(
- final long elapsedTimeMs, final int throttle, final int
intervalMs, final int messageCount) {
- // now assert that they have actually been throttled (use +/- 50 as
- // slack)
- long minimum = calculateMinimum(intervalMs, throttle, messageCount) -
50;
- long maximum = calculateMaximum(intervalMs, throttle, messageCount) +
50;
- // add 500 in case running on slow CI boxes
- maximum += 500;
- log.info("Sent {} exchanges in {}ms, with throttle rate of {} per
{}ms. Calculated min {}ms and max {}ms", messageCount,
- elapsedTimeMs, throttle, intervalMs, minimum,
- maximum);
-
- assertTrue(elapsedTimeMs >= minimum, "Should take at least " + minimum
+ "ms, was: " + elapsedTimeMs);
- assertTrue(elapsedTimeMs <= maximum + TOLERANCE, "Should take at most
" + maximum + "ms, was: " + elapsedTimeMs);
- }
-
- private long sendMessagesAndAwaitDelivery(
- final int messageCount, final String endpointUri, final int
threadPoolSize, final MockEndpoint receivingEndpoint)
+ private void sendMessagesAndAwaitDelivery(
+ final int messageCount, final String endpointUri, final int
throttle, final MockEndpoint receivingEndpoint)
throws InterruptedException {
- ExecutorService executor =
Executors.newFixedThreadPool(threadPoolSize);
+ max = 0;
+ curr = new ConcurrentHashMap<>();
+ // two throttle groups
+ curr.putIfAbsent("1", new AtomicInteger(0));
+ curr.putIfAbsent("2", new AtomicInteger(0));
+ ExecutorService executor = Executors.newFixedThreadPool(messageCount);
try {
if (receivingEndpoint != null) {
receivingEndpoint.expectedMessageCount(messageCount);
}
- long start = System.nanoTime();
for (int i = 0; i < messageCount; i++) {
- executor.execute(new Runnable() {
- public void run() {
- Map<String, Object> headers = new HashMap<>();
- if (messageCount % 2 == 0) {
- headers.put("key", "1");
- } else {
- headers.put("key", "2");
- }
- template.sendBodyAndHeaders(endpointUri,
"<message>payload</message>", headers);
+ int finalI = i;
+ executor.execute(() -> {
+ Map<String, Object> headers = new HashMap<>();
+ if (finalI % 2 == 0) {
+ headers.put("key", "1");
+ } else {
+ headers.put("key", "2");
}
+ template.sendBodyAndHeaders(endpointUri,
"<message>payload</message>", headers);
});
}
@@ -138,17 +112,10 @@ public class ThrottlingGroupingTest extends
ContextTestSupport {
if (receivingEndpoint != null) {
receivingEndpoint.assertIsSatisfied();
}
- return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
} finally {
executor.shutdownNow();
}
- }
-
- @Test
- public void
testSendLotsOfMessagesSimultaneouslyButOnlyGetThroughAsConstantThrottleValue()
throws Exception {
- MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:gresult",
MockEndpoint.class);
- long elapsed = sendMessagesAndAwaitDelivery(MESSAGE_COUNT,
"direct:ga", MESSAGE_COUNT, resultEndpoint);
- assertThrottlerTiming(elapsed, 5, INTERVAL, MESSAGE_COUNT);
+ assertTrue(max <= throttle);
}
@Test
@@ -158,50 +125,39 @@ public class ThrottlingGroupingTest extends
ContextTestSupport {
ExecutorService executor = Executors.newFixedThreadPool(MESSAGE_COUNT);
try {
- sendMessagesWithHeaderExpression(executor, resultEndpoint, 5,
INTERVAL, MESSAGE_COUNT);
+ sendMessagesWithHeaderExpression(executor, resultEndpoint,
CONCURRENT_REQUESTS, MESSAGE_COUNT);
} finally {
executor.shutdownNow();
}
}
- private long calculateMinimum(final long periodMs, final long
throttleRate, final long messageCount) {
- if (messageCount % throttleRate > 0) {
- return (long) Math.floor((double) messageCount / (double)
throttleRate) * periodMs;
- } else {
- return (long) (Math.floor((double) messageCount / (double)
throttleRate) * periodMs) - periodMs;
- }
- }
-
- private long calculateMaximum(final long periodMs, final long
throttleRate, final long messageCount) {
- return ((long) Math.ceil((double) messageCount / (double)
throttleRate)) * periodMs;
- }
-
private void sendMessagesWithHeaderExpression(
- final ExecutorService executor, final MockEndpoint resultEndpoint,
final int throttle, final int intervalMs,
- final int messageCount)
+ final ExecutorService executor, final MockEndpoint resultEndpoint,
final int throttle, final int messageCount)
throws InterruptedException {
resultEndpoint.expectedMessageCount(messageCount);
- long start = System.nanoTime();
+ max = 0;
+ curr = new ConcurrentHashMap<>();
+ // two throttle groups
+ curr.putIfAbsent("1", new AtomicInteger(0));
+ curr.putIfAbsent("2", new AtomicInteger(0));
for (int i = 0; i < messageCount; i++) {
- executor.execute(new Runnable() {
- public void run() {
- Map<String, Object> headers = new HashMap<>();
- headers.put("throttleValue", throttle);
- if (messageCount % 2 == 0) {
- headers.put("key", "1");
- } else {
- headers.put("key", "2");
- }
- template.sendBodyAndHeaders("direct:gexpressionHeader",
"<message>payload</message>", headers);
+ int finalI = i;
+ executor.execute(() -> {
+ Map<String, Object> headers = new HashMap<>();
+ headers.put("throttleValue", throttle);
+ if (finalI % 2 == 0) {
+ headers.put("key", "1");
+ } else {
+ headers.put("key", "2");
}
+ template.sendBodyAndHeaders("direct:gexpressionHeader",
"<message>payload</message>", headers);
});
}
// let's wait for the exchanges to arrive
resultEndpoint.assertIsSatisfied();
- long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
start);
- assertThrottlerTiming(elapsed, throttle, intervalMs, messageCount);
+ assertTrue(max <= throttle);
}
@Override
@@ -215,12 +171,24 @@ public class ThrottlingGroupingTest extends
ContextTestSupport {
from("seda:b").throttle(header("max"), 2).to("mock:result2");
from("seda:c").throttle(header("max")).correlationExpression(header("key")).to("mock:resultdynamic");
- from("seda:ga").throttle(constant(3),
header("key")).timePeriodMillis(1000).to("log:gresult", "mock:gresult");
-
- from("direct:ga").throttle(constant(5),
header("key")).timePeriodMillis(INTERVAL).to("log:gresult",
- "mock:gresult");
+ from("direct:ga").throttle(constant(CONCURRENT_REQUESTS),
header("key"))
+ .process(exchange -> {
+
curr.get(exchange.getMessage().getHeader("key")).getAndIncrement();
+ })
+ .delay(INTERVAL)
+ .process(exchange -> {
+ max = Math.max(max,
curr.get(exchange.getMessage().getHeader("key")).getAndDecrement());
+ })
+ .to("log:gresult", "mock:gresult");
-
from("direct:gexpressionHeader").throttle(header("throttleValue"),
header("key")).timePeriodMillis(INTERVAL)
+
from("direct:gexpressionHeader").throttle(header("throttleValue"),
header("key"))
+ .process(exchange -> {
+
curr.get(exchange.getMessage().getHeader("key")).getAndIncrement();
+ })
+ .delay(INTERVAL)
+ .process(exchange -> {
+ max = Math.max(max,
curr.get(exchange.getMessage().getHeader("key")).getAndDecrement());
+ })
.to("log:gresult", "mock:gresult");
}
};
diff --git
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
index 9acf6cc149e..95d5452693f 100644
---
a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
+++
b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
@@ -20,17 +20,11 @@ import org.apache.camel.api.management.ManagedAttribute;
public interface ManagedThrottlerMBean extends ManagedProcessorMBean {
- @ManagedAttribute(description = "Maximum requests per period")
- long getMaximumRequestsPerPeriod();
+ @ManagedAttribute(description = "Maximum concurrent requests")
+ long getMaximumConcurrentRequests();
- @ManagedAttribute(description = "Maximum requests per period")
- void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod);
-
- @ManagedAttribute(description = "Time period in millis")
- long getTimePeriodMillis();
-
- @ManagedAttribute(description = "Time period in millis")
- void setTimePeriodMillis(long timePeriodMillis);
+ @ManagedAttribute(description = "Maximum concurrent requests")
+ void setMaximumConcurrentRequests(long maximumConcurrentRequests);
@ManagedAttribute(description = "Enables asynchronous delay which means
the thread will not block while delaying")
Boolean isAsyncDelayed();
diff --git
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
index 977d33cfdac..2abb6a14f16 100644
---
a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
+++
b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedThrottler.java
@@ -38,23 +38,13 @@ public class ManagedThrottler extends ManagedProcessor
implements ManagedThrottl
}
@Override
- public long getMaximumRequestsPerPeriod() {
- return throttler.getCurrentMaximumRequestsPerPeriod();
+ public long getMaximumConcurrentRequests() {
+ return throttler.getCurrentMaximumConcurrentRequests();
}
@Override
- public void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod) {
-
throttler.setMaximumRequestsPerPeriodExpression(constant(maximumRequestsPerPeriod));
- }
-
- @Override
- public long getTimePeriodMillis() {
- return throttler.getTimePeriodMillis();
- }
-
- @Override
- public void setTimePeriodMillis(long timePeriodMillis) {
- throttler.setTimePeriodMillis(timePeriodMillis);
+ public void setMaximumConcurrentRequests(long maximumConcurrentRequests) {
+
throttler.setMaximumConcurrentRequestsExpression(constant(maximumConcurrentRequests));
}
@Override
diff --git
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
index 7e754f0b915..d1be4aeaeb0 100644
---
a/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
+++
b/core/camel-management/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
@@ -81,15 +81,13 @@ public class ManagedThrottlerTest extends
ManagementTestSupport {
Long completed = (Long) mbeanServer.getAttribute(routeName,
"ExchangesCompleted");
assertEquals(10, completed.longValue());
- Long timePeriod = (Long) mbeanServer.getAttribute(throttlerName,
"TimePeriodMillis");
- assertEquals(250, timePeriod.longValue());
-
Long total = (Long) mbeanServer.getAttribute(routeName,
"TotalProcessingTime");
- assertTrue(total < 1000, "Should take at most 1.0 sec: was " + total);
+ // 10 * delay (100) + tolerance (200)
+ assertTrue(total < 1200, "Should take at most 1.2 sec: was " + total);
// change the throttler using JMX
- mbeanServer.setAttribute(throttlerName, new
Attribute("MaximumRequestsPerPeriod", (long) 2));
+ mbeanServer.setAttribute(throttlerName, new
Attribute("MaximumConcurrentRequests", (long) 2));
// reset the counters
mbeanServer.invoke(routeName, "reset", null, null);
@@ -99,15 +97,16 @@ public class ManagedThrottlerTest extends
ManagementTestSupport {
template.sendBody("direct:start", "Message " + i);
}
- Long period = (Long) mbeanServer.getAttribute(throttlerName,
"MaximumRequestsPerPeriod");
- assertNotNull(period);
- assertEquals(2, period.longValue());
+ Long requests = (Long) mbeanServer.getAttribute(throttlerName,
"MaximumConcurrentRequests");
+ assertNotNull(requests);
+ assertEquals(2, requests.longValue());
completed = (Long) mbeanServer.getAttribute(routeName,
"ExchangesCompleted");
assertEquals(10, completed.longValue());
total = (Long) mbeanServer.getAttribute(routeName,
"TotalProcessingTime");
- assertTrue(total > 1000, "Should be around 1 sec now: was " + total);
+ // 10 * delay (100) + tolerance (200)
+ assertTrue(total < 1200, "Should take at most 1.2 sec: was " + total);
}
@DisabledOnOs(OS.WINDOWS)
@@ -269,19 +268,20 @@ public class ManagedThrottlerTest extends
ManagementTestSupport {
public void configure() throws Exception {
from("direct:start").id("route1")
.to("log:foo")
- .throttle(10).timePeriodMillis(250).id("mythrottler")
+ .throttle(10).id("mythrottler")
+ .delay(100)
.to("mock:result");
from("seda:throttleCount").id("route2")
- .throttle(1).timePeriodMillis(250).id("mythrottler2")
+ .throttle(1).id("mythrottler2").delay(250)
.to("mock:end");
from("seda:throttleCountAsync").id("route3")
-
.throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler3")
+
.throttle(1).asyncDelayed().id("mythrottler3").delay(250)
.to("mock:endAsync");
from("seda:throttleCountAsyncException").id("route4")
-
.throttle(1).asyncDelayed().timePeriodMillis(250).id("mythrottler4")
+
.throttle(1).asyncDelayed().id("mythrottler4").delay(250)
.to("mock:endAsyncException")
.process(exchange -> {
throw new RuntimeException("Fail me");
@@ -289,21 +289,21 @@ public class ManagedThrottlerTest extends
ManagementTestSupport {
from("seda:throttleCountRejectExecutionCallerRuns").id("route5")
.onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end()
.throttle(1)
- .timePeriodMillis(250)
.asyncDelayed()
.executorService(badService)
.callerRunsWhenRejected(true)
.id("mythrottler5")
+ .delay(250)
.to("mock:endAsyncRejectCallerRuns");
from("seda:throttleCountRejectExecution").id("route6")
.onException(RejectedExecutionException.class).to("mock:rejectedExceptionEndpoint1").end()
.throttle(1)
- .timePeriodMillis(250)
.asyncDelayed()
.executorService(badService)
.callerRunsWhenRejected(false)
.id("mythrottler6")
+ .delay(250)
.to("mock:endAsyncReject");
}
};
diff --git
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
index c4557ab90b5..d5ea55476bd 100644
---
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
+++
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/in/ModelParser.java
@@ -1457,7 +1457,6 @@ public class ModelParser extends BaseParser {
case "callerRunsWhenRejected":
def.setCallerRunsWhenRejected(val); break;
case "executorService": def.setExecutorService(val); break;
case "rejectExecution": def.setRejectExecution(val); break;
- case "timePeriodMillis": def.setTimePeriodMillis(val); break;
default: return
processorDefinitionAttributeHandler().accept(def, key, val);
}
return true;
diff --git
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java
index ab3770e1d5e..0b1ae97db63 100644
---
a/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java
+++
b/core/camel-xml-io/src/generated/java/org/apache/camel/xml/out/ModelWriter.java
@@ -2335,7 +2335,6 @@ public class ModelWriter extends BaseWriter {
throws IOException {
startElement(name);
doWriteProcessorDefinitionAttributes(def);
- doWriteAttribute("timePeriodMillis", def.getTimePeriodMillis());
doWriteAttribute("rejectExecution", def.getRejectExecution());
doWriteAttribute("callerRunsWhenRejected",
def.getCallerRunsWhenRejected());
doWriteAttribute("executorService", def.getExecutorService());
diff --git
a/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java
b/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java
index 06bd9313014..f9de1b056d8 100644
---
a/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java
+++
b/core/camel-yaml-io/src/generated/java/org/apache/camel/yaml/out/ModelWriter.java
@@ -2335,7 +2335,6 @@ public class ModelWriter extends BaseWriter {
throws IOException {
startElement(name);
doWriteProcessorDefinitionAttributes(def);
- doWriteAttribute("timePeriodMillis", def.getTimePeriodMillis());
doWriteAttribute("rejectExecution", def.getRejectExecution());
doWriteAttribute("callerRunsWhenRejected",
def.getCallerRunsWhenRejected());
doWriteAttribute("executorService", def.getExecutorService());
diff --git
a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
index 931c33a16af..f97b3b25b7c 100644
--- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
+++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_3.adoc
@@ -18,4 +18,12 @@ When using the `SyncCommitManager` then the offset will be
committed so that the
the behavior described in the documentation.
When using the `AsyncCommitManager` then the offset will be committed so that
the payload is continually retried. This was
-the behavior described in the documentation.
\ No newline at end of file
+the behavior described in the documentation.
+
+=== throttle
+
+Throttle now uses the number of concurrent requests as the throttling measure
instead of the number of requests
+per period.
+
+The `throttle` parameter now specifies the maximum number of concurrent
requests,
+and there is no longer support for the `timePeriodMillis` option.
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
index e6bc1336d0d..8fec7a9903c 100644
---
a/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl-deserializers/src/generated/java/org/apache/camel/dsl/yaml/deserializers/ModelDeserializers.java
@@ -17238,8 +17238,7 @@ public final class ModelDeserializers extends
YamlDeserializerSupport {
@YamlProperty(name = "expression", type =
"object:org.apache.camel.model.language.ExpressionDefinition", description =
"Expression to configure the maximum number of messages to throttle per
request", displayName = "Expression", oneOf = "expression"),
@YamlProperty(name = "id", type = "string", description =
"Sets the id of this node", displayName = "Id"),
@YamlProperty(name = "inherit-error-handler", type =
"boolean"),
- @YamlProperty(name = "reject-execution", type = "boolean",
description = "Whether or not throttler throws the
ThrottlerRejectedExecutionException when the exchange exceeds the request limit
Is by default false", displayName = "Reject Execution"),
- @YamlProperty(name = "time-period-millis", type =
"string", defaultValue = "1000", description = "Sets the time period during
which the maximum request count is valid for", displayName = "Time Period
Millis")
+ @YamlProperty(name = "reject-execution", type = "boolean",
description = "Whether or not throttler throws the
ThrottlerRejectedExecutionException when the exchange exceeds the request limit
Is by default false", displayName = "Reject Execution")
}
)
public static class ThrottleDefinitionDeserializer extends
YamlDeserializerBase<ThrottleDefinition> {
@@ -17296,11 +17295,6 @@ public final class ModelDeserializers extends
YamlDeserializerSupport {
target.setRejectExecution(val);
break;
}
- case "time-period-millis": {
- String val = asText(node);
- target.setTimePeriodMillis(val);
- break;
- }
case "id": {
String val = asText(node);
target.setId(val);
diff --git
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
index 82cfe6f5ee7..da067a148aa 100644
---
a/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
+++
b/dsl/camel-yaml-dsl/camel-yaml-dsl/src/generated/resources/schema/camelYamlDsl.json
@@ -6538,12 +6538,6 @@
"title" : "Reject Execution",
"description" : "Whether or not throttler throws the
ThrottlerRejectedExecutionException when the exchange exceeds the request limit
Is by default false"
},
- "timePeriodMillis" : {
- "type" : "string",
- "title" : "Time Period Millis",
- "description" : "Sets the time period during which the maximum
request count is valid for",
- "default" : "1000"
- },
"constant" : { },
"csimple" : { },
"datasonnet" : { },
diff --git a/etc/eclipse/camel_xml_templates.xml
b/etc/eclipse/camel_xml_templates.xml
index 6af5acce144..e283c357d9e 100644
--- a/etc/eclipse/camel_xml_templates.xml
+++ b/etc/eclipse/camel_xml_templates.xml
@@ -104,7 +104,7 @@
</route>
</template><template autoinsert="true" context="xml_all" deleted="false"
description="Creates a Throttler" enabled="true"
name="camel_throttler"><route>
<from uri="from_uri" />
- <throttle maximumRequestsPerPeriod="number_of_messages"
timePeriodMillis="milliseconds">
+ <throttle maximumConcurrentRequests="number_of_messages">
<to uri="to_uri" />
</throttle>
</route>