Author: gertv
Date: Tue Dec 30 05:59:16 2008
New Revision: 730132
URL: http://svn.apache.org/viewvc?rev=730132&view=rev
Log:
CAMEL-1199: Throttler appears to throttle per thread instead of over all threads
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?rev=730132&r1=730131&r2=730132&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
Tue Dec 30 05:59:16 2008
@@ -16,8 +16,12 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* A <a href="http://activemq.apache.org/camel/throttler.html">Throttler</a>
@@ -33,8 +37,8 @@
public class Throttler extends DelayProcessorSupport {
private long maximumRequestsPerPeriod;
private long timePeriodMillis;
- private long startTimeMillis;
- private long requestCount;
+ private AtomicLong startTimeMillis = new AtomicLong(0);
+ private AtomicLong requestCount = new AtomicLong(0);
public Throttler(Processor processor, long maximumRequestsPerPeriod) {
this(processor, maximumRequestsPerPeriod, 1000);
@@ -81,32 +85,36 @@
* period
*/
public long getRequestCount() {
- return requestCount;
+ return requestCount.get();
}
/**
* The start time when this current period began
*/
public long getStartTimeMillis() {
- return startTimeMillis;
+ return startTimeMillis.get();
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ super.process(exchange);
+
}
// Implementation methods
// -----------------------------------------------------------------------
protected void delay(Exchange exchange) throws Exception {
long now = currentSystemTime();
- if (startTimeMillis == 0) {
- startTimeMillis = now;
- }
- if (now - startTimeMillis > timePeriodMillis) {
+ startTimeMillis.compareAndSet(0, now);
+ if (now - startTimeMillis.get() > timePeriodMillis) {
// we're at the start of a new time period
// so lets reset things
- requestCount = 1;
- startTimeMillis = now;
+ requestCount.set(0);
+ startTimeMillis.set(0);
} else {
- if (++requestCount > maximumRequestsPerPeriod) {
+ if (requestCount.incrementAndGet() > maximumRequestsPerPeriod) {
// lets sleep until the start of the next time period
- long time = startTimeMillis + timePeriodMillis;
+ long time = startTimeMillis.get() + timePeriodMillis;
waitUntil(time, exchange);
}
}
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java?rev=730132&r1=730131&r2=730132&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
Tue Dec 30 05:59:16 2008
@@ -16,6 +16,9 @@
*/
package org.apache.camel.processor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -39,13 +42,33 @@
// to check that the throttle really does kick in
resultEndpoint.assertIsSatisfied();
}
+
+ public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough()
throws Exception {
+ MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(3);
+ resultEndpoint.setResultWaitTime(1000);
+
+ ExecutorService executor = Executors.newFixedThreadPool(messageCount);
+ for (int i = 0; i < messageCount; i++) {
+ executor.execute(new Runnable() {
+ public void run() {
+ template.sendBody("direct:a",
"<message>payload</message>");
+ }
+ });
+ }
+
+ // lets pause to give the requests time to be processed
+ // to check that the throttle really does kick in
+ resultEndpoint.assertIsSatisfied();
+ }
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
// START SNIPPET: ex
-
from("seda:a").throttle(3).timePeriodMillis(30000).to("mock:result");
+
from("seda:a").throttle(3).timePeriodMillis(10000).to("mock:result");
// END SNIPPET: ex
+
from("direct:a").throttle(3).timePeriodMillis(10000).to("mock:result");
}
};
}