Author: jstrachan
Date: Thu Apr 26 01:24:06 2007
New Revision: 532654
URL: http://svn.apache.org/viewvc?view=rev&rev=532654
Log:
added some load balancer helper classes together with the basics of a processor
component for turning a Processor into an Endpoint (so to turn say a
transformer into an endpoint)
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpointConsumer.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/package.html
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RandomLoadBalancer.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RoundRobinLoadBalancer.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
(with props)
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/package.html
(with props)
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?view=diff&rev=532654&r1=532653&r2=532654
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
Thu Apr 26 01:24:06 2007
@@ -129,4 +129,24 @@
* Returns the injector used to instantiate objects by type
*/
Injector getInjector();
+
+ /**
+ * Adds the endpoint to the context using the given URI
+ *
+ * @param uri the URI to be used to resolve this endpoint
+ * @param endpoint the endpoint to be added to the context
+ * @return the old endpoint that was previously registered to the context
if there was
+ * already an endpoint for that URI
+ * @throws Exception if the new endpoint could not be started or the old
endpoint could not be stopped
+ */
+ Endpoint addEndpoint(String uri, Endpoint endpoint) throws Exception;
+
+ /**
+ * Removes the endpoint with the given URI
+ *
+ * @param uri the URI to be used to remove
+ * @return the endpoint that was removed or null if there is no endpoint
for this URI
+ * @throws Exception if endpoint could not be stopped
+ */
+ Endpoint removeEndpoint(String uri) throws Exception;
}
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java?view=auto&rev=532654
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,76 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.processor;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Consumer;
+import org.apache.camel.Producer;
+import org.apache.camel.Component;
+import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.DefaultExchange;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.processor.loadbalancer.LoadBalancer;
+
+/**
+ * A base class for creating [EMAIL PROTECTED] Endpoint} implementations from
a [EMAIL PROTECTED] Processor}
+ *
+ * @version $Revision: 1.1 $
+ */
+public class ProcessorEndpoint extends DefaultEndpoint<Exchange> {
+ private final Processor<Exchange> processor;
+ private final LoadBalancer<Exchange> loadBalancer;
+
+ protected ProcessorEndpoint(String endpointUri, Component component,
Processor<Exchange> processor, LoadBalancer<Exchange> loadBalancer) {
+ super(endpointUri, component);
+ this.processor = processor;
+ this.loadBalancer = loadBalancer;
+ }
+
+ public Exchange createExchange() {
+ return new DefaultExchange(getContext());
+ }
+
+ public Producer<Exchange> createProducer() throws Exception {
+ return startService(new DefaultProducer<Exchange>(this) {
+ public void process(Exchange exchange) {
+ onExchange(exchange);
+ }
+ });
+ }
+
+ public Consumer<Exchange> createConsumer(Processor<Exchange> processor)
throws Exception {
+ return startService(new ProcessorEndpointConsumer(this, processor));
+ }
+
+ public Processor<Exchange> getProcessor() {
+ return processor;
+ }
+
+ public LoadBalancer<Exchange> getLoadBalancer() {
+ return loadBalancer;
+ }
+
+ protected void onExchange(Exchange exchange) {
+ processor.process(exchange);
+
+ // now lets output to the load balancer
+ loadBalancer.process(exchange);
+ }
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpoint.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpointConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpointConsumer.java?view=auto&rev=532654
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpointConsumer.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpointConsumer.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.DefaultConsumer;
+
+/**
+ * @version $Revision: 1.1 $
+*/
+public class ProcessorEndpointConsumer extends DefaultConsumer<Exchange> {
+ private final ProcessorEndpoint endpoint;
+
+ public ProcessorEndpointConsumer(ProcessorEndpoint endpoint,
Processor<Exchange> processor) {
+ super(endpoint, processor);
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+ endpoint.getLoadBalancer().addProcessor(getProcessor());
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ endpoint.getLoadBalancer().removeProcessor(getProcessor());
+ super.doStop();
+ }
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/ProcessorEndpointConsumer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/package.html
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/package.html?view=auto&rev=532654
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/package.html
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/package.html
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,26 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+A component to make it easy to turn a <a
href="http://activemq.apache.org/camel/processor.html">Processor</a> into a
fully fledged
+<a href="http://activemq.apache.org/camel/endpoint.html">Endpoint</a>
+
+</body>
+</html>
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/processor/package.html
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?view=diff&rev=532654&r1=532653&r2=532654
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
Thu Apr 26 01:24:06 2007
@@ -17,6 +17,8 @@
*/
package org.apache.camel.impl;
+import static org.apache.camel.util.ServiceHelper.stopServices;
+import static org.apache.camel.util.ServiceHelper.startServices;
import org.apache.camel.*;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.converter.DefaultTypeConverter;
@@ -78,7 +80,7 @@
if( isStarted() ) {
// If the component is looked
up after the context is started,
// lets start it up.
-
ServiceHelper.startServices(component);
+ startServices(component);
}
} catch (Exception e) {
throw new RuntimeCamelException("Could
not auto create component: "+name, e);
@@ -137,6 +139,26 @@
}
}
+ public Endpoint addEndpoint(String uri, Endpoint endpoint) throws
Exception {
+ Endpoint oldEndpoint;
+ synchronized (endpoints) {
+ startServices(endpoint);
+ oldEndpoint = endpoints.remove(uri);
+ endpoints.put(uri, endpoint);
+ stopServices(oldEndpoint);
+ }
+ return oldEndpoint;
+ }
+
+ public Endpoint removeEndpoint(String uri) throws Exception {
+ Endpoint oldEndpoint;
+ synchronized (endpoints) {
+ oldEndpoint = endpoints.remove(uri);
+ stopServices(oldEndpoint);
+ }
+ return oldEndpoint;
+ }
+
/**
* Resolves the given URI to an endpoint
*/
@@ -164,7 +186,7 @@
// HC: What's the idea behind starting an endpoint?
// I don't think we have any endpoints that are services
do we?
if (answer != null) {
- ServiceHelper.startServices(answer);
+ startServices(answer);
endpoints.put(uri, answer);
}
}
@@ -280,7 +302,7 @@
}
protected void doStop() throws Exception {
- ServiceHelper.stopServices(servicesToClose);
+ stopServices(servicesToClose);
}
/**
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java?view=auto&rev=532654
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,43 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import org.apache.camel.Processor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+/**
+ * A strategy for load balancing across a number of [EMAIL PROTECTED]
Processor} instances
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface LoadBalancer<E extends Exchange> extends Processor<E> {
+ /**
+ * Adds a new processor to the load balancer
+ *
+ * @param processor the processor to be added to the load balancer
+ */
+ void addProcessor(Processor<E> processor);
+
+ /**
+ * Removes the given processor from the load balancer
+ *
+ * @param processor the processor to be removed from the load balancer
+ */
+ void removeProcessor(Processor<E> processor);
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java?view=auto&rev=532654
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * A default base class for a [EMAIL PROTECTED] LoadBalancer} implementation
+ *
+ * @version $Revision: 1.1 $
+ */
+public abstract class LoadBalancerSupport<E extends Exchange> implements
LoadBalancer<E> {
+ private List<Processor<E>> processors = new
CopyOnWriteArrayList<Processor<E>>();
+
+ public void addProcessor(Processor<E> processor) {
+ processors.add(processor);
+ }
+
+ public void removeProcessor(Processor<E> processor) {
+ processors.remove(processor);
+ }
+
+ /**
+ * Returns the current processors available to this load balancer
+ *
+ * @return the processors available
+ */
+ public List<Processor<E>> getProcessors() {
+ return processors;
+ }
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancerSupport.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java?view=auto&rev=532654
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import java.util.List;
+
+/**
+ * A base class for [EMAIL PROTECTED] LoadBalancer} implementations which
choose a single destination for each exchange
+ * (rather like JMS Queues)
+ *
+ * @version $Revision: 1.1 $
+ */
+public abstract class QueueLoadBalancer<E extends Exchange> extends
LoadBalancerSupport<E> {
+
+ public void process(E exchange) {
+ List<Processor<E>> list = getProcessors();
+ if (list.isEmpty()) {
+ throw new IllegalStateException("No processors available to
process " + exchange);
+ }
+ Processor<E> processor = chooseProcessor(list, exchange);
+ if (processor == null) {
+ throw new IllegalStateException("No processors could be chosen to
process " + exchange);
+ }
+ else {
+ processor.process(exchange);
+ }
+ }
+
+ protected abstract Processor<E> chooseProcessor(List<Processor<E>>
processors, E exchange);
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RandomLoadBalancer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RandomLoadBalancer.java?view=auto&rev=532654
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RandomLoadBalancer.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RandomLoadBalancer.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import java.util.List;
+
+/**
+ * Implements the random load balancing policy
+ *
+ * @version $Revision: 1.1 $
+ */
+public class RandomLoadBalancer<E extends Exchange> extends
QueueLoadBalancer<E> {
+
+ protected synchronized Processor<E> chooseProcessor(List<Processor<E>>
processors, E exchange) {
+ int size = processors.size();
+ while (true) {
+ int index = (int) Math.round(Math.random() * size);
+ if (index < size) {
+ return processors.get(index);
+ }
+ }
+ }
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RandomLoadBalancer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RoundRobinLoadBalancer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RoundRobinLoadBalancer.java?view=auto&rev=532654
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RoundRobinLoadBalancer.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RoundRobinLoadBalancer.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import java.util.List;
+
+/**
+ * Implements the round robin load balancing policy
+ *
+ * @version $Revision: 1.1 $
+ */
+public class RoundRobinLoadBalancer<E extends Exchange> extends
QueueLoadBalancer<E> {
+ private int counter = -1;
+
+ protected synchronized Processor<E> chooseProcessor(List<Processor<E>>
processors, E exchange) {
+ int size = processors.size();
+ if (++counter >= size) {
+ counter = 0;
+ }
+ return processors.get(counter);
+ }
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/RoundRobinLoadBalancer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java?view=auto&rev=532654
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,113 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Processor;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Iterator;
+
+/**
+ * Implements a sticky load balancer using an [EMAIL PROTECTED] Expression} to
calculate
+ * a correlation key to perform the sticky load balancing; rather like
jsessionid in the web
+ * or JMSXGroupID in JMS.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class StickyLoadBalancer<E extends Exchange> extends
QueueLoadBalancer<E> {
+ private Expression<E> correlationExpression;
+ private QueueLoadBalancer loadBalancer;
+ private int numberOfHashGroups = 64 * 1024;
+ private Map<Object, Processor<E>> stickyMap = new HashMap<Object,
Processor<E>>();
+
+ public StickyLoadBalancer(Expression<E> correlationExpression) {
+ this(correlationExpression, new RoundRobinLoadBalancer());
+ }
+
+ public StickyLoadBalancer(Expression<E> correlationExpression,
QueueLoadBalancer loadBalancer) {
+ this.correlationExpression = correlationExpression;
+ this.loadBalancer = loadBalancer;
+ }
+
+ protected synchronized Processor<E> chooseProcessor(List<Processor<E>>
processors, E exchange) {
+ Object value = correlationExpression.evaluate(exchange);
+ Object key = getStickyKey(value);
+
+ Processor<E> processor;
+ synchronized (stickyMap) {
+ processor = stickyMap.get(key);
+ if (processor == null) {
+ processor = loadBalancer.chooseProcessor(processors, exchange);
+ stickyMap.put(key, processor);
+ }
+ }
+ return processor;
+ }
+
+ @Override
+ public void removeProcessor(Processor<E> processor) {
+ synchronized (stickyMap) {
+ Iterator<Map.Entry<Object,Processor<E>>> iter =
stickyMap.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<Object, Processor<E>> entry = iter.next();
+ if (processor.equals(entry.getValue())) {
+ iter.remove();
+ }
+ }
+ }
+ super.removeProcessor(processor);
+ }
+
+
+ // Properties
+ //-------------------------------------------------------------------------
+ public int getNumberOfHashGroups() {
+ return numberOfHashGroups;
+ }
+
+ public void setNumberOfHashGroups(int numberOfHashGroups) {
+ this.numberOfHashGroups = numberOfHashGroups;
+ }
+
+ // Implementation methods
+ //-------------------------------------------------------------------------
+
+ /**
+ * A strategy to create the key for the sticky load balancing map.
+ * The default implementation uses the hash code of the value
+ * then modulos by the numberOfHashGroups to avoid the sticky map getting
too big
+ *
+ * @param value the correlation value
+ * @return the key to be used in the sticky map
+ */
+ protected Object getStickyKey(Object value) {
+ int hashCode = 37;
+ if (value != null) {
+ hashCode = value.hashCode();
+ }
+ if (numberOfHashGroups > 0) {
+ hashCode = hashCode % numberOfHashGroups;
+ }
+ return hashCode;
+ }
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/StickyLoadBalancer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java?view=auto&rev=532654
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,50 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.loadbalancer;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.Pipeline;
+
+import java.util.List;
+
+/**
+ * A [EMAIL PROTECTED] LoadBalancer} implementations which sends to all
destinations (rather like JMS Topics)
+ *
+ * @version $Revision: 1.1 $
+ */
+public class TopicLoadBalancer<E extends Exchange> extends
LoadBalancerSupport<E> {
+ public void process(E exchange) {
+ List<Processor<E>> list = getProcessors();
+ for (Processor<E> processor : list) {
+ E copy = copyExchangeStrategy(processor, exchange);
+ processor.process(copy);
+ }
+ }
+
+ /**
+ * Strategy method to copy the exchange before sending to another
endpoint. Derived classes such as the
+ * [EMAIL PROTECTED] Pipeline} will not clone the exchange
+ *
+ * @param processor the processor that will send the exchange
+ * @param exchange @return the current exchange if no copying is required
such as for a pipeline otherwise a new copy of the exchange is returned.
+ */
+ protected E copyExchangeStrategy(Processor<E> processor, E exchange) {
+ return (E) exchange.copy();
+ }
+}
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/package.html
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/package.html?view=auto&rev=532654
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/package.html
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/package.html
Thu Apr 26 01:24:06 2007
@@ -0,0 +1,25 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+Various load balancer processors
+
+</body>
+</html>
Propchange:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/package.html
------------------------------------------------------------------------------
svn:eol-style = native