Repository: incubator-stratos Updated Branches: refs/heads/master ace6d562d -> 2dbaae431
Fixed default port issue and used axis2 member http/https port properties Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/2dbaae43 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/2dbaae43 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/2dbaae43 Branch: refs/heads/master Commit: 2dbaae431887aee8ebd1349c5366ed386815883d Parents: ace6d56 Author: Imesh Gunaratne <[email protected]> Authored: Fri May 2 16:13:23 2014 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Fri May 2 16:13:23 2014 +0530 ---------------------------------------------------------------------- .../TenantAwareLoadBalanceEndpoint.java | 291 ++++++++++++------- .../stratos/load/balancer/util/Constants.java | 9 +- 2 files changed, 191 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2dbaae43/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java index 709ad10..1ffdb50 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java @@ -28,14 +28,17 @@ import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory; import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration; import org.apache.stratos.load.balancer.conf.domain.MemberIpType; import org.apache.stratos.load.balancer.conf.domain.TenantIdentifier; +import org.apache.stratos.load.balancer.context.LoadBalancerContext; import org.apache.stratos.load.balancer.statistics.InFlightRequestDecrementCallable; import org.apache.stratos.load.balancer.statistics.InFlightRequestIncrementCallable; import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsExecutor; import org.apache.stratos.load.balancer.util.Constants; import org.apache.stratos.messaging.domain.tenant.Tenant; +import org.apache.stratos.messaging.domain.topology.Cluster; import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.Port; import org.apache.stratos.messaging.message.receiver.tenant.TenantManager; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.apache.synapse.MessageContext; import org.apache.synapse.SynapseConstants; import org.apache.synapse.SynapseException; @@ -87,7 +90,6 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints @Override public void send(MessageContext synCtx) { - SessionInformation sessionInformation = null; org.apache.axis2.clustering.Member currentMember = null; if (isSessionAffinityBasedLB()) { @@ -118,6 +120,8 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints TenantAwareLoadBalanceFaultHandler faultHandler = new TenantAwareLoadBalanceFaultHandler(); if (sessionInformation != null && currentMember != null) { + // Update axis2 member ports + updateAxis2MemberPorts(synCtx, currentMember); // Send request to the member with the existing session sessionInformation.updateExpiryTime(); sendToApplicationMember(synCtx, currentMember, faultHandler, false); @@ -199,90 +203,98 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints } private org.apache.axis2.clustering.Member findNextMember(MessageContext synCtx) { - try { - String targetHost = extractTargetHost(synCtx); - if (!requestDelegator.isTargetHostValid(targetHost)) { - throwSynapseException(synCtx, 404, String.format("Unknown host name %s", targetHost)); - } + String targetHost = extractTargetHost(synCtx); + if (!requestDelegator.isTargetHostValid(targetHost)) { + throwSynapseException(synCtx, 404, String.format("Unknown host name %s", targetHost)); + } - Member member = null; - if (LoadBalancerConfiguration.getInstance().isMultiTenancyEnabled()) { - // Try to find next member from multi-tenant cluster map - if (log.isDebugEnabled()) { - log.debug("Multi-tenancy enabled, scanning URL for tenant..."); - } - String url = extractUrl(synCtx); - int tenantId = scanUrlForTenantId(url); - if (tenantExists(tenantId)) { - // Tenant found, find member from hostname and tenant id - member = requestDelegator.findNextMemberFromTenantId(targetHost, tenantId); - } else { - // Tenant id not found in URL, find member from host name - member = requestDelegator.findNextMemberFromHostName(targetHost); - } + Member member = null; + if (LoadBalancerConfiguration.getInstance().isMultiTenancyEnabled()) { + // Try to find next member from multi-tenant cluster map + if (log.isDebugEnabled()) { + log.debug("Multi-tenancy enabled, scanning URL for tenant..."); + } + String url = extractUrl(synCtx); + int tenantId = scanUrlForTenantId(url); + if (tenantExists(tenantId)) { + // Tenant found, find member from hostname and tenant id + member = requestDelegator.findNextMemberFromTenantId(targetHost, tenantId); } else { - // Find next member from host name + // Tenant id not found in URL, find member from host name member = requestDelegator.findNextMemberFromHostName(targetHost); } + } else { + // Find next member from host name + member = requestDelegator.findNextMemberFromHostName(targetHost); + } + + if (member == null) + return null; + + // Create Axi2 member object + org.apache.axis2.clustering.Member axis2Member = new org.apache.axis2.clustering.Member( + getMemberIp(synCtx, member), -1); + axis2Member.setDomain(member.getClusterId()); + axis2Member.setActive(member.isActive()); + // Set cluster id and member id in member properties + axis2Member.getProperties().setProperty(Constants.CLUSTER_ID, member.getClusterId()); + axis2Member.getProperties().setProperty(Constants.MEMBER_ID, member.getMemberId()); + // Update axis2 member ports + updateAxis2MemberPorts(synCtx, axis2Member); + return axis2Member; + } - if (member == null) - return null; - - // Find mapping outgoing port for incoming port - int incomingPort = findIncomingPort(synCtx); - Port outgoingPort = findOutgoingPort(member, incomingPort); - if (outgoingPort == null) { - if (log.isErrorEnabled()) { - log.error(String.format("Could not find the port for proxy port %d in member %s", incomingPort, - member.getMemberId())); - } - throwSynapseException(synCtx, 500, "Internal server error"); - } - - // Create Axi2 member object - org.apache.axis2.clustering.Member axis2Member = new org.apache.axis2.clustering.Member( - getMemberIp(synCtx, member), outgoingPort.getValue()); - axis2Member.setDomain(member.getClusterId()); - axis2Member.setActive(member.isActive()); - - // Set cluster id and partition id in message context - axis2Member.getProperties().setProperty(Constants.CLUSTER_ID, member.getClusterId()); - return axis2Member; + /** + * Update http/https port in axis2 member according to incoming request port. + * + * @param synCtx + * @param axis2Member + */ + private void updateAxis2MemberPorts(MessageContext synCtx, org.apache.axis2.clustering.Member axis2Member) { + if(log.isDebugEnabled()) { + log.debug("Updating axis2 member port"); } - catch (Exception e) { - if(log.isErrorEnabled()) { - log.error("Could not find a member to serve the request"); + + // Find mapping outgoing port for incoming port + int incomingPort = findIncomingPort(synCtx); + String transport = extractTransport(synCtx); + Port outgoingPort = findOutgoingPort(synCtx, axis2Member, transport, incomingPort); + if (outgoingPort == null) { + if (log.isErrorEnabled()) { + log.error(String.format("Could not find the port for proxy port %d in member %s", incomingPort, + axis2Member.getProperties().getProperty(Constants.MEMBER_ID))); } throwSynapseException(synCtx, 500, "Internal server error"); } - return null; + if (Constants.HTTP.equals(transport)) { + axis2Member.setHttpPort(outgoingPort.getValue()); + } else if (Constants.HTTPS.equals(transport)) { + axis2Member.setHttpsPort(outgoingPort.getValue()); + } } - /*** + /** * Find incoming port from request URL. + * * @param synCtx * @return * @throws MalformedURLException */ - private int findIncomingPort(MessageContext synCtx) throws MalformedURLException { + private int findIncomingPort(MessageContext synCtx) { org.apache.axis2.context.MessageContext msgCtx = ((Axis2MessageContext) synCtx).getAxis2MessageContext(); try { - Map headerMap = (Map) msgCtx.getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS); - if (headerMap != null) { - String hostHeader = (String) headerMap.get(HTTP.TARGET_HOST); - int index = hostHeader.indexOf(':'); - if (index != -1) { - int port = Integer.parseInt(hostHeader.trim().substring(index + 1)); - if (log.isDebugEnabled()) { - log.debug("Incoming request port found: " + port); - } - return port; + String servicePrefix = (String) msgCtx.getProperty(Constants.AXIS2_MSG_CTX_SERVICE_PREFIX); + if (servicePrefix == null) { + if (log.isErrorEnabled()) { + log.error(String.format("%s property not found in axis2 message context", Constants.AXIS2_MSG_CTX_SERVICE_PREFIX)); } + throwSynapseException(synCtx, 500, "Internal server error"); } - } - catch (Exception e) { - if(log.isErrorEnabled()) { + URL servicePrefixUrl = new URL(servicePrefix); + return servicePrefixUrl.getPort(); + } catch (MalformedURLException e) { + if (log.isErrorEnabled()) { log.error("Could not find incoming request port"); } throwSynapseException(synCtx, 500, "Internal server error"); @@ -290,17 +302,30 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints return -1; } - /*** + /** * Find mapping outgoing port for incoming port. - * @param member + * + * @param synCtx + * @param axis2Member + * @param transport * @param incomingPort * @return * @throws MalformedURLException */ - private Port findOutgoingPort(Member member, int incomingPort) throws MalformedURLException { - if((member != null) && (member.getPorts() != null)) { + private Port findOutgoingPort(MessageContext synCtx, org.apache.axis2.clustering.Member axis2Member, String transport, int incomingPort) { + Member member = findMemberFromAxis2Member(synCtx, axis2Member); + if ((member != null) && (member.getPorts() != null)) { Port outgoingPort = member.getPort(incomingPort); - if(outgoingPort != null) { + if (outgoingPort != null) { + if (!transport.equals(outgoingPort.getProtocol())) { + if (log.isErrorEnabled()) { + String message = String.format("Transport %s is not valid for port %d", transport, incomingPort); + if (log.isErrorEnabled()) { + log.error(message); + } + throwSynapseException(synCtx, 500, message); + } + } if (log.isDebugEnabled()) { log.debug("Outgoing request port found: " + outgoingPort.getValue()); } @@ -310,47 +335,99 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints return null; } - /*** + /** + * Find topology member from axis2 member using cluster id and member id defined in axis2 member properties. + * + * @param synCtx + * @param axis2Member + * @return + */ + private Member findMemberFromAxis2Member(MessageContext synCtx, org.apache.axis2.clustering.Member axis2Member) { + String clusterId = axis2Member.getProperties().getProperty(Constants.CLUSTER_ID); + String memberId = axis2Member.getProperties().getProperty(Constants.MEMBER_ID); + if (StringUtils.isBlank(clusterId) || StringUtils.isBlank(memberId)) { + if (log.isErrorEnabled()) { + log.error(String.format("Could not find cluster id and/or member id properties in axis2 member: [cluster-id] %s " + + "[member-id] %s", clusterId, memberId)); + } + throwSynapseException(synCtx, 500, "Internal server error"); + } + try { + TopologyManager.acquireReadLock(); + Cluster cluster = LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterId); + if (cluster == null) { + if (log.isErrorEnabled()) { + log.error(String.format("Cluster not found in load balancer context: [cluster-id] %s ", clusterId)); + } + throwSynapseException(synCtx, 500, "Internal server error"); + } + Member member = cluster.getMember(memberId); + if (member == null) { + if (log.isErrorEnabled()) { + log.error(String.format("Member not found in load balancer context: [cluster-id] %s [member-id] %s", clusterId, memberId)); + } + throwSynapseException(synCtx, 500, "Internal server error"); + } + return member; + } finally { + TopologyManager.releaseReadLock(); + } + } + + /** * Get members private or public ip according to load balancer configuration. + * * @param synCtx * @param member * @return */ private String getMemberIp(MessageContext synCtx, Member member) { - if(LoadBalancerConfiguration.getInstance().isTopologyEventListenerEnabled()) { - if(LoadBalancerConfiguration.getInstance().getTopologyMemberIpType() == MemberIpType.Public) { + if (LoadBalancerConfiguration.getInstance().isTopologyEventListenerEnabled()) { + if (LoadBalancerConfiguration.getInstance().getTopologyMemberIpType() == MemberIpType.Public) { // Return member's public IP address - if(StringUtils.isBlank(member.getMemberPublicIp())) { + if (StringUtils.isBlank(member.getMemberPublicIp())) { if (log.isErrorEnabled()) { log.error(String.format("Member public IP address not found: [member] %s", member.getMemberId())); } throwSynapseException(synCtx, 500, "Internal server error"); } - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { log.debug(String.format("Using member public IP address: [member] %s [ip] %s", member.getMemberId(), member.getMemberPublicIp())); } return member.getMemberPublicIp(); } } // Return member's private IP address - if(StringUtils.isBlank(member.getMemberIp())) { + if (StringUtils.isBlank(member.getMemberIp())) { if (log.isErrorEnabled()) { log.error(String.format("Member IP address not found: [member] %s", member.getMemberId())); } throwSynapseException(synCtx, 500, "Internal server error"); } - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { log.debug(String.format("Using member IP address: [member] %s [ip] %s", member.getMemberId(), member.getMemberIp())); } return member.getMemberIp(); } + /** + * Extract incoming request URL from message context. + * + * @param synCtx + * @return + */ private String extractUrl(MessageContext synCtx) { Axis2MessageContext axis2smc = (Axis2MessageContext) synCtx; org.apache.axis2.context.MessageContext axis2MessageCtx = axis2smc.getAxis2MessageContext(); return (String) axis2MessageCtx.getProperty(Constants.AXIS2_MSG_CTX_TRANSPORT_IN_URL); } + /** + * Scan given URL for tenant id. + * + * @param url + * @return + */ private int scanUrlForTenantId(String url) { int tenantId = -1; String regex = LoadBalancerConfiguration.getInstance().getTenantIdentifierRegex(); @@ -456,22 +533,22 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints return endpoint; } - private EndpointReference getEndpointReferenceAfterURLRewrite(org.apache.axis2.clustering.Member currentMember, + private EndpointReference getEndpointReferenceAfterURLRewrite(MessageContext synCtx, org.apache.axis2.clustering.Member currentMember, String transport, String address) { + try { + if (transport.startsWith(Constants.HTTPS)) { + transport = Constants.HTTPS; + } else if (transport.startsWith(Constants.HTTP)) { + transport = Constants.HTTP; + } else { + String msg = "Cannot load balance for non-HTTP/S transport " + transport; + log.error(msg); + throwSynapseException(synCtx, 500, msg); + } - if (transport.startsWith("https")) { - transport = "https"; - } else if (transport.startsWith("http")) { - transport = "http"; - } else { - String msg = "Cannot load balance for non-HTTP/S transport " + transport; - log.error(msg); - throw new SynapseException(msg); - } - // URL Rewrite - if (transport.startsWith("http") || transport.startsWith("https")) { - if (address.startsWith("http://") || address.startsWith("https://")) { + // URL Rewrite + if (address.startsWith(Constants.HTTP + "://") || address.startsWith(Constants.HTTPS + "://")) { try { String _address = address.indexOf("?") > 0 ? address.substring(address.indexOf("?"), address.length()) : ""; address = new URL(address).getPath() + _address; @@ -483,13 +560,15 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints } String hostName = currentMember.getHostName(); - int port = currentMember.getPort(); - return new EndpointReference(transport + "://" + hostName + - ":" + port + address); - } else { - String msg = "Cannot load balance for non-HTTP/S transport " + transport; - log.error(msg); - throw new SynapseException(msg); + int port = (transport.startsWith(Constants.HTTPS)) ? currentMember.getHttpsPort() : currentMember.getHttpPort(); + return new EndpointReference(new URL(transport, hostName, port, address).toString()); + + } catch (MalformedURLException e) { + if (log.isErrorEnabled()) { + log.error("Could not create endpoint reference", e); + } + throwSynapseException(synCtx, 500, "Internal server error"); + return null; } } @@ -539,7 +618,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints String transport = axis2MsgCtx.getTransportIn().getName(); String address = synCtx.getTo().getAddress(); - EndpointReference to = getEndpointReferenceAfterURLRewrite(currentMember, transport, address); + EndpointReference to = getEndpointReferenceAfterURLRewrite(synCtx, currentMember, transport, address); synCtx.setTo(to); Endpoint endpoint = getEndpoint(to, currentMember, synCtx); @@ -595,14 +674,13 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints private void incrementInFlightRequestCount(MessageContext messageContext) { try { String clusterId = (String) messageContext.getProperty(Constants.CLUSTER_ID); - if(StringUtils.isBlank(clusterId)) { + if (StringUtils.isBlank(clusterId)) { throw new RuntimeException("Cluster id not found in message context"); } FutureTask<Object> task = new FutureTask<Object>(new InFlightRequestIncrementCallable(clusterId)); LoadBalancerStatisticsExecutor.getInstance().getService().submit(task); - } - catch (Exception e) { - if(log.isDebugEnabled()) { + } catch (Exception e) { + if (log.isDebugEnabled()) { log.debug("Could not increment in-flight request count", e); } } @@ -611,14 +689,13 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints private void decrementInFlightRequestCount(MessageContext messageContext) { try { String clusterId = (String) messageContext.getProperty(Constants.CLUSTER_ID); - if(StringUtils.isBlank(clusterId)) { + if (StringUtils.isBlank(clusterId)) { throw new RuntimeException("Cluster id not found in message context"); } FutureTask<Object> task = new FutureTask<Object>(new InFlightRequestDecrementCallable(clusterId)); LoadBalancerStatisticsExecutor.getInstance().getService().submit(task); - } - catch (Exception e) { - if(log.isDebugEnabled()) { + } catch (Exception e) { + if (log.isDebugEnabled()) { log.debug("Could not decrement in-flight request count", e); } } @@ -691,7 +768,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints decrementInFlightRequestCount(synCtx); if (isFailover()) { - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { log.debug("Fail-over enabled, trying to send the message to the next available member"); } @@ -700,7 +777,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints currentEp.destroy(); } if (currentMember == null) { - if(log.isErrorEnabled()) { + if (log.isErrorEnabled()) { log.error("Current member is null, could not fail-over"); } return; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2dbaae43/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java index ff64d20..a29b980 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java @@ -22,12 +22,17 @@ package org.apache.stratos.load.balancer.util; public class Constants { public static final String CLUSTER_ID = "cluster_id"; + public static final String MEMBER_ID = "member_id"; + + public static final String HTTP = "http"; + public static final String HTTPS = "https"; + + public static final String AXIS2_MSG_CTX_TRANSPORT_IN_URL = "TransportInURL"; + public static final String AXIS2_MSG_CTX_SERVICE_PREFIX = "SERVICE_PREFIX"; public static final String LB_HOST_NAME = "LB_HOST_NAME"; public static final String LB_HTTP_PORT = "LB_HTTP_PORT"; public static final String LB_HTTPS_PORT = "LB_HTTPS_PORT"; public static final String ALGORITHM_CONTEXT_CACHE = "algorithm.context.cache"; - - public static final String AXIS2_MSG_CTX_TRANSPORT_IN_URL = "TransportInURL"; }
