Author: davsclaus
Date: Thu Oct 29 09:22:46 2009
New Revision: 830872

URL: http://svn.apache.org/viewvc?rev=830872&view=rev
Log:
CAMEL-1048: Added first cut of RoutePolicy which allows to control routes 
dynamic at runtime. Added a throttling policy out of the box that is based on 
throtteling by the number of concurrent inflight exchanges.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/SuspendableServiceTest.java
   (with props)
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomRoutePolicyTest.java
   (with props)
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingRoutePolicy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=830872&r1=830871&r2=830872&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
 Thu Oct 29 09:22:46 2009
@@ -145,7 +145,7 @@
             Processor unitOfWorkProcessor = new UnitOfWorkProcessor(processor);
             Processor target = unitOfWorkProcessor;
 
-            // and then optionally and route policy processor
+            // and then optionally add route policy processor if a custom 
policy is set
             RoutePolicyProcessor routePolicyProcessor = null;
             if (getRoutePolicy() != null) {
                 routePolicyProcessor = new 
RoutePolicyProcessor(unitOfWorkProcessor, getRoutePolicy());

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingRoutePolicy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingRoutePolicy.java?rev=830872&r1=830871&r2=830872&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingRoutePolicy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingRoutePolicy.java
 Thu Oct 29 09:22:46 2009
@@ -72,7 +72,7 @@
         }
 
         // reload size in case a race condition with too many at once being 
invoked
-        // so we need to ensure that we read the most current side and start 
the consumer if we hit to low
+        // so we need to ensure that we read the most current size and start 
the consumer if we are already to low
         size = getSize(consumer, exchange);
         if (size <= resumeInflightExchanges) {
             try {

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java?rev=830872&r1=830871&r2=830872&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java 
Thu Oct 29 09:22:46 2009
@@ -181,8 +181,18 @@
      */
     DataFormatDefinition getDataFormat(String ref);
 
+    /**
+     * Gets the route policy
+     *
+     * @return the route policy if any
+     */
     RoutePolicy getRoutePolicy();
 
+    /**
+     * Sets a custom route policy
+     *
+     * @param routePolicy the custom route policy
+     */
     void setRoutePolicy(RoutePolicy routePolicy);
 
 }

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/SuspendableServiceTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/SuspendableServiceTest.java?rev=830872&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/SuspendableServiceTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/SuspendableServiceTest.java
 Thu Oct 29 09:22:46 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import junit.framework.TestCase;
+
+/**
+ * @version $Revision$
+ */
+public class SuspendableServiceTest extends TestCase {
+
+    private class MyService implements SuspendableService {
+
+        private boolean suspended;
+
+        public void start() throws Exception {
+        }
+
+        public void stop() throws Exception {
+        }
+
+        public void suspend() {
+            suspended = true;
+        }
+
+        public void resume() {
+            suspended = false;
+        }
+
+        public boolean isSuspended() {
+            return suspended;
+        }
+    }
+
+    public void testSuspendable() {
+        MyService my = new MyService();
+        assertEquals(false, my.isSuspended());
+
+        my.suspend();
+        assertEquals(true, my.isSuspended());
+
+        my.resume();
+        assertEquals(false, my.isSuspended());
+    }
+
+}

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/SuspendableServiceTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/SuspendableServiceTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomRoutePolicyTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomRoutePolicyTest.java?rev=830872&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomRoutePolicyTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomRoutePolicyTest.java
 Thu Oct 29 09:22:46 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.RoutePolicySupport;
+
+/**
+ * @version $Revision$
+ */
+public class CustomRoutePolicyTest extends ContextTestSupport {
+
+    private final MyCustomRoutePolicy policy = new MyCustomRoutePolicy();
+
+    private class MyCustomRoutePolicy extends RoutePolicySupport {
+
+        @Override
+        public void onExchangeDone(Route route, Exchange exchange) {
+            String body = exchange.getIn().getBody(String.class);
+            if ("stop".equals(body)) {
+                try {
+                    stopConsumer(route.getConsumer());
+                } catch (Exception e) {
+                    handleException(e);
+                }
+            }
+        }
+    }
+
+    public void testCustomPolicy() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template.sendBody("seda:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+
+        mock.reset();
+        mock.expectedBodiesReceived("stop");
+
+        // we send stop command so we should only get 1 message
+        template.sendBody("seda:foo", "stop");
+
+        Thread.sleep(500);
+
+        template.sendBody("seda:foo", "Bye World");
+
+        assertMockEndpointsSatisfied();
+
+        // we reset and prepare for the last message to arrive
+        mock.reset();
+        mock.expectedBodiesReceived("Bye World");
+
+        // start the route consumer again
+        context.getRoutes().get(0).getConsumer().start();
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo").routePolicy(policy).to("mock:result");
+            }
+        };
+    }
+}
+

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomRoutePolicyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomRoutePolicyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to