Updated Branches:
  refs/heads/master cc755b6ee -> 9ce7b8f5d

Implemented load balancing algorithm interface and its round robin 
implementation


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/159069f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/159069f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/159069f9

Branch: refs/heads/master
Commit: 159069f99ec37608accebc43a96ed14c0f73610c
Parents: ba99c95
Author: Imesh Gunaratne <[email protected]>
Authored: Sat Oct 19 20:17:15 2013 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Sat Oct 19 20:17:15 2013 +0530

----------------------------------------------------------------------
 .../stratos/lb/endpoint/ClusterContext.java     |  63 ++++++++++++
 .../lb/endpoint/LoadBalancerContext.java        |  39 ++++++-
 .../stratos/lb/endpoint/RequestProcessor.java   | 101 +++++++++++++++++++
 .../stratos/lb/endpoint/ServiceContext.java     |  56 ++++++++++
 .../lb/endpoint/algorithm/AlgorithmContext.java |  51 ++++++++++
 .../algorithm/LoadBalanceAlgorithm.java         |  65 ++++++++++++
 .../algorithm/LoadBalanceAlgorithmFactory.java  |  71 +++++++++++++
 .../lb/endpoint/algorithm/RoundRobin.java       |  98 ++++++++++++++++++
 .../TenantAwareLoadBalanceEndpoint.java         |  78 ++++++--------
 9 files changed, 569 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ClusterContext.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ClusterContext.java
 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ClusterContext.java
new file mode 100644
index 0000000..4721dff
--- /dev/null
+++ 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ClusterContext.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.lb.endpoint;
+
+import org.apache.stratos.lb.endpoint.algorithm.AlgorithmContext;
+
+import java.util.Properties;
+
+/**
+ * Defines cluster context properties.
+ */
+public class ClusterContext {
+    private String serviceName;
+    private String clusterId;
+    private AlgorithmContext algorithmContext;
+    private Properties properties;
+
+    public ClusterContext(String serviceName, String clusterId) {
+        this.serviceName = serviceName;
+        this.clusterId = clusterId;
+    }
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public AlgorithmContext getAlgorithmContext() {
+        return algorithmContext;
+    }
+
+    public void setAlgorithmContext(AlgorithmContext algorithmContext) {
+        this.algorithmContext = algorithmContext;
+    }
+
+    public Properties getProperties() {
+        return properties;
+    }
+
+    public void setProperties(Properties properties) {
+        this.properties = properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/LoadBalancerContext.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/LoadBalancerContext.java
 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/LoadBalancerContext.java
index bedb98e..232d472 100644
--- 
a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/LoadBalancerContext.java
+++ 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/LoadBalancerContext.java
@@ -29,10 +29,9 @@ import 
org.wso2.carbon.mediation.initializer.services.SynapseEnvironmentService;
 import org.wso2.carbon.registry.core.session.UserRegistry;
 import org.wso2.carbon.user.core.service.RealmService;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * Defines load balancer context information.
@@ -48,9 +47,16 @@ public class LoadBalancerContext {
     private UserRegistry configRegistry;
     private UserRegistry governanceRegistry;
     private DependencyManagementService dependencyManager;
-    private Map<Integer, SynapseEnvironmentService> synapseEnvironmentServices 
= new HashMap<Integer, SynapseEnvironmentService>();
+    private Map<Integer, SynapseEnvironmentService> synapseEnvironmentServices;
+
+    private Map<String, ServiceContext> serviceContextMap;
+    private Map<String, ClusterContext> clusterContextMap;
 
     private LoadBalancerContext() {
+        synapseEnvironmentServices = new HashMap<Integer, 
SynapseEnvironmentService>();
+
+        serviceContextMap = new HashMap<String, ServiceContext>();
+        clusterContextMap = new HashMap<String, ClusterContext>();
     }
 
     public static synchronized LoadBalancerContext getInstance() {
@@ -129,8 +135,7 @@ public class LoadBalancerContext {
         return synapseEnvironmentServices.get(id);
     }
 
-    public void addSynapseEnvironmentService(int id,
-                                             SynapseEnvironmentService 
synapseEnvironmentService) {
+    public void addSynapseEnvironmentService(int id, SynapseEnvironmentService 
synapseEnvironmentService) {
         synapseEnvironmentServices.put(id, synapseEnvironmentService);
     }
 
@@ -149,4 +154,28 @@ public class LoadBalancerContext {
     public void setConfigCtxt(ConfigurationContext configCtxt) {
         this.configCtxt = configCtxt;
     }
+
+    public Collection<ServiceContext> getServiceContexts() {
+        return serviceContextMap.values();
+    }
+
+    public ServiceContext getServiceContext(String serviceName) {
+        return serviceContextMap.get(serviceName);
+    }
+
+    public void addServiceContext(ServiceContext serviceContext) {
+        serviceContextMap.put(serviceContext.getServiceName(), serviceContext);
+    }
+
+    public Collection<ClusterContext> getClusterContexts() {
+        return clusterContextMap.values();
+    }
+
+    public ClusterContext getClusterContext(String clusterId) {
+        return clusterContextMap.get(clusterId);
+    }
+
+    public void addClusterContext(ClusterContext clusterContext) {
+        clusterContextMap.put(clusterContext.getClusterId(), clusterContext);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/RequestProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/RequestProcessor.java
 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/RequestProcessor.java
new file mode 100644
index 0000000..3b6633f
--- /dev/null
+++ 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/RequestProcessor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.lb.endpoint;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.stratos.lb.endpoint.algorithm.AlgorithmContext;
+import org.apache.stratos.lb.endpoint.algorithm.LoadBalanceAlgorithm;
+import org.apache.stratos.lb.endpoint.topology.TopologyManager;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Port;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseException;
+
+import java.util.Collection;
+
+/**
+ * Implements core load balancing logic.
+ */
+public class RequestProcessor {
+    private LoadBalanceAlgorithm algorithm;
+
+    public RequestProcessor(LoadBalanceAlgorithm algorithm) {
+        this.algorithm = algorithm;
+    }
+
+    public Member findNextMember(String targetHost) {
+
+        try {
+            if(targetHost == null)
+                return null;
+
+            TopologyManager.acquireReadLock();
+
+            Cluster cluster = findCluster(targetHost);
+            if(cluster != null) {
+                // Find algorithm context of the cluster
+                ClusterContext clusterContext = 
LoadBalancerContext.getInstance().getClusterContext(cluster.getClusterId());
+                if(clusterContext == null) {
+                    clusterContext = new 
ClusterContext(cluster.getServiceName(), cluster.getClusterId());
+                    
LoadBalancerContext.getInstance().addClusterContext(clusterContext);
+                }
+
+                AlgorithmContext algorithmContext = 
clusterContext.getAlgorithmContext();
+                if(algorithmContext == null) {
+                    algorithmContext = new 
AlgorithmContext(cluster.getServiceName(), cluster.getClusterId());
+                    clusterContext.setAlgorithmContext(algorithmContext);
+                }
+                return algorithm.getNextMember(algorithmContext);
+            }
+            return null;
+        }
+        finally {
+            TopologyManager.releaseReadLock();
+        }
+    }
+
+    public Member findNextMember(String serviceName, int tenantId, String 
targetHost) {
+        throw new NotImplementedException();
+    }
+
+    private Service findService(String serviceName) {
+        Collection<Service> services = 
TopologyManager.getTopology().getServices();
+        for (Service service : services) {
+            if(service.getServiceName().equals(serviceName))
+                return service;
+        }
+        return null;
+    }
+
+    private Cluster findCluster(String targetHost) {
+        Collection<Service> services = 
TopologyManager.getTopology().getServices();
+        for (Service service : services) {
+            for (Cluster cluster : service.getClusters()) {
+                if (targetHost.equals(cluster.getHostName())) {
+                    return cluster;
+                }
+            }
+        }
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ServiceContext.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ServiceContext.java
 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ServiceContext.java
new file mode 100644
index 0000000..7c4f8f8
--- /dev/null
+++ 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ServiceContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.lb.endpoint;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Defines service context properties.
+ */
+public class ServiceContext {
+    private String serviceName;
+    private Map<String, ClusterContext> clusterContextMap;
+
+    public ServiceContext() {
+        clusterContextMap = new HashMap<String, ClusterContext>();
+    }
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public void setServiceName(String serviceName) {
+        this.serviceName = serviceName;
+    }
+
+    public Collection<ClusterContext> getClusterContexts() {
+        return clusterContextMap.values();
+    }
+
+    public ClusterContext getClusterContext(String clusterId) {
+        return clusterContextMap.get(clusterId);
+    }
+
+    public void addClusterContext(ClusterContext clusterContext) {
+        clusterContextMap.put(clusterContext.getClusterId(), clusterContext);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/AlgorithmContext.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/AlgorithmContext.java
 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/AlgorithmContext.java
new file mode 100755
index 0000000..49430dc
--- /dev/null
+++ 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/AlgorithmContext.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.lb.endpoint.algorithm;
+
+/**
+ * Algorithm context is used for identifying the cluster and its current 
member for executing load balancing algorithms.
+ */
+public class AlgorithmContext {
+    private String serviceName;
+    private String clusterId;
+    private int currentMemberIndex;
+
+    public AlgorithmContext(String serviceName, String clusterId) {
+        this.serviceName = serviceName;
+        this.clusterId = clusterId;
+        this.currentMemberIndex = 0;
+    }
+
+    public String getServiceName() {
+        return serviceName;
+    }
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public int getCurrentMemberIndex() {
+        return currentMemberIndex;
+    }
+
+    public void setCurrentMemberIndex(int currentMemberIndex) {
+        this.currentMemberIndex = currentMemberIndex;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithm.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithm.java
 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithm.java
new file mode 100755
index 0000000..1da2ba7
--- /dev/null
+++ 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithm.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.lb.endpoint.algorithm;
+
+import org.apache.stratos.messaging.domain.topology.Member;
+
+import java.util.List;
+
+/**
+ * Defines the specification for implementing load balancing algorithms.
+ */
+public interface LoadBalanceAlgorithm {
+    /**
+     * Return algorithm name.
+     *
+     * @return
+     */
+    public String getName();
+
+    /**
+     * Apply the algorithm and return the next member.
+     *
+     * @param algorithmContext
+     * @return
+     */
+    public Member getNextMember(AlgorithmContext algorithmContext);
+
+    /**
+     * Set member list of a given cluster.
+     *
+     * @param members
+     */
+    public void setMembers(List<Member> members);
+
+    /**
+     * Reset the algorithm and start from the beginning.
+     *
+     * @param algorithmContext
+     */
+    public void reset(AlgorithmContext algorithmContext);
+
+    /**
+     * Clone algorithm object.
+     *
+     * @return
+     */
+    public LoadBalanceAlgorithm clone();
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithmFactory.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithmFactory.java
 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithmFactory.java
new file mode 100644
index 0000000..5b2511b
--- /dev/null
+++ 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithmFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.lb.endpoint.algorithm;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.lang.reflect.InvocationTargetException;
+
+/**
+ * Load balance algorithm factory to create the algorithm
+ */
+public class LoadBalanceAlgorithmFactory {
+    private static final Log log = 
LogFactory.getLog(LoadBalanceAlgorithmFactory.class);
+
+
+    public static LoadBalanceAlgorithm createAlgorithm() {
+        // TODO: Read from load balance configuration file
+        String className = 
"org.apache.stratos.lb.endpoint.algorithm.RoundRobin";
+        try {
+            Class algorithmClass = Class.forName(className);
+            try {
+                Object instance = 
algorithmClass.getConstructor().newInstance();
+                if (instance instanceof LoadBalanceAlgorithm) {
+                    return (LoadBalanceAlgorithm) instance;
+                } else {
+                    throw new RuntimeException(String.format("Class %s is not 
a valid load balance algorithm implementation"));
+                }
+            } catch (NoSuchMethodException e) {
+                if (log.isErrorEnabled()) {
+                    log.error(e);
+                }
+            } catch (InstantiationException e) {
+                if (log.isErrorEnabled()) {
+                    log.error(e);
+                }
+            } catch (IllegalAccessException e) {
+                if (log.isErrorEnabled()) {
+                    log.error(e);
+                }
+            } catch (InvocationTargetException e) {
+                if (log.isErrorEnabled()) {
+                    log.error(e);
+                }
+            }
+
+        } catch (ClassNotFoundException e) {
+            if (log.isErrorEnabled()) {
+                log.error(e);
+            }
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/RoundRobin.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/RoundRobin.java
 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/RoundRobin.java
new file mode 100644
index 0000000..d38d901
--- /dev/null
+++ 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/RoundRobin.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.lb.endpoint.algorithm;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Member;
+
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This is the implementation of the round robin load balancing algorithm. It 
simply iterates
+ * through the endpoint list one by one for until an active endpoint is found.
+ */
+public class RoundRobin implements LoadBalanceAlgorithm {
+    private static final Log log = LogFactory.getLog(RoundRobin.class);
+
+    private List<Member> members;
+    private final Lock lock = new ReentrantLock();
+
+    @Override
+    public String getName() {
+        return "Round Robin";
+    }
+
+    @Override
+    public Member getNextMember(AlgorithmContext algorithmContext) {
+        if (members.size() == 0) {
+            return null;
+        }
+        Member current = null;
+        lock.lock();
+        try {
+            int currentMemberIndex = algorithmContext.getCurrentMemberIndex();
+            if (currentMemberIndex >= members.size()) {
+                currentMemberIndex = 0;
+            }
+            int index = members.size();
+            do {
+                current = members.get(currentMemberIndex);
+                if (currentMemberIndex == members.size() - 1) {
+                    currentMemberIndex = 0;
+                } else {
+                    currentMemberIndex++;
+                }
+                index--;
+            } while ((!current.isActive()) && index > 0);
+            algorithmContext.setCurrentMemberIndex(currentMemberIndex);
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Service name: %s cluster id: %s 
members: %d", algorithmContext.getServiceName(), 
algorithmContext.getClusterId(), members.size()));
+                log.debug(String.format("Current member: %s", 
current.getMemberId()));
+            }
+
+        } finally {
+            lock.unlock();
+        }
+        return current;
+    }
+
+    @Override
+    public void setMembers(List<Member> members) {
+        this.members = members;
+    }
+
+    @Override
+    public void reset(AlgorithmContext algorithmContext) {
+        if (log.isDebugEnabled()) {
+            log.debug("Resetting the Round Robin load balancing algorithm 
...");
+        }
+        synchronized (algorithmContext) {
+            algorithmContext.setCurrentMemberIndex(0);
+        }
+    }
+
+    @Override
+    public LoadBalanceAlgorithm clone() {
+        return new RoundRobin();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/endpoint/TenantAwareLoadBalanceEndpoint.java
 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/endpoint/TenantAwareLoadBalanceEndpoint.java
index a805f3c..838b225 100644
--- 
a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ 
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -21,6 +21,8 @@ package org.apache.stratos.lb.endpoint.endpoint;
 
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.http.protocol.HTTP;
+import org.apache.stratos.lb.endpoint.RequestProcessor;
+import org.apache.stratos.lb.endpoint.algorithm.LoadBalanceAlgorithmFactory;
 import org.apache.stratos.lb.endpoint.topology.TopologyManager;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Member;
@@ -47,12 +49,16 @@ import java.util.*;
 
 public class TenantAwareLoadBalanceEndpoint extends 
org.apache.synapse.endpoints.LoadbalanceEndpoint implements Serializable {
     private static final String PORT_MAPPING_PREFIX = "port.mapping.";
+
+    private RequestProcessor requestProcessor;
     private HttpSessionDispatcher dispatcher;
+    private boolean sessionAffinityEnabled;
 
     @Override
     public void init(SynapseEnvironment synapseEnvironment) {
         super.init(synapseEnvironment);
 
+        requestProcessor = new 
RequestProcessor(LoadBalanceAlgorithmFactory.createAlgorithm());
         setDispatcher(new HttpSessionDispatcher());
     }
 
@@ -94,10 +100,12 @@ public class TenantAwareLoadBalanceEndpoint extends 
org.apache.synapse.endpoints
             sessionInformation.updateExpiryTime();
             sendToApplicationMember(synCtx, currentMember, faultHandler, 
false);
         } else {
-            // Send request to a new member
-            org.apache.axis2.clustering.Member member = findNextMember(synCtx);
-            if(member != null) {
-                sendToApplicationMember(synCtx, member, faultHandler, true);
+            // No existing session found
+            // Find next member
+            org.apache.axis2.clustering.Member axis2Member = 
findNextMember(synCtx);
+            if(axis2Member != null) {
+                // Send request to member
+                sendToApplicationMember(synCtx, axis2Member, faultHandler, 
true);
             }
             else {
                 throw new SynapseException(String.format("No application 
members available to serve the request %s", synCtx.getTo().getAddress()));
@@ -105,49 +113,6 @@ public class TenantAwareLoadBalanceEndpoint extends 
org.apache.synapse.endpoints
         }
     }
 
-    private org.apache.axis2.clustering.Member findNextMember(MessageContext 
synCtx) {
-        try {
-            // Identify cluster by target host
-            TopologyManager.acquireReadLock();
-            String targetHost = extractTargetHost(synCtx);
-            Cluster targetCluster = findNextCluster(targetHost);
-            if(targetCluster != null) {
-                // TODO: Apply algorithm and find next member from the 
selected cluster
-                if(targetCluster.getMembers().size() > 0) {
-                    Member member = 
targetCluster.getMembers().iterator().next();
-
-                    // Create Axi2 member object
-                    String transport = extractTransport(synCtx);
-                    Port transportPort = member.getPort(transport);
-                    if(transportPort == null)
-                        throw new SynapseException(String.format("Port not 
found for transport %s in member %s", transport, member.getMemberId()));
-
-                    int memberPort = transportPort.getValue();
-                    org.apache.axis2.clustering.Member axis2Member = new 
org.apache.axis2.clustering.Member(member.getHostName(), memberPort);
-                    axis2Member.setDomain(member.getHostName());
-                    axis2Member.setActive(true);
-                    return axis2Member;
-                }
-            }
-            return null;
-        }
-        finally {
-            TopologyManager.releaseReadLock();
-        }
-    }
-
-    private Cluster findNextCluster(String targetHost) {
-        Cluster targetCluster = null;
-        Collection<Service> services = 
TopologyManager.getTopology().getServices();
-        for (Service service : services) {
-            for (Cluster cluster : service.getClusters()) {
-                if (targetHost.equals(cluster.getHostName())) {
-                    return cluster;
-                }
-            }
-        }
-        return null;
-    }
 
     /**
      * Adding the X-Forwarded-For/X-Originating-IP headers to the outgoing 
message.
@@ -184,6 +149,23 @@ public class TenantAwareLoadBalanceEndpoint extends 
org.apache.synapse.endpoints
         }
     }
 
+    private org.apache.axis2.clustering.Member findNextMember(MessageContext 
synCtx) {
+        String targetHost = extractTargetHost(synCtx);
+        Member member = requestProcessor.findNextMember(targetHost);
+
+        // Create Axi2 member object
+        String transport = extractTransport(synCtx);
+        Port transportPort = member.getPort(transport);
+        if(transportPort == null)
+            throw new RuntimeException(String.format("Port not found for 
transport %s in member %s", transport, member.getMemberId()));
+
+        int memberPort = transportPort.getValue();
+        org.apache.axis2.clustering.Member axis2Member = new 
org.apache.axis2.clustering.Member(member.getHostName(), memberPort);
+        axis2Member.setDomain(member.getHostName());
+        axis2Member.setActive(true);
+        return axis2Member;
+    }
+
     private String extractTargetHost(MessageContext synCtx) {
         org.apache.axis2.context.MessageContext msgCtx =
                 ((Axis2MessageContext) synCtx).getAxis2MessageContext();
@@ -300,7 +282,7 @@ public class TenantAwareLoadBalanceEndpoint extends 
org.apache.synapse.endpoints
     }
 
     public boolean isSessionAffinityBasedLB() {
-        return false;
+        return sessionAffinityEnabled;
     }
 
     /*

Reply via email to