This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 55857fc4ed7 Camel 20158 fix throttle tests (#12233)
55857fc4ed7 is described below
commit 55857fc4ed7191b8ba85e09967593b7157931399
Author: Jono Morris <[email protected]>
AuthorDate: Wed Nov 29 02:51:55 2023 +1300
Camel 20158 fix throttle tests (#12233)
* CAMEL-20158 use semaphore to count requests
* CAMEL-20158 reenable throttle tests
---
.../java/org/apache/camel/processor/Throttler.java | 2 +-
.../org/apache/camel/processor/ThrottlerTest.java | 48 +++++++++-------------
2 files changed, 21 insertions(+), 29 deletions(-)
diff --git
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
index b74b145914e..a521808b218 100644
---
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
+++
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/Throttler.java
@@ -424,7 +424,7 @@ public class Throttler extends AsyncProcessorSupport
implements Traceable, IdAwa
}
/**
- * Sets the maximum number of requests per time period expression
+ * Sets the maximum number of concurrent requests.
*/
public void setMaximumConcurrentRequestsExpression(Expression
maxConcurrentRequestsExpression) {
this.maxConcurrentRequestsExpression = maxConcurrentRequestsExpression;
diff --git
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
index fc2b64f62f7..bfe2b1c5a14 100644
---
a/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
+++
b/core/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
@@ -19,7 +19,7 @@ package org.apache.camel.processor;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Semaphore;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
@@ -39,8 +39,7 @@ public class ThrottlerTest extends ContextTestSupport {
private static final int INTERVAL = 500;
private static final int MESSAGE_COUNT = 9;
private static final int CONCURRENT_REQUESTS = 2;
- private volatile int curr;
- private volatile int max;
+ private Semaphore semaphore;
@Test
public void testSendLotsOfMessagesWithRejectExecution() throws Exception {
@@ -61,22 +60,20 @@ public class ThrottlerTest extends ContextTestSupport {
}
}
- @Disabled("Disabled due to CAMEL-20158")
@Test
public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough()
throws Exception {
+ semaphore = new Semaphore(CONCURRENT_REQUESTS);
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
MockEndpoint.class);
sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:a", MESSAGE_COUNT,
resultEndpoint);
- assertTrue(max <= CONCURRENT_REQUESTS);
}
@Test
public void testConfigurationWithConstantExpression() throws Exception {
+ semaphore = new Semaphore(CONCURRENT_REQUESTS);
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
MockEndpoint.class);
sendMessagesAndAwaitDelivery(MESSAGE_COUNT,
"direct:expressionConstant", MESSAGE_COUNT, resultEndpoint);
- assertTrue(max <= CONCURRENT_REQUESTS);
}
- @Disabled("Disabled due to CAMEL-20158")
@Test
public void testConfigurationWithHeaderExpression() throws Exception {
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
MockEndpoint.class);
@@ -84,37 +81,36 @@ public class ThrottlerTest extends ContextTestSupport {
ExecutorService executor = Executors.newFixedThreadPool(MESSAGE_COUNT);
try {
- sendMessagesWithHeaderExpression(executor, resultEndpoint,
CONCURRENT_REQUESTS, INTERVAL, MESSAGE_COUNT);
+ sendMessagesWithHeaderExpression(executor, resultEndpoint,
CONCURRENT_REQUESTS, MESSAGE_COUNT);
} finally {
executor.shutdownNow();
}
}
- @Disabled("Disabled due to CAMEL-20158")
@Test
public void testConfigurationWithChangingHeaderExpression() throws
Exception {
ExecutorService executor = Executors.newFixedThreadPool(5);
try {
MockEndpoint resultEndpoint =
resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
- sendMessagesWithHeaderExpression(executor, resultEndpoint, 2,
INTERVAL, MESSAGE_COUNT);
+ sendMessagesWithHeaderExpression(executor, resultEndpoint, 2,
MESSAGE_COUNT);
Thread.sleep(INTERVAL); // sleep here to ensure the
// first throttle rate does not
// influence the next one.
resultEndpoint.reset();
- sendMessagesWithHeaderExpression(executor, resultEndpoint, 4,
INTERVAL, MESSAGE_COUNT);
+ sendMessagesWithHeaderExpression(executor, resultEndpoint, 4,
MESSAGE_COUNT);
Thread.sleep(INTERVAL); // sleep here to ensure the
// first throttle rate does not
// influence the next one.
resultEndpoint.reset();
- sendMessagesWithHeaderExpression(executor, resultEndpoint, 2,
INTERVAL, MESSAGE_COUNT);
+ sendMessagesWithHeaderExpression(executor, resultEndpoint, 2,
MESSAGE_COUNT);
Thread.sleep(INTERVAL); // sleep here to ensure the
// first throttle rate does not
// influence the next one.
resultEndpoint.reset();
- sendMessagesWithHeaderExpression(executor, resultEndpoint, 4,
INTERVAL, MESSAGE_COUNT);
+ sendMessagesWithHeaderExpression(executor, resultEndpoint, 4,
MESSAGE_COUNT);
} finally {
executor.shutdownNow();
}
@@ -162,13 +158,11 @@ public class ThrottlerTest extends ContextTestSupport {
}
private void sendMessagesWithHeaderExpression(
- final ExecutorService executor, final MockEndpoint resultEndpoint,
final int throttle, final int intervalMs,
- final int messageCount)
+ final ExecutorService executor, final MockEndpoint resultEndpoint,
final int throttle, final int messageCount)
throws InterruptedException {
resultEndpoint.expectedMessageCount(messageCount);
+ semaphore = new Semaphore(throttle);
- max = 0;
- long start = System.nanoTime();
for (int i = 0; i < messageCount; i++) {
executor.execute(new Runnable() {
public void run() {
@@ -180,8 +174,6 @@ public class ThrottlerTest extends ContextTestSupport {
// let's wait for the exchanges to arrive
resultEndpoint.assertIsSatisfied();
- long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
start);
- assertTrue(max <= throttle);
}
private void sendBody(String endpoint) {
@@ -198,31 +190,31 @@ public class ThrottlerTest extends ContextTestSupport {
from("direct:a").throttle(CONCURRENT_REQUESTS)
.process(exchange -> {
- curr++;
+ assertTrue(semaphore.tryAcquire(), "'direct:a' too
many requests");
})
- .delay(INTERVAL)
+ .delay(100)
.process(exchange -> {
- max = Math.max(max, curr--);
+ semaphore.release();
})
.to("log:result", "mock:result");
from("direct:expressionConstant").throttle(constant(CONCURRENT_REQUESTS))
.process(exchange -> {
- curr++;
+ assertTrue(semaphore.tryAcquire(),
"'direct:expressionConstant' too many requests");
})
- .delay(INTERVAL)
+ .delay(100)
.process(exchange -> {
- max = Math.max(max, curr--);
+ semaphore.release();
})
.to("log:result", "mock:result");
from("direct:expressionHeader").throttle(header("throttleValue"))
.process(exchange -> {
- curr++;
+ assertTrue(semaphore.tryAcquire(),
"'direct:expressionHeader' too many requests");
})
- .delay(INTERVAL)
+ .delay(100)
.process(exchange -> {
- max = Math.max(max, curr--);
+ semaphore.release();
})
.to("log:result", "mock:result");