Repository: stratos Updated Branches: refs/heads/master f0cacdff2 -> 67a22ab51
STRATOS-676: Fixed location header rewrite issue in load balancer Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/67a22ab5 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/67a22ab5 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/67a22ab5 Branch: refs/heads/master Commit: 67a22ab51ef9288629979470648b3b2d7b831bba Parents: f0cacdf Author: Imesh Gunaratne <[email protected]> Authored: Sun Jun 29 23:23:44 2014 -0400 Committer: Imesh Gunaratne <[email protected]> Committed: Sun Jun 29 23:23:44 2014 -0400 ---------------------------------------------------------------------- .../LoadBalancerTopologyEventReceiver.java | 216 ++++++++++++++++--- .../conf/LoadBalancerConfiguration.java | 14 ++ .../load/balancer/conf/util/Constants.java | 1 + .../balancer/context/LoadBalancerContext.java | 8 + .../TenantAwareLoadBalanceEndpoint.java | 4 +- .../balancer/mediators/LocationReWriter.java | 53 +++-- .../stratos/load/balancer/util/Constants.java | 2 +- .../test/LoadBalancerConfigurationTest.java | 11 +- .../sample/configuration/loadbalancer1.conf | 6 + .../sample/configuration/loadbalancer2.conf | 6 + .../sample/configuration/loadbalancer3.conf | 6 + .../ClusterRemovedMessageProcessor.java | 8 +- .../MemberTerminatedMessageProcessor.java | 6 +- .../ServiceRemovedMessageProcessor.java | 7 +- .../src/main/conf/loadbalancer.conf | 6 + 15 files changed, 296 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/67a22ab5/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java index f10256c..0b126cc 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyEventReceiver.java @@ -19,6 +19,7 @@ package org.apache.stratos.load.balancer; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.load.balancer.context.LoadBalancerContext; @@ -28,13 +29,8 @@ import org.apache.stratos.messaging.domain.topology.Member; import org.apache.stratos.messaging.domain.topology.MemberStatus; import org.apache.stratos.messaging.domain.topology.Service; import org.apache.stratos.messaging.event.Event; -import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent; -import org.apache.stratos.messaging.event.topology.MemberActivatedEvent; -import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent; -import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener; -import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener; -import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener; -import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener; +import org.apache.stratos.messaging.event.topology.*; +import org.apache.stratos.messaging.listener.topology.*; import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver; import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; @@ -93,6 +89,11 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { log.debug("Cluster does not have any active members"); } } + for (Member member : cluster.getMembers()) { + if (member.getStatus() == MemberStatus.Activated) { + addMemberIpsToMemberIpHostnameMap(cluster, member); + } + } } } initialized = true; @@ -119,31 +120,105 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { try { TopologyManager.acquireReadLock(); - // Add cluster to load balancer context when its first member is activated MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event; - if (LoadBalancerContext.getInstance().getClusterIdClusterMap().containsCluster(memberActivatedEvent.getClusterId())) { - if (log.isDebugEnabled()) { - log.debug(String.format("Cluster exists in load balancer context: [service] %s [cluster] %s", + Service service = TopologyManager.getTopology().getService(memberActivatedEvent.getServiceName()); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service not found in topology: [service] %s", + memberActivatedEvent.getServiceName())); + } + return; + } + Cluster cluster = service.getCluster(memberActivatedEvent.getClusterId()); + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster not found in topology: [service] %s [cluster] %s", memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId())); } return; } - // Cluster not found in load balancer context, add it - Service service = TopologyManager.getTopology().getService(memberActivatedEvent.getServiceName()); - if (service != null) { - Cluster cluster = service.getCluster(memberActivatedEvent.getClusterId()); - if (cluster != null) { - LoadBalancerContextUtil.addClusterAgainstHostNames(cluster); - } else { - if (log.isErrorEnabled()) { - log.error(String.format("Cluster not found in topology: [service] %s [cluster] %s", - memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId())); - } + Member member = cluster.getMember(memberActivatedEvent.getMemberId()); + if (member == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member not found in topology: [service] %s [cluster] %s [member] %s", + memberActivatedEvent.getServiceName(), memberActivatedEvent.getClusterId(), + memberActivatedEvent.getMemberId())); } - } else { - if (log.isErrorEnabled()) { - log.error(String.format("Service not found in topology: [service] %s", memberActivatedEvent.getServiceName())); + return; + } + + // Add member to member-ip -> hostname map + addMemberIpsToMemberIpHostnameMap(cluster, member); + + if (LoadBalancerContext.getInstance().getClusterIdClusterMap().containsCluster( + member.getClusterId())) { + if (log.isDebugEnabled()) { + log.debug(String.format("Cluster already exists in load balancer context: [service] %s " + + "[cluster] %s", member.getServiceName(), member.getClusterId())); } + // At this point member is already added to the cluster object in load balancer context + return; + } + + // Add cluster to load balancer context when its first member is activated: + // Cluster not found in load balancer context, add it + LoadBalancerContextUtil.addClusterAgainstHostNames(cluster); + } catch (Exception e) { + log.error("Error processing event", e); + } finally { + TopologyManager.releaseReadLock(); + } + } + }); + topologyEventReceiver.addEventListener(new MemberMaintenanceListener() { + @Override + protected void onEvent(Event event) { + try { + TopologyManager.acquireReadLock(); + MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event; + Member member = findMember(memberMaintenanceModeEvent.getServiceName(), + memberMaintenanceModeEvent.getClusterId(), memberMaintenanceModeEvent.getMemberId()); + + if (member != null) { + removeMemberIpsFromMemberIpHostnameMap(member); + } + } catch (Exception e) { + log.error("Error processing event", e); + } finally { + TopologyManager.releaseReadLock(); + } + } + }); + topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() { + @Override + protected void onEvent(Event event) { + try { + TopologyManager.acquireReadLock(); + MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event; + Member member = findMember(memberSuspendedEvent.getServiceName(), + memberSuspendedEvent.getClusterId(), memberSuspendedEvent.getMemberId()); + + if (member != null) { + removeMemberIpsFromMemberIpHostnameMap(member); + } + } catch (Exception e) { + log.error("Error processing event", e); + } finally { + TopologyManager.releaseReadLock(); + } + } + }); + topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() { + @Override + protected void onEvent(Event event) { + try { + TopologyManager.acquireReadLock(); + MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event; + Member member = findMember(memberTerminatedEvent.getServiceName(), + memberTerminatedEvent.getClusterId(), memberTerminatedEvent.getMemberId()); + + if (member != null) { + removeMemberIpsFromMemberIpHostnameMap(member); } } catch (Exception e) { log.error("Error processing event", e); @@ -162,6 +237,9 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event; Cluster cluster = LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterRemovedEvent.getClusterId()); if (cluster != null) { + for (Member member : cluster.getMembers()) { + removeMemberIpsFromMemberIpHostnameMap(member); + } LoadBalancerContextUtil.removeClusterAgainstHostNames(cluster.getClusterId()); } else { if (log.isWarnEnabled()) { @@ -187,11 +265,15 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { Service service = TopologyManager.getTopology().getService(serviceRemovedEvent.getServiceName()); if (service != null) { for (Cluster cluster : service.getClusters()) { + for (Member member : cluster.getMembers()) { + removeMemberIpsFromMemberIpHostnameMap(member); + } LoadBalancerContextUtil.removeClusterAgainstHostNames(cluster.getClusterId()); } } else { if (log.isWarnEnabled()) { - log.warn(String.format("Service not found in topology: [service] %s", serviceRemovedEvent.getServiceName())); + log.warn(String.format("Service not found in topology: [service] %s", + serviceRemovedEvent.getServiceName())); } } } catch (Exception e) { @@ -203,6 +285,88 @@ public class LoadBalancerTopologyEventReceiver implements Runnable { }); } + private Member findMember(String serviceName, String clusterId, String memberId) { + Service service = TopologyManager.getTopology().getService(serviceName); + if (service == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Service not found in topology: [service] %s", serviceName)); + } + return null; + } + + Cluster cluster = service.getCluster(clusterId); + if (cluster == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Cluster not found in topology: [service] %s [cluster] %s", serviceName, clusterId)); + } + return null; + } + + Member member = cluster.getMember(memberId); + if (member == null) { + if (log.isWarnEnabled()) { + log.warn(String.format("Member not found in topology: [service] %s [cluster] %s [member] %s", serviceName, + clusterId, memberId)); + } + return null; + } + return member; + } + + private void addMemberIpsToMemberIpHostnameMap(Cluster cluster, Member member) { + if ((cluster.getHostNames() == null) || (cluster.getHostNames().size() == 0)) { + if (log.isWarnEnabled()) { + log.warn(String.format("Hostnames not found in cluster %s, could not add member ips to member-ip " + + "-> hostname map", member.getClusterId())); + } + return; + } + + String hostname = cluster.getHostNames().get(0); + if (cluster.getHostNames().size() > 1) { + if (log.isWarnEnabled()) { + log.warn(String.format("Multiple hostnames found in cluster %s, using %s", + cluster.getHostNames().toString(), hostname)); + } + } + + if (StringUtils.isNotBlank(member.getMemberIp())) { + LoadBalancerContext.getInstance().getMemberIpHostnameMap().put(member.getMemberIp(), hostname); + if (log.isDebugEnabled()) { + log.debug(String.format("Member private ip added to member-ip -> hostname map: [service] %s [cluster] " + + "%s [member] %s [private-ip] %s", member.getServiceName(), member.getClusterId(), + member.getMemberId(), member.getMemberIp() + )); + } + } + if (StringUtils.isNotBlank(member.getMemberPublicIp())) { + LoadBalancerContext.getInstance().getMemberIpHostnameMap().put(member.getMemberPublicIp(), hostname); + if (log.isDebugEnabled()) { + log.debug(String.format("Member public ip added to member-ip -> hostname map: [service] %s [cluster] " + + "%s [member] %s [public-ip] %s", member.getServiceName(), member.getClusterId(), + member.getMemberId(), member.getMemberPublicIp() + )); + } + } + } + + private void removeMemberIpsFromMemberIpHostnameMap(Member member) { + if (StringUtils.isNotBlank(member.getMemberIp())) { + LoadBalancerContext.getInstance().getMemberIpHostnameMap().remove(member.getMemberIp()); + if (log.isDebugEnabled()) { + log.debug(String.format("Member private ip removed from member-ip -> hostname map: [private-ip] %s", + member.getMemberIp())); + } + } + if (StringUtils.isNotBlank(member.getMemberPublicIp())) { + LoadBalancerContext.getInstance().getMemberIpHostnameMap().remove(member.getMemberPublicIp()); + if (log.isDebugEnabled()) { + log.debug(String.format("Member public ip removed from member-ip -> hostname map: [public-ip] %s", + member.getMemberPublicIp())); + } + } + } + /** * Terminate load balancer topology receiver thread. */ http://git-wip-us.apache.org/repos/asf/stratos/blob/67a22ab5/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java index a246c1f..437f4d3 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java @@ -68,6 +68,7 @@ public class LoadBalancerConfiguration { private String tenantIdentifierRegex; private String topologyMemberFilter; private String networkPartitionId; + private boolean reWriteLocationHeader; /** * Load balancer configuration is singleton. @@ -271,6 +272,14 @@ public class LoadBalancerConfiguration { return networkPartitionId; } + public void setRewriteLocationHeader(boolean reWriteLocationHeader) { + this.reWriteLocationHeader = reWriteLocationHeader; + } + + public boolean isReWriteLocationHeader() { + return reWriteLocationHeader; + } + private static class LoadBalancerConfigurationReader { private String property; @@ -436,6 +445,11 @@ public class LoadBalancerConfiguration { configuration.addAlgorithm(algorithm); } + String rewriteLocationHeader = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_REWRITE_LOCATION_HEADER); + if(StringUtils.isNotEmpty(rewriteLocationHeader)) { + configuration.setRewriteLocationHeader(Boolean.parseBoolean(topologyEventListenerEnabled)); + } + if (!configuration.isTopologyEventListenerEnabled()) { Node servicesNode = loadBalancerNode.findChildNodeByName(Constants.CONF_ELEMENT_SERVICES); validateRequiredNode(servicesNode, Constants.CONF_ELEMENT_SERVICES); http://git-wip-us.apache.org/repos/asf/stratos/blob/67a22ab5/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java index 72851af..96b666a 100755 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java @@ -59,6 +59,7 @@ public class Constants { public static final String CONF_PROPERTY_VALUE_TENANT_DOMAIN = "tenant-domain"; public static final String CONF_PROPERTY_TENANT_IDENTIFIER_REGEX = "tenant-identifier-regex"; public static final String CONF_PROPERTY_NETWORK_PARTITION_ID = "network-partition-id"; + public static final String CONF_PROPERTY_REWRITE_LOCATION_HEADER = "rewrite-location-header"; public static final String CONF_DELIMITER_HOSTS = ","; public static final long DEFAULT_ENDPOINT_TIMEOUT = 15000; http://git-wip-us.apache.org/repos/asf/stratos/blob/67a22ab5/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java index 67d63ee..c0b994e 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java @@ -67,6 +67,9 @@ public class LoadBalancerContext { // Map<HostName, Map<TenantId, Cluster>> // Keep track of multi-tenant service clusters private MultiTenantClusterMap multiTenantClusterMap; + // Map<MemberIp, Hostname> + // Keep track of cluster hostnames of of all members against their ip addresses + private MemberIpHostnameMap memberIpHostnameMap; private LoadBalancerContext() { tenantIdSynapseEnvironmentServiceMap = new TenantIdSynapseEnvironmentServiceMap(); @@ -76,6 +79,7 @@ public class LoadBalancerContext { hostNameClusterMap = new HostNameClusterMap(); hostNameAppContextMap = new HostNameAppContextMap(); multiTenantClusterMap = new MultiTenantClusterMap(); + memberIpHostnameMap = new MemberIpHostnameMap(); } public static LoadBalancerContext getInstance() { @@ -192,4 +196,8 @@ public class LoadBalancerContext { public MultiTenantClusterMap getMultiTenantClusterMap() { return multiTenantClusterMap; } + + public MemberIpHostnameMap getMemberIpHostnameMap() { + return memberIpHostnameMap; + } } http://git-wip-us.apache.org/repos/asf/stratos/blob/67a22ab5/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java index 776dcc5..02ed6e6 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java @@ -154,7 +154,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints * @param currentMember */ private void setupLoadBalancerContextProperties(MessageContext synCtx, org.apache.axis2.clustering.Member currentMember) { - String lbHostName = extractTargetHost(synCtx); + String targetHostname = extractTargetHost(synCtx); org.apache.axis2.context.MessageContext axis2MsgCtx = ((Axis2MessageContext) synCtx).getAxis2MessageContext(); String httpTransportName = "http", httpsTransportName = "https"; @@ -171,7 +171,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints String lbHttpsPort = (String) httpsTransportIn.getParameter("port").getValue(); String clusterId = currentMember.getProperties().getProperty(Constants.CLUSTER_ID); - synCtx.setProperty(Constants.LB_HOST_NAME, lbHostName); + synCtx.setProperty(Constants.LB_TARGET_HOSTNAME, targetHostname); synCtx.setProperty(Constants.LB_HTTP_PORT, lbHttpPort); synCtx.setProperty(Constants.LB_HTTPS_PORT, lbHttpsPort); synCtx.setProperty(Constants.CLUSTER_ID, clusterId); http://git-wip-us.apache.org/repos/asf/stratos/blob/67a22ab5/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/LocationReWriter.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/LocationReWriter.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/LocationReWriter.java index 5850aac..50f08a3 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/LocationReWriter.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/LocationReWriter.java @@ -19,11 +19,14 @@ package org.apache.stratos.load.balancer.mediators; import org.apache.commons.lang3.StringUtils; +import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration; +import org.apache.stratos.load.balancer.context.LoadBalancerContext; import org.apache.stratos.load.balancer.util.Constants; import org.apache.synapse.MessageContext; import org.apache.synapse.core.axis2.Axis2MessageContext; import org.apache.synapse.mediators.AbstractMediator; +import java.net.MalformedURLException; import java.net.URL; import java.util.Map; @@ -38,6 +41,13 @@ public class LocationReWriter extends AbstractMediator { @Override public boolean mediate(MessageContext messageContext) { + if (LoadBalancerConfiguration.getInstance().isReWriteLocationHeader()) { + rewriteLocationHeader(messageContext); + } + return true; + } + + private void rewriteLocationHeader(MessageContext messageContext) { try { // Read transport headers Map transportHeaders = (Map) ((Axis2MessageContext) messageContext).getAxis2MessageContext(). @@ -45,24 +55,42 @@ public class LocationReWriter extends AbstractMediator { if (transportHeaders != null) { // Find location header String inLocation = (String) transportHeaders.get(LOCATION); - if(StringUtils.isNotBlank(inLocation)) { - URL inLocationUrl = new URL(inLocation); - // Find load balancer host name and port - String lbHost = (String) messageContext.getProperty(Constants.LB_HOST_NAME); - int lbPort = -1; + if (StringUtils.isNotBlank(inLocation)) { + URL inLocationUrl = null; + try { + inLocationUrl = new URL(inLocation); + } catch (MalformedURLException e) { + return; + } + + // Check whether the location host is an ip address of a known member + String hostname = LoadBalancerContext.getInstance().getMemberIpHostnameMap().get(inLocationUrl.getHost()); + if (StringUtils.isEmpty(hostname)) { + if (log.isDebugEnabled()) { + log.debug(String.format("A hostname not found for ip: [ip-address] %s", inLocationUrl.getHost())); + } + return; + } + + if (log.isDebugEnabled()) { + log.debug(String.format("A location header found with member ip: [member-ip] %s " + + "[hostname] %s ", inLocationUrl.getHost(), hostname)); + } + + int targetPort = -1; if (HTTP.equals(inLocationUrl.getProtocol())) { - lbPort = Integer.valueOf((String) messageContext.getProperty(Constants.LB_HTTP_PORT)); + targetPort = Integer.valueOf((String) messageContext.getProperty(Constants.LB_HTTP_PORT)); } else if (HTTPS.equals(inLocationUrl.getProtocol())) { - lbPort = Integer.valueOf((String) messageContext.getProperty(Constants.LB_HTTPS_PORT)); + targetPort = Integer.valueOf((String) messageContext.getProperty(Constants.LB_HTTPS_PORT)); } else { - if(log.isWarnEnabled()) { + if (log.isWarnEnabled()) { log.warn(String.format("An unknown protocol found: %s", inLocationUrl.getProtocol())); } } - if (lbPort != -1) { + if (targetPort != -1) { // Re-write location header - URL outLocationUrl = new URL(inLocationUrl.getProtocol(), lbHost, lbPort, inLocationUrl.getFile()); + URL outLocationUrl = new URL(inLocationUrl.getProtocol(), hostname, targetPort, inLocationUrl.getFile()); transportHeaders.put(LOCATION, outLocationUrl.toString()); if (log.isDebugEnabled()) { log.debug(String.format("Location header re-written: %s", outLocationUrl.toString())); @@ -71,10 +99,9 @@ public class LocationReWriter extends AbstractMediator { } } } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("Could re-write location header", e); + if (log.isWarnEnabled()) { + log.warn("Could not re-write location header", e); } } - return true; } } http://git-wip-us.apache.org/repos/asf/stratos/blob/67a22ab5/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java index a29b980..6650af9 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java @@ -30,7 +30,7 @@ public class Constants { public static final String AXIS2_MSG_CTX_TRANSPORT_IN_URL = "TransportInURL"; public static final String AXIS2_MSG_CTX_SERVICE_PREFIX = "SERVICE_PREFIX"; - public static final String LB_HOST_NAME = "LB_HOST_NAME"; + public static final String LB_TARGET_HOSTNAME = "LB_TARGET_HOSTNAME"; public static final String LB_HTTP_PORT = "LB_HTTP_PORT"; public static final String LB_HTTPS_PORT = "LB_HTTPS_PORT"; http://git-wip-us.apache.org/repos/asf/stratos/blob/67a22ab5/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java index ff67f39..5edd2c1 100755 --- a/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java +++ b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java @@ -18,18 +18,17 @@ */ package org.apache.stratos.load.balancer.test; -import java.io.File; -import java.net.URL; - +import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration; import org.apache.stratos.load.balancer.conf.domain.TenantIdentifier; import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration; -import org.apache.stratos.messaging.message.receiver.topology.TopologyManager; +import java.io.File; +import java.net.URL; /** * Test sample load balancer configurations. @@ -85,6 +84,7 @@ public class LoadBalancerConfigurationTest { Assert.assertTrue(String.format("%s, multi-tenancy is not true", validationError), configuration.isMultiTenancyEnabled()); Assert.assertEquals(String.format("%s, tenant-identifier is not valid", validationError), TenantIdentifier.TenantDomain, configuration.getTenantIdentifier()); Assert.assertEquals(String.format("%s, tenant-identifier-regex is not valid", validationError), "t/([^/]*)/", configuration.getTenantIdentifierRegex()); + Assert.assertTrue(String.format("%s, rewrite-location-header is not true", validationError), configuration.isReWriteLocationHeader()); } finally { LoadBalancerConfiguration.clear(); } @@ -137,6 +137,7 @@ public class LoadBalancerConfigurationTest { Assert.assertEquals(String.format("%s, port value not valid: [member] %s [proxy-port] %d", validationError, memberId, proxyPort), 8080, m1Http.getValue()); Assert.assertEquals(String.format("%s, port proxy not valid: [member] %s [proxy-port] %d", validationError, memberId, proxyPort), 80, m1Http.getProxy()); + Assert.assertFalse(String.format("%s, rewrite-location-header is not false", validationError), LoadBalancerConfiguration.getInstance().isReWriteLocationHeader()); } finally { TopologyManager.releaseReadLock(); LoadBalancerConfiguration.clear(); http://git-wip-us.apache.org/repos/asf/stratos/blob/67a22ab5/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf index ff11dac..8bdced5 100755 --- a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf +++ b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf @@ -103,4 +103,10 @@ loadbalancer { class-name: org.apache.stratos.load.balancer.algorithm.RoundRobin; } } + + # Rewrite location header + # If this property is set to true, load balancer will rewrite HTTP Location header values found in response + # messages if the host is set to an ip address of a known member. The resulting Location header host will be + # set to corresponding cluster hostname and the port will be set to corresponding transport proxy port. + rewrite-location-header: true; } http://git-wip-us.apache.org/repos/asf/stratos/blob/67a22ab5/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf index 9910dc9..b708d05 100755 --- a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf +++ b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf @@ -104,6 +104,12 @@ loadbalancer { } } + # Rewrite location header + # If this property is set to true, load balancer will rewrite HTTP Location header values found in response + # messages if the host is set to an ip address of a known member. The resulting Location header host will be + # set to corresponding cluster hostname and the port will be set to corresponding transport proxy port. + rewrite-location-header: false; + services { app-server { # service name, a unique identifier to identify a service multi-tenant: true; # Set to true if the service is multi-tenant http://git-wip-us.apache.org/repos/asf/stratos/blob/67a22ab5/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf index a629a6f..461b494 100755 --- a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf +++ b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf @@ -103,4 +103,10 @@ loadbalancer { class-name: org.apache.stratos.load.balancer.algorithm.RoundRobin; } } + + # Rewrite location header + # If this property is set to true, load balancer will rewrite HTTP Location header values found in response + # messages if the host is set to an ip address of a known member. The resulting Location header host will be + # set to corresponding cluster hostname and the port will be set to corresponding transport proxy port. + rewrite-location-header: true; } http://git-wip-us.apache.org/repos/asf/stratos/blob/67a22ab5/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java index 0e5a56f..69ef5b0 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ClusterRemovedMessageProcessor.java @@ -80,6 +80,10 @@ public class ClusterRemovedMessageProcessor extends MessageProcessor { } return false; } + + // Notify event listeners before removing the cluster object + notifyEventListeners(event); + if (!service.clusterExists(event.getClusterId())) { if (log.isWarnEnabled()) { log.warn(String.format("Cluster does not exist: [service] %s [cluster] %s", @@ -97,11 +101,7 @@ public class ClusterRemovedMessageProcessor extends MessageProcessor { } } - - // Notify event listeners - notifyEventListeners(event); return true; - } else { if (nextProcessor != null) { // ask the next processor to take care of the message. http://git-wip-us.apache.org/repos/asf/stratos/blob/67a22ab5/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java index 461b0da..5b5cbc9 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/MemberTerminatedMessageProcessor.java @@ -104,6 +104,9 @@ public class MemberTerminatedMessageProcessor extends MessageProcessor { } } + // Notify event listeners before removing member object + notifyEventListeners(event); + if (member == null) { if (log.isWarnEnabled()) { log.warn(String.format("Member already terminated: [service] %s [cluster] %s [member] %s", @@ -123,10 +126,7 @@ public class MemberTerminatedMessageProcessor extends MessageProcessor { } } - - notifyEventListeners(event); return true; - } else { if (nextProcessor != null) { // ask the next processor to take care of the message. http://git-wip-us.apache.org/repos/asf/stratos/blob/67a22ab5/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java index 5ea95cd..2c0bc70 100644 --- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java +++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/ServiceRemovedMessageProcessor.java @@ -60,6 +60,9 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor { } } + // Notify event listeners before removing service object + notifyEventListeners(event); + // Validate event against the existing topology Service service = topology.getService(event.getServiceName()); if (service == null) { @@ -77,11 +80,7 @@ public class ServiceRemovedMessageProcessor extends MessageProcessor { } } - - // Notify event listeners - notifyEventListeners(event); return true; - } else { if (nextProcessor != null) { // ask the next processor to take care of the message. http://git-wip-us.apache.org/repos/asf/stratos/blob/67a22ab5/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf ---------------------------------------------------------------------- diff --git a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf index 8060030..ea3fe8a 100644 --- a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf +++ b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf @@ -98,6 +98,12 @@ loadbalancer { } } + # Rewrite location header + # If this property is set to true, load balancer will rewrite HTTP Location header values found in response + # messages if the host is set to an ip address of a known member. The resulting Location header host will be + # set to corresponding cluster hostname and the port will be set to corresponding transport proxy port. + rewrite-location-header: true; + # Static topology configuration # Define a static topology configuration if topology-event-listener is set to false. # A sample configuration has been given below:
