Updated Branches: refs/heads/master d05eda2f7 -> 0a24ca1b5
Updated load balancer endpoint and added support to identify tenant from request URL Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/0a24ca1b Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/0a24ca1b Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/0a24ca1b Branch: refs/heads/master Commit: 0a24ca1b50dab603bcf8d386a8af888e8c40acfb Parents: d05eda2 Author: Imesh Gunaratne <[email protected]> Authored: Sun Dec 8 00:45:42 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Sun Dec 8 00:45:42 2013 +0530 ---------------------------------------------------------------------- .../stratos/load/balancer/RequestDelegator.java | 4 +- .../TenantAwareLoadBalanceEndpoint.java | 101 ++++++++++++++++++- .../message/receiver/tenant/TenantManager.java | 4 + 3 files changed, 102 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/0a24ca1b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java index 96cb3eb..9b0a8a5 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java @@ -55,8 +55,8 @@ public class RequestDelegator { if (cluster != null) { Member member = findNextMemberInCluster(cluster); if (member != null) { - long endTime = System.currentTimeMillis(); if (log.isDebugEnabled()) { + long endTime = System.currentTimeMillis(); log.debug(String.format("Next member identified in %dms: [service] %s [cluster] %s [member] %s", (endTime - startTime), member.getServiceName(), member.getClusterId(), member.getMemberId())); } } @@ -78,8 +78,8 @@ public class RequestDelegator { if (cluster != null) { Member member = findNextMemberInCluster(cluster); if (member != null) { - long endTime = System.currentTimeMillis(); if (log.isDebugEnabled()) { + long endTime = System.currentTimeMillis(); log.debug(String.format("Next member identified in %dms: [service] %s [cluster] %s [tenant-id] %d [member] %s", (endTime - startTime), member.getServiceName(), member.getClusterId(), tenantId, member.getMemberId())); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/0a24ca1b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java index 95ffb1f..c29359f 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java @@ -24,10 +24,14 @@ import org.apache.axis2.description.TransportInDescription; import org.apache.http.protocol.HTTP; import org.apache.stratos.load.balancer.RequestDelegator; import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory; +import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration; +import org.apache.stratos.load.balancer.conf.domain.TenantIdentifier; import org.apache.stratos.load.balancer.statistics.LoadBalancerInFlightRequestCountCollector; import org.apache.stratos.load.balancer.util.Constants; +import org.apache.stratos.messaging.domain.tenant.Tenant; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Port; +import org.apache.stratos.messaging.message.receiver.tenant.TenantManager; import org.apache.synapse.MessageContext; import org.apache.synapse.SynapseConstants; import org.apache.synapse.SynapseException; @@ -45,6 +49,8 @@ import java.io.Serializable; import java.net.MalformedURLException; import java.net.URL; import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints.LoadbalanceEndpoint implements Serializable { @@ -186,10 +192,29 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints private org.apache.axis2.clustering.Member findNextMember(MessageContext synCtx) { String targetHost = extractTargetHost(synCtx); - if(!requestDelegator.isTargetHostValid(targetHost)) { + if (!requestDelegator.isTargetHostValid(targetHost)) { throwSynapseException(synCtx, 404, String.format("Unknown host name %s", targetHost)); } - Member member = requestDelegator.findNextMemberFromHostName(targetHost); + + Member member = null; + if (LoadBalancerConfiguration.getInstance().isMultiTenancyEnabled()) { + // Try to find next member from multi-tenant cluster map + if (log.isDebugEnabled()) { + log.debug("Multi-tenancy enabled, scanning URL for tenant..."); + } + String url = extractUrl(synCtx); + int tenantId = scanUrlForTenantId(url); + if (tenantExists(tenantId)) { + member = requestDelegator.findNextMemberFromTenantId(targetHost, tenantId); + } else { + // Multi-tenant cluster not found, try single tenant + member = requestDelegator.findNextMemberFromHostName(targetHost); + } + } else { + // Find next member from single tenant cluster map + member = requestDelegator.findNextMemberFromHostName(targetHost); + } + if (member == null) return null; @@ -197,7 +222,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints String transport = extractTransport(synCtx); Port transportPort = member.getPort(transport); if (transportPort == null) { - if(log.isErrorEnabled()) { + if (log.isErrorEnabled()) { log.error(String.format("Port not found for transport %s in member %s", transport, member.getMemberId())); } throwSynapseException(synCtx, 500, "Internal server error"); @@ -216,6 +241,72 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints return axis2Member; } + private String extractUrl(MessageContext synCtx) { + Axis2MessageContext axis2smc = (Axis2MessageContext) synCtx; + org.apache.axis2.context.MessageContext axis2MessageCtx = axis2smc.getAxis2MessageContext(); + return (String) axis2MessageCtx.getProperty(org.apache.axis2.context.MessageContext.REMOTE_ADDR); + } + + private int scanUrlForTenantId(String url) { + int tenantId = -1; + String regex = LoadBalancerConfiguration.getInstance().getTenantIdentifierRegex(); + if(log.isDebugEnabled()) { + log.debug(String.format("Request URL: %s ", url)); + log.debug(String.format("Tenant identifier regex: %s ", regex)); + } + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(url); + if (matcher.find()) { + if (LoadBalancerConfiguration.getInstance().getTenantIdentifier() == TenantIdentifier.TenantId) { + if(log.isDebugEnabled()) { + log.debug("Identifying tenant using tenant id..."); + } + tenantId = Integer.parseInt(matcher.group(1)); + if (log.isDebugEnabled()) { + log.debug(String.format("Tenant identifier found: [tenant-id] %d", tenantId)); + } + } + else if (LoadBalancerConfiguration.getInstance().getTenantIdentifier() == TenantIdentifier.TenantDomain) { + if(log.isDebugEnabled()) { + log.debug("Identifying tenant using tenant domain..."); + } + String tenantDomain = matcher.group(1); + tenantId = findTenantIdFromTenantDomain(tenantDomain); + if (log.isDebugEnabled()) { + log.debug(String.format("Tenant identifier found: [tenant-domain] %s [tenant-id] %d", tenantDomain, tenantId)); + } + } + } + else { + if(log.isDebugEnabled()) { + log.debug("Tenant identifier not found in URL"); + } + } + return tenantId; + } + + private boolean tenantExists(int tenantId) { + try { + TenantManager.acquireReadLock(); + return TenantManager.getInstance().tenantExists(tenantId); + } finally { + TenantManager.releaseReadLock(); + } + } + + private int findTenantIdFromTenantDomain(String tenantDomain) { + try { + TenantManager.acquireReadLock(); + Tenant tenant = TenantManager.getInstance().getTenant(tenantDomain); + if(tenant != null) { + return tenant.getTenantId(); + } + return -1; + } finally { + TenantManager.releaseReadLock(); + } + } + private String extractTargetHost(MessageContext synCtx) { org.apache.axis2.context.MessageContext msgCtx = ((Axis2MessageContext) synCtx).getAxis2MessageContext(); @@ -503,12 +594,12 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints currentMember = findNextMember(synCtx); if (currentMember == null) { String msg = String.format("No application members available to serve the request %s", synCtx.getTo().getAddress()); - if(log.isErrorEnabled()) { + if (log.isErrorEnabled()) { log.error(msg); } throwSynapseException(synCtx, 404, msg); } - if(faultyMembers.containsKey(currentMember.getHostName())) { + if (faultyMembers.containsKey(currentMember.getHostName())) { // This member has been identified as faulty previously. It implies that // this request could not be served by any of the members in the cluster. throwSynapseException(synCtx, 404, String.format("Requested resource could not be found")); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/0a24ca1b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantManager.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantManager.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantManager.java index c15b73c..13cf1e8 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantManager.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/tenant/TenantManager.java @@ -113,6 +113,10 @@ public class TenantManager { return this.tenantDomainTenantMap.get(tenantDomain); } + public boolean tenantExists(int tenantId) { + return tenantIdTenantMap.containsKey(tenantId); + } + public void removeTenant(int tenantId) { Tenant tenant = getTenant(tenantId); if(tenant != null) {
