Author: ningjiang
Date: Tue Dec 9 05:04:14 2008
New Revision: 724681
URL: http://svn.apache.org/viewvc?rev=724681&view=rev
Log:
CAMEL-1161, CAMEL-1162 did refactoring work loadbalance
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerType.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/StickyLoadBalanceStrategy.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/TopicLoadBalanceStrategy.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerType.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerType.java?rev=724681&r1=724680&r2=724681&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerType.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerType.java
Tue Dec 9 05:04:14 2008
@@ -23,6 +23,7 @@
import javax.xml.bind.annotation.XmlTransient;
import javax.xml.bind.annotation.XmlType;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.model.IdentifiedType;
@@ -131,4 +132,20 @@
loadBalancer.process(exchange);
}
+ public boolean process(Exchange exchange, final AsyncCallback callback) {
+ ObjectHelper.notNull(loadBalancer, "loadBalancer");
+
+ return loadBalancer.process(exchange, new AsyncCallback() {
+ public void done(boolean doneSynchronously) {
+ // Only handle the async case...
+ if (doneSynchronously) {
+ return;
+ } else {
+ callback.done(doneSynchronously);
+ }
+ }
+ });
+
+ }
+
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/StickyLoadBalanceStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/StickyLoadBalanceStrategy.java?rev=724681&r1=724680&r2=724681&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/StickyLoadBalanceStrategy.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/StickyLoadBalanceStrategy.java
Tue Dec 9 05:04:14 2008
@@ -32,7 +32,7 @@
public class StickyLoadBalanceStrategy extends LoadBalancerType {
@XmlElement(required = true, name = "expression", type =
ExpressionType.class)
private ExpressionType expressionType;
- @XmlElement(required = false, name = "loadBalancer", type =
ExpressionType.class)
+ @XmlElement(required = false, name = "loadBalancer", type =
LoadBalancerType.class)
private LoadBalancerType loadBalancerType;
public StickyLoadBalanceStrategy() {
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/TopicLoadBalanceStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/TopicLoadBalanceStrategy.java?rev=724681&r1=724680&r2=724681&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/TopicLoadBalanceStrategy.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/TopicLoadBalanceStrategy.java
Tue Dec 9 05:04:14 2008
@@ -23,6 +23,7 @@
/**
* Represents an XML <topic/> element
+ * @deprecated
*/
@XmlRootElement(name = "topic")
public class TopicLoadBalanceStrategy extends LoadBalancerType {
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java?rev=724681&r1=724680&r2=724681&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
Tue Dec 9 05:04:14 2008
@@ -18,6 +18,7 @@
import java.util.List;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Processor;
/**
@@ -25,7 +26,7 @@
*
* @version $Revision$
*/
-public interface LoadBalancer extends Processor {
+public interface LoadBalancer extends AsyncProcessor {
/**
* Adds a new processor to the load balancer
*
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java?rev=724681&r1=724680&r2=724681&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
Tue Dec 9 05:04:14 2008
@@ -18,6 +18,8 @@
import java.util.List;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -41,6 +43,41 @@
processor.process(exchange);
}
}
+
+ public boolean process(final Exchange exchange, final AsyncCallback
callback) {
+ boolean sync = false;
+ List<Processor> list = getProcessors();
+ if (list.isEmpty()) {
+ throw new IllegalStateException("No processors available to
process " + exchange);
+ }
+ Processor processor = chooseProcessor(list, exchange);
+ if (processor == null) {
+ throw new IllegalStateException("No processors could be chosen to
process " + exchange);
+ } else {
+ if (processor instanceof AsyncProcessor) {
+ AsyncProcessor asyncProcessor = (AsyncProcessor)processor;
+ sync = asyncProcessor.process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+ // Only handle the async case...
+ if (sync) {
+ return;
+ } else {
+ callback.done(sync);
+ }
+ }
+ });
+ } else {
+ try {
+ processor.process(exchange);
+ } catch (Exception ex) {
+ exchange.setException(ex);
+ }
+ callback.done(false);
+ }
+ }
+ return sync;
+
+ }
protected abstract Processor chooseProcessor(List<Processor> processors,
Exchange exchange);
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java?rev=724681&r1=724680&r2=724681&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
Tue Dec 9 05:04:14 2008
@@ -18,14 +18,18 @@
import java.util.List;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
/**
* A [EMAIL PROTECTED] LoadBalancer} implementations which sends to all
destinations
- * (rather like JMS Topics)
+ * (rather like JMS Topics).
+ * It is deprecated in Camel 2.0, you need to move to use multicast,
+ * if you want to send the message to all destinations.
*
* @version $Revision$
+ * @deprecated
*/
public class TopicLoadBalancer extends LoadBalancerSupport {
@@ -50,4 +54,18 @@
protected Exchange copyExchangeStrategy(Processor processor, Exchange
exchange) {
return exchange.copy();
}
+
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ List<Processor> list = getProcessors();
+ for (Processor processor : list) {
+ Exchange copy = copyExchangeStrategy(processor, exchange);
+ try {
+ processor.process(copy);
+ } catch (Exception ex) {
+ // We don't handle the exception here
+ }
+ }
+ callback.done(false);
+ return false;
+ }
}