Author: gertv
Date: Mon Jan  5 00:14:27 2009
New Revision: 731488

URL: http://svn.apache.org/viewvc?rev=731488&view=rev
Log:
CAMEL-1199: Throttler appears to throttle per thread

Modified:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
    
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
    
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?rev=731488&r1=731487&r2=731488&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
 Mon Jan  5 00:14:27 2009
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.processor;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 
@@ -35,8 +33,7 @@
 public class Throttler extends DelayProcessorSupport {
     private long maximumRequestsPerPeriod;
     private long timePeriodMillis;
-    private AtomicLong startTimeMillis = new AtomicLong(0);
-    private AtomicLong requestCount = new AtomicLong(0);
+    private TimeSlot slot;
 
     public Throttler(Processor processor, long maximumRequestsPerPeriod) {
         this(processor, maximumRequestsPerPeriod, 1000);
@@ -78,43 +75,64 @@
         this.timePeriodMillis = timePeriodMillis;
     }
 
-    /**
-     * The number of requests which have taken place so far within this time
-     * period
-     */
-    public long getRequestCount() {
-        return requestCount.get();
+    // Implementation methods
+    // -----------------------------------------------------------------------
+    protected void delay(Exchange exchange) throws Exception {
+        TimeSlot slot = nextSlot();
+        if (!slot.isActive()) {
+            waitUntil(slot.startTime, exchange);
+        }
     }
-
-    /**
-     * The start time when this current period began
+    
+    /*
+     * Determine what the next available time slot is for handling an Exchange
      */
-    public long getStartTimeMillis() {
-        return startTimeMillis.get();
+    protected synchronized TimeSlot nextSlot() {
+        if (slot == null) {
+            slot = new TimeSlot();
+        }
+        if (slot.isFull()) {
+            slot = slot.next();
+        }
+        slot.assign();
+        return slot;
     }
     
-    @Override
-    public void process(Exchange exchange) throws Exception {
-        super.process(exchange);
+    /*
+     * A time slot is capable of handling a number of exchanges within a 
certain period of time.
+     */
+    protected class TimeSlot {
         
-    }
+        private long capacity = Throttler.this.maximumRequestsPerPeriod;
+        private final long duration = Throttler.this.timePeriodMillis;
+        private final long startTime;
 
-    // Implementation methods
-    // -----------------------------------------------------------------------
-    protected void delay(Exchange exchange) throws Exception {
-        long now = currentSystemTime();
-        startTimeMillis.compareAndSet(0, now);
-        if (now - startTimeMillis.get() > timePeriodMillis) {
-            // we're at the start of a new time period
-            // so lets reset things
-            requestCount.set(0);
-            startTimeMillis.set(now);
-        } else {
-            if (requestCount.incrementAndGet() > maximumRequestsPerPeriod) {
-                // lets sleep until the start of the next time period
-                long time = startTimeMillis.get() + timePeriodMillis;
-                waitUntil(time, exchange);
-            }
+        protected TimeSlot() {
+            this(System.currentTimeMillis());
         }
+
+        protected TimeSlot(long startTime) {
+            this.startTime = startTime;
+        }
+        
+        protected void assign() {
+            capacity--;
+        }
+        
+        /*
+         * Start the next time slot either now or in the future
+         * (no time slots are being created in the past)
+         */
+        protected TimeSlot next() {
+            return new TimeSlot(Math.max(System.currentTimeMillis(), 
this.startTime + this.duration));
+        }
+        
+        protected boolean isActive() {
+            return startTime <= System.currentTimeMillis();
+        }
+        
+        protected boolean isFull() {
+            return capacity <= 0;
+        }        
     }
 }

Modified: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java?rev=731488&r1=731487&r2=731488&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
 Mon Jan  5 00:14:27 2009
@@ -22,11 +22,13 @@
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.Throttler.TimeSlot;
 
 /**
  * @version $Revision$
  */
 public class ThrottlerTest extends ContextTestSupport {
+    private static final int INTERVAL = 500;
     protected int messageCount = 6;
 
     public void testSendLotsOfMessagesButOnly3GetThrough() throws Exception {
@@ -44,9 +46,9 @@
     }
     
     public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() 
throws Exception {
+        long start = System.currentTimeMillis();
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", 
MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(3);
-        resultEndpoint.setResultWaitTime(1000);
+        resultEndpoint.expectedMessageCount(messageCount);
 
         ExecutorService executor = Executors.newFixedThreadPool(messageCount);
         for (int i = 0; i < messageCount; i++) {
@@ -56,10 +58,28 @@
                 }                
             });
         }
-
-        // lets pause to give the requests time to be processed
-        // to check that the throttle really does kick in
+        
+        // let's wait for the exchanges to arrive
         resultEndpoint.assertIsSatisfied();
+        
+        // now assert that they have actually been throttled
+        long minimumTime = (messageCount - 1) * INTERVAL;
+        assertTrue("Should take at least " + minimumTime + "ms", 
System.currentTimeMillis() - start >= minimumTime);
+    }
+    
+    public void testTimeSlotCalculus() throws Exception {
+        Throttler throttler = new Throttler(null, 2, 1000);
+        TimeSlot slot = throttler.nextSlot();
+        // start a new time slot
+        assertNotNull(slot);
+        // make sure the same slot is used (2 exchanges per slot)
+        assertSame(slot, throttler.nextSlot());
+        assertTrue(slot.isFull());
+        
+        TimeSlot next = throttler.nextSlot();
+        // now we should have a new slot that starts somewhere in the future
+        assertNotSame(slot, next);
+        assertFalse(next.isActive());
     }
 
     protected RouteBuilder createRouteBuilder() {
@@ -68,7 +88,8 @@
                 // START SNIPPET: ex
                 
from("seda:a").throttle(3).timePeriodMillis(10000).to("mock:result");
                 // END SNIPPET: ex
-                
from("direct:a").throttle(3).timePeriodMillis(10000).to("mock:result");
+                
+                
from("direct:a").throttle(1).timePeriodMillis(INTERVAL).to("mock:result");
             }
         };
     }

Modified: 
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml?rev=731488&r1=731487&r2=731488&view=diff
==============================================================================
--- 
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml
 (original)
+++ 
activemq/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/throttler.xml
 Mon Jan  5 00:14:27 2009
@@ -35,7 +35,7 @@
 
     <route>
       <from uri="direct:a" />
-      <throttle maximumRequestsPerPeriod="3" timePeriodMillis="10000">
+      <throttle maximumRequestsPerPeriod="1" timePeriodMillis="500">
         <to uri="mock:result" />
       </throttle>
     </route>


Reply via email to