Updated load balancer port mapping logic to find outgoing port according to the incoming port rather than looking at the transport
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/e4b15b6f Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/e4b15b6f Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/e4b15b6f Branch: refs/heads/master Commit: e4b15b6f52253b804c5e716fbcf1c1db47111dc2 Parents: a8e3181 Author: Imesh Gunaratne <[email protected]> Authored: Thu May 1 10:38:21 2014 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Thu May 1 10:38:21 2014 +0530 ---------------------------------------------------------------------- .../TenantAwareLoadBalanceEndpoint.java | 181 ++++++++++--------- .../messaging/domain/topology/Member.java | 73 ++++---- .../messaging/domain/topology/Service.java | 36 ++-- .../event/topology/MemberActivatedEvent.java | 30 ++- .../event/topology/ServiceCreatedEvent.java | 29 ++- 5 files changed, 171 insertions(+), 178 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e4b15b6f/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java index 364e869..c426a1b 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java @@ -59,7 +59,7 @@ import java.util.regex.Pattern; public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints.LoadbalanceEndpoint implements Serializable { - private static final String PORT_MAPPING_PREFIX = "port.mapping."; + private static final long serialVersionUID = -6612900240087164008L; /* Request delegator identifies the next member */ private RequestDelegator requestDelegator; @@ -199,59 +199,111 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints } private org.apache.axis2.clustering.Member findNextMember(MessageContext synCtx) { - String targetHost = extractTargetHost(synCtx); - if (!requestDelegator.isTargetHostValid(targetHost)) { - throwSynapseException(synCtx, 404, String.format("Unknown host name %s", targetHost)); - } - - Member member = null; - if (LoadBalancerConfiguration.getInstance().isMultiTenancyEnabled()) { - // Try to find next member from multi-tenant cluster map - if (log.isDebugEnabled()) { - log.debug("Multi-tenancy enabled, scanning URL for tenant..."); + try { + String targetHost = extractTargetHost(synCtx); + if (!requestDelegator.isTargetHostValid(targetHost)) { + throwSynapseException(synCtx, 404, String.format("Unknown host name %s", targetHost)); } - String url = extractUrl(synCtx); - int tenantId = scanUrlForTenantId(url); - if (tenantExists(tenantId)) { - // Tenant found, find member from hostname and tenant id - member = requestDelegator.findNextMemberFromTenantId(targetHost, tenantId); + + Member member = null; + if (LoadBalancerConfiguration.getInstance().isMultiTenancyEnabled()) { + // Try to find next member from multi-tenant cluster map + if (log.isDebugEnabled()) { + log.debug("Multi-tenancy enabled, scanning URL for tenant..."); + } + String url = extractUrl(synCtx); + int tenantId = scanUrlForTenantId(url); + if (tenantExists(tenantId)) { + // Tenant found, find member from hostname and tenant id + member = requestDelegator.findNextMemberFromTenantId(targetHost, tenantId); + } else { + // Tenant id not found in URL, find member from host name + member = requestDelegator.findNextMemberFromHostName(targetHost); + } } else { - // Tenant id not found in URL, find member from host name + // Find next member from host name member = requestDelegator.findNextMemberFromHostName(targetHost); } - } else { - // Find next member from host name - member = requestDelegator.findNextMemberFromHostName(targetHost); - } - if (member == null) - return null; + if (member == null) + return null; - // Create Axi2 member object - String transport = extractTransport(synCtx); - Port transportPort = member.getPort(transport); - if (transportPort == null) { - if (log.isErrorEnabled()) { - log.error(String.format("Port not found for transport %s in member %s", transport, member.getMemberId())); + // Find mapping outgoing port for incoming port + int incomingPort = findIncomingPort(synCtx); + Port outgoingPort = findOutgoingPort(member, incomingPort); + if (outgoingPort == null) { + if (log.isErrorEnabled()) { + log.error(String.format("Could not find port for proxy port %d in member %s", incomingPort, + member.getMemberId())); + } + throwSynapseException(synCtx, 500, "Internal server error"); + } + + // Create Axi2 member object + org.apache.axis2.clustering.Member axis2Member = new org.apache.axis2.clustering.Member( + getMemberIp(synCtx, member), outgoingPort.getValue()); + axis2Member.setDomain(member.getClusterId()); + axis2Member.setActive(member.isActive()); + + // Set cluster id and partition id in message context + axis2Member.getProperties().setProperty(Constants.CLUSTER_ID, member.getClusterId()); + return axis2Member; + } + catch (Exception e) { + if(log.isErrorEnabled()) { + log.error("Could not find a member to serve the request"); } throwSynapseException(synCtx, 500, "Internal server error"); } + return null; + } + + /*** + * Find incoming port from request URL. + * @param synCtx + * @return + * @throws MalformedURLException + */ + private int findIncomingPort(MessageContext synCtx) throws MalformedURLException { + try { + URL url = new URL(extractUrl(synCtx)); + if(log.isDebugEnabled()) { + log.debug("Incoming request port found: " + url.getPort()); + } + return url.getPort(); + } + catch (MalformedURLException e) { + if(log.isErrorEnabled()) { + log.error("Could not extract port from incoming request", e); + } + throw e; + } + } - int memberPort = transportPort.getValue(); - org.apache.axis2.clustering.Member axis2Member = new org.apache.axis2.clustering.Member(getMemberIp(synCtx, member), memberPort); - axis2Member.setDomain(member.getClusterId()); - Port httpPort = member.getPort("http"); - if (httpPort != null) - axis2Member.setHttpPort(httpPort.getValue()); - Port httpsPort = member.getPort("https"); - if (httpsPort != null) - axis2Member.setHttpsPort(httpsPort.getValue()); - axis2Member.setActive(member.isActive()); - // Set cluster id and partition id in message context - axis2Member.getProperties().setProperty(Constants.CLUSTER_ID, member.getClusterId()); - return axis2Member; + /*** + * Find mapping outgoing port for incoming port. + * @param member + * @param incomingPort + * @return + * @throws MalformedURLException + */ + private Port findOutgoingPort(Member member, int incomingPort) throws MalformedURLException { + if((member != null) && (member.getPorts() != null)) { + Port outgoingPort = member.getPort(incomingPort); + if(log.isDebugEnabled()) { + log.debug("Outgoing request port found: " + outgoingPort.getValue()); + } + return outgoingPort; + } + return null; } + /*** + * Get members private or public ip according to load balancer configuration. + * @param synCtx + * @param member + * @return + */ private String getMemberIp(MessageContext synCtx, Member member) { if(LoadBalancerConfiguration.getInstance().isTopologyEventListenerEnabled()) { if(LoadBalancerConfiguration.getInstance().getTopologyMemberIpType() == MemberIpType.Public) { @@ -361,28 +413,6 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints return hostName; } - private int extractPort(MessageContext synCtx, String transport) { - org.apache.axis2.context.MessageContext msgCtx = - ((Axis2MessageContext) synCtx).getAxis2MessageContext(); - - Map headerMap = (Map) msgCtx.getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS); - int port = -1; - if (headerMap != null) { - String hostHeader = (String) headerMap.get(HTTP.TARGET_HOST); - int index = hostHeader.indexOf(':'); - if (index != -1) { - port = Integer.parseInt(hostHeader.trim().substring(index + 1)); - } else { - if ("http".equals(transport)) { - port = 80; - } else if ("https".equals(transport)) { - port = 443; - } - } - } - return port; - } - private String extractTransport(MessageContext synCtx) { org.apache.axis2.context.MessageContext axis2MessageContext = ((Axis2MessageContext) synCtx).getAxis2MessageContext(); return axis2MessageContext.getTransportIn().getName(); @@ -416,8 +446,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints private EndpointReference getEndpointReferenceAfterURLRewrite(org.apache.axis2.clustering.Member currentMember, String transport, - String address, - int incomingPort) { + String address) { if (transport.startsWith("https")) { transport = "https"; @@ -435,25 +464,14 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints String _address = address.indexOf("?") > 0 ? address.substring(address.indexOf("?"), address.length()) : ""; address = new URL(address).getPath() + _address; } catch (MalformedURLException e) { - String msg = "URL " + address + " is malformed"; + String msg = String.format("URL is malformed: %s", address); log.error(msg, e); throw new SynapseException(msg, e); } } - int port; - Properties memberProperties = currentMember.getProperties(); - String mappedPort = memberProperties.getProperty(PORT_MAPPING_PREFIX + incomingPort); - if (mappedPort != null) { - port = Integer.parseInt(mappedPort); - } else if (transport.startsWith("https")) { - port = currentMember.getHttpsPort(); - } else { - port = currentMember.getHttpPort(); - } - - String remoteHost = memberProperties.getProperty("remoteHost"); - String hostName = (remoteHost == null) ? currentMember.getHostName() : remoteHost; + String hostName = currentMember.getHostName(); + int port = currentMember.getPort(); return new EndpointReference(transport + "://" + hostName + ":" + port + address); } else { @@ -509,8 +527,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints String transport = axis2MsgCtx.getTransportIn().getName(); String address = synCtx.getTo().getAddress(); - int incomingPort = extractPort(synCtx, transport); - EndpointReference to = getEndpointReferenceAfterURLRewrite(currentMember, transport, address, incomingPort); + EndpointReference to = getEndpointReferenceAfterURLRewrite(currentMember, transport, address); synCtx.setTo(to); Endpoint endpoint = getEndpoint(to, currentMember, synCtx); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e4b15b6f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java index b942e2e..09d38a7 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java @@ -24,10 +24,7 @@ import org.apache.stratos.messaging.util.bean.type.map.MapAdapter; import javax.xml.bind.annotation.XmlRootElement; import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; import java.io.Serializable; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; +import java.util.*; /** * Defines a member node in a cluster. @@ -42,13 +39,12 @@ public class Member implements Serializable { private final String networkPartitionId; private final String partitionId; private final String memberId; - + @XmlJavaTypeAdapter(MapAdapter.class) + private final Map<Integer, Port> portMap; private String memberPublicIp; private MemberStatus status; private String memberIp; @XmlJavaTypeAdapter(MapAdapter.class) - private final Map<String, Port> portMap; - @XmlJavaTypeAdapter(MapAdapter.class) private Properties properties; private String lbClusterId; @@ -58,7 +54,7 @@ public class Member implements Serializable { this.networkPartitionId = networkPartitionId; this.partitionId = partitionId; this.memberId = memberId; - this.portMap = new HashMap<String, Port>(); + this.portMap = new HashMap<Integer, Port>(); } public String getServiceName() { @@ -85,34 +81,31 @@ public class Member implements Serializable { return (this.status == MemberStatus.Activated); } - public Collection<Port> getPorts() { - return portMap.values(); + public Port getPort(int proxy) { + if(portMap.containsKey(proxy)) { + return portMap.get(proxy); + } + return null; } - public void addPort(Port port) { - this.portMap.put(port.getProtocol(), port); + public Map<Integer, Port> getPorts() { + return Collections.unmodifiableMap(portMap); } - public void addPorts(Collection<Port> ports) { - for(Port port: ports) { - addPort(port); - } + public void addPort(Port port) { + this.portMap.put(port.getProxy(), port); } - public void removePort(Port port) { - this.portMap.remove(port.getProtocol()); + public void addPorts(Map<Integer, Port> portMap) { + this.portMap.putAll(portMap); } - public void removePort(String protocol) { - this.portMap.remove(protocol); + public void removePort(Port port) { + this.portMap.remove(port.getProxy()); } public boolean portExists(Port port) { - return this.portMap.containsKey(port.getProtocol()); - } - - public Port getPort(String protocol) { - return this.portMap.get(protocol); + return this.portMap.containsKey(port.getProxy()); } public Properties getProperties() { @@ -123,37 +116,37 @@ public class Member implements Serializable { this.properties = properties; } - public String getMemberIp() { - return memberIp; + public String getMemberIp() { + return memberIp; } - public void setMemberIp(String memberIp) { - this.memberIp = memberIp; + public void setMemberIp(String memberIp) { + this.memberIp = memberIp; } public String getPartitionId() { return partitionId; } - public void setLbClusterId(String lbClusterId) { - this.lbClusterId = lbClusterId; - } - public String getLbClusterId() { return lbClusterId; } + public void setLbClusterId(String lbClusterId) { + this.lbClusterId = lbClusterId; + } + public String getNetworkPartitionId() { return networkPartitionId; } - public String getMemberPublicIp() { - return memberPublicIp; - } + public String getMemberPublicIp() { + return memberPublicIp; + } + + public void setMemberPublicIp(String memberPublicIp) { + this.memberPublicIp = memberPublicIp; + } - public void setMemberPublicIp(String memberPublicIp) { - this.memberPublicIp = memberPublicIp; - } - } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e4b15b6f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java index 21cc5e7..f35dc40 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java @@ -34,14 +34,14 @@ public class Service implements Serializable{ private final ServiceType serviceType; // Key: Cluster.clusterId private Map<String, Cluster> clusterIdClusterMap; - private Map<String, Port> portMap; + private Map<Integer, Port> portMap; private Properties properties; public Service(String serviceName, ServiceType serviceType) { this.serviceName = serviceName; this.serviceType = serviceType; this.clusterIdClusterMap = new HashMap<String, Cluster>(); - this.portMap = new HashMap<String, Port>(); + this.portMap = new HashMap<Integer, Port>(); } public String getServiceName() { @@ -76,36 +76,32 @@ public class Service implements Serializable{ return this.clusterIdClusterMap.get(clusterId); } - public Collection<Port> getPorts() { - return portMap.values(); + public Map<Integer, Port> getPorts() { + return Collections.unmodifiableMap(portMap); } - public void addPort(Port port) { - this.portMap.put(port.getProtocol(), port); - } - - public void addPorts(Collection<Port> ports) { - for(Port port: ports) { - addPort(port); + public Port getPort(int proxy) { + if(portMap.containsKey(proxy)) { + return portMap.get(proxy); } + return null; } - public void removePort(Port port) { - this.portMap.remove(port.getProtocol()); + public void addPort(Port port) { + this.portMap.put(port.getProxy(), port); } - public void removePort(String protocol) { - this.portMap.remove(protocol); + public void addPorts(Map<Integer, Port> portSet) { + this.portMap.putAll(portSet); } - public boolean portExists(Port port) { - return this.portMap.containsKey(port.getProtocol()); + public void removePort(Port port) { + this.portMap.remove(port.getProxy()); } - public Port getPort(String protocol) { - return this.portMap.get(protocol); + public boolean portExists(Port port) { + return this.portMap.containsKey(port.getProxy()); } - public Properties getProperties() { return properties; } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e4b15b6f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberActivatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberActivatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberActivatedEvent.java index 3cda807..22f3735 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberActivatedEvent.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberActivatedEvent.java @@ -20,9 +20,7 @@ package org.apache.stratos.messaging.event.topology; import java.io.Serializable; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import org.apache.stratos.messaging.domain.topology.Port; @@ -38,7 +36,7 @@ public class MemberActivatedEvent extends TopologyEvent implements Serializable private final String networkPartitionId; private final String partitionId; private final String memberId; - private Map<String, Port> portMap; + private Map<Integer, Port> portMap; private String memberIp; public MemberActivatedEvent(String serviceName, String clusterId, String networkPartitionId, String partitionId, String memberId) { @@ -47,7 +45,7 @@ public class MemberActivatedEvent extends TopologyEvent implements Serializable this.networkPartitionId = networkPartitionId; this.partitionId = partitionId; this.memberId = memberId; - this.portMap = new HashMap<String, Port>(); + this.portMap = new HashMap<Integer, Port>(); } public String getServiceName() { @@ -69,29 +67,25 @@ public class MemberActivatedEvent extends TopologyEvent implements Serializable public String getMemberId() { return memberId; } - - public Collection<Port> getPorts() { - return portMap.values(); + + public Map<Integer, Port> getPorts() { + return Collections.unmodifiableMap(portMap); } public void addPort(Port port) { - this.portMap.put(port.getProtocol(), port); + this.portMap.put(port.getProxy(), port); } - public void removePort(Port port) { - this.portMap.remove(port.getProtocol()); + public void addPorts(Map<Integer, Port> portSet) { + this.portMap.putAll(portSet); } - public void removePort(String portName) { - this.portMap.remove(portName); + public void removePort(Port port) { + this.portMap.remove(port.getProxy()); } public boolean portExists(Port port) { - return this.portMap.containsKey(port.getProtocol()); - } - - public Port getPort(String portName) { - return this.portMap.get(portName); + return this.portMap.containsKey(port.getProxy()); } public String getMemberIp() { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e4b15b6f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ServiceCreatedEvent.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ServiceCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ServiceCreatedEvent.java index 846091e..1bd5ff4 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ServiceCreatedEvent.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ServiceCreatedEvent.java @@ -23,10 +23,7 @@ import org.apache.stratos.messaging.domain.topology.Port; import org.apache.stratos.messaging.domain.topology.ServiceType; import java.io.Serializable; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; +import java.util.*; /** * This event is fired by Cloud Controller when a service is added to a topology. @@ -37,13 +34,13 @@ public class ServiceCreatedEvent extends TopologyEvent implements Serializable { private final String serviceName; private final ServiceType serviceType; - private final Map<String, Port> portMap; + private final Map<Integer, Port> portMap; private Properties properties; public ServiceCreatedEvent(String serviceName, ServiceType serviceType) { this.serviceName = serviceName; this.serviceType = serviceType; - this.portMap = new HashMap<String, Port>(); + this.portMap = new HashMap<Integer, Port>(); } public String getServiceName() { @@ -54,28 +51,24 @@ public class ServiceCreatedEvent extends TopologyEvent implements Serializable { return serviceType; } - public Collection<Port> getPorts() { - return portMap.values(); + public Map<Integer, Port> getPorts() { + return Collections.unmodifiableMap(portMap); } public void addPort(Port port) { - this.portMap.put(port.getProtocol(), port); + this.portMap.put(port.getProxy(), port); } - public void removePort(Port port) { - this.portMap.remove(port.getProtocol()); + public void addPorts(Map<Integer, Port> portSet) { + this.portMap.putAll(portSet); } - public void removePort(String portName) { - this.portMap.remove(portName); + public void removePort(Port port) { + this.portMap.remove(port.getProxy()); } public boolean portExists(Port port) { - return this.portMap.containsKey(port.getProtocol()); - } - - public Port getPort(String portName) { - return this.portMap.get(portName); + return this.portMap.containsKey(port.getProxy()); } public Properties getProperties() {
