Author: supun
Date: Fri May 14 10:56:06 2010
New Revision: 944203
URL: http://svn.apache.org/viewvc?rev=944203&view=rev
Log:
adding weighted load balance algorithm
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java?rev=944203&r1=944202&r2=944203&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/LoadbalanceEndpointFactory.java
Fri May 14 10:56:06 2010
@@ -94,6 +94,7 @@ public final class LoadbalanceEndpointFa
algorithm =
LoadbalanceAlgorithmFactory.
createLoadbalanceAlgorithm(loadbalanceElement,
endpoints);
+ algorithm.setLoadBalanceEndpoint(loadbalanceEndpoint);
} else if (loadbalanceElement.getFirstChildWithName(MEMBER) !=
null) {
if(loadbalanceElement.
getChildrenWithName((XMLConfigConstants.ENDPOINT_ELT)).hasNext()){
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java?rev=944203&r1=944202&r2=944203&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/LoadbalanceEndpoint.java
Fri May 14 10:56:06 2010
@@ -22,7 +22,6 @@ package org.apache.synapse.endpoints;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.clustering.Member;
-import org.apache.http.protocol.HTTP;
import org.apache.synapse.*;
import org.apache.synapse.core.axis2.Axis2SynapseEnvironment;
import org.apache.synapse.core.axis2.Axis2MessageContext;
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java?rev=944203&r1=944202&r2=944203&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/LoadbalanceAlgorithm.java
Fri May 14 10:56:06 2010
@@ -46,6 +46,13 @@ public interface LoadbalanceAlgorithm {
void setEndpoints(List<Endpoint> endpoints);
/**
+ * Set the loadbalance endpoint
+ *
+ * @param endpoint the endpoint which uses this algorithm
+ */
+ void setLoadBalanceEndpoint(Endpoint endpoint);
+
+ /**
* This method returns the next node according to the algorithm
implementation.
*
* @param synapseMessageContext SynapseMessageContext of the current
message
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java?rev=944203&r1=944202&r2=944203&view=diff
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java
(original)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/RoundRobin.java
Fri May 14 10:56:06 2010
@@ -58,6 +58,8 @@ public class RoundRobin implements Loadb
this.endpoints = endpoints;
}
+ public void setLoadBalanceEndpoint(Endpoint endpoint) {}
+
/**
* Choose an active endpoint using the round robin algorithm. If there are
no active endpoints
* available, returns null.
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java
URL:
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java?rev=944203&view=auto
==============================================================================
---
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java
(added)
+++
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRoundRobin.java
Fri May 14 10:56:06 2010
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.endpoints.algorithms;
+
+import org.apache.axis2.clustering.Member;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.ManagedLifecycle;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.PropertyInclude;
+import org.apache.synapse.mediators.MediatorProperty;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.List;
+import java.util.Arrays;
+import java.util.Comparator;
+
+/**
+ * This algorithm sends messages based on the weights of the endpoints. For
example we may
+ * have 3 endpoints with following weights.</p>
+ * <ul>
+ * <li>Epr 1: 5</li>
+ * <li>Epr 2: 3</li>
+ * <li>Epr 3: 2</li>
+ * </ul>
+ * <p> This algorithm will send the first 5 messages through Epr1, next 3
messages through
+ * Epr2 and next 2 messages with Epr3. Then algorithm moves again to the first
endpoint
+ * and cycle continues.</p>
+ */
+public class WeightedRoundRobin implements LoadbalanceAlgorithm,
ManagedLifecycle {
+ private static final Log log = LogFactory.getLog(WeightedRoundRobin.class);
+
+ /** We keep a sorted array of endpoint states, first state will point to
the
+ * endpoint with the highest weight */
+ private EndpointState[] endpointStates = null;
+
+ /** Endpoint list */
+ private List<Endpoint> endpoints;
+
+ private Endpoint loadBalanceEndpoint;
+
+ /** Keep track of the current poistion we are operating on the
endpointStates array */
+ private int endpointCursor = 0;
+
+ /** If a weight is not specified by the user, we use the default as 1 */
+ private static final int DEFAULT_WEIGHT = 1;
+
+ /** Configuration key used by the endpoints for indicating their weight */
+ private static final String LOADBALANCE_WEIGHT = "loadbalance.weight";
+
+ /** Configuration key used by the endpoints for indicating their weight */
+ private static final String LOADBALANCE_ThEADLOCAL =
"loadbalance.threadLocal";
+
+ private boolean isThreadLocal = false;
+
+ private AlgorithmThreadLocal threadedAlgorithm = null;
+
+ /** we are not supporting members */
+ public void setApplicationMembers(List<Member> members) {
+ throw new UnsupportedOperationException("This algorithm doesn't
operate on Members");
+ }
+
+ public void setEndpoints(List<Endpoint> endpoints) {
+ this.endpoints = endpoints;
+ }
+
+ public void setLoadBalanceEndpoint(Endpoint endpoint) {
+ this.loadBalanceEndpoint = endpoint;
+ }
+
+ public Endpoint getNextEndpoint(MessageContext synapseMessageContext,
+ AlgorithmContext algorithmContext) {
+ if (!isThreadLocal) {
+ synchronized (this) {
+ EndpointState state = endpointStates[endpointCursor];
+ if (state.getCurrentWeight() == 0) {
+ // reset the current state
+ state.reset();
+
+ // go to the next enpoint
+ if (endpointCursor == endpointStates.length - 1) {
+ endpointCursor = 0;
+ } else {
+ ++endpointCursor;
+ }
+
+ state = endpointStates[endpointCursor];
+ }
+
+ // we are about to use this endpoint, so decrement its current
count
+ state.decrementCurrentWeight();
+
+ // return the endpoint corresponfing to the current poistion
+ return endpoints.get(state.getEndpointPosition());
+ }
+ } else {
+ if (threadedAlgorithm != null) {
+ Algorithm algo = threadedAlgorithm.get();
+
+ int position = algo.getNextEndpoint();
+
+ return endpoints.get(position);
+ } else {
+ String msg = "Algorithm: WeightedRoundRobin algorithm not
initialized properly";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ }
+ }
+
+ public Member getNextApplicationMember(AlgorithmContext algorithmContext) {
+ throw new UnsupportedOperationException("This algorithm doesn't
operate on Members");
+ }
+
+ public void reset(AlgorithmContext algorithmContext) {
+ for (EndpointState state : endpointStates) {
+ state.reset();
+ }
+
+ endpointCursor = 0;
+ }
+
+ public String getName() {
+ return WeightedRoundRobin.class.getName();
+ }
+
+ public void init(SynapseEnvironment se) {
+ if (endpoints == null) {
+ String msg = "Endpoints are not set, cannot initialize the
algorithm";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+
+ endpointStates = new EndpointState[endpoints.size()];
+
+ for (int i = 0; i < endpoints.size(); i++) {
+ Endpoint endpoint = endpoints.get(i);
+ if (!(endpoint instanceof PropertyInclude)) {
+ EndpointState state = new EndpointState(i, DEFAULT_WEIGHT);
+ endpointStates[i] = state;
+ } else {
+ MediatorProperty property =
+
((PropertyInclude)endpoint).getProperty(LOADBALANCE_WEIGHT);
+ EndpointState state;
+ if (property != null) {
+ int weight = Integer.parseInt(property.getValue());
+
+ if (weight <= 0) {
+ String msg = "Weight must be greater than zero";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+
+ state = new EndpointState(i, weight);
+ } else {
+ state = new EndpointState(i, DEFAULT_WEIGHT);
+ }
+
+ endpointStates[i] = state;
+ }
+ }
+
+ // now we are going to sort
+ Arrays.sort(endpointStates, new Comparator<EndpointState>() {
+ public int compare(EndpointState o1, EndpointState o2) {
+ return o2.getWeight() - o1.getWeight();
+ }
+ });
+
+ if (loadBalanceEndpoint instanceof PropertyInclude) {
+ MediatorProperty threadLocalProperty = ((PropertyInclude)
loadBalanceEndpoint).
+ getProperty(LOADBALANCE_ThEADLOCAL);
+
+ if (threadLocalProperty != null &&
threadLocalProperty.getValue().equals("true")) {
+ isThreadLocal = true;
+ }
+ }
+ }
+
+ public void destroy() {}
+
+ private class AlgorithmThreadLocal extends ThreadLocal<Algorithm> {
+ @Override
+ protected Algorithm initialValue() {
+ return new Algorithm(endpointStates);
+ }
+ }
+
+ private class Algorithm {
+ /**
+ * We keep a sorted array of endpoint states, first state will point
to the
+ * endpoint with the highest weight
+ */
+ private EndpointState[] threadLocalEndpointStates = null;
+
+ /**
+ * Keep track of the current poistion we are operating on the
endpointStates array
+ */
+ private int threadLocalEndpointCursor = 0;
+
+ public Algorithm(EndpointState[] states) {
+ threadLocalEndpointStates = new EndpointState[states.length];
+ for (int i = 0; i < states.length; i++) {
+ threadLocalEndpointStates[i] = new
EndpointState(states[i].getEndpointPosition(),
+ states[i].getWeight());
+ }
+ }
+
+ public int getNextEndpoint() {
+ EndpointState state =
threadLocalEndpointStates[threadLocalEndpointCursor];
+ if (state.getCurrentWeight() == 0) {
+ // reset the current state
+ state.reset();
+
+ // go to the next enpoint
+ if (threadLocalEndpointCursor ==
threadLocalEndpointStates.length - 1) {
+ threadLocalEndpointCursor = 0;
+ } else {
+ ++threadLocalEndpointCursor;
+ }
+
+ state = threadLocalEndpointStates[threadLocalEndpointCursor];
+ }
+
+ // we are about to use this endpoint, so decrement its current
count
+ state.decrementCurrentWeight();
+
+ // return the endpoint corresponfing to the current poistion
+ return state.getEndpointPosition();
+ }
+ }
+
+
+ /**
+ * Simple class for holding the states about the endpoints.
+ */
+ private class EndpointState {
+ private int endpointPosition = 0;
+
+ private int weight = 0;
+
+ private int currentWeight = 0;
+
+ public EndpointState(int endpointPosition, int weight) {
+ this.endpointPosition = endpointPosition;
+ this.weight = weight;
+ this.currentWeight = weight;
+ }
+
+ public int getEndpointPosition() {
+ return endpointPosition;
+ }
+
+ public int getWeight() {
+ return weight;
+ }
+
+ public int getCurrentWeight() {
+ return currentWeight;
+ }
+
+ public void setCurrentWeight(int currentWeight) {
+ this.currentWeight = currentWeight;
+ }
+
+ public void decrementCurrentWeight() {
+ --currentWeight;
+ }
+
+ public void reset() {
+ currentWeight = weight;
+ }
+ }
+}