Author: supun
Date: Mon Apr 18 19:45:14 2011
New Revision: 1094709
URL: http://svn.apache.org/viewvc?rev=1094709&view=rev
Log:
adding jmx support for weighted load balance algorithm
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java?rev=1094709&r1=1094708&r2=1094709&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java
Mon Apr 18 19:45:14 2011
@@ -20,6 +20,7 @@
package org.apache.synapse.endpoints.algorithms;
import org.apache.axis2.clustering.Member;
+import org.apache.synapse.commons.jmx.MBeanRegistrar;
import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.MessageContext;
import org.apache.synapse.ManagedLifecycle;
@@ -33,6 +34,9 @@ import org.apache.commons.logging.LogFac
import java.util.List;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* This algorithm sends messages based on the weights of the endpoints. For
example we may
@@ -75,6 +79,10 @@ public class WeightedRoundRobin implemen
private AlgorithmThreadLocal threadedAlgorithm = null;
+ private ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private WeightedRoundRobinViewMBean view;
+
/** we are not supporting members */
public void setApplicationMembers(List<Member> members) {
throw new UnsupportedOperationException("This algorithm doesn't
operate on Members");
@@ -90,41 +98,48 @@ public class WeightedRoundRobin implemen
public Endpoint getNextEndpoint(MessageContext synapseMessageContext,
AlgorithmContext algorithmContext) {
- if (!isThreadLocal) {
- synchronized (this) {
- EndpointState state = endpointStates[endpointCursor];
- if (state.getCurrentWeight() == 0) {
- // reset the current state
- state.reset();
-
- // go to the next enpoint
- if (endpointCursor == endpointStates.length - 1) {
- endpointCursor = 0;
- } else {
- ++endpointCursor;
- }
- state = endpointStates[endpointCursor];
- }
-
- // we are about to use this endpoint, so decrement its current
count
- state.decrementCurrentWeight();
+ Lock readLock = lock.readLock();
+ readLock.lock();
+ try {
+ if (!isThreadLocal) {
+ synchronized (this) {
+ EndpointState state = endpointStates[endpointCursor];
+ if (state.getCurrentWeight() == 0) {
+ // reset the current state
+ state.reset();
+
+ // go to the next endpoint
+ if (endpointCursor == endpointStates.length - 1) {
+ endpointCursor = 0;
+ } else {
+ ++endpointCursor;
+ }
- // return the endpoint corresponfing to the current poistion
- return endpoints.get(state.getEndpointPosition());
- }
- } else {
- if (threadedAlgorithm != null) {
- Algorithm algo = threadedAlgorithm.get();
+ state = endpointStates[endpointCursor];
+ }
- int position = algo.getNextEndpoint();
+ // we are about to use this endpoint, so decrement its
current count
+ state.decrementCurrentWeight();
- return endpoints.get(position);
+ // return the endpoint corresponding to the current
position
+ return endpoints.get(state.getEndpointPosition());
+ }
} else {
- String msg = "Algorithm: WeightedRoundRobin algorithm not
initialized properly";
- log.error(msg);
- throw new SynapseException(msg);
+ if (threadedAlgorithm != null) {
+ Algorithm algo = threadedAlgorithm.get();
+
+ int position = algo.getNextEndpoint();
+
+ return endpoints.get(position);
+ } else {
+ String msg = "Algorithm: WeightedRoundRobin algorithm not
initialized properly";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
}
+ } finally {
+ readLock.unlock();
}
}
@@ -160,7 +175,7 @@ public class WeightedRoundRobin implemen
endpointStates[i] = state;
} else {
MediatorProperty property =
-
((PropertyInclude)endpoint).getProperty(LOADBALANCE_WEIGHT);
+ ((PropertyInclude)
endpoint).getProperty(LOADBALANCE_WEIGHT);
EndpointState state;
if (property != null) {
int weight = Integer.parseInt(property.getValue());
@@ -180,13 +195,6 @@ public class WeightedRoundRobin implemen
}
}
- // now we are going to sort
- Arrays.sort(endpointStates, new Comparator<EndpointState>() {
- public int compare(EndpointState o1, EndpointState o2) {
- return o2.getWeight() - o1.getWeight();
- }
- });
-
if (loadBalanceEndpoint instanceof PropertyInclude) {
MediatorProperty threadLocalProperty = ((PropertyInclude)
loadBalanceEndpoint).
getProperty(LOADBALANCE_ThEADLOCAL);
@@ -195,6 +203,11 @@ public class WeightedRoundRobin implemen
isThreadLocal = true;
}
}
+
+ view = new WeightedRoundRobinView(this);
+
+ MBeanRegistrar.getInstance().registerMBean(view, "LBAlgorithms",
+ loadBalanceEndpoint.getName() != null ?
loadBalanceEndpoint.getName() : "LBEpr");
}
public void destroy() {}
@@ -290,10 +303,6 @@ public class WeightedRoundRobin implemen
return currentWeight;
}
- public void setCurrentWeight(int currentWeight) {
- this.currentWeight = currentWeight;
- }
-
public void decrementCurrentWeight() {
--currentWeight;
}
@@ -302,4 +311,37 @@ public class WeightedRoundRobin implemen
currentWeight = weight;
}
}
+
+ private void calculate() {
+ // now we are going to sort
+ Arrays.sort(endpointStates, new Comparator<EndpointState>() {
+ public int compare(EndpointState o1, EndpointState o2) {
+ return o2.getWeight() - o1.getWeight();
+ }
+ });
+ }
+
+ public void changeWeight(int pos, int weight) {
+ Lock writeLock = lock.writeLock();
+ writeLock.lock();
+ try {
+ EndpointState state = null;
+ for (EndpointState s : endpointStates) {
+ if (s.getEndpointPosition() == pos) {
+ state = s;
+ }
+ }
+
+ if (state == null) {
+ throw new SynapseException("The specified endpoint position
cannot be found");
+ }
+
+ state.weight = weight;
+
+ calculate();
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
}