Updated Branches:
  refs/heads/master 1393aa295 -> 60f35f391

Removed partition id from load balancer in-flight request count stat and added 
network partition id


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/60f35f39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/60f35f39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/60f35f39

Branch: refs/heads/master
Commit: 60f35f391e6b3f87dce19aa9ffb31f469effe4bb
Parents: 1393aa2
Author: Imesh Gunaratne <[email protected]>
Authored: Fri Dec 13 11:48:02 2013 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Fri Dec 13 11:48:02 2013 +0530

----------------------------------------------------------------------
 .../WSO2CEPInFlightRequestPublisher.java        |  8 +--
 ...oadBalancerInFlightRequestCountNotifier.java | 26 ++++-----
 .../extension/api/LoadBalancerStatsReader.java  |  3 +-
 .../conf/LoadBalancerConfiguration.java         | 13 +++++
 .../conf/configurator/CEPConfigurator.java      |  1 +
 .../load/balancer/conf/util/Constants.java      |  1 +
 .../TenantAwareLoadBalanceEndpoint.java         |  7 +--
 .../balancer/mediators/ResponseInterceptor.java |  6 +--
 ...adBalancerInFlightRequestCountCollector.java | 57 ++++++++------------
 .../WSO2CEPInFlightRequestCountObserver.java    | 25 ++++-----
 .../stratos/load/balancer/util/Constants.java   |  1 -
 .../test/LoadBalancerConfigurationTest.java     |  1 +
 .../sample/configuration/loadbalancer1.conf     |  4 ++
 .../sample/configuration/loadbalancer2.conf     |  4 ++
 .../sample/configuration/loadbalancer3.conf     |  4 ++
 .../src/main/bin/haproxy-extension.sh           |  1 +
 .../haproxy/extension/HAProxyStatsReader.java   | 34 ++++++------
 .../templates/loadbalancer.conf.template        |  4 ++
 .../src/main/conf/loadbalancer.conf             |  4 ++
 19 files changed, 109 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
index 6992f45..f8c6d40 100644
--- 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
@@ -45,7 +45,7 @@ public class WSO2CEPInFlightRequestPublisher extends 
WSO2CEPStatsPublisher {
             List<Attribute> payloadData = new ArrayList<Attribute>();
             // Payload definition
             payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
-            payloadData.add(new Attribute("partition_id", 
AttributeType.STRING));
+            payloadData.add(new Attribute("network_partition_id", 
AttributeType.STRING));
             payloadData.add(new Attribute("in_flight_request_count", 
AttributeType.INT));
             streamDefinition.setPayloadData(payloadData);
             return streamDefinition;
@@ -62,14 +62,14 @@ public class WSO2CEPInFlightRequestPublisher extends 
WSO2CEPStatsPublisher {
      * Publish in-flight request count of a cluster.
      *
      * @param clusterId
-     * @param partitionId
+     * @param networkPartitionId
      * @param inFlightRequestCount
      */
-    public void publish(String clusterId, String partitionId, int 
inFlightRequestCount) {
+    public void publish(String clusterId, String networkPartitionId, int 
inFlightRequestCount) {
         List<Object> payload = new ArrayList<Object>();
         // Payload values
         payload.add(clusterId);
-        payload.add(partitionId);
+        payload.add(networkPartitionId);
         payload.add(inFlightRequestCount);
         super.publish(payload.toArray());
     }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
index f1f4da9..5fa3c71 100644
--- 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
+++ 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
@@ -19,6 +19,7 @@
 
 package org.apache.stratos.load.balancer.extension.api;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import 
org.apache.stratos.load.balancer.common.statistics.WSO2CEPInFlightRequestPublisher;
@@ -37,6 +38,7 @@ public class LoadBalancerInFlightRequestCountNotifier 
implements Runnable {
     private LoadBalancerStatsReader statsReader;
     private final WSO2CEPInFlightRequestPublisher statsPublisher;
     private long statsPublisherInterval = 15000;
+    private String networkPartitionId;
     private boolean terminated;
 
     public LoadBalancerInFlightRequestCountNotifier(LoadBalancerStatsReader 
statsReader) {
@@ -47,6 +49,10 @@ public class LoadBalancerInFlightRequestCountNotifier 
implements Runnable {
         if (interval != null) {
             statsPublisherInterval = Long.getLong(interval);
         }
+        networkPartitionId = System.getProperty("network.partition.id");
+        if (StringUtils.isBlank(networkPartitionId)) {
+            throw new RuntimeException("network.partition.id system property 
was not found.");
+        }
     }
 
     @Override
@@ -61,24 +67,20 @@ public class LoadBalancerInFlightRequestCountNotifier 
implements Runnable {
                 if (statsPublisher.isEnabled()) {
                     try {
                         TopologyManager.acquireReadLock();
-                        Collection<String> partitionIds;
                         int requestCount;
                         for (Service service : 
TopologyManager.getTopology().getServices()) {
                             for (Cluster cluster : service.getClusters()) {
-                                partitionIds =  cluster.findPartitionIds();
-                                for(String partitionId : partitionIds) {
-                                    // Publish in-flight request count of each 
cluster partition
-                                    requestCount = 
statsReader.getInFlightRequestCount(cluster.getClusterId(), partitionId);
-                                    
statsPublisher.publish(cluster.getClusterId(), partitionId, requestCount);
-                                    if (log.isDebugEnabled()) {
-                                        log.debug(String.format("In-flight 
request count published to cep: [cluster-id] %s [partition] %s [value] %d",
-                                                cluster.getClusterId(), 
partitionId, requestCount));
-                                    }
+                                // Publish in-flight request count of load 
balancer's network partition
+                                requestCount = 
statsReader.getInFlightRequestCount(cluster.getClusterId());
+                                statsPublisher.publish(cluster.getClusterId(), 
networkPartitionId, requestCount);
+                                if (log.isDebugEnabled()) {
+                                    log.debug(String.format("In-flight request 
count published to cep: [cluster-id] %s [network-partition] %s [value] %d",
+                                            cluster.getClusterId(), 
networkPartitionId, requestCount));
                                 }
                             }
+
                         }
-                    }
-                    finally {
+                    } finally {
                         TopologyManager.releaseReadLock();
                     }
                 } else if (log.isWarnEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
index a098ef0..2c6f324 100644
--- 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
+++ 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
@@ -29,7 +29,6 @@ public interface LoadBalancerStatsReader {
     /**
      * Get in-flight request count of a given cluster.
      * @param clusterId
-     * @param partitionId
      */
-    int getInFlightRequestCount(String clusterId, String partitionId);
+    int getInFlightRequestCount(String clusterId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
index 1812fb7..9deb1ae 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
@@ -64,6 +64,7 @@ public class LoadBalancerConfiguration {
     private TenantIdentifier tenantIdentifier;
     private String tenantIdentifierRegex;
     private String topologyMemberFilter;
+    private String networkPartitionId;
 
     /**
      * Load balancer configuration is singleton.
@@ -243,6 +244,14 @@ public class LoadBalancerConfiguration {
         return tenantIdentifierRegex;
     }
 
+    public void setNetworkPartitionId(String networkPartitionId) {
+        this.networkPartitionId = networkPartitionId;
+    }
+
+    public String getNetworkPartitionId() {
+        return networkPartitionId;
+    }
+
     private static class LoadBalancerConfigurationReader {
 
         private String property;
@@ -349,6 +358,10 @@ public class LoadBalancerConfiguration {
                 String cepPort = 
loadBalancerNode.getProperty(Constants.CONF_PROPERTY_CEP_PORT);
                 
validateRequiredPropertyInNode(Constants.CONF_PROPERTY_CEP_PORT, cepPort, 
"loadbalancer");
                 configuration.setCepPort(Integer.parseInt(cepPort));
+
+                String networkPartitionId = 
loadBalancerNode.getProperty(Constants.CONF_PROPERTY_NETWORK_PARTITION_ID);
+                
validateRequiredPropertyInNode(Constants.CONF_PROPERTY_NETWORK_PARTITION_ID, 
networkPartitionId, "loadbalancer");
+                configuration.setNetworkPartitionId(networkPartitionId);
             }
 
             if (configuration.isMultiTenancyEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/CEPConfigurator.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/CEPConfigurator.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/CEPConfigurator.java
index 83a8086..9d9d492 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/CEPConfigurator.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/CEPConfigurator.java
@@ -31,5 +31,6 @@ public class CEPConfigurator {
         System.setProperty("load.balancer.cep.stats.publisher.enabled", 
String.valueOf(configuration.isCepStatsPublisherEnabled()));
         System.setProperty("thrift.receiver.ip", configuration.getCepIp());
         System.setProperty("thrift.receiver.port", 
String.valueOf(configuration.getCepPort()));
+        System.setProperty("network.partition.id", 
configuration.getNetworkPartitionId());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
index 87aed3f..cd26dd2 100755
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/util/Constants.java
@@ -56,6 +56,7 @@ public class Constants {
     public static final String CONF_PROPERTY_VALUE_TENANT_ID = "tenant-id";
     public static final String CONF_PROPERTY_VALUE_TENANT_DOMAIN = 
"tenant-domain";
     public static final String CONF_PROPERTY_TENANT_IDENTIFIER_REGEX = 
"tenant-identifier-regex";
+    public static final String CONF_PROPERTY_NETWORK_PARTITION_ID = 
"network-partition-id";
 
     public static final String CONF_DELIMITER_HOSTS = ",";
     public static final long DEFAULT_SESSION_TIMEOUT = 90000;

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
index 9c97d4b..b6fe08b 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -241,7 +241,6 @@ public class TenantAwareLoadBalanceEndpoint extends 
org.apache.synapse.endpoints
         axis2Member.setActive(member.isActive());
         // Set cluster id and partition id in message context
         synCtx.setProperty(Constants.CLUSTER_ID, member.getClusterId());
-        synCtx.setProperty(Constants.PARTITION_ID, member.getPartitionId());
         return axis2Member;
     }
 
@@ -527,11 +526,7 @@ public class TenantAwareLoadBalanceEndpoint extends 
org.apache.synapse.endpoints
             if(StringUtils.isBlank(clusterId)) {
                 throw new RuntimeException("Cluster id not found in message 
context");
             }
-            String partitionId = (String) 
messageContext.getProperty(Constants.PARTITION_ID);
-            if(StringUtils.isBlank(partitionId)) {
-                throw new RuntimeException("Partition id not found in message 
context");
-            }
-            
LoadBalancerInFlightRequestCountCollector.getInstance().incrementInFlightRequestCount(clusterId,
 partitionId);
+            
LoadBalancerInFlightRequestCountCollector.getInstance().incrementInFlightRequestCount(clusterId);
         }
         catch (Exception e) {
             if(log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
index 1af0b9c..fef3fea 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
@@ -40,11 +40,7 @@ public class ResponseInterceptor extends AbstractMediator 
implements ManagedLife
             if (StringUtils.isBlank(clusterId)) {
                 throw new RuntimeException("Cluster id not found in message 
context");
             }
-            String partitionId = (String) 
messageContext.getProperty(Constants.PARTITION_ID);
-            if (StringUtils.isBlank(partitionId)) {
-                throw new RuntimeException("Partition id not found in message 
context");
-            }
-            
LoadBalancerInFlightRequestCountCollector.getInstance().decrementInFlightRequestCount(clusterId,
 partitionId);
+            
LoadBalancerInFlightRequestCountCollector.getInstance().decrementInFlightRequestCount(clusterId);
         } catch (Exception e) {
             if(log.isErrorEnabled()) {
                 log.error("Could not decrement in-flight request count", e);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
index 497b973..3c8927a 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
@@ -41,11 +41,11 @@ public class LoadBalancerInFlightRequestCountCollector 
extends Observable {
 
     private static LoadBalancerInFlightRequestCountCollector collector;
     // Map<ClusterId, Map<PartitionId, InFlightRequestCount>
-    private Map<String, Map<String, Integer>> inFlightRequestCountMap;
+    private Map<String, Integer> inFlightRequestCountMap;
     private Thread notifier;
 
     private LoadBalancerInFlightRequestCountCollector() {
-        inFlightRequestCountMap = new ConcurrentHashMap<String, Map<String, 
Integer>>();
+        inFlightRequestCountMap = new ConcurrentHashMap<String, Integer>();
         if (notifier == null || (notifier != null && !notifier.isAlive())) {
             notifier = new Thread(new ObserverNotifier());
             notifier.start();
@@ -65,63 +65,50 @@ public class LoadBalancerInFlightRequestCountCollector 
extends Observable {
         return collector;
     }
 
-    public int getInFlightRequestCount(String clusterId, String partitionId) {
-        if (StringUtils.isBlank(clusterId) || 
StringUtils.isBlank(partitionId)) {
-            return -1;
-        }
-
-        Map<String, Integer> partitionMap = 
inFlightRequestCountMap.get(clusterId);
-        if (partitionMap == null) {
-            return 0;
-        }
-        if (partitionMap.containsKey(partitionId)) {
-            return partitionMap.get(partitionId);
+    public int getInFlightRequestCount(String clusterId) {
+        if (inFlightRequestCountMap.containsKey(clusterId)) {
+            return inFlightRequestCountMap.get(clusterId);
         }
         return 0;
     }
 
-    public void setInFlightRequestCount(String clusterId, String partitionId, 
int value) {
-        if (StringUtils.isBlank(clusterId) || 
StringUtils.isBlank(partitionId)) {
+    public void setInFlightRequestCount(String clusterId, int value) {
+        if (StringUtils.isBlank(clusterId)) {
             return;
         }
 
-        Map<String, Integer> partitionMap = 
inFlightRequestCountMap.get(clusterId);
-        if (partitionMap == null) {
-            partitionMap = new HashMap<String, Integer>();
-            inFlightRequestCountMap.put(clusterId, partitionMap);
-        }
-        partitionMap.put(partitionId, value);
+        inFlightRequestCountMap.put(clusterId, value);
         if(log.isDebugEnabled()) {
-            log.debug(String.format("In-flight request count updated: 
[cluster] %s [partition] $s [value] %d", clusterId, partitionId, value));
+            log.debug(String.format("In-flight request count updated: 
[cluster] %s [value] %d", clusterId, value));
         }
         setChanged();
     }
 
-    public void incrementInFlightRequestCount(String clusterId, String 
partitionId) {
-        incrementInFlightRequestCount(clusterId, partitionId, 1);
+    public void incrementInFlightRequestCount(String clusterId) {
+        incrementInFlightRequestCount(clusterId, 1);
     }
 
-    public void incrementInFlightRequestCount(String clusterId, String 
partitionId, int value) {
-        if (StringUtils.isBlank(clusterId) || 
StringUtils.isBlank(partitionId)) {
+    public void incrementInFlightRequestCount(String clusterId, int value) {
+        if (StringUtils.isBlank(clusterId)) {
             return;
         }
 
-        int count = getInFlightRequestCount(clusterId, partitionId);
-        setInFlightRequestCount(clusterId, partitionId, (count + value));
+        int count = getInFlightRequestCount(clusterId);
+        setInFlightRequestCount(clusterId, (count + value));
     }
 
-    public void decrementInFlightRequestCount(String clusterId, String 
partitionId) {
-        decrementInFlightRequestCount(clusterId, partitionId, 1);
+    public void decrementInFlightRequestCount(String clusterId) {
+        decrementInFlightRequestCount(clusterId, 1);
     }
 
-    public void decrementInFlightRequestCount(String clusterId, String 
partitionId, int value) {
-        if (StringUtils.isBlank(clusterId) || 
StringUtils.isBlank(partitionId)) {
+    public void decrementInFlightRequestCount(String clusterId, int value) {
+        if (StringUtils.isBlank(clusterId)) {
             return;
         }
 
-        int count = getInFlightRequestCount(clusterId, partitionId);
+        int count = getInFlightRequestCount(clusterId);
         int newValue = (count - value) < 0 ? 0 : (count - value);
-        setInFlightRequestCount(clusterId, partitionId, newValue);
+        setInFlightRequestCount(clusterId, newValue);
     }
 
 
@@ -142,7 +129,7 @@ public class LoadBalancerInFlightRequestCountCollector 
extends Observable {
                     Thread.sleep(15000);
                 } catch (InterruptedException ignore) {
                 }
-                
LoadBalancerInFlightRequestCountCollector.getInstance().notifyObservers(new 
HashMap<String, Map<String, Integer>>(inFlightRequestCountMap));
+                
LoadBalancerInFlightRequestCountCollector.getInstance().notifyObservers(new 
HashMap<String, Integer>(inFlightRequestCountMap));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
index f86643b..390905e 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
@@ -18,6 +18,7 @@
  */
 package org.apache.stratos.load.balancer.statistics.observers;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import 
org.apache.stratos.load.balancer.common.statistics.WSO2CEPInFlightRequestPublisher;
@@ -29,28 +30,28 @@ import java.util.Observer;
 public class WSO2CEPInFlightRequestCountObserver implements Observer {
     private static final Log log = 
LogFactory.getLog(WSO2CEPInFlightRequestCountObserver.class);
     private WSO2CEPInFlightRequestPublisher publisher;
+    private String networkPartitionId;
 
     public WSO2CEPInFlightRequestCountObserver() {
         this.publisher = new WSO2CEPInFlightRequestPublisher();
+        networkPartitionId = System.getProperty("network.partition.id");
+        if (StringUtils.isBlank(networkPartitionId)) {
+            throw new RuntimeException("network.partition.id system property 
was not found.");
+        }
     }
 
     public void update(Observable observable, Object object) {
         try {
             if (publisher.isEnabled()) {
-                Map<String, Map<String, Integer>> inFlightRequestCountMap = 
(Map<String, Map<String, Integer>>) object;
+                // Map<ClusterId, Count>
+                Map<String, Integer> inFlightRequestCountMap = (Map<String, 
Integer>) object;
                 // Publish event per cluster id
-                Map<String, Integer> partitionMap = null;
                 for (String clusterId : inFlightRequestCountMap.keySet()) {
-                    partitionMap = inFlightRequestCountMap.get(clusterId);
-                    if (partitionMap != null) {
-                        for (String partitionId : partitionMap.keySet()) {
-                            // Publish event
-                            publisher.publish(clusterId, partitionId, 
partitionMap.get(partitionId));
-                            if (log.isDebugEnabled()) {
-                                log.debug(String.format("In-flight request 
count published to cep: [cluster-id] %s [partition] %s [value] %d",
-                                        clusterId, partitionId, 
partitionMap.get(partitionId)));
-                            }
-                        }
+                    // Publish event
+                    publisher.publish(clusterId, networkPartitionId, 
inFlightRequestCountMap.get(clusterId));
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("In-flight request count 
published to cep: [cluster-id] %s [network-partition] %s [value] %d",
+                                clusterId, networkPartitionId, 
inFlightRequestCountMap.get(clusterId)));
                     }
                 }
             } else if (log.isWarnEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
index 959ba90..674fe24 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
@@ -21,7 +21,6 @@ package org.apache.stratos.load.balancer.util;
 public class Constants {
 
     public static final String CLUSTER_ID = "cluster_id";
-    public static final String PARTITION_ID = "partition_id";
 
     public static final String LB_HOST_NAME = "LB_HOST_NAME";
     public static final String LB_HTTP_PORT = "LB_HTTP_PORT";

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java
 
b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java
index ee7a726..8256b80 100755
--- 
a/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java
+++ 
b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java
@@ -81,6 +81,7 @@ public class LoadBalancerConfigurationTest {
             Assert.assertTrue(String.format("%s, cep stats publisher is not 
true", validationError), configuration.isCepStatsPublisherEnabled());
             Assert.assertEquals(String.format("%s, cep ip is not valid", 
validationError), "localhost", configuration.getCepIp());
             Assert.assertEquals(String.format("%s, cep port is not valid", 
validationError), 7615, configuration.getCepPort());
+            Assert.assertEquals(String.format("%s, network partition id is not 
valid", validationError), "network-partition-1", 
configuration.getNetworkPartitionId());
             Assert.assertTrue(String.format("%s, multi-tenancy is not true", 
validationError), configuration.isMultiTenancyEnabled());
             Assert.assertEquals(String.format("%s, tenant-identifier is not 
valid", validationError), TenantIdentifier.TenantDomain, 
configuration.getTenantIdentifier());
             Assert.assertEquals(String.format("%s, tenant-identifier-regex is 
not valid", validationError), "t/([^/]*)/", 
configuration.getTenantIdentifierRegex());

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf
 
b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf
index 7fec56d..bdb3fc4 100755
--- 
a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf
+++ 
b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf
@@ -77,6 +77,10 @@ loadbalancer {
     cep-ip: localhost;
     cep-port: 7615;
 
+    # Network partition id
+    # Provide the network partition id if cep-stats-publisher is set to true.
+    network-partition-id: network-partition-1;
+
     # Multi-tenancy
     # If this property is set to true, all incoming request URLs will be 
scanned using the given tenant-identifier-regex
     # and matching tenant identifier value will be used to delegate the 
requests to the relevant cluster.

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
 
b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
index 7e0f10d..31649c5 100755
--- 
a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
+++ 
b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
@@ -77,6 +77,10 @@ loadbalancer {
     cep-ip: localhost;
     cep-port: 7615;
 
+    # Network partition id
+    # Provide the network partition id if cep-stats-publisher is set to true.
+    network-partition-id: network-partition-1;
+
     # Multi-tenancy
     # If this property is set to true, all incoming request URLs will be 
scanned using the given tenant-identifier-regex
     # and matching tenant identifier value will be used to delegate the 
requests to the relevant cluster.

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf
 
b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf
index 4dfe2dc..a6270da 100755
--- 
a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf
+++ 
b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf
@@ -77,6 +77,10 @@ loadbalancer {
     # cep-ip: localhost;
     # cep-port: 7615;
 
+    # Network partition id
+    # Provide the network partition id if cep-stats-publisher is set to true.
+    # network-partition-id: network-partition-1;
+
     # Multi-tenancy
     # If this property is set to true, all incoming request URLs will be 
scanned using the given tenant-identifier-regex
     # and matching tenant identifier value will be used to delegate the 
requests to the relevant cluster.

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh 
b/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh
index ed11eaa..741b269 100755
--- 
a/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh
+++ 
b/extensions/load-balancer/haproxy-extension/src/main/bin/haproxy-extension.sh
@@ -36,6 +36,7 @@ properties="-Djndi.properties.dir=${script_path}/../conf
             -Djavax.net.ssl.trustStorePassword=wso2carbon
             -Dthrift.receiver.ip=localhost
             -Dthrift.receiver.port=7615
+            -Dnetwork.partition.id=
             -Dstratos.messaging.topology.service.filter=
             -Dload.balancer.cep.stats.publisher.enabled=true"
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
index 43de7d6..57c6bc0 100644
--- 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
+++ 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
@@ -45,7 +45,7 @@ public class HAProxyStatsReader implements 
LoadBalancerStatsReader {
     }
 
     @Override
-    public int getInFlightRequestCount(String clusterId, String partitionId) {
+    public int getInFlightRequestCount(String clusterId) {
         String frontendId, backendId, command, output;
         String[] array;
         int totalWeight, weight;
@@ -63,25 +63,23 @@ public class HAProxyStatsReader implements 
LoadBalancerStatsReader {
                         backendId = frontendId + "-members";
 
                         for (Member member : cluster.getMembers()) {
-                            if((member.getPartitionId() != null) && 
member.getPartitionId().equals(partitionId)) {
-                                // echo "get weight <backend>/<server>" | 
socat stdio <stats-socket>
-                                command = String.format("%s/get-weight.sh %s 
%s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath);
-                                try {
-                                    output = 
CommandUtil.executeCommand(command);
-                                    if ((output != null) && (output.length() > 
0)) {
-                                        array = output.split(" ");
-                                        if ((array != null) && (array.length > 
0)) {
-                                            weight = 
Integer.parseInt(array[0]);
-                                            if (log.isDebugEnabled()) {
-                                                
log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] 
%d", member.getClusterId(), member.getMemberId(), weight));
-                                            }
-                                            totalWeight += weight;
+                            // echo "get weight <backend>/<server>" | socat 
stdio <stats-socket>
+                            command = String.format("%s/get-weight.sh %s %s 
%s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath);
+                            try {
+                                output = CommandUtil.executeCommand(command);
+                                if ((output != null) && (output.length() > 0)) 
{
+                                    array = output.split(" ");
+                                    if ((array != null) && (array.length > 0)) 
{
+                                        weight = Integer.parseInt(array[0]);
+                                        if (log.isDebugEnabled()) {
+                                            log.debug(String.format("Member 
weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), 
member.getMemberId(), weight));
                                         }
+                                        totalWeight += weight;
                                     }
-                                } catch (IOException e) {
-                                    if (log.isErrorEnabled()) {
-                                        log.error(e);
-                                    }
+                                }
+                            } catch (IOException e) {
+                                if (log.isErrorEnabled()) {
+                                    log.error(e);
                                 }
                             }
                         }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/products/cartridge-agent/modules/cartridge-agent/ec2/load-balancer/templates/loadbalancer.conf.template
----------------------------------------------------------------------
diff --git 
a/products/cartridge-agent/modules/cartridge-agent/ec2/load-balancer/templates/loadbalancer.conf.template
 
b/products/cartridge-agent/modules/cartridge-agent/ec2/load-balancer/templates/loadbalancer.conf.template
index 354aa45..caa9078 100755
--- 
a/products/cartridge-agent/modules/cartridge-agent/ec2/load-balancer/templates/loadbalancer.conf.template
+++ 
b/products/cartridge-agent/modules/cartridge-agent/ec2/load-balancer/templates/loadbalancer.conf.template
@@ -71,6 +71,10 @@ loadbalancer {
     cep-ip: CEP_IP;
     cep-port: CEP_PORT;
 
+    # Network partition id
+    # Provide the network partition id if cep-stats-publisher is set to true.
+    network-partition-id: network-partition-1;
+
     # Multi-tenancy
     # If this property is set to true, all incoming request URLs will be 
scanned using the given tenant-identifier-regex
     # and matching tenant identifier value will be used to delegate the 
requests to the relevant cluster.

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/60f35f39/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
----------------------------------------------------------------------
diff --git 
a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf 
b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
index 69d35a7..03a9634 100644
--- 
a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
+++ 
b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
@@ -71,6 +71,10 @@ loadbalancer {
     cep-ip: localhost;
     cep-port: 7615;
 
+    # Network partition id
+    # Provide the network partition id if cep-stats-publisher is set to true.
+    network-partition-id: network-partition-1;
+
     # Multi-tenancy
     # If this property is set to true, all incoming request URLs will be 
scanned using the given tenant-identifier-regex
     # and matching tenant identifier value will be used to delegate the 
requests to the relevant cluster.

Reply via email to