Updated Branches: refs/heads/master 1393aa295 -> 60f35f391
Removed partition id from load balancer in-flight request count stat and added network partition id Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/60f35f39 Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/60f35f39 Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/60f35f39 Branch: refs/heads/master Commit: 60f35f391e6b3f87dce19aa9ffb31f469effe4bb Parents: 1393aa2 Author: Imesh Gunaratne <[email protected]> Authored: Fri Dec 13 11:48:02 2013 +0530 Committer: Imesh Gunaratne <[email protected]> Committed: Fri Dec 13 11:48:02 2013 +0530 ---------------------------------------------------------------------- .../WSO2CEPInFlightRequestPublisher.java | 8 +-- ...oadBalancerInFlightRequestCountNotifier.java | 26 ++++----- .../extension/api/LoadBalancerStatsReader.java | 3 +- .../conf/LoadBalancerConfiguration.java | 13 +++++ .../conf/configurator/CEPConfigurator.java | 1 + .../load/balancer/conf/util/Constants.java | 1 + .../TenantAwareLoadBalanceEndpoint.java | 7 +-- .../balancer/mediators/ResponseInterceptor.java | 6 +-- ...adBalancerInFlightRequestCountCollector.java | 57 ++++++++------------ .../WSO2CEPInFlightRequestCountObserver.java | 25 ++++----- .../stratos/load/balancer/util/Constants.java | 1 - .../test/LoadBalancerConfigurationTest.java | 1 + .../sample/configuration/loadbalancer1.conf | 4 ++ .../sample/configuration/loadbalancer2.conf | 4 ++ .../sample/configuration/loadbalancer3.conf | 4 ++ .../src/main/bin/haproxy-extension.sh | 1 + .../haproxy/extension/HAProxyStatsReader.java | 34 ++++++------ .../templates/loadbalancer.conf.template | 4 ++ .../src/main/conf/loadbalancer.conf | 4 ++ 19 files changed, 109 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java index 6992f45..f8c6d40 100644 --- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java +++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java @@ -45,7 +45,7 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatsPublisher { List<Attribute> payloadData = new ArrayList<Attribute>(); // Payload definition payloadData.add(new Attribute("cluster_id", AttributeType.STRING)); - payloadData.add(new Attribute("partition_id", AttributeType.STRING)); + payloadData.add(new Attribute("network_partition_id", AttributeType.STRING)); payloadData.add(new Attribute("in_flight_request_count", AttributeType.INT)); streamDefinition.setPayloadData(payloadData); return streamDefinition; @@ -62,14 +62,14 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatsPublisher { * Publish in-flight request count of a cluster. * * @param clusterId - * @param partitionId + * @param networkPartitionId * @param inFlightRequestCount */ - public void publish(String clusterId, String partitionId, int inFlightRequestCount) { + public void publish(String clusterId, String networkPartitionId, int inFlightRequestCount) { List<Object> payload = new ArrayList<Object>(); // Payload values payload.add(clusterId); - payload.add(partitionId); + payload.add(networkPartitionId); payload.add(inFlightRequestCount); super.publish(payload.toArray()); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java index f1f4da9..5fa3c71 100644 --- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java +++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java @@ -19,6 +19,7 @@ package org.apache.stratos.load.balancer.extension.api; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.load.balancer.common.statistics.WSO2CEPInFlightRequestPublisher; @@ -37,6 +38,7 @@ public class LoadBalancerInFlightRequestCountNotifier implements Runnable { private LoadBalancerStatsReader statsReader; private final WSO2CEPInFlightRequestPublisher statsPublisher; private long statsPublisherInterval = 15000; + private String networkPartitionId; private boolean terminated; public LoadBalancerInFlightRequestCountNotifier(LoadBalancerStatsReader statsReader) { @@ -47,6 +49,10 @@ public class LoadBalancerInFlightRequestCountNotifier implements Runnable { if (interval != null) { statsPublisherInterval = Long.getLong(interval); } + networkPartitionId = System.getProperty("network.partition.id"); + if (StringUtils.isBlank(networkPartitionId)) { + throw new RuntimeException("network.partition.id system property was not found."); + } } @Override @@ -61,24 +67,20 @@ public class LoadBalancerInFlightRequestCountNotifier implements Runnable { if (statsPublisher.isEnabled()) { try { TopologyManager.acquireReadLock(); - Collection<String> partitionIds; int requestCount; for (Service service : TopologyManager.getTopology().getServices()) { for (Cluster cluster : service.getClusters()) { - partitionIds = cluster.findPartitionIds(); - for(String partitionId : partitionIds) { - // Publish in-flight request count of each cluster partition - requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId(), partitionId); - statsPublisher.publish(cluster.getClusterId(), partitionId, requestCount); - if (log.isDebugEnabled()) { - log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [partition] %s [value] %d", - cluster.getClusterId(), partitionId, requestCount)); - } + // Publish in-flight request count of load balancer's network partition + requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId()); + statsPublisher.publish(cluster.getClusterId(), networkPartitionId, requestCount); + if (log.isDebugEnabled()) { + log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d", + cluster.getClusterId(), networkPartitionId, requestCount)); } } + } - } - finally { + } finally { TopologyManager.releaseReadLock(); } } else if (log.isWarnEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java index a098ef0..2c6f324 100644 --- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java +++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java @@ -29,7 +29,6 @@ public interface LoadBalancerStatsReader { /** * Get in-flight request count of a given cluster. * @param clusterId - * @param partitionId */ - int getInFlightRequestCount(String clusterId, String partitionId); + int getInFlightRequestCount(String clusterId); } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java index 1812fb7..9deb1ae 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java @@ -64,6 +64,7 @@ public class LoadBalancerConfiguration { private TenantIdentifier tenantIdentifier; private String tenantIdentifierRegex; private String topologyMemberFilter; + private String networkPartitionId; /** * Load balancer configuration is singleton. @@ -243,6 +244,14 @@ public class LoadBalancerConfiguration { return tenantIdentifierRegex; } + public void setNetworkPartitionId(String networkPartitionId) { + this.networkPartitionId = networkPartitionId; + } + + public String getNetworkPartitionId() { + return networkPartitionId; + } + private static class LoadBalancerConfigurationReader { private String property; @@ -349,6 +358,10 @@ public class LoadBalancerConfiguration { String cepPort = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_CEP_PORT); validateRequiredPropertyInNode(Constants.CONF_PROPERTY_CEP_PORT, cepPort, "loadbalancer"); configuration.setCepPort(Integer.parseInt(cepPort)); + + String networkPartitionId = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_NETWORK_PARTITION_ID); + validateRequiredPropertyInNode(Constants.CONF_PROPERTY_NETWORK_PARTITION_ID, networkPartitionId, "loadbalancer"); + configuration.setNetworkPartitionId(networkPartitionId); } if (configuration.isMultiTenancyEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/CEPConfigurator.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/CEPConfigurator.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/CEPConfigurator.java index 83a8086..9d9d492 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/CEPConfigurator.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/CEPConfigurator.java @@ -31,5 +31,6 @@ public class CEPConfigurator { System.setProperty("load.balancer.cep.stats.publisher.enabled", String.valueOf(configuration.isCepStatsPublisherEnabled())); System.setProperty("thrift.receiver.ip", configuration.getCepIp()); System.setProperty("thrift.receiver.port", String.valueOf(configuration.getCepPort())); + System.setProperty("network.partition.id", configuration.getNetworkPartitionId()); } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java index 87aed3f..cd26dd2 100755 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java @@ -56,6 +56,7 @@ public class Constants { public static final String CONF_PROPERTY_VALUE_TENANT_ID = "tenant-id"; public static final String CONF_PROPERTY_VALUE_TENANT_DOMAIN = "tenant-domain"; public static final String CONF_PROPERTY_TENANT_IDENTIFIER_REGEX = "tenant-identifier-regex"; + public static final String CONF_PROPERTY_NETWORK_PARTITION_ID = "network-partition-id"; public static final String CONF_DELIMITER_HOSTS = ","; public static final long DEFAULT_SESSION_TIMEOUT = 90000; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java index 9c97d4b..b6fe08b 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java @@ -241,7 +241,6 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints axis2Member.setActive(member.isActive()); // Set cluster id and partition id in message context synCtx.setProperty(Constants.CLUSTER_ID, member.getClusterId()); - synCtx.setProperty(Constants.PARTITION_ID, member.getPartitionId()); return axis2Member; } @@ -527,11 +526,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints if(StringUtils.isBlank(clusterId)) { throw new RuntimeException("Cluster id not found in message context"); } - String partitionId = (String) messageContext.getProperty(Constants.PARTITION_ID); - if(StringUtils.isBlank(partitionId)) { - throw new RuntimeException("Partition id not found in message context"); - } - LoadBalancerInFlightRequestCountCollector.getInstance().incrementInFlightRequestCount(clusterId, partitionId); + LoadBalancerInFlightRequestCountCollector.getInstance().incrementInFlightRequestCount(clusterId); } catch (Exception e) { if(log.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java index 1af0b9c..fef3fea 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java @@ -40,11 +40,7 @@ public class ResponseInterceptor extends AbstractMediator implements ManagedLife if (StringUtils.isBlank(clusterId)) { throw new RuntimeException("Cluster id not found in message context"); } - String partitionId = (String) messageContext.getProperty(Constants.PARTITION_ID); - if (StringUtils.isBlank(partitionId)) { - throw new RuntimeException("Partition id not found in message context"); - } - LoadBalancerInFlightRequestCountCollector.getInstance().decrementInFlightRequestCount(clusterId, partitionId); + LoadBalancerInFlightRequestCountCollector.getInstance().decrementInFlightRequestCount(clusterId); } catch (Exception e) { if(log.isErrorEnabled()) { log.error("Could not decrement in-flight request count", e); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java index 497b973..3c8927a 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java @@ -41,11 +41,11 @@ public class LoadBalancerInFlightRequestCountCollector extends Observable { private static LoadBalancerInFlightRequestCountCollector collector; // Map<ClusterId, Map<PartitionId, InFlightRequestCount> - private Map<String, Map<String, Integer>> inFlightRequestCountMap; + private Map<String, Integer> inFlightRequestCountMap; private Thread notifier; private LoadBalancerInFlightRequestCountCollector() { - inFlightRequestCountMap = new ConcurrentHashMap<String, Map<String, Integer>>(); + inFlightRequestCountMap = new ConcurrentHashMap<String, Integer>(); if (notifier == null || (notifier != null && !notifier.isAlive())) { notifier = new Thread(new ObserverNotifier()); notifier.start(); @@ -65,63 +65,50 @@ public class LoadBalancerInFlightRequestCountCollector extends Observable { return collector; } - public int getInFlightRequestCount(String clusterId, String partitionId) { - if (StringUtils.isBlank(clusterId) || StringUtils.isBlank(partitionId)) { - return -1; - } - - Map<String, Integer> partitionMap = inFlightRequestCountMap.get(clusterId); - if (partitionMap == null) { - return 0; - } - if (partitionMap.containsKey(partitionId)) { - return partitionMap.get(partitionId); + public int getInFlightRequestCount(String clusterId) { + if (inFlightRequestCountMap.containsKey(clusterId)) { + return inFlightRequestCountMap.get(clusterId); } return 0; } - public void setInFlightRequestCount(String clusterId, String partitionId, int value) { - if (StringUtils.isBlank(clusterId) || StringUtils.isBlank(partitionId)) { + public void setInFlightRequestCount(String clusterId, int value) { + if (StringUtils.isBlank(clusterId)) { return; } - Map<String, Integer> partitionMap = inFlightRequestCountMap.get(clusterId); - if (partitionMap == null) { - partitionMap = new HashMap<String, Integer>(); - inFlightRequestCountMap.put(clusterId, partitionMap); - } - partitionMap.put(partitionId, value); + inFlightRequestCountMap.put(clusterId, value); if(log.isDebugEnabled()) { - log.debug(String.format("In-flight request count updated: [cluster] %s [partition] $s [value] %d", clusterId, partitionId, value)); + log.debug(String.format("In-flight request count updated: [cluster] %s [value] %d", clusterId, value)); } setChanged(); } - public void incrementInFlightRequestCount(String clusterId, String partitionId) { - incrementInFlightRequestCount(clusterId, partitionId, 1); + public void incrementInFlightRequestCount(String clusterId) { + incrementInFlightRequestCount(clusterId, 1); } - public void incrementInFlightRequestCount(String clusterId, String partitionId, int value) { - if (StringUtils.isBlank(clusterId) || StringUtils.isBlank(partitionId)) { + public void incrementInFlightRequestCount(String clusterId, int value) { + if (StringUtils.isBlank(clusterId)) { return; } - int count = getInFlightRequestCount(clusterId, partitionId); - setInFlightRequestCount(clusterId, partitionId, (count + value)); + int count = getInFlightRequestCount(clusterId); + setInFlightRequestCount(clusterId, (count + value)); } - public void decrementInFlightRequestCount(String clusterId, String partitionId) { - decrementInFlightRequestCount(clusterId, partitionId, 1); + public void decrementInFlightRequestCount(String clusterId) { + decrementInFlightRequestCount(clusterId, 1); } - public void decrementInFlightRequestCount(String clusterId, String partitionId, int value) { - if (StringUtils.isBlank(clusterId) || StringUtils.isBlank(partitionId)) { + public void decrementInFlightRequestCount(String clusterId, int value) { + if (StringUtils.isBlank(clusterId)) { return; } - int count = getInFlightRequestCount(clusterId, partitionId); + int count = getInFlightRequestCount(clusterId); int newValue = (count - value) < 0 ? 0 : (count - value); - setInFlightRequestCount(clusterId, partitionId, newValue); + setInFlightRequestCount(clusterId, newValue); } @@ -142,7 +129,7 @@ public class LoadBalancerInFlightRequestCountCollector extends Observable { Thread.sleep(15000); } catch (InterruptedException ignore) { } - LoadBalancerInFlightRequestCountCollector.getInstance().notifyObservers(new HashMap<String, Map<String, Integer>>(inFlightRequestCountMap)); + LoadBalancerInFlightRequestCountCollector.getInstance().notifyObservers(new HashMap<String, Integer>(inFlightRequestCountMap)); } } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java index f86643b..390905e 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java @@ -18,6 +18,7 @@ */ package org.apache.stratos.load.balancer.statistics.observers; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.stratos.load.balancer.common.statistics.WSO2CEPInFlightRequestPublisher; @@ -29,28 +30,28 @@ import java.util.Observer; public class WSO2CEPInFlightRequestCountObserver implements Observer { private static final Log log = LogFactory.getLog(WSO2CEPInFlightRequestCountObserver.class); private WSO2CEPInFlightRequestPublisher publisher; + private String networkPartitionId; public WSO2CEPInFlightRequestCountObserver() { this.publisher = new WSO2CEPInFlightRequestPublisher(); + networkPartitionId = System.getProperty("network.partition.id"); + if (StringUtils.isBlank(networkPartitionId)) { + throw new RuntimeException("network.partition.id system property was not found."); + } } public void update(Observable observable, Object object) { try { if (publisher.isEnabled()) { - Map<String, Map<String, Integer>> inFlightRequestCountMap = (Map<String, Map<String, Integer>>) object; + // Map<ClusterId, Count> + Map<String, Integer> inFlightRequestCountMap = (Map<String, Integer>) object; // Publish event per cluster id - Map<String, Integer> partitionMap = null; for (String clusterId : inFlightRequestCountMap.keySet()) { - partitionMap = inFlightRequestCountMap.get(clusterId); - if (partitionMap != null) { - for (String partitionId : partitionMap.keySet()) { - // Publish event - publisher.publish(clusterId, partitionId, partitionMap.get(partitionId)); - if (log.isDebugEnabled()) { - log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [partition] %s [value] %d", - clusterId, partitionId, partitionMap.get(partitionId))); - } - } + // Publish event + publisher.publish(clusterId, networkPartitionId, inFlightRequestCountMap.get(clusterId)); + if (log.isDebugEnabled()) { + log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d", + clusterId, networkPartitionId, inFlightRequestCountMap.get(clusterId))); } } } else if (log.isWarnEnabled()) { http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java index 959ba90..674fe24 100644 --- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java +++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java @@ -21,7 +21,6 @@ package org.apache.stratos.load.balancer.util; public class Constants { public static final String CLUSTER_ID = "cluster_id"; - public static final String PARTITION_ID = "partition_id"; public static final String LB_HOST_NAME = "LB_HOST_NAME"; public static final String LB_HTTP_PORT = "LB_HTTP_PORT"; http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java index ee7a726..8256b80 100755 --- a/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java +++ b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java @@ -81,6 +81,7 @@ public class LoadBalancerConfigurationTest { Assert.assertTrue(String.format("%s, cep stats publisher is not true", validationError), configuration.isCepStatsPublisherEnabled()); Assert.assertEquals(String.format("%s, cep ip is not valid", validationError), "localhost", configuration.getCepIp()); Assert.assertEquals(String.format("%s, cep port is not valid", validationError), 7615, configuration.getCepPort()); + Assert.assertEquals(String.format("%s, network partition id is not valid", validationError), "network-partition-1", configuration.getNetworkPartitionId()); Assert.assertTrue(String.format("%s, multi-tenancy is not true", validationError), configuration.isMultiTenancyEnabled()); Assert.assertEquals(String.format("%s, tenant-identifier is not valid", validationError), TenantIdentifier.TenantDomain, configuration.getTenantIdentifier()); Assert.assertEquals(String.format("%s, tenant-identifier-regex is not valid", validationError), "t/([^/]*)/", configuration.getTenantIdentifierRegex()); http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf index 7fec56d..bdb3fc4 100755 --- a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf +++ b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf @@ -77,6 +77,10 @@ loadbalancer { cep-ip: localhost; cep-port: 7615; + # Network partition id + # Provide the network partition id if cep-stats-publisher is set to true. + network-partition-id: network-partition-1; + # Multi-tenancy # If this property is set to true, all incoming request URLs will be scanned using the given tenant-identifier-regex # and matching tenant identifier value will be used to delegate the requests to the relevant cluster. http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf index 7e0f10d..31649c5 100755 --- a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf +++ b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf @@ -77,6 +77,10 @@ loadbalancer { cep-ip: localhost; cep-port: 7615; + # Network partition id + # Provide the network partition id if cep-stats-publisher is set to true. + network-partition-id: network-partition-1; + # Multi-tenancy # If this property is set to true, all incoming request URLs will be scanned using the given tenant-identifier-regex # and matching tenant identifier value will be used to delegate the requests to the relevant cluster. http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf index 4dfe2dc..a6270da 100755 --- a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf +++ b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf @@ -77,6 +77,10 @@ loadbalancer { # cep-ip: localhost; # cep-port: 7615; + # Network partition id + # Provide the network partition id if cep-stats-publisher is set to true. + # network-partition-id: network-partition-1; + # Multi-tenancy # If this property is set to true, all incoming request URLs will be scanned using the given tenant-identifier-regex # and matching tenant identifier value will be used to delegate the requests to the relevant cluster. http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh b/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh index ed11eaa..741b269 100755 --- a/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh +++ b/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh @@ -36,6 +36,7 @@ properties="-Djndi.properties.dir=${script_path}/../conf -Djavax.net.ssl.trustStorePassword=wso2carbon -Dthrift.receiver.ip=localhost -Dthrift.receiver.port=7615 + -Dnetwork.partition.id= -Dstratos.messaging.topology.service.filter= -Dload.balancer.cep.stats.publisher.enabled=true" http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java ---------------------------------------------------------------------- diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java index 43de7d6..57c6bc0 100644 --- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java +++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java @@ -45,7 +45,7 @@ public class HAProxyStatsReader implements LoadBalancerStatsReader { } @Override - public int getInFlightRequestCount(String clusterId, String partitionId) { + public int getInFlightRequestCount(String clusterId) { String frontendId, backendId, command, output; String[] array; int totalWeight, weight; @@ -63,25 +63,23 @@ public class HAProxyStatsReader implements LoadBalancerStatsReader { backendId = frontendId + "-members"; for (Member member : cluster.getMembers()) { - if((member.getPartitionId() != null) && member.getPartitionId().equals(partitionId)) { - // echo "get weight <backend>/<server>" | socat stdio <stats-socket> - command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath); - try { - output = CommandUtil.executeCommand(command); - if ((output != null) && (output.length() > 0)) { - array = output.split(" "); - if ((array != null) && (array.length > 0)) { - weight = Integer.parseInt(array[0]); - if (log.isDebugEnabled()) { - log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight)); - } - totalWeight += weight; + // echo "get weight <backend>/<server>" | socat stdio <stats-socket> + command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath); + try { + output = CommandUtil.executeCommand(command); + if ((output != null) && (output.length() > 0)) { + array = output.split(" "); + if ((array != null) && (array.length > 0)) { + weight = Integer.parseInt(array[0]); + if (log.isDebugEnabled()) { + log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight)); } + totalWeight += weight; } - } catch (IOException e) { - if (log.isErrorEnabled()) { - log.error(e); - } + } + } catch (IOException e) { + if (log.isErrorEnabled()) { + log.error(e); } } } http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/products/cartridge-agent/modules/cartridge-agent/ec2/load-balancer/templates/loadbalancer.conf.template ---------------------------------------------------------------------- diff --git a/products/cartridge-agent/modules/cartridge-agent/ec2/load-balancer/templates/loadbalancer.conf.template b/products/cartridge-agent/modules/cartridge-agent/ec2/load-balancer/templates/loadbalancer.conf.template index 354aa45..caa9078 100755 --- a/products/cartridge-agent/modules/cartridge-agent/ec2/load-balancer/templates/loadbalancer.conf.template +++ b/products/cartridge-agent/modules/cartridge-agent/ec2/load-balancer/templates/loadbalancer.conf.template @@ -71,6 +71,10 @@ loadbalancer { cep-ip: CEP_IP; cep-port: CEP_PORT; + # Network partition id + # Provide the network partition id if cep-stats-publisher is set to true. + network-partition-id: network-partition-1; + # Multi-tenancy # If this property is set to true, all incoming request URLs will be scanned using the given tenant-identifier-regex # and matching tenant identifier value will be used to delegate the requests to the relevant cluster. http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf ---------------------------------------------------------------------- diff --git a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf index 69d35a7..03a9634 100644 --- a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf +++ b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf @@ -71,6 +71,10 @@ loadbalancer { cep-ip: localhost; cep-port: 7615; + # Network partition id + # Provide the network partition id if cep-stats-publisher is set to true. + network-partition-id: network-partition-1; + # Multi-tenancy # If this property is set to true, all incoming request URLs will be scanned using the given tenant-identifier-regex # and matching tenant identifier value will be used to delegate the requests to the relevant cluster.
