Updated Branches:
  refs/heads/master d29ee5d60 -> 24101ae18

Added load balancing statistics reporting interface to load balancing extension 
api


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/24101ae1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/24101ae1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/24101ae1

Branch: refs/heads/master
Commit: 24101ae18fb63af9306a1a9d7c101ea66909771f
Parents: d29ee5d
Author: Imesh Gunaratne <[email protected]>
Authored: Thu Nov 14 19:16:38 2013 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Thu Nov 14 19:16:38 2013 +0530

----------------------------------------------------------------------
 .../statistics/LoadBalancerStatsPublisher.java  |  34 +++++
 .../statistics/LoadBalancingStatsCollector.java | 110 ----------------
 .../statistics/WSO2CEPStatsPublisher.java       |  97 ++++++++++++++
 .../observers/WSO2CEPStatsObserver.java         | 102 ---------------
 .../extension/api/LoadBalancerExtension.java    |  23 +++-
 .../api/LoadBalancerStatsNotifier.java          |  77 +++++++++++
 .../extension/api/LoadBalancerStatsReader.java  |  34 +++++
 .../TenantAwareLoadBalanceEndpoint.java         |   4 +-
 .../balancer/mediators/ResponseInterceptor.java |   4 +-
 .../statistics/LoadBalancerStatsCollector.java  | 128 +++++++++++++++++++
 .../observers/WSO2CEPStatsObserver.java         |  40 ++++++
 .../haproxy/extension/HAProxyStatsReader.java   |  33 +++++
 .../apache/stratos/haproxy/extension/Main.java  |   3 +-
 13 files changed, 468 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java
new file mode 100644
index 0000000..5687d76
--- /dev/null
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.statistics;
+
+import java.util.Map;
+
+/**
+ * Load balancer statistics publisher interface.
+ */
+public interface LoadBalancerStatsPublisher {
+
+    /**
+     * Publish statistics as a map of Cluster Id, In-flight Request Count.
+     * @param stats
+     */
+    void publish(Map<String, Integer> stats);
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java
deleted file mode 100644
index a55fc22..0000000
--- 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.load.balancer.common.statistics;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Observable;
-import java.util.concurrent.ConcurrentHashMap;
-
-import 
org.apache.stratos.load.balancer.common.statistics.observers.WSO2CEPStatsObserver;
-
-/**
- * This is the load balancing stats collector and any observer can get 
registered here 
- * and receive notifications periodically.
- * This is a Singleton object.
- * @author nirmal
- *
- */
-public class LoadBalancingStatsCollector extends Observable{
-
-       private static LoadBalancingStatsCollector collector;
-       private Map<String, Integer> clusterIdToRequestInflightCountMap;
-       private Thread notifier;
-       
-       private LoadBalancingStatsCollector() {
-               clusterIdToRequestInflightCountMap = new 
ConcurrentHashMap<String, Integer>();
-               if (notifier == null || (notifier != null && 
!notifier.isAlive())) {
-                       notifier = new Thread(new ObserverNotifier());
-                       notifier.start();
-               }
-    }
-       
-       public static LoadBalancingStatsCollector getInstance() {
-               if (collector == null) {
-                       synchronized (LoadBalancingStatsCollector.class) {
-                               if (collector == null) {
-                                       collector = new 
LoadBalancingStatsCollector();
-                                       // add observers
-                                       collector.addObserver(new 
WSO2CEPStatsObserver());
-                               }
-                       }
-               }
-               return collector;
-       }
-       
-       public void incrementRequestInflightCount(String clusterId) {
-               if(clusterId == null) {
-                       return;
-               }
-               
-               int value = 1;
-               if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
-                       value += 
clusterIdToRequestInflightCountMap.get(clusterId);
-               }
-               clusterIdToRequestInflightCountMap.put(clusterId, value);
-               setChanged();
-       }
-       
-       public void decrementRequestInflightCount(String clusterId) {
-               if(clusterId == null) {
-                       return;
-               }
-               
-               int value = -1;
-               if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
-                       value += 
clusterIdToRequestInflightCountMap.get(clusterId);
-               }
-               clusterIdToRequestInflightCountMap.put(clusterId, value);
-               setChanged();
-       }
-       
-       
-       /**
-        * This thread will notify all the observers of this subject.
-        * @author nirmal
-        *
-        */
-       private class ObserverNotifier implements Runnable {
-
-               @Override
-        public void run() {
-                       while(true) {
-                               try {
-                       Thread.sleep(15000);
-                } catch (InterruptedException ignore) {
-                }
-                               
LoadBalancingStatsCollector.getInstance().notifyObservers(new HashMap<String, 
Integer>(clusterIdToRequestInflightCountMap));
-                       }
-               
-        }
-               
-       }
-       
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java
new file mode 100644
index 0000000..2d41242
--- /dev/null
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.statistics;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.databridge.agent.thrift.Agent;
+import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
+import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
+import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
+import org.wso2.carbon.databridge.commons.Event;
+import org.wso2.carbon.utils.CarbonUtils;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * WSO2 CEP statistics publisher for the load balancer.
+ */
+public class WSO2CEPStatsPublisher implements LoadBalancerStatsPublisher {
+    private static final Log log = 
LogFactory.getLog(WSO2CEPStatsPublisher.class);
+    private static final String CALL_CENTER_DATA_STREAM = "stratos.lb.stats";
+    private static final String VERSION = "1.0.0";
+    private AsyncDataPublisher asyncDataPublisher;
+
+    public WSO2CEPStatsPublisher() {
+        AgentConfiguration agentConfiguration = new AgentConfiguration();
+        // TODO get following from somewhere, without hard-coding.
+        System.setProperty("javax.net.ssl.trustStore", 
CarbonUtils.getCarbonHome()+ File.separator+"repository"+
+                
File.separator+"resources"+File.separator+"security"+File.separator+"client-truststore.jks"
    );
+        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
+
+        Agent agent = new Agent(agentConfiguration);
+        //TODO read following from a config file?
+        String ip = System.getProperty("thrift.receiver.ip");
+        String port = System.getProperty("thrift.receiver.port");
+        //Using Asynchronous data publisher
+        asyncDataPublisher = new AsyncDataPublisher("tcp://"+ip+":"+port+"", 
"admin", "admin", agent);
+        String streamDefinition = "{" +
+                " 'name':'" + CALL_CENTER_DATA_STREAM + "'," +
+                " 'version':'" + VERSION + "'," +
+                " 'nickName': 'lb stats'," +
+                " 'description': 'lb stats'," +
+                " 'metaData':[]," +
+                " 'payloadData':[" +
+                " {'name':'cluster_id','type':'STRING'}," +
+                " {'name':'in_flight_requests','type':'INT'}" +
+                " ]" +
+                "}";
+        asyncDataPublisher.addStreamDefinition(streamDefinition, 
CALL_CENTER_DATA_STREAM, VERSION);
+    }
+
+    @Override
+    public void publish(Map<String, Integer> stats) {
+
+        for (Map.Entry<String, Integer> entry : stats.entrySet()) {
+
+            Object[] payload = new Object[]{entry.getKey(), entry.getValue()};
+            Event event = eventObject(null, null, payload, new HashMap<String, 
String>());
+            try {
+                asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, 
event);
+            } catch (AgentException e) {
+                log.error("Failed to publish events. ", e);
+            }
+
+        }
+        stats = null;
+    }
+
+    private static Event eventObject(Object[] correlationData, Object[] 
metaData,
+                                     Object[] payLoadData, HashMap<String, 
String> map) {
+        Event event = new Event();
+        event.setCorrelationData(correlationData);
+        event.setMetaData(metaData);
+        event.setPayloadData(payLoadData);
+        event.setArbitraryDataMap(map);
+        return event;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java
deleted file mode 100644
index f53c5a5..0000000
--- 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.load.balancer.common.statistics.observers;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Observable;
-import java.util.Observer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.wso2.carbon.databridge.agent.thrift.Agent;
-import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
-import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
-import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
-import org.wso2.carbon.databridge.commons.Event;
-import org.wso2.carbon.utils.CarbonUtils;
-
-public class WSO2CEPStatsObserver implements Observer{
-
-       private static final Log log = 
LogFactory.getLog(WSO2CEPStatsObserver.class);
-       private static final String CALL_CENTER_DATA_STREAM = 
"stratos.lb.stats";
-       private static final String VERSION = "1.0.0";
-       private AsyncDataPublisher asyncDataPublisher;
-       
-       public WSO2CEPStatsObserver() {
-               AgentConfiguration agentConfiguration = new 
AgentConfiguration();
-        // TODO get following from somewhere, without hard-coding.
-        System.setProperty("javax.net.ssl.trustStore", 
CarbonUtils.getCarbonHome()+File.separator+"repository"+
-                           
File.separator+"resources"+File.separator+"security"+File.separator+"client-truststore.jks"
 );
-        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
-        
-        Agent agent = new Agent(agentConfiguration);
-        //TODO read following from a config file?
-        String ip = System.getProperty("thrift.receiver.ip");
-        String port = System.getProperty("thrift.receiver.port");
-        //Using Asynchronous data publisher
-        asyncDataPublisher = new AsyncDataPublisher("tcp://"+ip+":"+port+"", 
"admin", "admin", agent);
-        String streamDefinition = "{" +
-                                  " 'name':'" + CALL_CENTER_DATA_STREAM + "'," 
+
-                                  " 'version':'" + VERSION + "'," +
-                                  " 'nickName': 'lb stats'," +
-                                  " 'description': 'lb stats'," +
-                                  " 'metaData':[]," +
-                                  " 'payloadData':[" +
-                                  " {'name':'cluster_id','type':'STRING'}," +
-                                  " 
{'name':'in_flight_requests','type':'INT'}" +
-                                  " ]" +
-                                  "}";
-        asyncDataPublisher.addStreamDefinition(streamDefinition, 
CALL_CENTER_DATA_STREAM, VERSION);
-    }
-       
-       public void update(Observable arg0, Object arg1) {
-               if(arg1 != null && arg1 instanceof Map<?, ?>) {
-                       Map<String, Integer> stats = (Map<String, Integer>)arg1;
-                       publishEvents(stats);
-               }
-       }
-       
-    private void publishEvents(Map<String, Integer> stats) {
-       
-       for (Map.Entry<String, Integer> entry : stats.entrySet()) {
-               
-               Object[] payload = new Object[]{entry.getKey(), 
entry.getValue()};
-               Event event = eventObject(null, null, payload, new 
HashMap<String, String>());
-               try {
-                       asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, 
VERSION, event);
-               } catch (AgentException e) {
-                       log.error("Failed to publish events. ", e);
-               }
-               
-       }
-       stats = null;
-    }
-    
-    private static Event eventObject(Object[] correlationData, Object[] 
metaData,
-                                     Object[] payLoadData, HashMap<String, 
String> map) {
-        Event event = new Event();
-        event.setCorrelationData(correlationData);
-        event.setMetaData(metaData);
-        event.setPayloadData(payLoadData);
-        event.setArbitraryDataMap(map);
-        return event;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index c2dbe50..b1118d5 100644
--- 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -21,13 +21,20 @@ package org.apache.stratos.load.balancer.extension.api;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import 
org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatsPublisher;
+import 
org.apache.stratos.load.balancer.common.statistics.WSO2CEPStatsPublisher;
 import org.apache.stratos.load.balancer.common.topology.TopologyReceiver;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.event.topology.*;
 import 
org.apache.stratos.messaging.message.processor.topology.TopologyEventProcessorChain;
 import 
org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Load balancer extension thread for executing load balancer life-cycle 
according to the topology updates
  * received from the message broker.
@@ -36,18 +43,26 @@ public class LoadBalancerExtension implements Runnable {
     private static final Log log = 
LogFactory.getLog(LoadBalancerExtension.class);
 
     private LoadBalancer loadBalancer;
+    private LoadBalancerStatsReader statsReader;
 
-    public LoadBalancerExtension(LoadBalancer loadBalancer) {
+    public LoadBalancerExtension(LoadBalancer loadBalancer, 
LoadBalancerStatsReader statsReader) {
         this.loadBalancer = loadBalancer;
+        this.statsReader = statsReader;
     }
 
     @Override
     public void run() {
         try {
-            // Start topology receiver
+            // Start topology receiver thread
             TopologyReceiver topologyReceiver = new 
TopologyReceiver(createMessageDelegator());
-            Thread thread = new Thread(topologyReceiver);
-            thread.start();
+            Thread topologyReceiverThread = new Thread(topologyReceiver);
+            topologyReceiverThread.start();
+
+            // Start stats notifier thread
+            LoadBalancerStatsNotifier statsNotifier = new 
LoadBalancerStatsNotifier(statsReader);
+            Thread statsNotifierThread = new Thread(statsNotifier);
+            statsNotifierThread.start();
+
         } catch (Exception e) {
             if (log.isErrorEnabled()) {
                 log.error(e);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
new file mode 100644
index 0000000..532ded3
--- /dev/null
+++ 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.extension.api;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatsPublisher;
+import 
org.apache.stratos.load.balancer.common.statistics.WSO2CEPStatsPublisher;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ *
+ */
+public class LoadBalancerStatsNotifier implements Runnable {
+    private static final Log log = 
LogFactory.getLog(LoadBalancerStatsNotifier.class);
+
+    private LoadBalancerStatsReader statsReader;
+    private final LoadBalancerStatsPublisher statsPublisher;
+    private long statsPublisherInterval = 15000;
+
+    public LoadBalancerStatsNotifier(LoadBalancerStatsReader statsReader) {
+        this.statsReader = statsReader;
+        this.statsPublisher = new WSO2CEPStatsPublisher();
+
+        String interval = System.getProperty("stats.notifier.interval");
+        if (interval != null) {
+            statsPublisherInterval = Long.getLong(interval);
+        }
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            try {
+                try {
+                    Thread.sleep(statsPublisherInterval);
+                } catch (InterruptedException ignore) {
+                }
+                Map<String, Integer> stats = new HashMap<String, Integer>();
+                for (Service service : 
TopologyManager.getTopology().getServices()) {
+                    for (Cluster cluster : service.getClusters()) {
+                        stats.put(cluster.getClusterId(), 
statsReader.getInFlightRequestCount(cluster.getClusterId()));
+                    }
+                }
+                if (stats.size() > 0) {
+                    statsPublisher.publish(stats);
+                }
+            } catch (Exception e) {
+                if (log.isErrorEnabled()) {
+                    log.error("Could not publish load balancer stats", e);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
new file mode 100644
index 0000000..2c6f324
--- /dev/null
+++ 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.extension.api;
+
+import java.util.HashMap;
+
+/**
+ * Load balancer statistics reader interface.
+ */
+public interface LoadBalancerStatsReader {
+
+    /**
+     * Get in-flight request count of a given cluster.
+     * @param clusterId
+     */
+    int getInFlightRequestCount(String clusterId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
index 4a05335..e54fb60 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -24,7 +24,7 @@ import org.apache.axis2.description.TransportInDescription;
 import org.apache.http.protocol.HTTP;
 import org.apache.stratos.load.balancer.RequestDelegator;
 import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory;
-import 
org.apache.stratos.load.balancer.common.statistics.LoadBalancingStatsCollector;
+import org.apache.stratos.load.balancer.statistics.LoadBalancerStatsCollector;
 import org.apache.stratos.load.balancer.util.Constants;
 import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.Port;
@@ -410,7 +410,7 @@ public class TenantAwareLoadBalanceEndpoint extends 
org.apache.synapse.endpoints
         setupLoadBalancerContextProperties(synCtx);
 
         // Update health stats
-        
LoadBalancingStatsCollector.getInstance().incrementRequestInflightCount(currentMember.getDomain());
+        
LoadBalancerStatsCollector.getInstance().incrementRequestInflightCount(currentMember.getDomain());
         // Set the cluster id in the message context
         synCtx.setProperty(Constants.CLUSTER_ID, currentMember.getDomain());
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
index 9108be5..d75b460 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
@@ -18,7 +18,7 @@
  */
 package org.apache.stratos.load.balancer.mediators;
 
-import 
org.apache.stratos.load.balancer.common.statistics.LoadBalancingStatsCollector;
+import org.apache.stratos.load.balancer.statistics.LoadBalancerStatsCollector;
 import org.apache.stratos.load.balancer.util.Constants;
 import org.apache.synapse.ManagedLifecycle;
 import org.apache.synapse.MessageContext;
@@ -36,7 +36,7 @@ public class ResponseInterceptor extends AbstractMediator 
implements ManagedLife
             log.debug("Mediation started " + 
ResponseInterceptor.class.getName());
         }
         String clusterId = (String) synCtx.getProperty(Constants.CLUSTER_ID);
-        
LoadBalancingStatsCollector.getInstance().decrementRequestInflightCount(clusterId);
+        
LoadBalancerStatsCollector.getInstance().decrementRequestInflightCount(clusterId);
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java
new file mode 100644
index 0000000..895657f
--- /dev/null
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.load.balancer.statistics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Observable;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.stratos.load.balancer.statistics.observers.WSO2CEPStatsObserver;
+
+/**
+ * This is the load balancing stats collector and any observer can get 
registered here 
+ * and receive notifications periodically.
+ * This is a Singleton object.
+ * @author nirmal
+ *
+ */
+public class LoadBalancerStatsCollector extends Observable{
+    private static final Log log = 
LogFactory.getLog(LoadBalancerStatsCollector.class);
+
+       private static LoadBalancerStatsCollector collector;
+       private Map<String, Integer> clusterIdToRequestInflightCountMap;
+       private Thread notifier;
+       
+       private LoadBalancerStatsCollector() {
+               clusterIdToRequestInflightCountMap = new 
ConcurrentHashMap<String, Integer>();
+               if (notifier == null || (notifier != null && 
!notifier.isAlive())) {
+                       notifier = new Thread(new ObserverNotifier());
+                       notifier.start();
+               }
+    }
+       
+       public static LoadBalancerStatsCollector getInstance() {
+               if (collector == null) {
+                       synchronized (LoadBalancerStatsCollector.class) {
+                               if (collector == null) {
+                                       collector = new 
LoadBalancerStatsCollector();
+                                       // add observers
+                                       collector.addObserver(new 
WSO2CEPStatsObserver());
+                               }
+                       }
+               }
+               return collector;
+       }
+
+    public void setRequestInflightCount(String clusterId, int value) {
+        if(clusterId == null) {
+            return;
+        }
+
+        clusterIdToRequestInflightCountMap.put(clusterId, value);
+        setChanged();
+    }
+
+    public void incrementRequestInflightCount(String clusterId) {
+        incrementRequestInflightCount(clusterId, 1);
+    }
+       
+       public void incrementRequestInflightCount(String clusterId, int value) {
+               if(clusterId == null) {
+                       return;
+               }
+
+               if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
+                       value += 
clusterIdToRequestInflightCountMap.get(clusterId);
+               }
+               clusterIdToRequestInflightCountMap.put(clusterId, value);
+               setChanged();
+       }
+
+    public void decrementRequestInflightCount(String clusterId) {
+        decrementRequestInflightCount(clusterId , 1);
+    }
+       
+       public void decrementRequestInflightCount(String clusterId, int value) {
+               if(clusterId == null) {
+                       return;
+               }
+
+               if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
+                       value += 
clusterIdToRequestInflightCountMap.get(clusterId);
+               }
+               clusterIdToRequestInflightCountMap.put(clusterId, value);
+               setChanged();
+       }
+
+
+       /**
+        * This thread will notify all the observers of this subject.
+        * @author nirmal
+        *
+        */
+       private class ObserverNotifier implements Runnable {
+
+               @Override
+        public void run() {
+            if(log.isInfoEnabled()) {
+                log.info("Load balancing statistics notifier thread started");
+            }
+                       while(true) {
+                               try {
+                       Thread.sleep(15000);
+                } catch (InterruptedException ignore) {
+                }
+                               
LoadBalancerStatsCollector.getInstance().notifyObservers(new HashMap<String, 
Integer>(clusterIdToRequestInflightCountMap));
+                       }
+        }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java
new file mode 100644
index 0000000..b2e108a
--- /dev/null
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.load.balancer.statistics.observers;
+
+import java.util.Map;
+import java.util.Observable;
+import java.util.Observer;
+
+import 
org.apache.stratos.load.balancer.common.statistics.WSO2CEPStatsPublisher;
+
+public class WSO2CEPStatsObserver implements Observer{
+    private WSO2CEPStatsPublisher statsPublisher;
+
+    public WSO2CEPStatsObserver() {
+        this.statsPublisher = new WSO2CEPStatsPublisher();
+    }
+
+    public void update(Observable arg0, Object arg1) {
+        if(arg1 != null && arg1 instanceof Map<?, ?>) {
+            Map<String, Integer> stats = (Map<String, Integer>)arg1;
+            statsPublisher.publish(stats);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
new file mode 100644
index 0000000..d76cc0b
--- /dev/null
+++ 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.haproxy.extension;
+
+import org.apache.stratos.load.balancer.extension.api.LoadBalancerStatsReader;
+
+/**
+ * HAProxy statistics reader.
+ */
+public class HAProxyStatsReader implements LoadBalancerStatsReader {
+
+    @Override
+    public int getInFlightRequestCount(String clusterId) {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/24101ae1/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
----------------------------------------------------------------------
diff --git 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
index 979f59b..ef586f5 100644
--- 
a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
+++ 
b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
@@ -56,7 +56,8 @@ public class Main {
             }
 
             HAProxy haProxy = new HAProxy(executableFilePath, templatePath, 
templateName, confFilePath);
-            LoadBalancerExtension  extension = new 
LoadBalancerExtension(haProxy);
+            HAProxyStatsReader statsReader = new HAProxyStatsReader();
+            LoadBalancerExtension extension = new 
LoadBalancerExtension(haProxy, statsReader);
             Thread thread = new Thread(extension);
             thread.start();
         }

Reply via email to