Author: supun
Date: Tue Jun  8 22:04:18 2010
New Revision: 952838

URL: http://svn.apache.org/viewvc?rev=952838&view=rev
Log:
adding a new load balancing algorithm that takes the active connections in to 
account

Added:
    
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRRLCAlgorithm.java

Added: 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRRLCAlgorithm.java
URL: 
http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRRLCAlgorithm.java?rev=952838&view=auto
==============================================================================
--- 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRRLCAlgorithm.java
 (added)
+++ 
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/algorithms/WeightedRRLCAlgorithm.java
 Tue Jun  8 22:04:18 2010
@@ -0,0 +1,413 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.endpoints.algorithms;
+
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.WSDLEndpoint;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.PropertyInclude;
+import org.apache.synapse.ManagedLifecycle;
+import org.apache.synapse.mediators.MediatorProperty;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.axis2.clustering.Member;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.net.URL;
+import java.net.MalformedURLException;
+
+/**
+ * This is a Weighted Round Robin Least Connection algorithm.</p>
+ *
+ * <p> This algorithm is similar to {...@link WeightedRoundRobin 
WeightedRoundRobin} algorithm
+ * except it takes the active connections made by the endpoints in to account. 
Weights assinged
+ * to each endpoint and these are static weights. But depending on the active 
connections these
+ * weights are changed dynamically during the execution. </p>
+ *
+ * <p> Algorithm assumes that the endpoint connections to total connection 
ratio should be eqault
+ * to endpoint weight to total weights ratio. If the ratios are different it 
tries to align them
+ * by changing the weights dynamically.</p>
+ *
+ */
+public class WeightedRRLCAlgorithm implements LoadbalanceAlgorithm, 
ManagedLifecycle {
+    private static final Log log = 
LogFactory.getLog(WeightedRRLCAlgorithm.class);
+
+    /**
+     * Endpoints list for the round robin algorithm
+     */
+    private List<Endpoint> endpoints = null;
+    /** Load balance endpoint */
+    private Endpoint loadBalanceEndpoint = null;
+    /** We keep a sorted array of endpoint states, first state will point to 
the
+     * endpoint with the highest weight */
+    private WeightedState[] list;
+    /** Keep track of the current poistion we are operating on the 
endpointStates array */
+    private int endpointCursor = 0;
+    /** How many rounds should go before re-calculating the dynamic weights 
based
+     * on number of active connections */
+    private int roundsPerRecalculation = 1;
+    /** How many rounds we have gone throug */
+    private int currentRound = 0;
+    /** total weight of the endpoints */
+    private int totalWeight = 0;
+    /** current connection count */
+    private int totalConnections = 0;
+
+    public static final String LB_WEIGHTED_RRLC_ROUNDS_PER_RECAL =
+            "loadbalance.weightedRRLC.roundsPerRecal";
+    public static final String LB_WEIGHTED_RRLC_WEIGHT = "loadbalance.weight";
+    public static final String LB_WEIGHTED_RRLC_WEIGHT_MIN = 
LB_WEIGHTED_RRLC_WEIGHT + ".min";
+    public static final String LB_WEIGHTED_RRLC_WEIGHT_MAX = 
LB_WEIGHTED_RRLC_WEIGHT + ".max";
+    public static final int LB_WEIGHTED_RRLC_WEIGHT_SKEW = 2;    
+
+    public void setApplicationMembers(List<Member> members) {}
+
+    public void setEndpoints(List<Endpoint> endpoints) {
+        this.endpoints = endpoints;
+    }
+
+    public void setLoadBalanceEndpoint(Endpoint endpoint) {
+        this.loadBalanceEndpoint = endpoint;        
+    }
+
+    public synchronized Endpoint getNextEndpoint(MessageContext messageContext,
+                                                 AlgorithmContext 
algorithmContext) {                
+        WeightedState s = list[endpointCursor];
+
+        // once we choose an endpoit we countinue to use that untile all
+        // the chances are over for this round
+        if (!s.isSendsAvailable()) {
+            // reset this state for this round
+            s.resetPerRound();
+            do {
+                if (++endpointCursor == list.length) {
+                    endpointCursor = 0;             
+                    // if we we have gone through enough cycles to recalculate 
the weights based
+                    // on the current connection count recalculate the current 
weights
+                    if (++currentRound == roundsPerRecalculation) {
+                        currentRound = 0;
+                        // we recalculate the current weights based on the 
connections and weights
+                        reCalcuateWeights(messageContext);
+                    }
+                }
+                s = list[endpointCursor];
+            } while (!s.isSendsAvailable());
+        }
+
+        s.chosenToSend();
+        // get the endpoint correspondint to the current poistion and return it
+        return endpoints.get(s.getEndpointPosition());
+    }
+
+    private void intialize() {
+        // get the global properties
+        if (loadBalanceEndpoint != null && loadBalanceEndpoint instanceof 
PropertyInclude) {
+            PropertyInclude include = (PropertyInclude) loadBalanceEndpoint;
+
+            MediatorProperty val = 
include.getProperty(LB_WEIGHTED_RRLC_ROUNDS_PER_RECAL);
+            if (val != null) {
+                roundsPerRecalculation = Integer.parseInt(val.getValue());
+            }
+        }
+
+        // initialize the states list, this runs only once
+        list = new WeightedState[endpoints.size()];
+
+        int totalWeight = 0;
+        for (Endpoint endpoint : endpoints) {
+            if (endpoint instanceof PropertyInclude) {
+                PropertyInclude include = (PropertyInclude) endpoint;
+                MediatorProperty val = 
include.getProperty(LB_WEIGHTED_RRLC_WEIGHT);
+
+                if (val == null) {
+                    String msg = "Parameter " +
+                            "loadbalance.weighted.weight should be specified 
for every " +
+                            "endpoint in the load balance group";
+                    log.error(msg);
+                    throw new SynapseException(msg);
+                }
+                totalWeight += Integer.parseInt(val.getValue());
+            }
+        }
+
+        this.totalWeight = totalWeight;
+
+        for (int i = 0; i < endpoints.size(); i++) {
+            Endpoint e = endpoints.get(i);
+            if (e instanceof PropertyInclude) {
+                PropertyInclude include = (PropertyInclude) e;
+
+                MediatorProperty weight = include.getProperty(
+                        LB_WEIGHTED_RRLC_WEIGHT);
+
+                String key;
+                URL url;
+                if (e instanceof AddressEndpoint) {
+                    AddressEndpoint addressEndpoint = (AddressEndpoint) e;
+                    try {
+                        url = new 
URL(addressEndpoint.getDefinition().getAddress());
+                    } catch (MalformedURLException e1) {
+                        throw new SynapseException("Mulformed URL in address 
endpoint");
+                    }
+                } else if (e instanceof WSDLEndpoint) {
+                    WSDLEndpoint wsdlEndpoint = (WSDLEndpoint) e;
+                    try {
+                        url = new 
URL(wsdlEndpoint.getDefinition().getAddress());
+                    } catch (MalformedURLException e1) {
+                        throw new SynapseException("Mulformed URL in address 
endpoint");
+                    }
+                } else {
+                    throw new SynapseException(
+                            "Only AddressEndpoint and WSDLEndpoint can be used 
" +
+                                    "with WeightedRRLCAlgorithm");
+                }
+
+                // construct the key
+                key = url.getHost() + ":" + url.getPort();
+
+                WeightedState state = new WeightedState(
+                        Integer.parseInt(weight.getValue()), i, key);
+
+                MediatorProperty minimumWeight = include.getProperty(
+                        LB_WEIGHTED_RRLC_WEIGHT_MIN);
+                if (minimumWeight != null) {
+                    
state.setMinWeight(Integer.parseInt(minimumWeight.getValue()));
+                }
+
+                MediatorProperty maxWeight = include.getProperty(
+                        LB_WEIGHTED_RRLC_WEIGHT_MAX);
+                if (maxWeight != null) {
+                    state.setMaxWeight(Integer.parseInt(maxWeight.getValue()));
+                }
+
+                list[i] = state;
+            }
+        }                   
+
+        // sort the states according to the initial fixed weights
+        Arrays.sort(list, new Comparator<WeightedState>() {
+            public int compare(WeightedState o1, WeightedState o2) {
+                return o2.getFixedWeight() - o1.getFixedWeight();
+            }
+        });
+    }
+
+    public Member getNextApplicationMember(AlgorithmContext algorithmContext) {
+        // this doesn't make sense for weighted load balance algorithm
+        return null;
+    }
+
+    public void reset(AlgorithmContext algorithmContext) {
+        for (WeightedState state : list) {
+            state.reset();
+        }
+    }
+
+    public String getName() {
+        return WeightedRRLCAlgorithm.class.getName();
+    }
+
+    public int getEndpointCursor() {
+        return endpointCursor;
+    }
+
+    public int getRoundsPerRecalculation() {
+        return roundsPerRecalculation;
+    }
+
+    public int getCurrentRound() {
+        return currentRound;
+    }
+
+    public int getTotalWeight() {
+        return totalWeight;
+    }
+
+    public int getTotalConnections() {
+        return totalConnections;
+    }
+
+    private void reCalcuateWeights(MessageContext messageContext) {
+        Map connectionsMap = null;
+        // fetch the connections map
+        if (messageContext instanceof Axis2MessageContext) {
+            Axis2MessageContext axis2MessageContext = (Axis2MessageContext) 
messageContext;
+            org.apache.axis2.context.MessageContext msgCtx =
+                    axis2MessageContext.getAxis2MessageContext();
+
+            Object obj = msgCtx.getProperty("OPEN_CONNNECTIONS_MAP");
+            if (obj != null && obj instanceof Map) {
+                connectionsMap = (Map) obj;
+            }
+        }
+
+        if (connectionsMap == null) {
+            throw new SynapseException("Connections map not found");
+        }
+
+        for (WeightedState state : list) {
+            String key = state.getKeyToConnectionCount();
+            AtomicInteger integer = (AtomicInteger) connectionsMap.get(key);
+
+            if (integer != null) {
+                state.setCurrentConnectionCount(integer.get());
+            } else {
+                state.setCurrentConnectionCount(0);
+            }
+
+            totalConnections += state.getCurrentConnectionCount();
+        }
+
+        for (WeightedState state : list) {
+            state.reCalcuateWeight();
+        }
+    }
+
+    public void init(SynapseEnvironment se) {
+        intialize();
+    }
+
+    public void destroy() {
+
+    }
+
+    private class WeightedState {
+        /** this is the statics weight specified by the user */
+        private int fixedWeight = 0;
+        /** position of the endpoint related to this state */
+        private int endpointPosition = 0;
+        /** current weight of the algorithm, this is calculated based on sends 
through this epr */
+        private int currentWeight = 1;
+        /** calculated weight for this round */
+        private int currentCalcWeight = 0;
+        /** current connection count */
+        private int currentConnectionCount = 0;
+        /** minimum possible weight */
+        private int minWeight = 0;
+        /** maximum possible weight */
+        private int maxWeight = 0;
+        /** holds the key to access the connection count */
+        private String keyToConnectionCount = "";
+
+        public WeightedState(int weight, int endpointPosition, String 
keyToConnectionCount) {
+            this.fixedWeight = weight;
+            this.endpointPosition = endpointPosition;
+            this.currentWeight = fixedWeight;
+            this.currentCalcWeight = fixedWeight;
+            this.keyToConnectionCount = keyToConnectionCount;
+            this.maxWeight = fixedWeight + LB_WEIGHTED_RRLC_WEIGHT_SKEW;
+            this.minWeight = fixedWeight - LB_WEIGHTED_RRLC_WEIGHT_SKEW > 0 ?
+                    fixedWeight - LB_WEIGHTED_RRLC_WEIGHT_SKEW : 0;
+        }
+
+        public int getEndpointPosition() {
+            return endpointPosition;
+        }
+
+        public int getFixedWeight() {
+            return fixedWeight;
+        }
+
+        public boolean isSendsAvailable() {
+            return currentCalcWeight > 0;
+        }
+
+        public void chosenToSend() {
+            currentCalcWeight--;
+        }
+
+        public int getCurrentWeight() {
+            return currentWeight;
+        }
+
+        public void setMinWeight(int minWeight) {
+            this.minWeight = minWeight;
+        }
+
+        public String getKeyToConnectionCount() {
+            return keyToConnectionCount;
+        }
+
+        public void setCurrentWeight(int currentWeight) {
+            this.currentWeight = currentWeight;
+        }
+
+        public void setCurrentConnectionCount(int currentConnectionCount) {
+            this.currentConnectionCount = currentConnectionCount;
+        }
+
+        public int getCurrentConnectionCount() {
+            return currentConnectionCount;
+        }
+
+        public void setMaxWeight(int maxWeight) {
+            this.maxWeight = maxWeight;
+        }
+
+        /**
+         * Recalcualate the weights based on the current connection count for 
this set of rounds.         
+         */
+        public void reCalcuateWeight() {
+            if (totalConnections > 0) {
+                double weightRatio = (double) fixedWeight / totalWeight;
+                double connectionRatio;
+                if (totalConnections != 0) {
+                    connectionRatio = (double) currentConnectionCount / 
totalConnections;
+                } else {
+                    connectionRatio = 0;
+                }
+
+                double diff = weightRatio - connectionRatio;
+                double multiple = diff * totalConnections;
+                double floor = Math.floor(multiple);
+
+                if (floor - multiple >= -0.5) {
+                    currentWeight = fixedWeight + (int) floor;
+                } else {
+                    currentWeight = fixedWeight + (int) Math.ceil(multiple);
+                }
+
+                if (diff < 0) {
+                    // we always return the max from minWeight and calculated 
Current weight
+                    currentWeight = minWeight > currentWeight ? minWeight : 
currentWeight;
+                } else {
+                    // we always return the min from maxWeight and calculated 
Current weight
+                    currentWeight = maxWeight < currentWeight ? maxWeight : 
currentWeight;
+                }
+                currentCalcWeight = currentWeight;
+            }
+        }
+
+        public void resetPerRound() {
+            currentCalcWeight = currentWeight;
+        }
+
+        public void reset() {
+            currentWeight = fixedWeight;
+            currentConnectionCount = 0;
+            currentCalcWeight = fixedWeight;
+        }
+    }
+}


Reply via email to