Author: gertv
Date: Mon Jan 5 00:14:27 2009
New Revision: 731488
URL: http://svn.apache.org/viewvc?rev=731488&view=rev
Log:
CAMEL-1199: Throttler appears to throttle per thread
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?rev=731488&r1=731487&r2=731488&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
Mon Jan 5 00:14:27 2009
@@ -16,8 +16,6 @@
*/
package org.apache.camel.processor;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -35,8 +33,7 @@
public class Throttler extends DelayProcessorSupport {
private long maximumRequestsPerPeriod;
private long timePeriodMillis;
- private AtomicLong startTimeMillis = new AtomicLong(0);
- private AtomicLong requestCount = new AtomicLong(0);
+ private TimeSlot slot;
public Throttler(Processor processor, long maximumRequestsPerPeriod) {
this(processor, maximumRequestsPerPeriod, 1000);
@@ -78,43 +75,64 @@
this.timePeriodMillis = timePeriodMillis;
}
- /**
- * The number of requests which have taken place so far within this time
- * period
- */
- public long getRequestCount() {
- return requestCount.get();
+ // Implementation methods
+ // -----------------------------------------------------------------------
+ protected void delay(Exchange exchange) throws Exception {
+ TimeSlot slot = nextSlot();
+ if (!slot.isActive()) {
+ waitUntil(slot.startTime, exchange);
+ }
}
-
- /**
- * The start time when this current period began
+
+ /*
+ * Determine what the next available time slot is for handling an Exchange
*/
- public long getStartTimeMillis() {
- return startTimeMillis.get();
+ protected synchronized TimeSlot nextSlot() {
+ if (slot == null) {
+ slot = new TimeSlot();
+ }
+ if (slot.isFull()) {
+ slot = slot.next();
+ }
+ slot.assign();
+ return slot;
}
- @Override
- public void process(Exchange exchange) throws Exception {
- super.process(exchange);
+ /*
+ * A time slot is capable of handling a number of exchanges within a
certain period of time.
+ */
+ protected class TimeSlot {
- }
+ private long capacity = Throttler.this.maximumRequestsPerPeriod;
+ private final long duration = Throttler.this.timePeriodMillis;
+ private final long startTime;
- // Implementation methods
- // -----------------------------------------------------------------------
- protected void delay(Exchange exchange) throws Exception {
- long now = currentSystemTime();
- startTimeMillis.compareAndSet(0, now);
- if (now - startTimeMillis.get() > timePeriodMillis) {
- // we're at the start of a new time period
- // so lets reset things
- requestCount.set(0);
- startTimeMillis.set(now);
- } else {
- if (requestCount.incrementAndGet() > maximumRequestsPerPeriod) {
- // lets sleep until the start of the next time period
- long time = startTimeMillis.get() + timePeriodMillis;
- waitUntil(time, exchange);
- }
+ protected TimeSlot() {
+ this(System.currentTimeMillis());
}
+
+ protected TimeSlot(long startTime) {
+ this.startTime = startTime;
+ }
+
+ protected void assign() {
+ capacity--;
+ }
+
+ /*
+ * Start the next time slot either now or in the future
+ * (no time slots are being created in the past)
+ */
+ protected TimeSlot next() {
+ return new TimeSlot(Math.max(System.currentTimeMillis(),
this.startTime + this.duration));
+ }
+
+ protected boolean isActive() {
+ return startTime <= System.currentTimeMillis();
+ }
+
+ protected boolean isFull() {
+ return capacity <= 0;
+ }
}
}
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java?rev=731488&r1=731487&r2=731488&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
Mon Jan 5 00:14:27 2009
@@ -22,11 +22,13 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.Throttler.TimeSlot;
/**
* @version $Revision$
*/
public class ThrottlerTest extends ContextTestSupport {
+ private static final int INTERVAL = 500;
protected int messageCount = 6;
public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception {
@@ -44,9 +46,9 @@
}
public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough()
throws Exception {
+ long start = System.currentTimeMillis();
MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
MockEndpoint.class);
- resultEndpoint.expectedMessageCount(3);
- resultEndpoint.setResultWaitTime(1000);
+ resultEndpoint.expectedMessageCount(messageCount);
ExecutorService executor = Executors.newFixedThreadPool(messageCount);
for (int i = 0; i < messageCount; i++) {
@@ -56,10 +58,28 @@
}
});
}
-
- // lets pause to give the requests time to be processed
- // to check that the throttle really does kick in
+
+ // let's wait for the exchanges to arrive
resultEndpoint.assertIsSatisfied();
+
+ // now assert that they have actually been throttled
+ long minimumTime = (messageCount - 1) * INTERVAL;
+ assertTrue("Should take at least " + minimumTime + "ms",
System.currentTimeMillis() - start >= minimumTime);
+ }
+
+ public void testTimeSlotCalculus() throws Exception {
+ Throttler throttler = new Throttler(null, 2, 1000);
+ TimeSlot slot = throttler.nextSlot();
+ // start a new time slot
+ assertNotNull(slot);
+ // make sure the same slot is used (2 exchanges per slot)
+ assertSame(slot, throttler.nextSlot());
+ assertTrue(slot.isFull());
+
+ TimeSlot next = throttler.nextSlot();
+ // now we should have a new slot that starts somewhere in the future
+ assertNotSame(slot, next);
+ assertFalse(next.isActive());
}
protected RouteBuilder createRouteBuilder() {
@@ -68,7 +88,8 @@
// START SNIPPET: ex
from("seda:a").throttle(3).timePeriodMillis(10000).to("mock:result");
// END SNIPPET: ex
-
from("direct:a").throttle(3).timePeriodMillis(10000).to("mock:result");
+
+
from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("mock:result");
}
};
}
Modified:
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml?rev=731488&r1=731487&r2=731488&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml
(original)
+++
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml
Mon Jan 5 00:14:27 2009
@@ -35,7 +35,7 @@
<route>
<from uri="direct:a" />
- <throttle maximumRequestsPerPeriod="3" timePeriodMillis="10000">
+ <throttle maximumRequestsPerPeriod="1" timePeriodMillis="500">
<to uri="mock:result" />
</throttle>
</route>