[
https://issues.apache.org/jira/browse/CAMEL-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16532377#comment-16532377
]
ASF GitHub Bot commented on CAMEL-6840:
---------------------------------------
onderson closed pull request #2376: CAMEL-6840 - add grouped throttling
feature, both XML and Java DSL should be fine now.
URL: https://github.com/apache/camel/pull/2376
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/camel-core/src/main/docs/eips/throttle-eip.adoc
b/camel-core/src/main/docs/eips/throttle-eip.adoc
index 71da9599589..aa0582b297e 100644
--- a/camel-core/src/main/docs/eips/throttle-eip.adoc
+++ b/camel-core/src/main/docs/eips/throttle-eip.adoc
@@ -6,11 +6,12 @@ The Throttler Pattern allows you to ensure that a specific
endpoint does not get
=== Options
// eip options: START
-The Throttle EIP supports 5 options which are listed below:
+The Throttle EIP supports 6 options which are listed below:
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
+| *correlationExpression* | The expression used to calculate the correlation
key to use for throttle grouping. The Exchange which has the same correlation
key is throttled together. | | NamespaceAware Expression
| *executorServiceRef* | To use a custom thread pool
(ScheduledExecutorService) by the throttler. | | String
| *timePeriodMillis* | Sets the time period during which the maximum request
count is valid for | 1000 | Long
| *asyncDelayed* | Enables asynchronous delay which means the thread will not
block while delaying. | false | Boolean
diff --git
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
index 673c13e88fe..6e993b0633f 100644
---
a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
+++
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedThrottlerMBean.java
@@ -20,10 +20,10 @@
public interface ManagedThrottlerMBean extends ManagedProcessorMBean {
- @ManagedAttribute(description = "Maximum requires per period")
+ @ManagedAttribute(description = "Maximum requests per period")
long getMaximumRequestsPerPeriod();
- @ManagedAttribute(description = "Maximum requires per period")
+ @ManagedAttribute(description = "Maximum requests per period")
void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod);
@ManagedAttribute(description = "Time period in millis")
diff --git
a/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
b/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
index d2ac38ec8d7..5a1d7b1d125 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ExpressionNode.java
@@ -21,6 +21,7 @@
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElementRef;
+import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.Expression;
import org.apache.camel.Predicate;
@@ -36,6 +37,7 @@
* @version
*/
@XmlAccessorType(XmlAccessType.FIELD)
+@XmlTransient
public abstract class ExpressionNode extends
ProcessorDefinition<ExpressionNode> {
@XmlElementRef
private ExpressionDefinition expression;
diff --git
a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index 005270e313b..9108d78371f 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -2284,6 +2284,48 @@ public ThrottleDefinition throttle(Expression
maximumRequestCount) {
addOutput(answer);
return answer;
}
+
+ /**
+ * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
+ * Creates a throttler allowing you to ensure that a specific endpoint
does not get overloaded,
+ * or that we don't exceed an agreed SLA with some external service.
+ * Here another parameter correlationExpressionKey is introduced for the
functionality which
+ * will throttle based on the key expression to group exchanges. This will
make key-based throttling
+ * instead of overall throttling.
+ * <p/>
+ * Will default use a time period of 1 second, so setting the
maximumRequestCount to eg 10
+ * will default ensure at most 10 messages per second.
+ *
+ * @param maximumRequestCount an expression to calculate the maximum
request count
+ * @param correlationExpressionKey is a correlation key that can throttle
by the given key instead of overall throttling
+ * @return the builder
+ */
+ public ThrottleDefinition throttle(Expression maximumRequestCount, long
correlationExpressionKey) {
+ ThrottleDefinition answer = new
ThrottleDefinition(maximumRequestCount,
ExpressionBuilder.constantExpression(correlationExpressionKey));
+ addOutput(answer);
+ return answer;
+ }
+
+ /**
+ * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
+ * Creates a throttler allowing you to ensure that a specific endpoint
does not get overloaded,
+ * or that we don't exceed an agreed SLA with some external service.
+ * Here another parameter correlationExpressionKey is introduced for the
functionality which
+ * will throttle based on the key expression to group exchanges. This will
make key-based throttling
+ * instead of overall throttling.
+ * <p/>
+ * Will default use a time period of 1 second, so setting the
maximumRequestCount to eg 10
+ * will default ensure at most 10 messages per second.
+ *
+ * @param maximumRequestCount an expression to calculate the maximum
request count
+ * @param correlationExpressionKey is a correlation key as an expression
that can throttle by the given key instead of overall throttling
+ * @return the builder
+ */
+ public ThrottleDefinition throttle(Expression maximumRequestCount,
Expression correlationExpressionKey) {
+ ThrottleDefinition answer = new
ThrottleDefinition(maximumRequestCount, correlationExpressionKey);
+ addOutput(answer);
+ return answer;
+ }
/**
* <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
diff --git
a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
index 613d2b351c5..eeb1645f615 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
@@ -21,8 +21,10 @@
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
+import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
+import javax.xml.bind.annotation.XmlType;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
@@ -40,9 +42,12 @@
@Metadata(label = "eip,routing")
@XmlRootElement(name = "throttle")
@XmlAccessorType(XmlAccessType.FIELD)
+@XmlType(propOrder = {"expression", "correlationExpression", "outputs"})
public class ThrottleDefinition extends ExpressionNode implements
ExecutorServiceAwareDefinition<ThrottleDefinition> {
// TODO: Camel 3.0 Should not support outputs
+ @XmlElement(name = "correlationExpression")
+ private ExpressionSubElementDefinition correlationExpression;
@XmlTransient
private ExecutorService executorService;
@XmlAttribute
@@ -55,7 +60,7 @@
private Boolean callerRunsWhenRejected;
@XmlAttribute
private Boolean rejectExecution;
-
+
public ThrottleDefinition() {
}
@@ -63,6 +68,18 @@ public ThrottleDefinition(Expression
maximumRequestsPerPeriod) {
super(maximumRequestsPerPeriod);
}
+ public ThrottleDefinition(Expression maximumRequestsPerPeriod, Expression
correlationExpression) {
+
this(ExpressionNodeHelper.toExpressionDefinition(maximumRequestsPerPeriod),
correlationExpression);
+ }
+
+ private ThrottleDefinition(ExpressionDefinition maximumRequestsPerPeriod,
Expression correlationExpression) {
+ super(maximumRequestsPerPeriod);
+
+ ExpressionSubElementDefinition cor = new
ExpressionSubElementDefinition();
+
cor.setExpressionType(ExpressionNodeHelper.toExpressionDefinition(correlationExpression));
+ setCorrelationExpression(cor);
+ }
+
@Override
public String toString() {
return "Throttle[" + description() + " -> " + getOutputs() + "]";
@@ -93,9 +110,14 @@ public Processor createProcessor(RouteContext routeContext)
throws Exception {
if (maxRequestsExpression == null) {
throw new IllegalArgumentException("MaxRequestsPerPeriod
expression must be provided on " + this);
}
+
+ Expression correlation = null;
+ if (correlationExpression != null) {
+ correlation = correlationExpression.createExpression(routeContext);
+ }
boolean reject = getRejectExecution() != null && getRejectExecution();
- Throttler answer = new Throttler(routeContext.getCamelContext(),
childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool,
reject);
+ Throttler answer = new Throttler(routeContext.getCamelContext(),
childProcessor, maxRequestsExpression, period, threadPool, shutdownThreadPool,
reject, correlation);
answer.setAsyncDelayed(async);
if (getCallerRunsWhenRejected() == null) {
@@ -104,6 +126,7 @@ public Processor createProcessor(RouteContext routeContext)
throws Exception {
} else {
answer.setCallerRunsWhenRejected(getCallerRunsWhenRejected());
}
+
return answer;
}
@@ -256,4 +279,16 @@ public Boolean getRejectExecution() {
public void setRejectExecution(Boolean rejectExecution) {
this.rejectExecution = rejectExecution;
}
+
+ /**
+ * The expression used to calculate the correlation key to use for
throttle grouping.
+ * The Exchange which has the same correlation key is throttled together.
+ */
+ public void setCorrelationExpression(ExpressionSubElementDefinition
correlationExpression) {
+ this.correlationExpression = correlationExpression;
+ }
+
+ public ExpressionSubElementDefinition getCorrelationExpression() {
+ return correlationExpression;
+ }
}
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 543ec9a9cb0..a1b10c859b8 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -16,8 +16,13 @@
*/
package org.apache.camel.processor;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@@ -31,7 +36,11 @@
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.LRUCache;
+import org.apache.camel.util.LRUCacheFactory;
import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,20 +75,24 @@
private final Logger log = LoggerFactory.getLogger(Throttler.class);
private final CamelContext camelContext;
- private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>();
private final ExecutorService asyncExecutor;
private final boolean shutdownAsyncExecutor;
private volatile long timePeriodMillis;
- private volatile int throttleRate;
private String id;
+ private volatile Integer throttleRate = new Integer(0);
private Expression maxRequestsPerPeriodExpression;
private boolean rejectExecution;
private boolean asyncDelayed;
private boolean callerRunsWhenRejected = true;
+ private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>();
+ // below 3 fields added for (throttling grouping)
+ private Expression correlationExpression;
+ private Map<Integer, DelayQueue<ThrottlePermit>> delayQueueCache;
+ private Map<Integer, Integer> throttleRatesMap = new HashMap<>();
public Throttler(final CamelContext camelContext, final Processor
processor, final Expression maxRequestsPerPeriodExpression, final long
timePeriodMillis,
- final ExecutorService asyncExecutor, final boolean
shutdownAsyncExecutor, final boolean rejectExecution) {
+ final ExecutorService asyncExecutor, final boolean
shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) {
super(processor);
this.camelContext = camelContext;
this.rejectExecution = rejectExecution;
@@ -93,6 +106,7 @@ public Throttler(final CamelContext camelContext, final
Processor processor, fin
}
this.timePeriodMillis = timePeriodMillis;
this.asyncExecutor = asyncExecutor;
+ this.correlationExpression = correlation;
}
@Override
@@ -111,13 +125,22 @@ public boolean process(final Exchange exchange, final
AsyncCallback callback) {
throw new RejectedExecutionException("Run is not allowed");
}
- calculateAndSetMaxRequestsPerPeriod(exchange);
- ThrottlePermit permit = delayQueue.poll();
+ DelayQueue<ThrottlePermit> delayQ = null;
+ Integer key = null;
+ if (correlationExpression != null) {
+ key = correlationExpression.evaluate(exchange, Integer.class);
+ delayQ = locateDelayQueue(key, doneSync);
+ } else {
+ delayQ = delayQueue;
+ }
+
+ calculateAndSetMaxRequestsPerPeriod(delayQ, exchange, key);
+ ThrottlePermit permit = delayQ.poll();
if (permit == null) {
if (isRejectExecution()) {
throw new ThrottlerRejectedExecutionException("Exceeded
the max throttle rate of "
- + throttleRate + " within " + timePeriodMillis +
"ms");
+ + ((correlationExpression != null) ?
throttleRatesMap.get(key) : throttleRate) + " within " + timePeriodMillis +
"ms");
} else {
// delegate to async pool
if (isAsyncDelayed() && !exchange.isTransacted() && state
== State.SYNC) {
@@ -131,11 +154,11 @@ public boolean process(final Exchange exchange, final
AsyncCallback callback) {
if (log.isTraceEnabled()) {
start = System.currentTimeMillis();
}
- permit = delayQueue.take();
+ permit = delayQ.take();
if (log.isTraceEnabled()) {
elapsed = System.currentTimeMillis() - start;
}
- enqueuePermit(permit, exchange);
+ enqueuePermit(permit, exchange, delayQ);
if (state == State.ASYNC) {
if (log.isTraceEnabled()) {
@@ -147,7 +170,7 @@ public boolean process(final Exchange exchange, final
AsyncCallback callback) {
}
}
} else {
- enqueuePermit(permit, exchange);
+ enqueuePermit(permit, exchange, delayQ);
if (state == State.ASYNC) {
if (log.isTraceEnabled()) {
@@ -192,6 +215,41 @@ public boolean process(final Exchange exchange, final
AsyncCallback callback) {
}
}
+ /**
+ *
+ * Finds the right Delay Queue to put a permit into with the exchanges
time arrival timestamp + timePeriodInMillis
+ * In case of asynchronous routing there may be cases where we create new
group whose correlationExpression
+ * might first hit after long series of exchanges with a different
correlationExpression and are to be on hold in
+ * their delayQueue so we need to find delay queue to add new ones while
we create a new empty delay
+ * queue for the new group hit for the first time. that's why locating
delay queues for those frequently
+ * hitting exchanges for the group during asynchronous routing would be
better be asynchronous with asyncExecutor
+ *
+ * @param key is evaluated value of correlationExpression
+ * @param doneSync is a flag indicating if the exchange is routed
asynchronously or not
+ * @return DelayQueue in which the exchange with permit expiry to be put
into
+ */
+ private DelayQueue<ThrottlePermit> locateDelayQueue(final Integer key,
final boolean doneSync) throws InterruptedException, ExecutionException {
+ CompletableFuture<DelayQueue<ThrottlePermit>> futureDelayQueue = new
CompletableFuture<>();
+
+ if (!doneSync) {
+ asyncExecutor.submit(() -> {
+ futureDelayQueue.complete(findDelayQueue(key));
+ });
+ }
+ DelayQueue<ThrottlePermit> currentQueue = (!doneSync) ?
futureDelayQueue.get() : findDelayQueue(key);
+ return currentQueue;
+ }
+
+ private DelayQueue<ThrottlePermit> findDelayQueue(Integer key) {
+ DelayQueue<ThrottlePermit> currentDelayQueue =
delayQueueCache.get(key);
+ if (currentDelayQueue == null) {
+ currentDelayQueue = new DelayQueue<>();
+ throttleRatesMap.put(key, 0);
+ delayQueueCache.put(key, currentDelayQueue);
+ }
+ return currentDelayQueue;
+ }
+
/**
* Delegate blocking on the DelayQueue to an asyncExecutor. Except if the
executor rejects the submission
* and isCallerRunsWhenRejected() is enabled, then this method will
delegate back to process(), but not
@@ -222,8 +280,10 @@ public void run() {
/**
* Returns a permit to the DelayQueue, first resetting it's delay to be
relative to now.
+ * @throws ExecutionException
+ * @throws InterruptedException
*/
- protected void enqueuePermit(final ThrottlePermit permit, final Exchange
exchange) {
+ protected void enqueuePermit(final ThrottlePermit permit, final Exchange
exchange, DelayQueue<ThrottlePermit> delayQueue) throws InterruptedException,
ExecutionException {
permit.setDelayMs(getTimePeriodMillis());
delayQueue.put(permit);
// try and incur the least amount of overhead while releasing permits
back to the queue
@@ -235,23 +295,31 @@ protected void enqueuePermit(final ThrottlePermit permit,
final Exchange exchang
/**
* Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle
rate up or down.
*/
- protected void calculateAndSetMaxRequestsPerPeriod(final Exchange
exchange) throws Exception {
+ protected void
calculateAndSetMaxRequestsPerPeriod(DelayQueue<ThrottlePermit> delayQueue,
final Exchange exchange, final Integer key) throws Exception {
Integer newThrottle =
maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class);
if (newThrottle != null && newThrottle < 0) {
throw new IllegalStateException("The maximumRequestsPerPeriod must
be a positive number, was: " + newThrottle);
}
- synchronized (this) {
- if (newThrottle == null && throttleRate == 0) {
+ Object lockOnSync = this;
+ Integer currentThrottleRate = throttleRate;
+ if (correlationExpression != null) {
+ currentThrottleRate = throttleRatesMap.get(key);
+ lockOnSync = key;
+ }
+
+ synchronized (lockOnSync) {
+ if (newThrottle == null && currentThrottleRate == 0) {
throw new RuntimeExchangeException("The
maxRequestsPerPeriodExpression was evaluated as null: " +
maxRequestsPerPeriodExpression, exchange);
}
if (newThrottle != null) {
- if (newThrottle != throttleRate) {
+ if (newThrottle != currentThrottleRate) {
+ // get the queue from the cache
// decrease
- if (throttleRate > newThrottle) {
- int delta = throttleRate - newThrottle;
+ if (currentThrottleRate > newThrottle) {
+ int delta = currentThrottleRate - newThrottle;
// discard any permits that are needed to decrease
throttling
while (delta > 0) {
@@ -259,39 +327,75 @@ protected void calculateAndSetMaxRequestsPerPeriod(final
Exchange exchange) thro
delta--;
log.trace("Permit discarded due to throttling rate
decrease, triggered by ExchangeId: {}", exchange.getExchangeId());
}
- log.debug("Throttle rate decreased from {} to {},
triggered by ExchangeId: {}", throttleRate, newThrottle,
exchange.getExchangeId());
+ log.debug("Throttle rate decreased from {} to {},
triggered by ExchangeId: {}", currentThrottleRate, newThrottle,
exchange.getExchangeId());
// increase
- } else if (newThrottle > throttleRate) {
- int delta = newThrottle - throttleRate;
+ } else if (newThrottle > currentThrottleRate) {
+ int delta = newThrottle - currentThrottleRate;
for (int i = 0; i < delta; i++) {
delayQueue.put(new ThrottlePermit(-1));
}
- if (throttleRate == 0) {
+ if (currentThrottleRate == 0) {
log.debug("Initial throttle rate set to {},
triggered by ExchangeId: {}", newThrottle, exchange.getExchangeId());
} else {
- log.debug("Throttle rate increase from {} to {},
triggered by ExchangeId: {}", throttleRate, newThrottle,
exchange.getExchangeId());
+ log.debug("Throttle rate increase from {} to {},
triggered by ExchangeId: {}", currentThrottleRate, newThrottle,
exchange.getExchangeId());
}
}
- throttleRate = newThrottle;
+ if (correlationExpression != null) {
+ throttleRatesMap.put(key, newThrottle);
+ } else {
+ throttleRate = newThrottle;
+ }
}
}
}
}
+ @SuppressWarnings("unchecked")
@Override
protected void doStart() throws Exception {
if (isAsyncDelayed()) {
ObjectHelper.notNull(asyncExecutor, "executorService", this);
}
+ if (correlationExpression != null) {
+ if (camelContext != null) {
+ int maxSize =
CamelContextHelper.getMaximumSimpleCacheSize(camelContext);
+ if (maxSize > 0) {
+ delayQueueCache = LRUCacheFactory.newLRUCache(16, maxSize,
false);
+ log.debug("DelayQueues cache size: {}", maxSize);
+ } else {
+ delayQueueCache = LRUCacheFactory.newLRUCache(100);
+ log.debug("Defaulting DelayQueues cache size: {}", 100);
+ }
+ }
+ if (delayQueueCache != null) {
+ ServiceHelper.startService(delayQueueCache);
+ }
+ }
super.doStart();
}
+ @SuppressWarnings("rawtypes")
@Override
protected void doShutdown() throws Exception {
if (shutdownAsyncExecutor && asyncExecutor != null) {
camelContext.getExecutorServiceManager().shutdownNow(asyncExecutor);
}
+ if (correlationExpression != null) {
+ if (delayQueueCache != null) {
+ ServiceHelper.stopService(delayQueueCache);
+ if (log.isDebugEnabled()) {
+ if (delayQueueCache instanceof LRUCache) {
+ log.debug("Clearing deleay queues cache[size={},
hits={}, misses={}, evicted={}]",
+ delayQueueCache.size(), ((LRUCache)
delayQueueCache).getHits(), ((LRUCache) delayQueueCache).getMisses(),
((LRUCache) delayQueueCache).getEvicted());
+ }
+ }
+ delayQueueCache.clear();
+ }
+ if (throttleRatesMap != null && throttleRatesMap.size() > 0) {
+ throttleRatesMap.clear();
+ }
+ }
super.doShutdown();
}
@@ -365,9 +469,14 @@ public Expression getMaximumRequestsPerPeriodExpression() {
/**
* Gets the current maximum request per period value.
+ * If it is grouped throttling applied with correlationExpression
+ * than the max per period within the group will return
*/
public int getCurrentMaximumRequestsPerPeriod() {
- return throttleRate;
+ if (correlationExpression == null) {
+ return throttleRate;
+ }
+ return Collections.max(throttleRatesMap.entrySet(), (entry1, entry2)
-> entry1.getValue() - entry2.getValue()).getValue();
}
/**
diff --git
a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
new file mode 100644
index 00000000000..09f1160148e
--- /dev/null
+++
b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version
+ */
+public class ThrottlingGroupingTest extends ContextTestSupport {
+
+ public void testGroupingWithSingleConstant() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World",
"Bye World");
+ getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom");
+
+ template.sendBodyAndHeader("seda:a", "Kaboom", "max", null);
+ template.sendBodyAndHeader("seda:a", "Hello World", "max", 2);
+ template.sendBodyAndHeader("seda:a", "Bye World", "max", 2);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testGroupingWithDynamicHeaderExpression() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:result2").expectedBodiesReceived("Bye World");
+ getMockEndpoint("mock:dead").expectedBodiesReceived("Kaboom",
"Saloon");
+ getMockEndpoint("mock:resultdynamic").expectedBodiesReceived("Hello
Dynamic World", "Bye Dynamic World");
+
+ Map<String, Object> headers = new HashMap<String, Object>();
+
+ template.sendBodyAndHeaders("seda:a", "Kaboom", headers);
+ template.sendBodyAndHeaders("seda:a", "Saloon", headers);
+
+ headers.put("max", "2");
+ template.sendBodyAndHeaders("seda:a", "Hello World", headers);
+ template.sendBodyAndHeaders("seda:b", "Bye World", headers);
+ headers.put("max", "2");
+ headers.put("key", "1");
+ template.sendBodyAndHeaders("seda:c", "Hello Dynamic World", headers);
+ headers.put("key", "2");
+ template.sendBodyAndHeaders("seda:c", "Bye Dynamic World", headers);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ errorHandler(deadLetterChannel("mock:dead"));
+
+ from("seda:a").throttle(header("max"), 1).to("mock:result");
+ from("seda:b").throttle(header("max"), 2).to("mock:result2");
+ from("seda:c").throttle(header("max"),
header("key")).to("mock:resultdynamic");
+ }
+ };
+ }
+}
diff --git
a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java
b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java
new file mode 100644
index 00000000000..e321838cdb3
--- /dev/null
+++
b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spring.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.processor.ThrottlingGroupingTest;
+import org.junit.Ignore;
+
+import static
org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
+
+public class SpringThrottlerGroupingTest extends ThrottlingGroupingTest {
+
+ protected CamelContext createCamelContext() throws Exception {
+ return createSpringCamelContext(this,
+ "org/apache/camel/spring/processor/ThrottlerGroupingTest.xml");
+ }
+}
diff --git
a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml
new file mode 100644
index 00000000000..c3019cda3a9
--- /dev/null
+++
b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml
@@ -0,0 +1,66 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd
+ ">
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
+ <errorHandler id="dlc" deadLetterUri="mock:dead" type="DeadLetterChannel"/>
+ <route errorHandlerRef="dlc">
+ <from uri="seda:a"/>
+ <throttle timePeriodMillis="1000">
+ <header>max</header>
+ <correlationExpression>
+ <constant>1</constant>
+ </correlationExpression>
+ <to uri="log:result"/>
+ <to uri="mock:result"/>
+ </throttle>
+ </route>
+
+ <route errorHandlerRef="dlc">
+ <from uri="seda:b"/>
+ <throttle timePeriodMillis="1000">
+ <header>max</header>
+ <correlationExpression>
+ <constant>2</constant>
+ </correlationExpression>
+ <to uri="log:result"/>
+ <to uri="mock:result2"/>
+ </throttle>
+ </route>
+
+ <route errorHandlerRef="dlc">
+ <from uri="seda:c"/>
+ <throttle timePeriodMillis="1000">
+ <header>max</header>
+ <correlationExpression>
+ <header>key</header>
+ </correlationExpression>
+ <to uri="log:result"/>
+ <to uri="mock:resultdynamic"/>
+ </throttle>
+ </route>
+
+ </camelContext>
+
+</beans>
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Allow Throttler EIP to specify SLA per client/correlated-group
> --------------------------------------------------------------
>
> Key: CAMEL-6840
> URL: https://issues.apache.org/jira/browse/CAMEL-6840
> Project: Camel
> Issue Type: New Feature
> Components: camel-core, eip
> Reporter: Christian Posta
> Assignee: Önder Sezgin
> Priority: Major
> Fix For: 2.23.0
>
>
> Basic idea is to allow throttler to have a predicate to determine whether or
> not to apply throttling to that exchange.
> From this Mailing List discussion:
> http://camel.465427.n5.nabble.com/Throttling-by-client-ID-td5741032.html
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)