Updated Branches:
  refs/heads/master 2fca73b39 -> 9ced6550c

Refactored cep statistics publisher and added load balancer faulty member 
statistics publisher


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/9ced6550
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/9ced6550
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/9ced6550

Branch: refs/heads/master
Commit: 9ced6550c30fd6de212400d4e1d231ba5d998e68
Parents: 2fca73b
Author: Imesh Gunaratne <[email protected]>
Authored: Thu Dec 5 16:56:21 2013 +0530
Committer: Imesh Gunaratne <[email protected]>
Committed: Thu Dec 5 16:56:21 2013 +0530

----------------------------------------------------------------------
 .../statistics/LoadBalancerStatsPublisher.java  |   6 +-
 .../WSO2CEPFaultyMemberPublisher.java           |  74 +++++++++++
 .../WSO2CEPInFlightRequestPublisher.java        |  74 +++++++++++
 .../statistics/WSO2CEPStatsPublisher.java       |  80 +++++-------
 .../extension/api/LoadBalancerExtension.java    |   4 +-
 ...oadBalancerInFlightRequestCountNotifier.java |  82 ++++++++++++
 .../api/LoadBalancerStatsNotifier.java          |  90 -------------
 .../TenantAwareLoadBalanceEndpoint.java         |   4 +-
 .../balancer/mediators/ResponseInterceptor.java |   4 +-
 ...adBalancerInFlightRequestCountCollector.java | 128 +++++++++++++++++++
 .../statistics/LoadBalancerStatsCollector.java  | 128 -------------------
 .../WSO2CEPInFlightRequestCountObserver.java    |  55 ++++++++
 .../observers/WSO2CEPStatsObserver.java         |  48 -------
 13 files changed, 454 insertions(+), 323 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java
index 46f9b35..298087a 100644
--- 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java
@@ -38,8 +38,8 @@ public interface LoadBalancerStatsPublisher {
     boolean isEnabled();
 
     /**
-     * Publish statistics as a map of Cluster Id, In-flight Request Count.
-     * @param stats
+     * Payload to be published.
+     * @param payload An array of parameter values.
      */
-    void publish(Map<String, Integer> stats);
+    void publish(Object[] payload);
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java
new file mode 100644
index 0000000..fc6b65a
--- /dev/null
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.statistics;
+
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * WSO2 CEP faulty member publisher.
+ *
+ * Faulty members:
+ * If a request was rejected by some of the members in a cluster while at least
+ * one member could serve it, those members could be identified as faulty.
+ */
+public class WSO2CEPFaultyMemberPublisher extends WSO2CEPStatsPublisher {
+
+    private static final String DATA_STREAM_NAME = "stratos.lb.faulty.members";
+    private static final String VERSION = "1.0.0";
+
+    private static StreamDefinition createStreamDefinition() {
+        try {
+            StreamDefinition streamDefinition = new 
StreamDefinition(DATA_STREAM_NAME, VERSION);
+            streamDefinition.setNickName("lb fault members");
+            streamDefinition.setDescription("lb fault members");
+            List<Attribute> payloadData = new ArrayList<Attribute>();
+            // Payload definition
+            payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
+            payloadData.add(new Attribute("member_id", AttributeType.STRING));
+            streamDefinition.setPayloadData(payloadData);
+            return streamDefinition;
+        }
+        catch (Exception e) {
+            throw new RuntimeException("Could not create stream definition", 
e);
+        }
+    }
+
+    public WSO2CEPFaultyMemberPublisher() {
+        super(createStreamDefinition());
+    }
+
+    /**
+     * Publish faulty members.
+     * @param clusterId
+     * @param memberId
+     */
+    public void publish(String clusterId, String memberId) {
+        List<Object> payload = new ArrayList<Object>();
+        // Payload values
+        payload.add(clusterId);
+        payload.add(memberId);
+        super.publish(payload.toArray());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
new file mode 100644
index 0000000..f10907e
--- /dev/null
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.statistics;
+
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * WSO2 CEP in flight request count publisher.
+ *
+ * In-flight request count:
+ * Number of requests being served at a given moment could be identified as
+ * in-flight request count.
+ */
+public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatsPublisher {
+
+    private static final String DATA_STREAM_NAME = "stratos.lb.stats";
+    private static final String VERSION = "1.0.0";
+
+    private static StreamDefinition createStreamDefinition() {
+        try {
+            StreamDefinition streamDefinition = new 
StreamDefinition(DATA_STREAM_NAME, VERSION);
+            streamDefinition.setNickName("lb stats");
+            streamDefinition.setDescription("lb stats");
+            List<Attribute> payloadData = new ArrayList<Attribute>();
+            // Payload definition
+            payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
+            payloadData.add(new Attribute("in_flight_requests", 
AttributeType.INT));
+            streamDefinition.setPayloadData(payloadData);
+            return streamDefinition;
+        }
+        catch (Exception e) {
+            throw new RuntimeException("Could not create stream definition", 
e);
+        }
+    }
+
+    public WSO2CEPInFlightRequestPublisher() {
+        super(createStreamDefinition());
+    }
+
+    /**
+     * Publish in-flight request count of a cluster.
+     * @param clusterId
+     * @param inFlightRequestCount
+     */
+    public void publish(String clusterId, int inFlightRequestCount) {
+        List<Object> payload = new ArrayList<Object>();
+        // Payload values
+        payload.add(clusterId);
+        payload.add(inFlightRequestCount);
+        super.publish(payload.toArray());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java
 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java
index cb24a2a..6d013ed 100644
--- 
a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java
+++ 
b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java
@@ -27,10 +27,10 @@ import 
org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
 import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
 import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
 import org.wso2.carbon.databridge.commons.Event;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
 import org.wso2.carbon.utils.CarbonUtils;
 
 import java.io.File;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -40,20 +40,26 @@ import java.util.Map;
 public class WSO2CEPStatsPublisher implements LoadBalancerStatsPublisher {
     private static final Log log = 
LogFactory.getLog(WSO2CEPStatsPublisher.class);
 
-    private static final String CALL_CENTER_DATA_STREAM = "stratos.lb.stats";
-    private static final String VERSION = "1.0.0";
+    private StreamDefinition streamDefinition;
     private AsyncDataPublisher asyncDataPublisher;
     private String ip;
     private String port;
+    private String username;
+    private String password;
     private boolean enabled = false;
 
-    public WSO2CEPStatsPublisher() {
-        ip = System.getProperty("thrift.receiver.ip");
-        port = System.getProperty("thrift.receiver.port");
-        enabled = 
Boolean.getBoolean("load.balancer.cep.stats.publisher.enabled");
-
-        if(enabled) {
-            init();
+    public WSO2CEPStatsPublisher(StreamDefinition streamDefinition) {
+        this.streamDefinition = streamDefinition;
+        this.ip = System.getProperty("thrift.receiver.ip");
+        this.port = System.getProperty("thrift.receiver.port");
+        this.username = "admin";
+        this.password = "admin";
+        String enabledStr = 
System.getProperty("load.balancer.cep.stats.publisher.enabled");
+        if (StringUtils.isNotBlank(enabledStr)) {
+            enabled = Boolean.getBoolean(enabledStr);
+            if (enabled) {
+                init();
+            }
         }
     }
 
@@ -62,25 +68,14 @@ public class WSO2CEPStatsPublisher implements 
LoadBalancerStatsPublisher {
         Agent agent = new Agent(agentConfiguration);
 
         // Initialize asynchronous data publisher
-        asyncDataPublisher = new AsyncDataPublisher("tcp://"+ip+":"+port+"", 
"admin", "admin", agent);
-        String streamDefinition = "{" +
-                " 'name':'" + CALL_CENTER_DATA_STREAM + "'," +
-                " 'version':'" + VERSION + "'," +
-                " 'nickName': 'lb stats'," +
-                " 'description': 'lb stats'," +
-                " 'metaData':[]," +
-                " 'payloadData':[" +
-                " {'name':'cluster_id','type':'STRING'}," +
-                " {'name':'in_flight_requests','type':'INT'}" +
-                " ]" +
-                "}";
-        asyncDataPublisher.addStreamDefinition(streamDefinition, 
CALL_CENTER_DATA_STREAM, VERSION);
+        asyncDataPublisher = new AsyncDataPublisher("tcp://" + ip + ":" + port 
+ "", username, password, agent);
+        asyncDataPublisher.addStreamDefinition(streamDefinition);
     }
 
     @Override
     public void setEnabled(boolean enabled) {
         this.enabled = enabled;
-        if(this.enabled) {
+        if (this.enabled) {
             init();
         }
     }
@@ -91,35 +86,24 @@ public class WSO2CEPStatsPublisher implements 
LoadBalancerStatsPublisher {
     }
 
     @Override
-    public void publish(Map<String, Integer> stats) {
-        if(!isEnabled()) {
+    public void publish(Object[] payload) {
+        if (!isEnabled()) {
             throw new RuntimeException("Statistics publisher is not enabled");
         }
 
-        for (Map.Entry<String, Integer> entry : stats.entrySet()) {
+        Event event = new Event();
+        event.setPayloadData(payload);
+        event.setArbitraryDataMap(new HashMap<String, String>());
 
-            Object[] payload = new Object[]{entry.getKey(), entry.getValue()};
-            Event event = eventObject(null, null, payload, new HashMap<String, 
String>());
-            try {
-                if(log.isInfoEnabled()) {
-                    log.info(String.format("Publishing statistics event: 
[stream] %s [version] %s [payload] %s", CALL_CENTER_DATA_STREAM, VERSION, 
Arrays.toString(payload)));
-                }
-                asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, 
event);
-            } catch (AgentException e) {
-                log.error("Failed to publish events. ", e);
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("Publishing cep event: [stream] %s 
[version] %s", streamDefinition.getName(), streamDefinition.getVersion()));
+            }
+            asyncDataPublisher.publish(streamDefinition.getName(), 
streamDefinition.getVersion(), event);
+        } catch (AgentException e) {
+            if (log.isErrorEnabled()) {
+                log.error(String.format("Could not publish cep event: [stream] 
%s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), 
e);
             }
-
         }
-        stats = null;
-    }
-
-    private static Event eventObject(Object[] correlationData, Object[] 
metaData,
-                                     Object[] payLoadData, HashMap<String, 
String> map) {
-        Event event = new Event();
-        event.setCorrelationData(correlationData);
-        event.setMetaData(metaData);
-        event.setPayloadData(payLoadData);
-        event.setArbitraryDataMap(map);
-        return event;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index 95dc039..f9b607a 100644
--- 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -39,7 +39,7 @@ public class LoadBalancerExtension implements Runnable {
     private LoadBalancerStatsReader statsReader;
     private boolean loadBalancerStarted;
     private TopologyReceiver topologyReceiver;
-    private LoadBalancerStatsNotifier statsNotifier;
+    private LoadBalancerInFlightRequestCountNotifier statsNotifier;
     private boolean terminated;
 
     public LoadBalancerExtension(LoadBalancer loadBalancer, 
LoadBalancerStatsReader statsReader) {
@@ -60,7 +60,7 @@ public class LoadBalancerExtension implements Runnable {
             topologyReceiverThread.start();
 
             // Start stats notifier thread
-            statsNotifier = new LoadBalancerStatsNotifier(statsReader);
+            statsNotifier = new 
LoadBalancerInFlightRequestCountNotifier(statsReader);
             Thread statsNotifierThread = new Thread(statsNotifier);
             statsNotifierThread.start();
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
new file mode 100644
index 0000000..57b969f
--- /dev/null
+++ 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.extension.api;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.stratos.load.balancer.common.statistics.WSO2CEPInFlightRequestPublisher;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/**
+ * Load balancer statistics notifier thread for publishing statistics 
periodically to CEP.
+ */
+public class LoadBalancerInFlightRequestCountNotifier implements Runnable {
+    private static final Log log = 
LogFactory.getLog(LoadBalancerInFlightRequestCountNotifier.class);
+
+    private LoadBalancerStatsReader statsReader;
+    private final WSO2CEPInFlightRequestPublisher statsPublisher;
+    private long statsPublisherInterval = 15000;
+    private boolean terminated;
+
+    public LoadBalancerInFlightRequestCountNotifier(LoadBalancerStatsReader 
statsReader) {
+        this.statsReader = statsReader;
+        this.statsPublisher = new WSO2CEPInFlightRequestPublisher();
+
+        String interval = System.getProperty("stats.notifier.interval");
+        if (interval != null) {
+            statsPublisherInterval = Long.getLong(interval);
+        }
+    }
+
+    @Override
+    public void run() {
+        while (!terminated) {
+            try {
+                try {
+                    Thread.sleep(statsPublisherInterval);
+                } catch (InterruptedException ignore) {
+                }
+
+                if (statsPublisher.isEnabled()) {
+                    for (Service service : 
TopologyManager.getTopology().getServices()) {
+                        for (Cluster cluster : service.getClusters()) {
+                            statsPublisher.publish(cluster.getClusterId(), 
statsReader.getInFlightRequestCount(cluster.getClusterId()));
+                        }
+                    }
+                } else if (log.isWarnEnabled()) {
+                    log.warn("CEP statistics publisher is disabled");
+                }
+            } catch (Exception e) {
+                if (log.isErrorEnabled()) {
+                    log.error("Could not publish in-flight request count", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Terminate load balancer statistics notifier thread.
+     */
+    public void terminate() {
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
 
b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
deleted file mode 100644
index 47a545e..0000000
--- 
a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsNotifier.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.load.balancer.extension.api;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import 
org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatsPublisher;
-import 
org.apache.stratos.load.balancer.common.statistics.WSO2CEPStatsPublisher;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Load balancer statistics notifier thread for publishing statistics 
periodically to CEP.
- */
-public class LoadBalancerStatsNotifier implements Runnable {
-    private static final Log log = 
LogFactory.getLog(LoadBalancerStatsNotifier.class);
-
-    private LoadBalancerStatsReader statsReader;
-    private final LoadBalancerStatsPublisher statsPublisher;
-    private long statsPublisherInterval = 15000;
-    private boolean terminated;
-
-    public LoadBalancerStatsNotifier(LoadBalancerStatsReader statsReader) {
-        this.statsReader = statsReader;
-        this.statsPublisher = new WSO2CEPStatsPublisher();
-
-        String interval = System.getProperty("stats.notifier.interval");
-        if (interval != null) {
-            statsPublisherInterval = Long.getLong(interval);
-        }
-    }
-
-    @Override
-    public void run() {
-        while (!terminated) {
-            try {
-                try {
-                    Thread.sleep(statsPublisherInterval);
-                } catch (InterruptedException ignore) {
-                }
-                Map<String, Integer> stats = new HashMap<String, Integer>();
-                for (Service service : 
TopologyManager.getTopology().getServices()) {
-                    for (Cluster cluster : service.getClusters()) {
-                        stats.put(cluster.getClusterId(), 
statsReader.getInFlightRequestCount(cluster.getClusterId()));
-                    }
-                }
-                if (stats.size() > 0) {
-                    if(statsPublisher.isEnabled()) {
-                        statsPublisher.publish(stats);
-                    }
-                    else if (log.isWarnEnabled()) {
-                        log.warn("Load balancer statistics publisher is 
disabled");
-                    }
-                }
-            } catch (Exception e) {
-                if (log.isErrorEnabled()) {
-                    log.error("Could not publish load balancer stats", e);
-                }
-            }
-        }
-    }
-
-    /**
-     * Terminate load balancer statistics notifier thread.
-     */
-    public void terminate() {
-        terminated = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
index f516a52..7b8c816 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -24,7 +24,7 @@ import org.apache.axis2.description.TransportInDescription;
 import org.apache.http.protocol.HTTP;
 import org.apache.stratos.load.balancer.RequestDelegator;
 import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory;
-import org.apache.stratos.load.balancer.statistics.LoadBalancerStatsCollector;
+import 
org.apache.stratos.load.balancer.statistics.LoadBalancerInFlightRequestCountCollector;
 import org.apache.stratos.load.balancer.util.Constants;
 import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.Port;
@@ -411,7 +411,7 @@ public class TenantAwareLoadBalanceEndpoint extends 
org.apache.synapse.endpoints
         setupLoadBalancerContextProperties(synCtx);
 
         // Update health stats
-        
LoadBalancerStatsCollector.getInstance().incrementRequestInflightCount(currentMember.getDomain());
+        
LoadBalancerInFlightRequestCountCollector.getInstance().incrementRequestInflightCount(currentMember.getDomain());
         // Set the cluster id in the message context
         synCtx.setProperty(Constants.CLUSTER_ID, currentMember.getDomain());
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
index d75b460..a21a83b 100644
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
@@ -18,7 +18,7 @@
  */
 package org.apache.stratos.load.balancer.mediators;
 
-import org.apache.stratos.load.balancer.statistics.LoadBalancerStatsCollector;
+import 
org.apache.stratos.load.balancer.statistics.LoadBalancerInFlightRequestCountCollector;
 import org.apache.stratos.load.balancer.util.Constants;
 import org.apache.synapse.ManagedLifecycle;
 import org.apache.synapse.MessageContext;
@@ -36,7 +36,7 @@ public class ResponseInterceptor extends AbstractMediator 
implements ManagedLife
             log.debug("Mediation started " + 
ResponseInterceptor.class.getName());
         }
         String clusterId = (String) synCtx.getProperty(Constants.CLUSTER_ID);
-        
LoadBalancerStatsCollector.getInstance().decrementRequestInflightCount(clusterId);
+        
LoadBalancerInFlightRequestCountCollector.getInstance().decrementRequestInflightCount(clusterId);
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
new file mode 100644
index 0000000..0ac8e0c
--- /dev/null
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.load.balancer.statistics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Observable;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.stratos.load.balancer.statistics.observers.WSO2CEPInFlightRequestCountObserver;
+
+/**
+ * This is the load balancing stats collector and any observer can get 
registered here 
+ * and receive notifications periodically.
+ * This is a Singleton object.
+ * @author nirmal
+ *
+ */
+public class LoadBalancerInFlightRequestCountCollector extends Observable{
+    private static final Log log = 
LogFactory.getLog(LoadBalancerInFlightRequestCountCollector.class);
+
+       private static LoadBalancerInFlightRequestCountCollector collector;
+       private Map<String, Integer> clusterIdToRequestInflightCountMap;
+       private Thread notifier;
+       
+       private LoadBalancerInFlightRequestCountCollector() {
+               clusterIdToRequestInflightCountMap = new 
ConcurrentHashMap<String, Integer>();
+               if (notifier == null || (notifier != null && 
!notifier.isAlive())) {
+                       notifier = new Thread(new ObserverNotifier());
+                       notifier.start();
+               }
+    }
+       
+       public static LoadBalancerInFlightRequestCountCollector getInstance() {
+               if (collector == null) {
+                       synchronized 
(LoadBalancerInFlightRequestCountCollector.class) {
+                               if (collector == null) {
+                                       collector = new 
LoadBalancerInFlightRequestCountCollector();
+                                       // add observers
+                                       collector.addObserver(new 
WSO2CEPInFlightRequestCountObserver());
+                               }
+                       }
+               }
+               return collector;
+       }
+
+    public void setRequestInflightCount(String clusterId, int value) {
+        if(clusterId == null) {
+            return;
+        }
+
+        clusterIdToRequestInflightCountMap.put(clusterId, value);
+        setChanged();
+    }
+
+    public void incrementRequestInflightCount(String clusterId) {
+        incrementRequestInflightCount(clusterId, 1);
+    }
+       
+       public void incrementRequestInflightCount(String clusterId, int value) {
+               if(clusterId == null) {
+                       return;
+               }
+
+               if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
+                       value += 
clusterIdToRequestInflightCountMap.get(clusterId);
+               }
+               clusterIdToRequestInflightCountMap.put(clusterId, value);
+               setChanged();
+       }
+
+    public void decrementRequestInflightCount(String clusterId) {
+        decrementRequestInflightCount(clusterId , 1);
+    }
+       
+       public void decrementRequestInflightCount(String clusterId, int value) {
+               if(clusterId == null) {
+                       return;
+               }
+
+               if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
+                       value += 
clusterIdToRequestInflightCountMap.get(clusterId);
+               }
+               clusterIdToRequestInflightCountMap.put(clusterId, value);
+               setChanged();
+       }
+
+
+       /**
+        * This thread will notify all the observers of this subject.
+        * @author nirmal
+        *
+        */
+       private class ObserverNotifier implements Runnable {
+
+               @Override
+        public void run() {
+            if(log.isInfoEnabled()) {
+                log.info("Load balancing statistics notifier thread started");
+            }
+                       while(true) {
+                               try {
+                       Thread.sleep(15000);
+                } catch (InterruptedException ignore) {
+                }
+                               
LoadBalancerInFlightRequestCountCollector.getInstance().notifyObservers(new 
HashMap<String, Integer>(clusterIdToRequestInflightCountMap));
+                       }
+        }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java
deleted file mode 100644
index 895657f..0000000
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatsCollector.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.load.balancer.statistics;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Observable;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import 
org.apache.stratos.load.balancer.statistics.observers.WSO2CEPStatsObserver;
-
-/**
- * This is the load balancing stats collector and any observer can get 
registered here 
- * and receive notifications periodically.
- * This is a Singleton object.
- * @author nirmal
- *
- */
-public class LoadBalancerStatsCollector extends Observable{
-    private static final Log log = 
LogFactory.getLog(LoadBalancerStatsCollector.class);
-
-       private static LoadBalancerStatsCollector collector;
-       private Map<String, Integer> clusterIdToRequestInflightCountMap;
-       private Thread notifier;
-       
-       private LoadBalancerStatsCollector() {
-               clusterIdToRequestInflightCountMap = new 
ConcurrentHashMap<String, Integer>();
-               if (notifier == null || (notifier != null && 
!notifier.isAlive())) {
-                       notifier = new Thread(new ObserverNotifier());
-                       notifier.start();
-               }
-    }
-       
-       public static LoadBalancerStatsCollector getInstance() {
-               if (collector == null) {
-                       synchronized (LoadBalancerStatsCollector.class) {
-                               if (collector == null) {
-                                       collector = new 
LoadBalancerStatsCollector();
-                                       // add observers
-                                       collector.addObserver(new 
WSO2CEPStatsObserver());
-                               }
-                       }
-               }
-               return collector;
-       }
-
-    public void setRequestInflightCount(String clusterId, int value) {
-        if(clusterId == null) {
-            return;
-        }
-
-        clusterIdToRequestInflightCountMap.put(clusterId, value);
-        setChanged();
-    }
-
-    public void incrementRequestInflightCount(String clusterId) {
-        incrementRequestInflightCount(clusterId, 1);
-    }
-       
-       public void incrementRequestInflightCount(String clusterId, int value) {
-               if(clusterId == null) {
-                       return;
-               }
-
-               if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
-                       value += 
clusterIdToRequestInflightCountMap.get(clusterId);
-               }
-               clusterIdToRequestInflightCountMap.put(clusterId, value);
-               setChanged();
-       }
-
-    public void decrementRequestInflightCount(String clusterId) {
-        decrementRequestInflightCount(clusterId , 1);
-    }
-       
-       public void decrementRequestInflightCount(String clusterId, int value) {
-               if(clusterId == null) {
-                       return;
-               }
-
-               if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
-                       value += 
clusterIdToRequestInflightCountMap.get(clusterId);
-               }
-               clusterIdToRequestInflightCountMap.put(clusterId, value);
-               setChanged();
-       }
-
-
-       /**
-        * This thread will notify all the observers of this subject.
-        * @author nirmal
-        *
-        */
-       private class ObserverNotifier implements Runnable {
-
-               @Override
-        public void run() {
-            if(log.isInfoEnabled()) {
-                log.info("Load balancing statistics notifier thread started");
-            }
-                       while(true) {
-                               try {
-                       Thread.sleep(15000);
-                } catch (InterruptedException ignore) {
-                }
-                               
LoadBalancerStatsCollector.getInstance().notifyObservers(new HashMap<String, 
Integer>(clusterIdToRequestInflightCountMap));
-                       }
-        }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
new file mode 100644
index 0000000..9a77778
--- /dev/null
+++ 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.load.balancer.statistics.observers;
+
+import java.util.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.stratos.load.balancer.common.statistics.WSO2CEPInFlightRequestPublisher;
+
+public class WSO2CEPInFlightRequestCountObserver implements Observer {
+    private static final Log log = 
LogFactory.getLog(WSO2CEPInFlightRequestCountObserver.class);
+    private WSO2CEPInFlightRequestPublisher publisher;
+
+    public WSO2CEPInFlightRequestCountObserver() {
+        this.publisher = new WSO2CEPInFlightRequestPublisher();
+    }
+
+    public void update(Observable observable, Object object) {
+        if (object != null && object instanceof Map<?, ?>) {
+            try {
+                if (publisher.isEnabled()) {
+                    Map<String, Integer> stats = (Map<String, Integer>) object;
+                    // Publish event per cluster id
+                    for (String clusterId : stats.keySet()) {
+                        // Publish event
+                        publisher.publish(clusterId, stats.get(clusterId));
+                    }
+                } else if (log.isWarnEnabled()) {
+                    log.warn("CEP statistics publisher is disabled");
+                }
+            } catch (Exception e) {
+                if (log.isErrorEnabled()) {
+                    log.error("Could not publish in-flight request count", e);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/9ced6550/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java
 
b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java
deleted file mode 100644
index ebc793b..0000000
--- 
a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPStatsObserver.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.load.balancer.statistics.observers;
-
-import java.util.Map;
-import java.util.Observable;
-import java.util.Observer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import 
org.apache.stratos.load.balancer.common.statistics.WSO2CEPStatsPublisher;
-
-public class WSO2CEPStatsObserver implements Observer{
-    private static final Log log = 
LogFactory.getLog(WSO2CEPStatsObserver.class);
-    private WSO2CEPStatsPublisher statsPublisher;
-
-    public WSO2CEPStatsObserver() {
-        this.statsPublisher = new WSO2CEPStatsPublisher();
-    }
-
-    public void update(Observable arg0, Object arg1) {
-        if(arg1 != null && arg1 instanceof Map<?, ?>) {
-            Map<String, Integer> stats = (Map<String, Integer>)arg1;
-            if(statsPublisher.isEnabled()) {
-                statsPublisher.publish(stats);
-            }
-            else if (log.isWarnEnabled()) {
-                log.warn("Load balancer statistics publisher is disabled");
-            }
-        }
-    }
-}

Reply via email to