Author: davsclaus
Date: Thu Oct 29 09:22:46 2009
New Revision: 830872
URL: http://svn.apache.org/viewvc?rev=830872&view=rev
Log:
CAMEL-1048: Added first cut of RoutePolicy which allows to control routes
dynamic at runtime. Added a throttling policy out of the box that is based on
throtteling by the number of concurrent inflight exchanges.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/SuspendableServiceTest.java
(with props)
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomRoutePolicyTest.java
(with props)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingRoutePolicy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=830872&r1=830871&r2=830872&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
Thu Oct 29 09:22:46 2009
@@ -145,7 +145,7 @@
Processor unitOfWorkProcessor = new UnitOfWorkProcessor(processor);
Processor target = unitOfWorkProcessor;
- // and then optionally and route policy processor
+ // and then optionally add route policy processor if a custom
policy is set
RoutePolicyProcessor routePolicyProcessor = null;
if (getRoutePolicy() != null) {
routePolicyProcessor = new
RoutePolicyProcessor(unitOfWorkProcessor, getRoutePolicy());
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingRoutePolicy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingRoutePolicy.java?rev=830872&r1=830871&r2=830872&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingRoutePolicy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ThrottlingRoutePolicy.java
Thu Oct 29 09:22:46 2009
@@ -72,7 +72,7 @@
}
// reload size in case a race condition with too many at once being
invoked
- // so we need to ensure that we read the most current side and start
the consumer if we hit to low
+ // so we need to ensure that we read the most current size and start
the consumer if we are already to low
size = getSize(consumer, exchange);
if (size <= resumeInflightExchanges) {
try {
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java?rev=830872&r1=830871&r2=830872&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/RouteContext.java
Thu Oct 29 09:22:46 2009
@@ -181,8 +181,18 @@
*/
DataFormatDefinition getDataFormat(String ref);
+ /**
+ * Gets the route policy
+ *
+ * @return the route policy if any
+ */
RoutePolicy getRoutePolicy();
+ /**
+ * Sets a custom route policy
+ *
+ * @param routePolicy the custom route policy
+ */
void setRoutePolicy(RoutePolicy routePolicy);
}
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/SuspendableServiceTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/SuspendableServiceTest.java?rev=830872&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/SuspendableServiceTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/SuspendableServiceTest.java
Thu Oct 29 09:22:46 2009
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import junit.framework.TestCase;
+
+/**
+ * @version $Revision$
+ */
+public class SuspendableServiceTest extends TestCase {
+
+ private class MyService implements SuspendableService {
+
+ private boolean suspended;
+
+ public void start() throws Exception {
+ }
+
+ public void stop() throws Exception {
+ }
+
+ public void suspend() {
+ suspended = true;
+ }
+
+ public void resume() {
+ suspended = false;
+ }
+
+ public boolean isSuspended() {
+ return suspended;
+ }
+ }
+
+ public void testSuspendable() {
+ MyService my = new MyService();
+ assertEquals(false, my.isSuspended());
+
+ my.suspend();
+ assertEquals(true, my.isSuspended());
+
+ my.resume();
+ assertEquals(false, my.isSuspended());
+ }
+
+}
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/SuspendableServiceTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/SuspendableServiceTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomRoutePolicyTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomRoutePolicyTest.java?rev=830872&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomRoutePolicyTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomRoutePolicyTest.java
Thu Oct 29 09:22:46 2009
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Route;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.RoutePolicySupport;
+
+/**
+ * @version $Revision$
+ */
+public class CustomRoutePolicyTest extends ContextTestSupport {
+
+ private final MyCustomRoutePolicy policy = new MyCustomRoutePolicy();
+
+ private class MyCustomRoutePolicy extends RoutePolicySupport {
+
+ @Override
+ public void onExchangeDone(Route route, Exchange exchange) {
+ String body = exchange.getIn().getBody(String.class);
+ if ("stop".equals(body)) {
+ try {
+ stopConsumer(route.getConsumer());
+ } catch (Exception e) {
+ handleException(e);
+ }
+ }
+ }
+ }
+
+ public void testCustomPolicy() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+
+ template.sendBody("seda:foo", "Hello World");
+
+ assertMockEndpointsSatisfied();
+
+ mock.reset();
+ mock.expectedBodiesReceived("stop");
+
+ // we send stop command so we should only get 1 message
+ template.sendBody("seda:foo", "stop");
+
+ Thread.sleep(500);
+
+ template.sendBody("seda:foo", "Bye World");
+
+ assertMockEndpointsSatisfied();
+
+ // we reset and prepare for the last message to arrive
+ mock.reset();
+ mock.expectedBodiesReceived("Bye World");
+
+ // start the route consumer again
+ context.getRoutes().get(0).getConsumer().start();
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:foo").routePolicy(policy).to("mock:result");
+ }
+ };
+ }
+}
+
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomRoutePolicyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/CustomRoutePolicyTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date