Author: ningjiang
Date: Tue Dec  9 05:04:14 2008
New Revision: 724681

URL: http://svn.apache.org/viewvc?rev=724681&view=rev
Log:
CAMEL-1161, CAMEL-1162 did refactoring work loadbalance

Modified:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerType.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/StickyLoadBalanceStrategy.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/TopicLoadBalanceStrategy.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerType.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerType.java?rev=724681&r1=724680&r2=724681&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerType.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerType.java
 Tue Dec  9 05:04:14 2008
@@ -23,6 +23,7 @@
 import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.XmlType;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.model.IdentifiedType;
@@ -131,4 +132,20 @@
         loadBalancer.process(exchange);
     }
 
+    public boolean process(Exchange exchange, final AsyncCallback callback) {
+        ObjectHelper.notNull(loadBalancer, "loadBalancer");
+        
+        return loadBalancer.process(exchange, new AsyncCallback() {
+            public void done(boolean doneSynchronously) {
+                // Only handle the async case...
+                if (doneSynchronously) {
+                    return;
+                } else {
+                    callback.done(doneSynchronously);
+                }
+            }
+        });                
+    
+    }
+
 }

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/StickyLoadBalanceStrategy.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/StickyLoadBalanceStrategy.java?rev=724681&r1=724680&r2=724681&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/StickyLoadBalanceStrategy.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/StickyLoadBalanceStrategy.java
 Tue Dec  9 05:04:14 2008
@@ -32,7 +32,7 @@
 public class StickyLoadBalanceStrategy extends LoadBalancerType {
     @XmlElement(required = true, name = "expression", type = 
ExpressionType.class)
     private ExpressionType expressionType;
-    @XmlElement(required = false, name = "loadBalancer", type = 
ExpressionType.class)
+    @XmlElement(required = false, name = "loadBalancer", type = 
LoadBalancerType.class)
     private LoadBalancerType loadBalancerType;
 
     public StickyLoadBalanceStrategy() {

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/TopicLoadBalanceStrategy.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/TopicLoadBalanceStrategy.java?rev=724681&r1=724680&r2=724681&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/TopicLoadBalanceStrategy.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/TopicLoadBalanceStrategy.java
 Tue Dec  9 05:04:14 2008
@@ -23,6 +23,7 @@
 
 /**
  * Represents an XML <topic/> element
+ * @deprecated
  */
 @XmlRootElement(name = "topic")
 public class TopicLoadBalanceStrategy extends LoadBalancerType {

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java?rev=724681&r1=724680&r2=724681&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
 Tue Dec  9 05:04:14 2008
@@ -18,6 +18,7 @@
 
 import java.util.List;
 
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Processor;
 
 /**
@@ -25,7 +26,7 @@
  *
  * @version $Revision$
  */
-public interface LoadBalancer extends Processor {
+public interface LoadBalancer extends AsyncProcessor {
     /**
      * Adds a new processor to the load balancer
      *

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java?rev=724681&r1=724680&r2=724681&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
 Tue Dec  9 05:04:14 2008
@@ -18,6 +18,8 @@
 
 import java.util.List;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 
@@ -41,6 +43,41 @@
             processor.process(exchange);
         }
     }
+    
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
+        boolean sync = false;
+        List<Processor> list = getProcessors();
+        if (list.isEmpty()) {
+            throw new IllegalStateException("No processors available to 
process " + exchange);
+        }
+        Processor processor = chooseProcessor(list, exchange);
+        if (processor == null) {
+            throw new IllegalStateException("No processors could be chosen to 
process " + exchange);
+        } else {
+            if (processor instanceof AsyncProcessor) {
+                AsyncProcessor asyncProcessor = (AsyncProcessor)processor;
+                sync = asyncProcessor.process(exchange, new AsyncCallback() {
+                    public void done(boolean sync) {
+                        // Only handle the async case...
+                        if (sync) {
+                            return;
+                        } else {
+                            callback.done(sync);                        
+                        }
+                    }
+                });                
+            } else {
+                try {
+                    processor.process(exchange);
+                } catch (Exception ex) {
+                    exchange.setException(ex);
+                }
+                callback.done(false);                
+            }            
+        }
+        return sync;
+        
+    }
 
     protected abstract Processor chooseProcessor(List<Processor> processors, 
Exchange exchange);
 }

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java?rev=724681&r1=724680&r2=724681&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
 Tue Dec  9 05:04:14 2008
@@ -18,14 +18,18 @@
 
 import java.util.List;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 
 /**
  * A [EMAIL PROTECTED] LoadBalancer} implementations which sends to all 
destinations
- * (rather like JMS Topics)
+ * (rather like JMS Topics). 
+ *  It is deprecated in Camel 2.0, you need to move to use multicast, 
+ *  if you want to send the message to all destinations.
  * 
  * @version $Revision$
+ * @deprecated
  */
 public class TopicLoadBalancer extends LoadBalancerSupport {
 
@@ -50,4 +54,18 @@
     protected Exchange copyExchangeStrategy(Processor processor, Exchange 
exchange) {
         return exchange.copy();
     }
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        List<Processor> list = getProcessors();
+        for (Processor processor : list) {
+            Exchange copy = copyExchangeStrategy(processor, exchange);
+            try {
+                processor.process(copy);
+            } catch (Exception ex) {
+                // We don't handle the exception here
+            }
+        }
+        callback.done(false);
+        return false;
+    }
 }


Reply via email to