Author: gertv Date: Mon Jan 5 00:14:27 2009 New Revision: 731488 URL: http://svn.apache.org/viewvc?rev=731488&view=rev Log: CAMEL-1199: Throttler appears to throttle per thread
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?rev=731488&r1=731487&r2=731488&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java Mon Jan 5 00:14:27 2009 @@ -16,8 +16,6 @@ */ package org.apache.camel.processor; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -35,8 +33,7 @@ public class Throttler extends DelayProcessorSupport { private long maximumRequestsPerPeriod; private long timePeriodMillis; - private AtomicLong startTimeMillis = new AtomicLong(0); - private AtomicLong requestCount = new AtomicLong(0); + private TimeSlot slot; public Throttler(Processor processor, long maximumRequestsPerPeriod) { this(processor, maximumRequestsPerPeriod, 1000); @@ -78,43 +75,64 @@ this.timePeriodMillis = timePeriodMillis; } - /** - * The number of requests which have taken place so far within this time - * period - */ - public long getRequestCount() { - return requestCount.get(); + // Implementation methods + // ----------------------------------------------------------------------- + protected void delay(Exchange exchange) throws Exception { + TimeSlot slot = nextSlot(); + if (!slot.isActive()) { + waitUntil(slot.startTime, exchange); + } } - - /** - * The start time when this current period began + + /* + * Determine what the next available time slot is for handling an Exchange */ - public long getStartTimeMillis() { - return startTimeMillis.get(); + protected synchronized TimeSlot nextSlot() { + if (slot == null) { + slot = new TimeSlot(); + } + if (slot.isFull()) { + slot = slot.next(); + } + slot.assign(); + return slot; } - @Override - public void process(Exchange exchange) throws Exception { - super.process(exchange); + /* + * A time slot is capable of handling a number of exchanges within a certain period of time. + */ + protected class TimeSlot { - } + private long capacity = Throttler.this.maximumRequestsPerPeriod; + private final long duration = Throttler.this.timePeriodMillis; + private final long startTime; - // Implementation methods - // ----------------------------------------------------------------------- - protected void delay(Exchange exchange) throws Exception { - long now = currentSystemTime(); - startTimeMillis.compareAndSet(0, now); - if (now - startTimeMillis.get() > timePeriodMillis) { - // we're at the start of a new time period - // so lets reset things - requestCount.set(0); - startTimeMillis.set(now); - } else { - if (requestCount.incrementAndGet() > maximumRequestsPerPeriod) { - // lets sleep until the start of the next time period - long time = startTimeMillis.get() + timePeriodMillis; - waitUntil(time, exchange); - } + protected TimeSlot() { + this(System.currentTimeMillis()); } + + protected TimeSlot(long startTime) { + this.startTime = startTime; + } + + protected void assign() { + capacity--; + } + + /* + * Start the next time slot either now or in the future + * (no time slots are being created in the past) + */ + protected TimeSlot next() { + return new TimeSlot(Math.max(System.currentTimeMillis(), this.startTime + this.duration)); + } + + protected boolean isActive() { + return startTime <= System.currentTimeMillis(); + } + + protected boolean isFull() { + return capacity <= 0; + } } } Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java?rev=731488&r1=731487&r2=731488&view=diff ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java (original) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java Mon Jan 5 00:14:27 2009 @@ -22,11 +22,13 @@ import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.Throttler.TimeSlot; /** * @version $Revision$ */ public class ThrottlerTest extends ContextTestSupport { + private static final int INTERVAL = 500; protected int messageCount = 6; public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception { @@ -44,9 +46,9 @@ } public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() throws Exception { + long start = System.currentTimeMillis(); MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class); - resultEndpoint.expectedMessageCount(3); - resultEndpoint.setResultWaitTime(1000); + resultEndpoint.expectedMessageCount(messageCount); ExecutorService executor = Executors.newFixedThreadPool(messageCount); for (int i = 0; i < messageCount; i++) { @@ -56,10 +58,28 @@ } }); } - - // lets pause to give the requests time to be processed - // to check that the throttle really does kick in + + // let's wait for the exchanges to arrive resultEndpoint.assertIsSatisfied(); + + // now assert that they have actually been throttled + long minimumTime = (messageCount - 1) * INTERVAL; + assertTrue("Should take at least " + minimumTime + "ms", System.currentTimeMillis() - start >= minimumTime); + } + + public void testTimeSlotCalculus() throws Exception { + Throttler throttler = new Throttler(null, 2, 1000); + TimeSlot slot = throttler.nextSlot(); + // start a new time slot + assertNotNull(slot); + // make sure the same slot is used (2 exchanges per slot) + assertSame(slot, throttler.nextSlot()); + assertTrue(slot.isFull()); + + TimeSlot next = throttler.nextSlot(); + // now we should have a new slot that starts somewhere in the future + assertNotSame(slot, next); + assertFalse(next.isActive()); } protected RouteBuilder createRouteBuilder() { @@ -68,7 +88,8 @@ // START SNIPPET: ex from("seda:a").throttle(3).timePeriodMillis(10000).to("mock:result"); // END SNIPPET: ex - from("direct:a").throttle(3).timePeriodMillis(10000).to("mock:result"); + + from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("mock:result"); } }; } Modified: activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml?rev=731488&r1=731487&r2=731488&view=diff ============================================================================== --- activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml (original) +++ activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml Mon Jan 5 00:14:27 2009 @@ -35,7 +35,7 @@ <route> <from uri="direct:a" /> - <throttle maximumRequestsPerPeriod="3" timePeriodMillis="10000"> + <throttle maximumRequestsPerPeriod="1" timePeriodMillis="500"> <to uri="mock:result" /> </throttle> </route>