Updated Branches: refs/heads/master cc755b6ee -> 9ce7b8f5d
Implemented load balancing algorithm interface and its round robin implementation Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/159069f9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/159069f9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/159069f9 Branch: refs/heads/master Commit: 159069f99ec37608accebc43a96ed14c0f73610c Parents: ba99c95 Author: Imesh Gunaratne <[email protected]> Authored: Sat Oct 19 20:17:15 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Sat Oct 19 20:17:15 2013 +0530 ---------------------------------------------------------------------- .../stratos/lb/endpoint/ClusterContext.java | 63 ++++++++++++ .../lb/endpoint/LoadBalancerContext.java | 39 ++++++- .../stratos/lb/endpoint/RequestProcessor.java | 101 +++++++++++++++++++ .../stratos/lb/endpoint/ServiceContext.java | 56 ++++++++++ .../lb/endpoint/algorithm/AlgorithmContext.java | 51 ++++++++++ .../algorithm/LoadBalanceAlgorithm.java | 65 ++++++++++++ .../algorithm/LoadBalanceAlgorithmFactory.java | 71 +++++++++++++ .../lb/endpoint/algorithm/RoundRobin.java | 98 ++++++++++++++++++ .../TenantAwareLoadBalanceEndpoint.java | 78 ++++++-------- 9 files changed, 569 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ClusterContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ClusterContext.java b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ClusterContext.java new file mode 100644 index 0000000..4721dff --- /dev/null +++ b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ClusterContext.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.lb.endpoint; + +import org.apache.stratos.lb.endpoint.algorithm.AlgorithmContext; + +import java.util.Properties; + +/** + * Defines cluster context properties. + */ +public class ClusterContext { + private String serviceName; + private String clusterId; + private AlgorithmContext algorithmContext; + private Properties properties; + + public ClusterContext(String serviceName, String clusterId) { + this.serviceName = serviceName; + this.clusterId = clusterId; + } + + public String getServiceName() { + return serviceName; + } + + public String getClusterId() { + return clusterId; + } + + public AlgorithmContext getAlgorithmContext() { + return algorithmContext; + } + + public void setAlgorithmContext(AlgorithmContext algorithmContext) { + this.algorithmContext = algorithmContext; + } + + public Properties getProperties() { + return properties; + } + + public void setProperties(Properties properties) { + this.properties = properties; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/LoadBalancerContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/LoadBalancerContext.java b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/LoadBalancerContext.java index bedb98e..232d472 100644 --- a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/LoadBalancerContext.java +++ b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/LoadBalancerContext.java @@ -29,10 +29,9 @@ import org.wso2.carbon.mediation.initializer.services.SynapseEnvironmentService; import org.wso2.carbon.registry.core.session.UserRegistry; import org.wso2.carbon.user.core.service.RealmService; +import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; /** * Defines load balancer context information. @@ -48,9 +47,16 @@ public class LoadBalancerContext { private UserRegistry configRegistry; private UserRegistry governanceRegistry; private DependencyManagementService dependencyManager; - private Map<Integer, SynapseEnvironmentService> synapseEnvironmentServices = new HashMap<Integer, SynapseEnvironmentService>(); + private Map<Integer, SynapseEnvironmentService> synapseEnvironmentServices; + + private Map<String, ServiceContext> serviceContextMap; + private Map<String, ClusterContext> clusterContextMap; private LoadBalancerContext() { + synapseEnvironmentServices = new HashMap<Integer, SynapseEnvironmentService>(); + + serviceContextMap = new HashMap<String, ServiceContext>(); + clusterContextMap = new HashMap<String, ClusterContext>(); } public static synchronized LoadBalancerContext getInstance() { @@ -129,8 +135,7 @@ public class LoadBalancerContext { return synapseEnvironmentServices.get(id); } - public void addSynapseEnvironmentService(int id, - SynapseEnvironmentService synapseEnvironmentService) { + public void addSynapseEnvironmentService(int id, SynapseEnvironmentService synapseEnvironmentService) { synapseEnvironmentServices.put(id, synapseEnvironmentService); } @@ -149,4 +154,28 @@ public class LoadBalancerContext { public void setConfigCtxt(ConfigurationContext configCtxt) { this.configCtxt = configCtxt; } + + public Collection<ServiceContext> getServiceContexts() { + return serviceContextMap.values(); + } + + public ServiceContext getServiceContext(String serviceName) { + return serviceContextMap.get(serviceName); + } + + public void addServiceContext(ServiceContext serviceContext) { + serviceContextMap.put(serviceContext.getServiceName(), serviceContext); + } + + public Collection<ClusterContext> getClusterContexts() { + return clusterContextMap.values(); + } + + public ClusterContext getClusterContext(String clusterId) { + return clusterContextMap.get(clusterId); + } + + public void addClusterContext(ClusterContext clusterContext) { + clusterContextMap.put(clusterContext.getClusterId(), clusterContext); + } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/RequestProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/RequestProcessor.java b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/RequestProcessor.java new file mode 100644 index 0000000..3b6633f --- /dev/null +++ b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/RequestProcessor.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.lb.endpoint; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.stratos.lb.endpoint.algorithm.AlgorithmContext; +import org.apache.stratos.lb.endpoint.algorithm.LoadBalanceAlgorithm; +import org.apache.stratos.lb.endpoint.topology.TopologyManager; +import org.apache.stratos.messaging.domain.topology.Cluster; +import org.apache.stratos.messaging.domain.topology.Member; +import org.apache.stratos.messaging.domain.topology.Port; +import org.apache.stratos.messaging.domain.topology.Service; +import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseException; + +import java.util.Collection; + +/** + * Implements core load balancing logic. + */ +public class RequestProcessor { + private LoadBalanceAlgorithm algorithm; + + public RequestProcessor(LoadBalanceAlgorithm algorithm) { + this.algorithm = algorithm; + } + + public Member findNextMember(String targetHost) { + + try { + if(targetHost == null) + return null; + + TopologyManager.acquireReadLock(); + + Cluster cluster = findCluster(targetHost); + if(cluster != null) { + // Find algorithm context of the cluster + ClusterContext clusterContext = LoadBalancerContext.getInstance().getClusterContext(cluster.getClusterId()); + if(clusterContext == null) { + clusterContext = new ClusterContext(cluster.getServiceName(), cluster.getClusterId()); + LoadBalancerContext.getInstance().addClusterContext(clusterContext); + } + + AlgorithmContext algorithmContext = clusterContext.getAlgorithmContext(); + if(algorithmContext == null) { + algorithmContext = new AlgorithmContext(cluster.getServiceName(), cluster.getClusterId()); + clusterContext.setAlgorithmContext(algorithmContext); + } + return algorithm.getNextMember(algorithmContext); + } + return null; + } + finally { + TopologyManager.releaseReadLock(); + } + } + + public Member findNextMember(String serviceName, int tenantId, String targetHost) { + throw new NotImplementedException(); + } + + private Service findService(String serviceName) { + Collection<Service> services = TopologyManager.getTopology().getServices(); + for (Service service : services) { + if(service.getServiceName().equals(serviceName)) + return service; + } + return null; + } + + private Cluster findCluster(String targetHost) { + Collection<Service> services = TopologyManager.getTopology().getServices(); + for (Service service : services) { + for (Cluster cluster : service.getClusters()) { + if (targetHost.equals(cluster.getHostName())) { + return cluster; + } + } + } + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ServiceContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ServiceContext.java b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ServiceContext.java new file mode 100644 index 0000000..7c4f8f8 --- /dev/null +++ b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/ServiceContext.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.lb.endpoint; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +/** + * Defines service context properties. + */ +public class ServiceContext { + private String serviceName; + private Map<String, ClusterContext> clusterContextMap; + + public ServiceContext() { + clusterContextMap = new HashMap<String, ClusterContext>(); + } + + public String getServiceName() { + return serviceName; + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + } + + public Collection<ClusterContext> getClusterContexts() { + return clusterContextMap.values(); + } + + public ClusterContext getClusterContext(String clusterId) { + return clusterContextMap.get(clusterId); + } + + public void addClusterContext(ClusterContext clusterContext) { + clusterContextMap.put(clusterContext.getClusterId(), clusterContext); + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/AlgorithmContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/AlgorithmContext.java b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/AlgorithmContext.java new file mode 100755 index 0000000..49430dc --- /dev/null +++ b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/AlgorithmContext.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.lb.endpoint.algorithm; + +/** + * Algorithm context is used for identifying the cluster and its current member for executing load balancing algorithms. + */ +public class AlgorithmContext { + private String serviceName; + private String clusterId; + private int currentMemberIndex; + + public AlgorithmContext(String serviceName, String clusterId) { + this.serviceName = serviceName; + this.clusterId = clusterId; + this.currentMemberIndex = 0; + } + + public String getServiceName() { + return serviceName; + } + + public String getClusterId() { + return clusterId; + } + + public int getCurrentMemberIndex() { + return currentMemberIndex; + } + + public void setCurrentMemberIndex(int currentMemberIndex) { + this.currentMemberIndex = currentMemberIndex; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithm.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithm.java b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithm.java new file mode 100755 index 0000000..1da2ba7 --- /dev/null +++ b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithm.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.lb.endpoint.algorithm; + +import org.apache.stratos.messaging.domain.topology.Member; + +import java.util.List; + +/** + * Defines the specification for implementing load balancing algorithms. + */ +public interface LoadBalanceAlgorithm { + /** + * Return algorithm name. + * + * @return + */ + public String getName(); + + /** + * Apply the algorithm and return the next member. + * + * @param algorithmContext + * @return + */ + public Member getNextMember(AlgorithmContext algorithmContext); + + /** + * Set member list of a given cluster. + * + * @param members + */ + public void setMembers(List<Member> members); + + /** + * Reset the algorithm and start from the beginning. + * + * @param algorithmContext + */ + public void reset(AlgorithmContext algorithmContext); + + /** + * Clone algorithm object. + * + * @return + */ + public LoadBalanceAlgorithm clone(); +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithmFactory.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithmFactory.java b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithmFactory.java new file mode 100644 index 0000000..5b2511b --- /dev/null +++ b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/LoadBalanceAlgorithmFactory.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.lb.endpoint.algorithm; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.lang.reflect.InvocationTargetException; + +/** + * Load balance algorithm factory to create the algorithm + */ +public class LoadBalanceAlgorithmFactory { + private static final Log log = LogFactory.getLog(LoadBalanceAlgorithmFactory.class); + + + public static LoadBalanceAlgorithm createAlgorithm() { + // TODO: Read from load balance configuration file + String className = "org.apache.stratos.lb.endpoint.algorithm.RoundRobin"; + try { + Class algorithmClass = Class.forName(className); + try { + Object instance = algorithmClass.getConstructor().newInstance(); + if (instance instanceof LoadBalanceAlgorithm) { + return (LoadBalanceAlgorithm) instance; + } else { + throw new RuntimeException(String.format("Class %s is not a valid load balance algorithm implementation")); + } + } catch (NoSuchMethodException e) { + if (log.isErrorEnabled()) { + log.error(e); + } + } catch (InstantiationException e) { + if (log.isErrorEnabled()) { + log.error(e); + } + } catch (IllegalAccessException e) { + if (log.isErrorEnabled()) { + log.error(e); + } + } catch (InvocationTargetException e) { + if (log.isErrorEnabled()) { + log.error(e); + } + } + + } catch (ClassNotFoundException e) { + if (log.isErrorEnabled()) { + log.error(e); + } + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/RoundRobin.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/RoundRobin.java b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/RoundRobin.java new file mode 100644 index 0000000..d38d901 --- /dev/null +++ b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/algorithm/RoundRobin.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.lb.endpoint.algorithm; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.messaging.domain.topology.Member; + +import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * This is the implementation of the round robin load balancing algorithm. It simply iterates + * through the endpoint list one by one for until an active endpoint is found. + */ +public class RoundRobin implements LoadBalanceAlgorithm { + private static final Log log = LogFactory.getLog(RoundRobin.class); + + private List<Member> members; + private final Lock lock = new ReentrantLock(); + + @Override + public String getName() { + return "Round Robin"; + } + + @Override + public Member getNextMember(AlgorithmContext algorithmContext) { + if (members.size() == 0) { + return null; + } + Member current = null; + lock.lock(); + try { + int currentMemberIndex = algorithmContext.getCurrentMemberIndex(); + if (currentMemberIndex >= members.size()) { + currentMemberIndex = 0; + } + int index = members.size(); + do { + current = members.get(currentMemberIndex); + if (currentMemberIndex == members.size() - 1) { + currentMemberIndex = 0; + } else { + currentMemberIndex++; + } + index--; + } while ((!current.isActive()) && index > 0); + algorithmContext.setCurrentMemberIndex(currentMemberIndex); + if (log.isDebugEnabled()) { + log.debug(String.format("Service name: %s cluster id: %s members: %d", algorithmContext.getServiceName(), algorithmContext.getClusterId(), members.size())); + log.debug(String.format("Current member: %s", current.getMemberId())); + } + + } finally { + lock.unlock(); + } + return current; + } + + @Override + public void setMembers(List<Member> members) { + this.members = members; + } + + @Override + public void reset(AlgorithmContext algorithmContext) { + if (log.isDebugEnabled()) { + log.debug("Resetting the Round Robin load balancing algorithm ..."); + } + synchronized (algorithmContext) { + algorithmContext.setCurrentMemberIndex(0); + } + } + + @Override + public LoadBalanceAlgorithm clone() { + return new RoundRobin(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/159069f9/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/endpoint/TenantAwareLoadBalanceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/endpoint/TenantAwareLoadBalanceEndpoint.java index a805f3c..838b225 100644 --- a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/endpoint/TenantAwareLoadBalanceEndpoint.java +++ b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/endpoint/TenantAwareLoadBalanceEndpoint.java @@ -21,6 +21,8 @@ package org.apache.stratos.lb.endpoint.endpoint; import org.apache.axis2.addressing.EndpointReference; import org.apache.http.protocol.HTTP; +import org.apache.stratos.lb.endpoint.RequestProcessor; +import org.apache.stratos.lb.endpoint.algorithm.LoadBalanceAlgorithmFactory; import org.apache.stratos.lb.endpoint.topology.TopologyManager; import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; @@ -47,12 +49,16 @@ import java.util.*; public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints.LoadbalanceEndpoint implements Serializable { private static final String PORT_MAPPING_PREFIX = "port.mapping."; + + private RequestProcessor requestProcessor; private HttpSessionDispatcher dispatcher; + private boolean sessionAffinityEnabled; @Override public void init(SynapseEnvironment synapseEnvironment) { super.init(synapseEnvironment); + requestProcessor = new RequestProcessor(LoadBalanceAlgorithmFactory.createAlgorithm()); setDispatcher(new HttpSessionDispatcher()); } @@ -94,10 +100,12 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints sessionInformation.updateExpiryTime(); sendToApplicationMember(synCtx, currentMember, faultHandler, false); } else { - // Send request to a new member - org.apache.axis2.clustering.Member member = findNextMember(synCtx); - if(member != null) { - sendToApplicationMember(synCtx, member, faultHandler, true); + // No existing session found + // Find next member + org.apache.axis2.clustering.Member axis2Member = findNextMember(synCtx); + if(axis2Member != null) { + // Send request to member + sendToApplicationMember(synCtx, axis2Member, faultHandler, true); } else { throw new SynapseException(String.format("No application members available to serve the request %s", synCtx.getTo().getAddress())); @@ -105,49 +113,6 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints } } - private org.apache.axis2.clustering.Member findNextMember(MessageContext synCtx) { - try { - // Identify cluster by target host - TopologyManager.acquireReadLock(); - String targetHost = extractTargetHost(synCtx); - Cluster targetCluster = findNextCluster(targetHost); - if(targetCluster != null) { - // TODO: Apply algorithm and find next member from the selected cluster - if(targetCluster.getMembers().size() > 0) { - Member member = targetCluster.getMembers().iterator().next(); - - // Create Axi2 member object - String transport = extractTransport(synCtx); - Port transportPort = member.getPort(transport); - if(transportPort == null) - throw new SynapseException(String.format("Port not found for transport %s in member %s", transport, member.getMemberId())); - - int memberPort = transportPort.getValue(); - org.apache.axis2.clustering.Member axis2Member = new org.apache.axis2.clustering.Member(member.getHostName(), memberPort); - axis2Member.setDomain(member.getHostName()); - axis2Member.setActive(true); - return axis2Member; - } - } - return null; - } - finally { - TopologyManager.releaseReadLock(); - } - } - - private Cluster findNextCluster(String targetHost) { - Cluster targetCluster = null; - Collection<Service> services = TopologyManager.getTopology().getServices(); - for (Service service : services) { - for (Cluster cluster : service.getClusters()) { - if (targetHost.equals(cluster.getHostName())) { - return cluster; - } - } - } - return null; - } /** * Adding the X-Forwarded-For/X-Originating-IP headers to the outgoing message. @@ -184,6 +149,23 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints } } + private org.apache.axis2.clustering.Member findNextMember(MessageContext synCtx) { + String targetHost = extractTargetHost(synCtx); + Member member = requestProcessor.findNextMember(targetHost); + + // Create Axi2 member object + String transport = extractTransport(synCtx); + Port transportPort = member.getPort(transport); + if(transportPort == null) + throw new RuntimeException(String.format("Port not found for transport %s in member %s", transport, member.getMemberId())); + + int memberPort = transportPort.getValue(); + org.apache.axis2.clustering.Member axis2Member = new org.apache.axis2.clustering.Member(member.getHostName(), memberPort); + axis2Member.setDomain(member.getHostName()); + axis2Member.setActive(true); + return axis2Member; + } + private String extractTargetHost(MessageContext synCtx) { org.apache.axis2.context.MessageContext msgCtx = ((Axis2MessageContext) synCtx).getAxis2MessageContext(); @@ -300,7 +282,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints } public boolean isSessionAffinityBasedLB() { - return false; + return sessionAffinityEnabled; } /*
