Repository: stratos
Updated Branches:
  refs/heads/master 7c52406fb -> 92eb63b52


PCA - Enable health stats reader plugins, refactor health stats publisher module


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/81d5d34c
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/81d5d34c
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/81d5d34c

Branch: refs/heads/master
Commit: 81d5d34ce1920b2dcca8e6e4ab592233f9a792d0
Parents: 7c52406
Author: Chamila de Alwis <[email protected]>
Authored: Fri May 1 00:33:19 2015 +0530
Committer: Chamila de Alwis <[email protected]>
Committed: Thu Jul 30 00:16:58 2015 -0400

----------------------------------------------------------------------
 .../cartridge.agent/cartridge.agent/agent.py    |   2 +-
 .../cartridge.agent/exception.py                |  11 +-
 .../cartridge.agent/healthstats.py              | 246 +++++++++++++++++
 .../modules/event/eventhandler.py               |  34 ++-
 .../modules/healthstatspublisher/__init__.py    |  16 --
 .../abstracthealthstatisticspublisher.py        |  63 -----
 .../modules/healthstatspublisher/healthstats.py | 261 -------------------
 .../publisher/cartridgeagentpublisher.py        |  37 ++-
 .../cartridge.agent/plugins/contracts.py        |  15 ++
 9 files changed, 325 insertions(+), 360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
index 3051775..806f6c9 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
@@ -99,7 +99,7 @@ class CartridgeAgent(threading.Thread):
         if repo_url is None or str(repo_url).strip() == "":
             self.__log.info("No artifact repository found")
             self.__event_handler.on_instance_activated_event()
-            cartridgeagentpublisher.publish_instance_activated_event()
+            
cartridgeagentpublisher.publish_instance_activated_event(self.__event_handler.health_stat_plugin)
         else:
             self.__log.info(
                 "Artifact repository found, waiting for artifact updated event 
to checkout artifacts: [repo_url] %s",

http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py
index 5262d2a..1586c4d 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py
@@ -77,4 +77,13 @@ class 
ThriftReceiverOfflineException(CartridgeAgentException):
     """
 
     def __init__(self, message):
-        super(ThriftReceiverOfflineException, self).__init__(message)
\ No newline at end of file
+        super(ThriftReceiverOfflineException, self).__init__(message)
+
+
+class CEPPublisherException(CartridgeAgentException):
+    """
+    Exception to be used during CEP publishing operations
+    """
+
+    def __init__(self, message):
+        super(CEPPublisherException, self).__init__(message)

http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
new file mode 100644
index 0000000..815ec01
--- /dev/null
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
@@ -0,0 +1,246 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from threading import Thread
+import multiprocessing
+
+import psutil
+
+from modules.databridge.agent import *
+from config import CartridgeAgentConfiguration
+from modules.util import cartridgeagentutils
+from exception import ThriftReceiverOfflineException, CEPPublisherException
+import constants
+
+
+class HealthStatisticsPublisherManager(Thread):
+    """
+    Read from provided health stat reader plugin or the default health stat 
reader, the value for memory usage and
+    load average and publishes them as ThriftEvents to a CEP server
+    """
+    STREAM_NAME = "cartridge_agent_health_stats"
+    STREAM_VERSION = "1.0.0"
+    STREAM_NICKNAME = "agent health stats"
+    STREAM_DESCRIPTION = "agent health stats"
+
+    def __init__(self, publish_interval, health_stat_plugin):
+        """
+        Initializes a new HealthStatisticsPublisherManager with a given number 
of seconds as the interval
+        :param int publish_interval: Number of seconds as the interval
+        :return: void
+        """
+        Thread.__init__(self)
+
+        self.log = LogFactory().get_log(__name__)
+
+        self.publish_interval = publish_interval
+        """:type : int"""
+
+        self.terminated = False
+
+        self.publisher = HealthStatisticsPublisher()
+        """:type : HealthStatisticsPublisher"""
+        # If there are no health stat reader plugins, create the default 
reader instance
+        self.stats_reader = health_stat_plugin if health_stat_plugin is not 
None else DefaultHealthStatisticsReader()
+
+    def run(self):
+        while not self.terminated:
+            time.sleep(self.publish_interval)
+
+            try:
+                ca_health_stat = CartridgeHealthStatistics()
+                cartridge_stats = 
self.stats_reader.stat_cartridge_health(ca_health_stat)
+                self.log.debug("Publishing memory consumption: %r" % 
cartridge_stats.memory_usage)
+                
self.publisher.publish_memory_usage(cartridge_stats.memory_usage)
+
+                self.log.debug("Publishing load average: %r" % 
cartridge_stats.load_avg)
+                self.publisher.publish_load_average(cartridge_stats.load_avg)
+            except ThriftReceiverOfflineException:
+                self.log.error("Couldn't publish health statistics to CEP. 
Thrift Receiver offline. Reconnecting...")
+                self.publisher = HealthStatisticsPublisher()
+
+        self.publisher.publisher.disconnect()
+
+
+class HealthStatisticsPublisher:
+    """
+    Publishes memory usage and load average to thrift server
+    """
+    log = LogFactory().get_log(__name__)
+
+    def read_config(self, conf_key):
+        """
+        Read a given key from the cartridge agent configuration
+        :param conf_key: The key to look for in the CA config
+        :return: The value for the key from the CA config
+        :raise: RuntimeError if the given key is not found in the CA config
+        """
+        if self.cartridge_agent_config is None:
+            self.cartridge_agent_config = CartridgeAgentConfiguration()
+
+        conf_value = self.cartridge_agent_config.read_property(conf_key, False)
+
+        if conf_value is None or conf_value.strip() == "":
+            raise RuntimeError("System property not found: " + conf_key)
+
+        return conf_value
+
+    def __init__(self):
+        self.cartridge_agent_config = CartridgeAgentConfiguration()
+
+        self.ports = []
+        cep_port = self.read_config(constants.CEP_RECEIVER_PORT)
+        self.ports.append(cep_port)
+
+        cep_ip = self.read_config(constants.CEP_RECEIVER_IP)
+
+        cartridgeagentutils.wait_until_ports_active(
+            cep_ip,
+            self.ports,
+            
int(self.cartridge_agent_config.read_property("port.check.timeout", 
critical=False)))
+
+        cep_active = cartridgeagentutils.check_ports_active(
+            cep_ip,
+            self.ports)
+
+        if not cep_active:
+            raise CEPPublisherException("CEP server not active. Health 
statistics publishing aborted.")
+
+        cep_admin_username = 
self.read_config(constants.CEP_SERVER_ADMIN_USERNAME)
+        cep_admin_password = 
self.read_config(constants.CEP_SERVER_ADMIN_PASSWORD)
+
+        self.stream_definition = 
HealthStatisticsPublisher.create_stream_definition()
+        HealthStatisticsPublisher.log.debug("Stream definition created: %r" % 
str(self.stream_definition))
+
+        self.publisher = ThriftPublisher(
+            cep_ip,
+            cep_port,
+            cep_admin_username,
+            cep_admin_password,
+            self.stream_definition)
+
+        HealthStatisticsPublisher.log.debug("HealthStatisticsPublisher 
initialized")
+
+    @staticmethod
+    def create_stream_definition():
+        """
+        Create a StreamDefinition for publishing to CEP
+        """
+        stream_def = StreamDefinition()
+        stream_def.name = HealthStatisticsPublisherManager.STREAM_NAME
+        stream_def.version = HealthStatisticsPublisherManager.STREAM_VERSION
+        stream_def.nickname = HealthStatisticsPublisherManager.STREAM_NICKNAME
+        stream_def.description = 
HealthStatisticsPublisherManager.STREAM_DESCRIPTION
+
+        # stream_def.add_payloaddata_attribute()
+        stream_def.add_payloaddata_attribute("cluster_id", 
StreamDefinition.STRING)
+        stream_def.add_payloaddata_attribute("cluster_instance_id", 
StreamDefinition.STRING)
+        stream_def.add_payloaddata_attribute("network_partition_id", 
StreamDefinition.STRING)
+        stream_def.add_payloaddata_attribute("member_id", 
StreamDefinition.STRING)
+        stream_def.add_payloaddata_attribute("partition_id", 
StreamDefinition.STRING)
+        stream_def.add_payloaddata_attribute("health_description", 
StreamDefinition.STRING)
+        stream_def.add_payloaddata_attribute("value", StreamDefinition.DOUBLE)
+
+        return stream_def
+
+    def publish_memory_usage(self, memory_usage):
+        """
+        Publishes the given memory usage value to the thrift server as a 
ThriftEvent
+        :param float memory_usage: memory usage
+        """
+
+        event = ThriftEvent()
+        event.payloadData.append(self.cartridge_agent_config.cluster_id)
+        
event.payloadData.append(self.cartridge_agent_config.cluster_instance_id)
+        
event.payloadData.append(self.cartridge_agent_config.network_partition_id)
+        event.payloadData.append(self.cartridge_agent_config.member_id)
+        event.payloadData.append(self.cartridge_agent_config.partition_id)
+        event.payloadData.append(constants.MEMORY_CONSUMPTION)
+        event.payloadData.append(float(memory_usage))
+        # event.payloadData.append(str(memory_usage))
+
+        HealthStatisticsPublisher.log.debug("Publishing cep event: [stream] %r 
[payload_data] %r [version] %r"
+                                            % (
+                                                self.stream_definition.name,
+                                                event.payloadData,
+                                                
self.stream_definition.version))
+
+        self.publisher.publish(event)
+
+    def publish_load_average(self, load_avg):
+        """
+        Publishes the given load average value to the thrift server as a 
ThriftEvent
+        :param float load_avg: load average value
+        """
+
+        event = ThriftEvent()
+        event.payloadData.append(self.cartridge_agent_config.cluster_id)
+        
event.payloadData.append(self.cartridge_agent_config.cluster_instance_id)
+        
event.payloadData.append(self.cartridge_agent_config.network_partition_id)
+        event.payloadData.append(self.cartridge_agent_config.member_id)
+        event.payloadData.append(self.cartridge_agent_config.partition_id)
+        event.payloadData.append(constants.LOAD_AVERAGE)
+        event.payloadData.append(float(load_avg))
+        # event.payloadData.append(str(load_avg))
+
+        HealthStatisticsPublisher.log.debug("Publishing cep event: [stream] %r 
[payload_data] %r [version] %r"
+                                            % (
+                                                self.stream_definition.name,
+                                                event.payloadData,
+                                                
self.stream_definition.version))
+
+        self.publisher.publish(event)
+
+
+class DefaultHealthStatisticsReader:
+    """
+    Default implementation for the health statistics reader. If no Health 
Statistics Reader plugins are provided,
+    this will be used to read health stats from the instance.
+    """
+
+    def __init__(self):
+        self.log = LogFactory().get_log(__name__)
+
+    def stat_cartridge_health(self, ca_health_stat):
+        ca_health_stat.memory_usage = 
DefaultHealthStatisticsReader.__read_mem_usage()
+        ca_health_stat.load_avg = 
DefaultHealthStatisticsReader.__read_load_avg()
+
+        self.log.debug("Memory read: %r, CPU read: %r" % 
(ca_health_stat.memory_usage, ca_health_stat.load_avg))
+        return ca_health_stat
+
+    @staticmethod
+    def __read_mem_usage():
+        return psutil.virtual_memory().percent
+
+    @staticmethod
+    def __read_load_avg():
+        (one, five, fifteen) = os.getloadavg()
+        cores = multiprocessing.cpu_count()
+
+        return (one/cores) * 100
+
+
+class CartridgeHealthStatistics:
+    """
+    Holds the memory usage and load average reading
+    """
+
+    def __init__(self):
+        self.memory_usage = None
+        """:type : float"""
+        self.load_avg = None
+        """:type : float"""

http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
index 2cc9fba..af66b81 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
@@ -20,7 +20,7 @@ from threading import Thread
 
 from ..util import cartridgeagentutils
 from yapsy.PluginManager import PluginManager
-from plugins.contracts import ICartridgeAgentPlugin, IArtifactManagementPlugin
+from plugins.contracts import ICartridgeAgentPlugin, 
IArtifactManagementPlugin, IHealthStatReaderPlugin
 from ..artifactmgt.git.agentgithandler import *
 from ..artifactmgt.repository import Repository
 from config import CartridgeAgentConfiguration
@@ -35,6 +35,7 @@ from exception import ParameterNotFoundException
 AGENT_PLUGIN_EXT = "agent-plugin"
 ARTIFACT_MGT_PLUGIN = "ArtifactManagementPlugin"
 CARTRIDGE_AGENT_PLUGIN = "CartridgeAgentPlugin"
+HEALTH_STAT_PLUGIN = "HealthStatReaderPlugin"
 SUPER_TENANT_ID = -1234
 SUPER_TENANT_REPO_PATH = "/repository/deployment/server/"
 TENANT_REPO_PATH = "/repository/tenants/"
@@ -51,7 +52,7 @@ class EventHandler:
         self.__plugins = {}
         """ :type dict{str: [PluginInfo]} : """
         self.__artifact_mgt_plugins = []
-        self.__plugins, self.__artifact_mgt_plugins = self.initialize_plugins()
+        self.__plugins, self.__artifact_mgt_plugins, self.health_stat_plugin = 
self.initialize_plugins()
         self.__extension_executor = self.initialize_extensions()
 
     def on_instance_started_event(self):
@@ -117,7 +118,7 @@ class EventHandler:
 
             if subscribe_run:
                 # publish instanceActivated
-                cartridgeagentpublisher.publish_instance_activated_event()
+                
cartridgeagentpublisher.publish_instance_activated_event(self.health_stat_plugin)
             elif updated:
                 # updated on pull
                 self.on_artifact_update_scheduler_event(tenant_id)
@@ -384,13 +385,15 @@ class EventHandler:
         try:
             # TODO: change plugin descriptor ext, 
plugin_manager.setPluginInfoExtension(AGENT_PLUGIN_EXT)
             plugins_dir = self.__config.read_property(constants.PLUGINS_DIR)
-            category_filter = {CARTRIDGE_AGENT_PLUGIN: ICartridgeAgentPlugin, 
ARTIFACT_MGT_PLUGIN: IArtifactManagementPlugin}
+            category_filter = {CARTRIDGE_AGENT_PLUGIN: ICartridgeAgentPlugin,
+                               ARTIFACT_MGT_PLUGIN: IArtifactManagementPlugin,
+                               HEALTH_STAT_PLUGIN: IHealthStatReaderPlugin}
 
             plugin_manager = 
EventHandler.create_plugin_manager(category_filter, plugins_dir)
 
             # activate cartridge agent plugins
             plugins = 
plugin_manager.getPluginsOfCategory(CARTRIDGE_AGENT_PLUGIN)
-            grouped_plugins = {}
+            grouped_ca_plugins = {}
             for plugin_info in plugins:
                 self.__log.debug("Found plugin [%s] at [%s]" % 
(plugin_info.name, plugin_info.path))
                 plugin_manager.activatePluginByName(plugin_info.name)
@@ -399,19 +402,32 @@ class EventHandler:
                 mapped_events = plugin_info.description.split(",")
                 for mapped_event in mapped_events:
                     if mapped_event.strip() != "":
-                        if grouped_plugins.get(mapped_event) is None:
-                            grouped_plugins[mapped_event] = []
+                        if grouped_ca_plugins.get(mapped_event) is None:
+                            grouped_ca_plugins[mapped_event] = []
 
-                        grouped_plugins[mapped_event].append(plugin_info)
+                        grouped_ca_plugins[mapped_event].append(plugin_info)
 
             # activate artifact management plugins
             artifact_mgt_plugins = 
plugin_manager.getPluginsOfCategory(ARTIFACT_MGT_PLUGIN)
             for plugin_info in artifact_mgt_plugins:
+                # TODO: Fix this to only load the first plugin
                 self.__log.debug("Found artifact management plugin [%s] at 
[%s]" % (plugin_info.name, plugin_info.path))
                 plugin_manager.activatePluginByName(plugin_info.name)
                 self.__log.info("Activated artifact management plugin [%s]" % 
plugin_info.name)
 
-            return grouped_plugins, artifact_mgt_plugins
+            health_stat_plugins = 
plugin_manager.getPluginsOfCategory(HEALTH_STAT_PLUGIN)
+            health_stat_plugin = None
+
+            # If there are any health stat reader plugins, load the first one 
and ignore the rest
+            if len(health_stat_plugins) > 0:
+                plugin_info = health_stat_plugins[0]
+                self.__log.debug("Found health statistics reader plugin [%s] 
at [%s]" %
+                                 (plugin_info.name, plugin_info.path))
+                plugin_manager.activatePluginByName(plugin_info.name)
+                self.__log.info("Activated health statistics reader plugin 
[%s]" % plugin_info.name)
+                health_stat_plugin = plugin_info
+
+            return grouped_ca_plugins, artifact_mgt_plugins, health_stat_plugin
         except ParameterNotFoundException as e:
             self.__log.exception("Could not load plugins. Plugins directory 
not set: %s" % e)
             return None, None

http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/__init__.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/__init__.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/__init__.py
deleted file mode 100644
index 13a8339..0000000
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/__init__.py
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.

http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py
deleted file mode 100644
index 4839aa8..0000000
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py
+++ /dev/null
@@ -1,63 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-
-class AbstractHealthStatisticsReader:
-    """
-    Abstract class to implement to create a custom health stat reader
-    """
-
-    def stat_cartridge_health(self):
-        """
-        Abstract method that when implemented reads the memory usage and the 
load average
-        of the instance running the agent and returns a 
CartridgeHealthStatistics object
-        with the information
-
-        :return: CartridgeHealthStatistics object with memory usage and load 
average values
-        :rtype : CartridgeHealthStatistics
-        """
-        raise NotImplementedError
-
-
-class CartridgeHealthStatistics:
-    """
-    Holds the memory usage and load average reading
-    """
-
-    def __init__(self):
-        self.memory_usage = None
-        """:type : float"""
-        self.load_avg = None
-        """:type : float"""
-
-
-class CEPPublisherException(Exception):
-    """
-    Exception to be used during CEP publishing operations
-    """
-
-    def __init__(self, msg):
-        Exception.__init__(self,  msg)
-        self.message = msg
-
-    def get_message(self):
-        """
-        The message provided when the exception is raised
-        :return: message
-        :rtype: str
-        """
-        return self.message

http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
deleted file mode 100644
index 9753c3e..0000000
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py
+++ /dev/null
@@ -1,261 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-from threading import Thread
-import multiprocessing
-
-import psutil
-
-from abstracthealthstatisticspublisher import *
-from ..databridge.agent import *
-from config import CartridgeAgentConfiguration
-from ..util import cartridgeagentutils
-from exception import ThriftReceiverOfflineException
-import constants
-
-
-class HealthStatisticsPublisherManager(Thread):
-    """
-    Read from an implementation of AbstractHealthStatisticsPublisher the value 
for memory usage and
-    load average and publishes them as ThriftEvents to a CEP server
-    """
-    STREAM_NAME = "cartridge_agent_health_stats"
-    STREAM_VERSION = "1.0.0"
-    STREAM_NICKNAME = "agent health stats"
-    STREAM_DESCRIPTION = "agent health stats"
-
-    def __init__(self, publish_interval):
-        """
-        Initializes a new HealthStatistsPublisherManager with a given number 
of seconds as the interval
-        :param int publish_interval: Number of seconds as the interval
-        :return: void
-        """
-        Thread.__init__(self)
-
-        self.log = LogFactory().get_log(__name__)
-
-        self.publish_interval = publish_interval
-        """:type : int"""
-
-        self.terminated = False
-
-        self.publisher = HealthStatisticsPublisher()
-        """:type : HealthStatisticsPublisher"""
-        # TODO: load plugins for the reader
-        self.stats_reader = DefaultHealthStatisticsReader()
-        """:type : AbstractHealthStatisticsReader"""
-
-    def run(self):
-        while not self.terminated:
-            time.sleep(self.publish_interval)
-
-            try:
-                cartridge_stats = self.stats_reader.stat_cartridge_health()
-                self.log.debug("Publishing memory consumption: %r" % 
cartridge_stats.memory_usage)
-                
self.publisher.publish_memory_usage(cartridge_stats.memory_usage)
-
-                self.log.debug("Publishing load average: %r" % 
cartridge_stats.load_avg)
-                self.publisher.publish_load_average(cartridge_stats.load_avg)
-            except ThriftReceiverOfflineException:
-                self.log.error("Couldn't publish health statistics to CEP. 
Thrift Receiver offline. Reconnecting...")
-                try:
-                    self.publisher = HealthStatisticsPublisher()
-                except:
-                    self.log.error("Couldn't connect. CEP Offline.")
-
-        self.publisher.publisher.disconnect()
-
-
-class HealthStatisticsPublisher:
-    """
-    Publishes memory usage and load average to thrift server
-    """
-    log = LogFactory().get_log(__name__)
-
-    def __init__(self):
-
-        self.ports = []
-        self.ports.append(CEPPublisherConfiguration.get_instance().server_port)
-
-        self.cartridge_agent_config = CartridgeAgentConfiguration()
-
-        cartridgeagentutils.wait_until_ports_active(
-            CEPPublisherConfiguration.get_instance().server_ip,
-            self.ports,
-            
int(self.cartridge_agent_config.read_property("port.check.timeout", 
critical=False)))
-        cep_active = 
cartridgeagentutils.check_ports_active(CEPPublisherConfiguration.get_instance().server_ip,
 self.ports)
-        if not cep_active:
-            raise CEPPublisherException("CEP server not active. Health 
statistics publishing aborted.")
-
-        self.stream_definition = 
HealthStatisticsPublisher.create_stream_definition()
-        HealthStatisticsPublisher.log.debug("Stream definition created: %r" % 
str(self.stream_definition))
-        self.publisher = ThriftPublisher(
-            CEPPublisherConfiguration.get_instance().server_ip,
-            CEPPublisherConfiguration.get_instance().server_port,
-            CEPPublisherConfiguration.get_instance().admin_username,
-            CEPPublisherConfiguration.get_instance().admin_password,
-            self.stream_definition)
-
-        HealthStatisticsPublisher.log.debug("HealthStatisticsPublisher 
initialized")
-
-    @staticmethod
-    def create_stream_definition():
-        """
-        Create a StreamDefinition for publishing to CEP
-        """
-        stream_def = StreamDefinition()
-        stream_def.name = HealthStatisticsPublisherManager.STREAM_NAME
-        stream_def.version = HealthStatisticsPublisherManager.STREAM_VERSION
-        stream_def.nickname = HealthStatisticsPublisherManager.STREAM_NICKNAME
-        stream_def.description = 
HealthStatisticsPublisherManager.STREAM_DESCRIPTION
-
-        # stream_def.add_payloaddata_attribute()
-        stream_def.add_payloaddata_attribute("cluster_id", 
StreamDefinition.STRING)
-        stream_def.add_payloaddata_attribute("cluster_instance_id", 
StreamDefinition.STRING)
-        stream_def.add_payloaddata_attribute("network_partition_id", 
StreamDefinition.STRING)
-        stream_def.add_payloaddata_attribute("member_id", 
StreamDefinition.STRING)
-        stream_def.add_payloaddata_attribute("partition_id", 
StreamDefinition.STRING)
-        stream_def.add_payloaddata_attribute("health_description", 
StreamDefinition.STRING)
-        stream_def.add_payloaddata_attribute("value", StreamDefinition.DOUBLE)
-
-        return stream_def
-
-    def publish_memory_usage(self, memory_usage):
-        """
-        Publishes the given memory usage value to the thrift server as a 
ThriftEvent
-        :param float memory_usage: memory usage
-        """
-
-        event = ThriftEvent()
-        event.payloadData.append(self.cartridge_agent_config.cluster_id)
-        
event.payloadData.append(self.cartridge_agent_config.cluster_instance_id)
-        
event.payloadData.append(self.cartridge_agent_config.network_partition_id)
-        event.payloadData.append(self.cartridge_agent_config.member_id)
-        event.payloadData.append(self.cartridge_agent_config.partition_id)
-        event.payloadData.append(constants.MEMORY_CONSUMPTION)
-        event.payloadData.append(float(memory_usage))
-
-        HealthStatisticsPublisher.log.debug("Publishing cep event: [stream] %r 
[payload_data} %r [version] %r" % 
(self.stream_definition.name,event.payloadData, self.stream_definition.version))
-        self.publisher.publish(event)
-
-    def publish_load_average(self, load_avg):
-        """
-        Publishes the given load average value to the thrift server as a 
ThriftEvent
-        :param float load_avg: load average value
-        """
-
-        event = ThriftEvent()
-        event.payloadData.append(self.cartridge_agent_config.cluster_id)
-        
event.payloadData.append(self.cartridge_agent_config.cluster_instance_id)
-        
event.payloadData.append(self.cartridge_agent_config.network_partition_id)
-        event.payloadData.append(self.cartridge_agent_config.member_id)
-        event.payloadData.append(self.cartridge_agent_config.partition_id)
-        event.payloadData.append(constants.LOAD_AVERAGE)
-        event.payloadData.append(float(load_avg))
-
-        HealthStatisticsPublisher.log.debug("Publishing cep event: [stream] %r 
[version] %r" % (self.stream_definition.name, self.stream_definition.version))
-        self.publisher.publish(event)
-
-
-class DefaultHealthStatisticsReader(AbstractHealthStatisticsReader):
-    """
-    Default implementation of the AbstractHealthStatisticsReader
-    """
-
-    def __init__(self):
-        self.log = LogFactory().get_log(__name__)
-
-    def stat_cartridge_health(self):
-        cartridge_stats = CartridgeHealthStatistics()
-        cartridge_stats.memory_usage = 
DefaultHealthStatisticsReader.__read_mem_usage()
-        cartridge_stats.load_avg = 
DefaultHealthStatisticsReader.__read_load_avg()
-
-        self.log.debug("Memory read: %r, CPU read: %r" % 
(cartridge_stats.memory_usage, cartridge_stats.load_avg))
-        return cartridge_stats
-
-    @staticmethod
-    def __read_mem_usage():
-        return psutil.virtual_memory().percent
-
-    @staticmethod
-    def __read_load_avg():
-        (one, five, fifteen) = os.getloadavg()
-        cores = multiprocessing.cpu_count()
-
-        return (one/cores) * 100
-
-
-class CEPPublisherConfiguration:
-    """
-    TODO: Extract common functionality
-    """
-
-    __instance = None
-    log = LogFactory().get_log(__name__)
-
-    @staticmethod
-    def get_instance():
-        """
-        Singleton instance retriever
-        :return: Instance
-        :rtype : CEPPublisherConfiguration
-        """
-        if CEPPublisherConfiguration.__instance is None:
-            CEPPublisherConfiguration.__instance = CEPPublisherConfiguration()
-
-        return CEPPublisherConfiguration.__instance
-
-    def __init__(self):
-        self.enabled = False
-        self.server_ip = None
-        self.server_port = None
-        self.admin_username = None
-        self.admin_password = None
-        self.cartridge_agent_config = CartridgeAgentConfiguration()
-
-        self.read_config()
-
-    def read_config(self):
-        self.enabled = True if self.cartridge_agent_config.read_property(
-            constants.CEP_PUBLISHER_ENABLED, False).strip().lower() == "true" 
else False
-        if not self.enabled:
-            CEPPublisherConfiguration.log.info("CEP Publisher disabled")
-            return
-
-        CEPPublisherConfiguration.log.info("CEP Publisher enabled")
-
-        self.server_ip = self.cartridge_agent_config.read_property(
-            constants.CEP_RECEIVER_IP, False)
-        if self.server_ip is None or self.server_ip.strip() == "":
-            raise RuntimeError("System property not found: " + 
constants.CEP_RECEIVER_IP)
-
-        self.server_port = self.cartridge_agent_config.read_property(
-            constants.CEP_RECEIVER_PORT, False)
-        if self.server_port is None or self.server_port.strip() == "":
-            raise RuntimeError("System property not found: " + 
constants.CEP_RECEIVER_PORT)
-
-        self.admin_username = self.cartridge_agent_config.read_property(
-            constants.CEP_SERVER_ADMIN_USERNAME, False)
-        if self.admin_username is None or self.admin_username.strip() == "":
-            raise RuntimeError("System property not found: " + 
constants.CEP_SERVER_ADMIN_USERNAME)
-
-        self.admin_password = self.cartridge_agent_config.read_property(
-            constants.CEP_SERVER_ADMIN_PASSWORD, False)
-        if self.admin_password is None or self.admin_password.strip() == "":
-            raise RuntimeError("System property not found: " + 
constants.CEP_SERVER_ADMIN_PASSWORD)
-
-        CEPPublisherConfiguration.log.info("CEP Publisher configuration 
initialized")

http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py
index aea0e58..f94ae1d 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py
@@ -18,7 +18,9 @@
 import paho.mqtt.publish as publish
 
 from .. event.instance.status.events import *
-from .. healthstatspublisher.healthstats import *
+from .. util.log import *
+from .. util import cartridgeagentutils
+import healthstats
 import constants
 from config import CartridgeAgentConfiguration
 
@@ -48,8 +50,16 @@ def publish_instance_started_event():
         network_partition_id = 
CartridgeAgentConfiguration().network_partition_id
         partition_id = CartridgeAgentConfiguration().partition_id
 
-        instance_started_event = InstanceStartedEvent(application_id, 
service_name, cluster_id, cluster_instance_id, member_id,
-                                                      instance_id, 
network_partition_id, partition_id)
+        instance_started_event = InstanceStartedEvent(
+            application_id,
+            service_name,
+            cluster_id,
+            cluster_instance_id,
+            member_id,
+            instance_id,
+            network_partition_id,
+            partition_id)
+
         publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + 
constants.INSTANCE_STARTED_EVENT)
         publisher.publish(instance_started_event)
         started = True
@@ -58,7 +68,7 @@ def publish_instance_started_event():
         log.warn("Instance already started")
 
 
-def publish_instance_activated_event():
+def publish_instance_activated_event(health_stat_plugin):
     global activated, log
     if not activated:
         # Wait for all ports to be active
@@ -90,7 +100,10 @@ def publish_instance_activated_event():
             log.info("Instance activated event published")
             log.info("Starting health statistics notifier")
 
-            if CEPPublisherConfiguration.get_instance().enabled:
+            health_stat_publishing_enabled = True if 
CartridgeAgentConfiguration().read_property(
+                constants.CEP_PUBLISHER_ENABLED, False).strip().lower() == 
"true" else False
+
+            if health_stat_publishing_enabled:
                 interval_default = 15  # seconds
                 interval = 
CartridgeAgentConfiguration().read_property("stats.notifier.interval", False)
                 if interval is not None and len(interval) > 0:
@@ -101,7 +114,7 @@ def publish_instance_activated_event():
                 else:
                     interval = interval_default
 
-                health_stats_publisher = 
HealthStatisticsPublisherManager(interval)
+                health_stats_publisher = 
healthstats.HealthStatisticsPublisherManager(interval, health_stat_plugin)
                 log.info("Starting Health statistics publisher with interval 
%r" % interval)
                 health_stats_publisher.start()
             else:
@@ -110,7 +123,7 @@ def publish_instance_activated_event():
             activated = True
             log.info("Health statistics notifier started")
         else:
-            log.error("Ports activation timed out. Aborting 
InstanceActivatedEvent publishing. [IPAddress] %s [Ports] %s"
+            log.error("Ports activation timed out. Aborting 
InstanceActivatedEvent publishing [IPAddress] %s [Ports] %s"
                       % (listen_address, configuration__ports))
     else:
         log.warn("Instance already activated")
@@ -129,8 +142,14 @@ def publish_maintenance_mode_event():
         network_partition_id = 
CartridgeAgentConfiguration().network_partition_id
         partition_id = CartridgeAgentConfiguration().partition_id
 
-        instance_maintenance_mode_event = 
InstanceMaintenanceModeEvent(service_name, cluster_id, cluster_instance_id, 
member_id,
-                                                                       
instance_id, network_partition_id, partition_id)
+        instance_maintenance_mode_event = InstanceMaintenanceModeEvent(
+            service_name,
+            cluster_id,
+            cluster_instance_id,
+            member_id,
+            instance_id,
+            network_partition_id,
+            partition_id)
 
         publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + 
constants.INSTANCE_MAINTENANCE_MODE_EVENT)
         publisher.publish(instance_maintenance_mode_event)

http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/contracts.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/contracts.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/contracts.py
index 040401b..39baddc 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/contracts.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/contracts.py
@@ -19,15 +19,30 @@ from yapsy.IPlugin import IPlugin
 
 
 class ICartridgeAgentPlugin(IPlugin):
+    """
+    To implement a Cartridge Agent plugin to be executed on a MB event
+    """
 
     def run_plugin(self, values):
         raise NotImplementedError
 
 
 class IArtifactManagementPlugin(IPlugin):
+    """
+    To implement an artifact management plugin to manage artifact distribution 
using a custom version control tool
+    """
 
     def checkout(self):
         raise NotImplementedError
 
     def push(self):
+        raise NotImplementedError
+
+
+class IHealthStatReaderPlugin(IPlugin):
+    """
+    To implement a health statistics reader plugin to read health statistics 
using a custom factor
+    """
+
+    def stat_cartridge_health(self, health_stat):
         raise NotImplementedError
\ No newline at end of file

Reply via email to