Author: gertv
Date: Tue Dec 30 05:59:16 2008
New Revision: 730132

URL: http://svn.apache.org/viewvc?rev=730132&view=rev
Log:
CAMEL-1199: Throttler appears to throttle per thread instead of over all threads

Modified:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
    
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java?rev=730132&r1=730131&r2=730132&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
 Tue Dec 30 05:59:16 2008
@@ -16,8 +16,12 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * A <a href="http://activemq.apache.org/camel/throttler.html";>Throttler</a>
@@ -33,8 +37,8 @@
 public class Throttler extends DelayProcessorSupport {
     private long maximumRequestsPerPeriod;
     private long timePeriodMillis;
-    private long startTimeMillis;
-    private long requestCount;
+    private AtomicLong startTimeMillis = new AtomicLong(0);
+    private AtomicLong requestCount = new AtomicLong(0);
 
     public Throttler(Processor processor, long maximumRequestsPerPeriod) {
         this(processor, maximumRequestsPerPeriod, 1000);
@@ -81,32 +85,36 @@
      * period
      */
     public long getRequestCount() {
-        return requestCount;
+        return requestCount.get();
     }
 
     /**
      * The start time when this current period began
      */
     public long getStartTimeMillis() {
-        return startTimeMillis;
+        return startTimeMillis.get();
+    }
+    
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        super.process(exchange);
+        
     }
 
     // Implementation methods
     // -----------------------------------------------------------------------
     protected void delay(Exchange exchange) throws Exception {
         long now = currentSystemTime();
-        if (startTimeMillis == 0) {
-            startTimeMillis = now;
-        }
-        if (now - startTimeMillis > timePeriodMillis) {
+        startTimeMillis.compareAndSet(0, now);
+        if (now - startTimeMillis.get() > timePeriodMillis) {
             // we're at the start of a new time period
             // so lets reset things
-            requestCount = 1;
-            startTimeMillis = now;
+            requestCount.set(0);
+            startTimeMillis.set(0);
         } else {
-            if (++requestCount > maximumRequestsPerPeriod) {
+            if (requestCount.incrementAndGet() > maximumRequestsPerPeriod) {
                 // lets sleep until the start of the next time period
-                long time = startTimeMillis + timePeriodMillis;
+                long time = startTimeMillis.get() + timePeriodMillis;
                 waitUntil(time, exchange);
             }
         }

Modified: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java?rev=730132&r1=730131&r2=730132&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThrottlerTest.java
 Tue Dec 30 05:59:16 2008
@@ -16,6 +16,9 @@
  */
 package org.apache.camel.processor;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
@@ -39,13 +42,33 @@
         // to check that the throttle really does kick in
         resultEndpoint.assertIsSatisfied();
     }
+    
+    public void testSendLotsOfMessagesSimultaneouslyButOnly3GetThrough() 
throws Exception {
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", 
MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(3);
+        resultEndpoint.setResultWaitTime(1000);
+
+        ExecutorService executor = Executors.newFixedThreadPool(messageCount);
+        for (int i = 0; i < messageCount; i++) {
+            executor.execute(new Runnable() {
+                public void run() {
+                    template.sendBody("direct:a", 
"<message>payload</message>");
+                }                
+            });
+        }
+
+        // lets pause to give the requests time to be processed
+        // to check that the throttle really does kick in
+        resultEndpoint.assertIsSatisfied();
+    }
 
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
                 // START SNIPPET: ex
-                
from("seda:a").throttle(3).timePeriodMillis(30000).to("mock:result");
+                
from("seda:a").throttle(3).timePeriodMillis(10000).to("mock:result");
                 // END SNIPPET: ex
+                
from("direct:a").throttle(3).timePeriodMillis(10000).to("mock:result");
             }
         };
     }


Reply via email to