Author: supun
Date: Mon Apr 18 19:45:14 2011
New Revision: 1094709

URL: http://svn.apache.org/viewvc?rev=1094709&view=rev
Log:
adding jmx support for weighted load balance algorithm

Modified:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java

Modified: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java?rev=1094709&r1=1094708&r2=1094709&view=diff
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java
 (original)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java
 Mon Apr 18 19:45:14 2011
@@ -20,6 +20,7 @@
 package org.apache.synapse.endpoints.algorithms;
 
 import org.apache.axis2.clustering.Member;
+import org.apache.synapse.commons.jmx.MBeanRegistrar;
 import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.ManagedLifecycle;
@@ -33,6 +34,9 @@ import org.apache.commons.logging.LogFac
 import java.util.List;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * This algorithm sends messages based on the weights of the endpoints. For 
example we may
@@ -75,6 +79,10 @@ public class WeightedRoundRobin implemen
 
     private AlgorithmThreadLocal threadedAlgorithm = null;
 
+    private ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private WeightedRoundRobinViewMBean view;
+
     /** we are not supporting members */
     public void setApplicationMembers(List<Member> members) {
         throw new UnsupportedOperationException("This algorithm doesn't 
operate on Members");
@@ -90,41 +98,48 @@ public class WeightedRoundRobin implemen
 
     public Endpoint getNextEndpoint(MessageContext synapseMessageContext,
                                     AlgorithmContext algorithmContext) {
-        if (!isThreadLocal) {
-            synchronized (this) {
-                EndpointState state = endpointStates[endpointCursor];
-                if (state.getCurrentWeight() == 0) {
-                    // reset the current state
-                    state.reset();
-
-                    // go to the next enpoint
-                    if (endpointCursor == endpointStates.length - 1) {
-                        endpointCursor = 0;
-                    } else {
-                        ++endpointCursor;
-                    }
 
-                    state = endpointStates[endpointCursor];
-                }
-
-                // we are about to use this endpoint, so decrement its current 
count
-                state.decrementCurrentWeight();
+        Lock readLock = lock.readLock();
+        readLock.lock();
+        try {
+            if (!isThreadLocal) {
+                synchronized (this) {
+                    EndpointState state = endpointStates[endpointCursor];
+                    if (state.getCurrentWeight() == 0) {
+                        // reset the current state
+                        state.reset();
+
+                        // go to the next endpoint
+                        if (endpointCursor == endpointStates.length - 1) {
+                            endpointCursor = 0;
+                        } else {
+                            ++endpointCursor;
+                        }
 
-                // return the endpoint corresponfing to the current poistion
-                return endpoints.get(state.getEndpointPosition());
-            }
-        } else {
-            if (threadedAlgorithm != null) {
-                Algorithm algo = threadedAlgorithm.get();
+                        state = endpointStates[endpointCursor];
+                    }
 
-                int position = algo.getNextEndpoint();
+                    // we are about to use this endpoint, so decrement its 
current count
+                    state.decrementCurrentWeight();
 
-                return endpoints.get(position);
+                    // return the endpoint corresponding to the current 
position
+                    return endpoints.get(state.getEndpointPosition());
+                }
             } else {
-                String msg = "Algorithm: WeightedRoundRobin algorithm not 
initialized properly";
-                log.error(msg);
-                throw new SynapseException(msg);
+                if (threadedAlgorithm != null) {
+                    Algorithm algo = threadedAlgorithm.get();
+
+                    int position = algo.getNextEndpoint();
+
+                    return endpoints.get(position);
+                } else {
+                    String msg = "Algorithm: WeightedRoundRobin algorithm not 
initialized properly";
+                    log.error(msg);
+                    throw new SynapseException(msg);
+                }
             }
+        } finally {
+            readLock.unlock();
         }
     }        
 
@@ -160,7 +175,7 @@ public class WeightedRoundRobin implemen
                 endpointStates[i] = state;
             } else {
                 MediatorProperty property =
-                        
((PropertyInclude)endpoint).getProperty(LOADBALANCE_WEIGHT);
+                        ((PropertyInclude) 
endpoint).getProperty(LOADBALANCE_WEIGHT);
                 EndpointState state;
                 if (property != null) {
                     int weight = Integer.parseInt(property.getValue());
@@ -180,13 +195,6 @@ public class WeightedRoundRobin implemen
             }
         }
 
-        // now we are going to sort
-        Arrays.sort(endpointStates, new Comparator<EndpointState>() {
-            public int compare(EndpointState o1, EndpointState o2) {
-                return o2.getWeight() - o1.getWeight();
-            }
-        });
-
         if (loadBalanceEndpoint instanceof PropertyInclude) {
             MediatorProperty threadLocalProperty = ((PropertyInclude) 
loadBalanceEndpoint).
                     getProperty(LOADBALANCE_ThEADLOCAL);
@@ -195,6 +203,11 @@ public class WeightedRoundRobin implemen
                 isThreadLocal = true;
             }
         }
+
+        view = new WeightedRoundRobinView(this);
+
+        MBeanRegistrar.getInstance().registerMBean(view, "LBAlgorithms",
+                loadBalanceEndpoint.getName() != null ? 
loadBalanceEndpoint.getName() : "LBEpr");
     }
 
     public void destroy() {}
@@ -290,10 +303,6 @@ public class WeightedRoundRobin implemen
             return currentWeight;
         }
 
-        public void setCurrentWeight(int currentWeight) {
-            this.currentWeight = currentWeight;
-        }
-
         public void decrementCurrentWeight() {
             --currentWeight;
         }
@@ -302,4 +311,37 @@ public class WeightedRoundRobin implemen
             currentWeight = weight;
         }
     }
+
+    private void calculate() {
+        // now we are going to sort
+        Arrays.sort(endpointStates, new Comparator<EndpointState>() {
+            public int compare(EndpointState o1, EndpointState o2) {
+                return o2.getWeight() - o1.getWeight();
+            }
+        });
+    }
+
+    public void changeWeight(int pos, int weight) {
+        Lock writeLock = lock.writeLock();
+        writeLock.lock();
+        try {
+            EndpointState state = null;
+            for (EndpointState s : endpointStates) {
+                if (s.getEndpointPosition() == pos) {
+                    state = s;
+                }
+            }
+
+            if (state == null) {
+                throw new SynapseException("The specified endpoint position 
cannot be found");
+            }
+
+            state.weight = weight;
+
+            calculate();
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
 }


Reply via email to