Updated Branches: refs/heads/master de739901c -> 4bf745608
Added partition id to in-flight request count sent from load balancer Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/ca4b8f43 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/ca4b8f43 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/ca4b8f43 Branch: refs/heads/master Commit: ca4b8f438e4906d9e38742918168c60a53e4dfc1 Parents: 522db60 Author: Imesh Gunaratne <[email protected]> Authored: Tue Dec 10 17:47:57 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Tue Dec 10 17:47:57 2013 +0530 ---------------------------------------------------------------------- .../WSO2CEPInFlightRequestPublisher.java | 9 ++- .../extension/api/LoadBalancerStatsReader.java | 3 +- .../TenantAwareLoadBalanceEndpoint.java | 31 +++++++-- .../balancer/mediators/ResponseInterceptor.java | 26 +++++-- ...adBalancerInFlightRequestCountCollector.java | 71 +++++++++++++------- .../WSO2CEPInFlightRequestCountObserver.java | 36 ++++++---- .../stratos/load/balancer/util/Constants.java | 3 + .../haproxy/extension/HAProxyStatsReader.java | 34 +++++----- 8 files changed, 143 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java index f10907e..d41bb0b 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java @@ -30,8 +30,7 @@ import java.util.List; * WSO2 CEP in flight request count publisher. * * In-flight request count: - * Number of requests being served at a given moment could be identified as - * in-flight request count. + * Number of requests being served at a given moment could be identified as in-flight request count. */ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatsPublisher { @@ -46,6 +45,7 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatsPublisher { List<Attribute> payloadData = new ArrayList<Attribute>(); // Payload definition payloadData.add(new Attribute("cluster_id", AttributeType.STRING)); + payloadData.add(new Attribute("partition_id", AttributeType.STRING)); payloadData.add(new Attribute("in_flight_requests", AttributeType.INT)); streamDefinition.setPayloadData(payloadData); return streamDefinition; @@ -61,13 +61,16 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatsPublisher { /** * Publish in-flight request count of a cluster. + * * @param clusterId + * @param partitionId * @param inFlightRequestCount */ - public void publish(String clusterId, int inFlightRequestCount) { + public void publish(String clusterId, String partitionId, int inFlightRequestCount) { List<Object> payload = new ArrayList<Object>(); // Payload values payload.add(clusterId); + payload.add(partitionId); payload.add(inFlightRequestCount); super.publish(payload.toArray()); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java index 2c6f324..a098ef0 100644 --- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java +++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java @@ -29,6 +29,7 @@ public interface LoadBalancerStatsReader { /** * Get in-flight request count of a given cluster. * @param clusterId + * @param partitionId */ - int getInFlightRequestCount(String clusterId); + int getInFlightRequestCount(String clusterId, String partitionId); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java index 8e3928e..9c97d4b 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java @@ -21,6 +21,7 @@ package org.apache.stratos.load.balancer.endpoint; import org.apache.axis2.addressing.EndpointReference; import org.apache.axis2.description.TransportInDescription; +import org.apache.commons.lang3.StringUtils; import org.apache.http.protocol.HTTP; import org.apache.stratos.load.balancer.RequestDelegator; import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory; @@ -238,6 +239,9 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints if (httpsPort != null) axis2Member.setHttpsPort(httpsPort.getValue()); axis2Member.setActive(member.isActive()); + // Set cluster id and partition id in message context + synCtx.setProperty(Constants.CLUSTER_ID, member.getClusterId()); + synCtx.setProperty(Constants.PARTITION_ID, member.getPartitionId()); return axis2Member; } @@ -499,16 +503,14 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints setupTransportHeaders(synCtx); setupLoadBalancerContextProperties(synCtx); - // Update health stats - LoadBalancerInFlightRequestCountCollector.getInstance().incrementRequestInflightCount(currentMember.getDomain()); - // Set the cluster id in the message context - synCtx.setProperty(Constants.CLUSTER_ID, currentMember.getDomain()); - try { if (log.isDebugEnabled()) { log.debug(String.format("Sending request to endpoint: %s", to.getAddress())); } endpoint.send(synCtx); + + // Increment in-flight request count + incrementInFlightRequestCount(synCtx); } catch (Exception e) { if (e.getMessage().toLowerCase().contains("io reactor shutdown")) { log.fatal("System cannot continue normal operation. Restarting", e); @@ -519,6 +521,25 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints } } + private void incrementInFlightRequestCount(MessageContext messageContext) { + try { + String clusterId = (String) messageContext.getProperty(Constants.CLUSTER_ID); + if(StringUtils.isBlank(clusterId)) { + throw new RuntimeException("Cluster id not found in message context"); + } + String partitionId = (String) messageContext.getProperty(Constants.PARTITION_ID); + if(StringUtils.isBlank(partitionId)) { + throw new RuntimeException("Partition id not found in message context"); + } + LoadBalancerInFlightRequestCountCollector.getInstance().incrementInFlightRequestCount(clusterId, partitionId); + } + catch (Exception e) { + if(log.isDebugEnabled()) { + log.debug("Could not increment in-flight request count", e); + } + } + } + public void setDispatcher(HttpSessionDispatcher dispatcher) { this.dispatcher = dispatcher; } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java index ddf7eb5..1af0b9c 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java @@ -18,6 +18,7 @@ */ package org.apache.stratos.load.balancer.mediators; +import org.apache.commons.lang3.StringUtils; import org.apache.stratos.load.balancer.statistics.LoadBalancerInFlightRequestCountCollector; import org.apache.stratos.load.balancer.util.Constants; import org.apache.synapse.ManagedLifecycle; @@ -30,19 +31,32 @@ import org.apache.synapse.mediators.AbstractMediator; */ public class ResponseInterceptor extends AbstractMediator implements ManagedLifecycle { - public boolean mediate(MessageContext synCtx) { - if (log.isDebugEnabled()) { - log.debug("Mediation started " + ResponseInterceptor.class.getName()); + public boolean mediate(MessageContext messageContext) { + try { + if (log.isDebugEnabled()) { + log.debug("Response interceptor mediation started"); + } + String clusterId = (String) messageContext.getProperty(Constants.CLUSTER_ID); + if (StringUtils.isBlank(clusterId)) { + throw new RuntimeException("Cluster id not found in message context"); + } + String partitionId = (String) messageContext.getProperty(Constants.PARTITION_ID); + if (StringUtils.isBlank(partitionId)) { + throw new RuntimeException("Partition id not found in message context"); + } + LoadBalancerInFlightRequestCountCollector.getInstance().decrementInFlightRequestCount(clusterId, partitionId); + } catch (Exception e) { + if(log.isErrorEnabled()) { + log.error("Could not decrement in-flight request count", e); + } } - String clusterId = (String) synCtx.getProperty(Constants.CLUSTER_ID); - LoadBalancerInFlightRequestCountCollector.getInstance().decrementRequestInflightCount(clusterId); return true; } @Override public void destroy() { if (log.isDebugEnabled()) { - log.debug("ResponseInterceptor mediator destroyed"); + log.debug("Response interceptor mediator destroyed"); } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java index cbe8e00..497b973 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java @@ -18,6 +18,8 @@ */ package org.apache.stratos.load.balancer.statistics; +import org.apache.commons.lang.StringUtils; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.load.balancer.statistics.observers.WSO2CEPInFlightRequestCountObserver; @@ -28,7 +30,7 @@ import java.util.Observable; import java.util.concurrent.ConcurrentHashMap; /** - * This is the load balancing stats collector and any observer can get registered here + * This is the load balancing in-flight request count collector and any observer can get registered here * and receive notifications periodically. * This is a Singleton object. * @@ -38,11 +40,12 @@ public class LoadBalancerInFlightRequestCountCollector extends Observable { private static final Log log = LogFactory.getLog(LoadBalancerInFlightRequestCountCollector.class); private static LoadBalancerInFlightRequestCountCollector collector; - private Map<String, Integer> clusterIdToRequestInflightCountMap; + // Map<ClusterId, Map<PartitionId, InFlightRequestCount> + private Map<String, Map<String, Integer>> inFlightRequestCountMap; private Thread notifier; private LoadBalancerInFlightRequestCountCollector() { - clusterIdToRequestInflightCountMap = new ConcurrentHashMap<String, Integer>(); + inFlightRequestCountMap = new ConcurrentHashMap<String, Map<String, Integer>>(); if (notifier == null || (notifier != null && !notifier.isAlive())) { notifier = new Thread(new ObserverNotifier()); notifier.start(); @@ -62,45 +65,63 @@ public class LoadBalancerInFlightRequestCountCollector extends Observable { return collector; } - public void setRequestInflightCount(String clusterId, int value) { - if (clusterId == null) { + public int getInFlightRequestCount(String clusterId, String partitionId) { + if (StringUtils.isBlank(clusterId) || StringUtils.isBlank(partitionId)) { + return -1; + } + + Map<String, Integer> partitionMap = inFlightRequestCountMap.get(clusterId); + if (partitionMap == null) { + return 0; + } + if (partitionMap.containsKey(partitionId)) { + return partitionMap.get(partitionId); + } + return 0; + } + + public void setInFlightRequestCount(String clusterId, String partitionId, int value) { + if (StringUtils.isBlank(clusterId) || StringUtils.isBlank(partitionId)) { return; } - clusterIdToRequestInflightCountMap.put(clusterId, value); + Map<String, Integer> partitionMap = inFlightRequestCountMap.get(clusterId); + if (partitionMap == null) { + partitionMap = new HashMap<String, Integer>(); + inFlightRequestCountMap.put(clusterId, partitionMap); + } + partitionMap.put(partitionId, value); + if(log.isDebugEnabled()) { + log.debug(String.format("In-flight request count updated: [cluster] %s [partition] $s [value] %d", clusterId, partitionId, value)); + } setChanged(); } - public void incrementRequestInflightCount(String clusterId) { - incrementRequestInflightCount(clusterId, 1); + public void incrementInFlightRequestCount(String clusterId, String partitionId) { + incrementInFlightRequestCount(clusterId, partitionId, 1); } - public void incrementRequestInflightCount(String clusterId, int value) { - if (clusterId == null) { + public void incrementInFlightRequestCount(String clusterId, String partitionId, int value) { + if (StringUtils.isBlank(clusterId) || StringUtils.isBlank(partitionId)) { return; } - if (clusterIdToRequestInflightCountMap.get(clusterId) != null) { - value += clusterIdToRequestInflightCountMap.get(clusterId); - } - clusterIdToRequestInflightCountMap.put(clusterId, value); - setChanged(); + int count = getInFlightRequestCount(clusterId, partitionId); + setInFlightRequestCount(clusterId, partitionId, (count + value)); } - public void decrementRequestInflightCount(String clusterId) { - decrementRequestInflightCount(clusterId, 1); + public void decrementInFlightRequestCount(String clusterId, String partitionId) { + decrementInFlightRequestCount(clusterId, partitionId, 1); } - public void decrementRequestInflightCount(String clusterId, int value) { - if (clusterId == null) { + public void decrementInFlightRequestCount(String clusterId, String partitionId, int value) { + if (StringUtils.isBlank(clusterId) || StringUtils.isBlank(partitionId)) { return; } - if (clusterIdToRequestInflightCountMap.get(clusterId) != null) { - value += clusterIdToRequestInflightCountMap.get(clusterId); - } - clusterIdToRequestInflightCountMap.put(clusterId, value); - setChanged(); + int count = getInFlightRequestCount(clusterId, partitionId); + int newValue = (count - value) < 0 ? 0 : (count - value); + setInFlightRequestCount(clusterId, partitionId, newValue); } @@ -121,7 +142,7 @@ public class LoadBalancerInFlightRequestCountCollector extends Observable { Thread.sleep(15000); } catch (InterruptedException ignore) { } - LoadBalancerInFlightRequestCountCollector.getInstance().notifyObservers(new HashMap<String, Integer>(clusterIdToRequestInflightCountMap)); + LoadBalancerInFlightRequestCountCollector.getInstance().notifyObservers(new HashMap<String, Map<String, Integer>>(inFlightRequestCountMap)); } } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java index 8b7bf5a..f86643b 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java @@ -35,22 +35,30 @@ public class WSO2CEPInFlightRequestCountObserver implements Observer { } public void update(Observable observable, Object object) { - if (object != null && object instanceof Map<?, ?>) { - try { - if (publisher.isEnabled()) { - Map<String, Integer> stats = (Map<String, Integer>) object; - // Publish event per cluster id - for (String clusterId : stats.keySet()) { - // Publish event - publisher.publish(clusterId, stats.get(clusterId)); + try { + if (publisher.isEnabled()) { + Map<String, Map<String, Integer>> inFlightRequestCountMap = (Map<String, Map<String, Integer>>) object; + // Publish event per cluster id + Map<String, Integer> partitionMap = null; + for (String clusterId : inFlightRequestCountMap.keySet()) { + partitionMap = inFlightRequestCountMap.get(clusterId); + if (partitionMap != null) { + for (String partitionId : partitionMap.keySet()) { + // Publish event + publisher.publish(clusterId, partitionId, partitionMap.get(partitionId)); + if (log.isDebugEnabled()) { + log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [partition] %s [value] %d", + clusterId, partitionId, partitionMap.get(partitionId))); + } + } } - } else if (log.isWarnEnabled()) { - log.warn("CEP statistics publisher is disabled"); - } - } catch (Exception e) { - if (log.isErrorEnabled()) { - log.error("Could not publish in-flight request count", e); } + } else if (log.isWarnEnabled()) { + log.warn("CEP statistics publisher is disabled"); + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("Could not publish in-flight request count to cep", e); } } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java index 96fee1d..959ba90 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java @@ -21,8 +21,11 @@ package org.apache.stratos.load.balancer.util; public class Constants { public static final String CLUSTER_ID = "cluster_id"; + public static final String PARTITION_ID = "partition_id"; + public static final String LB_HOST_NAME = "LB_HOST_NAME"; public static final String LB_HTTP_PORT = "LB_HTTP_PORT"; public static final String LB_HTTPS_PORT = "LB_HTTPS_PORT"; + public static final String AXIS2_MSG_CTX_TRANSPORT_IN_URL = "TransportInURL"; } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java index 57c6bc0..43de7d6 100644 --- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java +++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java @@ -45,7 +45,7 @@ public class HAProxyStatsReader implements LoadBalancerStatsReader { } @Override - public int getInFlightRequestCount(String clusterId) { + public int getInFlightRequestCount(String clusterId, String partitionId) { String frontendId, backendId, command, output; String[] array; int totalWeight, weight; @@ -63,23 +63,25 @@ public class HAProxyStatsReader implements LoadBalancerStatsReader { backendId = frontendId + "-members"; for (Member member : cluster.getMembers()) { - // echo "get weight <backend>/<server>" | socat stdio <stats-socket> - command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath); - try { - output = CommandUtil.executeCommand(command); - if ((output != null) && (output.length() > 0)) { - array = output.split(" "); - if ((array != null) && (array.length > 0)) { - weight = Integer.parseInt(array[0]); - if (log.isDebugEnabled()) { - log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight)); + if((member.getPartitionId() != null) && member.getPartitionId().equals(partitionId)) { + // echo "get weight <backend>/<server>" | socat stdio <stats-socket> + command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath); + try { + output = CommandUtil.executeCommand(command); + if ((output != null) && (output.length() > 0)) { + array = output.split(" "); + if ((array != null) && (array.length > 0)) { + weight = Integer.parseInt(array[0]); + if (log.isDebugEnabled()) { + log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight)); + } + totalWeight += weight; } - totalWeight += weight; } - } - } catch (IOException e) { - if (log.isErrorEnabled()) { - log.error(e); + } catch (IOException e) { + if (log.isErrorEnabled()) { + log.error(e); + } } } }
