Added health statistic publishing classes
Refactored stream definition creation in datapublishing


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/1c2f0462
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/1c2f0462
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/1c2f0462

Branch: refs/heads/master
Commit: 1c2f0462963464b0958af81206939c8bcfa40492
Parents: 700a6d2
Author: Chamila de Alwis <[email protected]>
Authored: Wed Oct 1 16:06:26 2014 +0530
Committer: Chamila de Alwis <[email protected]>
Committed: Thu Oct 9 15:40:20 2014 +0530

----------------------------------------------------------------------
 .../cartridge-agent/modules/databridge/agent.py |   4 +-
 .../exception/datapublisherexception.py         |   3 +
 .../modules/datapublisher/logpublisher.py       |  59 +++---
 .../modules/healthstatspublisher/__init__.py    |   0
 .../abstracthealthstatisticspublisher.py        |  45 +++++
 .../modules/healthstatspublisher/healthstats.py | 202 +++++++++++++++++++
 .../publisher/cartridgeagentpublisher.py        |  19 +-
 7 files changed, 304 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/1c2f0462/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py
----------------------------------------------------------------------
diff --git 
a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py 
b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py
index 03d5f31..7a8a0dc 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py
@@ -71,7 +71,7 @@ class StreamDefinition:
         return json_str
 
 
-class LogEvent:
+class ThriftEvent:
     """
     Represents an event to be published to a BAM/CEP monitoring server
     """
@@ -114,7 +114,7 @@ class ThriftPublisher:
         """
         Publishes the given event by creating the event bundle from the log 
event
 
-        :param LogEvent event: The log event to be published
+        :param ThriftEvent event: The log event to be published
         :return: void
         """
         event_bundler = EventBundle()

http://git-wip-us.apache.org/repos/asf/stratos/blob/1c2f0462/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py
----------------------------------------------------------------------
diff --git 
a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py
 
b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py
index 59e103d..444c2c1 100644
--- 
a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py
+++ 
b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py
@@ -1,4 +1,7 @@
 class DataPublisherException(Exception):
+    """
+    Exception to be used during log publishing operations
+    """
 
     def __init__(self, msg):
         super(self,  msg)

http://git-wip-us.apache.org/repos/asf/stratos/blob/1c2f0462/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py
----------------------------------------------------------------------
diff --git 
a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py
 
b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py
index cf00b0b..d2769d9 100644
--- 
a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py
+++ 
b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py
@@ -47,7 +47,7 @@ class LogPublisher(Thread):
                     read_file.seek(where)  # set seeker
                 else:
                     # new line detected, create event object
-                    event = LogEvent()
+                    event = ThriftEvent()
                     event.metaData.append(self.member_id)
                     event.payloadData.append(self.tenant_id)
                     event.payloadData.append(self.alias)
@@ -80,6 +80,37 @@ class LogPublisherManager(Thread):
     definition and the BAM/CEP server information for a single publishing 
context.
     """
 
+    @staticmethod
+    def define_stream():
+        """
+        Creates a stream definition for Log Publishing
+        :return: A StreamDefinition object with the required attributes added
+        :rtype : StreamDefinition
+        """
+        # stream definition
+        stream_definition = StreamDefinition()
+        valid_tenant_id = 
LogPublisherManager.get_valid_tenant_id(CartridgeAgentConfiguration.tenant_id)
+        alias = 
LogPublisherManager.get_alias(CartridgeAgentConfiguration.cluster_id)
+        stream_name = "logs." + valid_tenant_id + "." \
+                      + alias + "." + LogPublisherManager.get_current_date()
+        stream_version = "1.0.0"
+        stream_definition.name = stream_name
+        stream_definition.version = stream_version
+        stream_definition.description = "Apache Stratos Instance Log Publisher"
+        stream_definition.add_metadata_attribute("memberId", 'STRING')
+        stream_definition.add_payloaddata_attribute("tenantID", "STRING")
+        stream_definition.add_payloaddata_attribute("serverName", "STRING")
+        stream_definition.add_payloaddata_attribute("appName", "STRING")
+        stream_definition.add_payloaddata_attribute("logTime", "STRING")
+        stream_definition.add_payloaddata_attribute("priority", "STRING")
+        stream_definition.add_payloaddata_attribute("message", "STRING")
+        stream_definition.add_payloaddata_attribute("logger", "STRING")
+        stream_definition.add_payloaddata_attribute("ip", "STRING")
+        stream_definition.add_payloaddata_attribute("instance", "STRING")
+        stream_definition.add_payloaddata_attribute("stacktrace", "STRING")
+
+        return stream_definition
+
     def __init__(self, logfile_paths):
         Thread.__init__(self)
         self.logfile_paths = logfile_paths
@@ -93,29 +124,7 @@ class LogPublisherManager(Thread):
         if not ports_active:
             raise DataPublisherException("Monitoring server not active, data 
publishing is aborted")
 
-        #stream definition
-        self.stream_definition = StreamDefinition()
-        valid_tenant_id = 
LogPublisherManager.get_valid_tenant_id(CartridgeAgentConfiguration.tenant_id)
-        alias = 
LogPublisherManager.get_alias(CartridgeAgentConfiguration.cluster_id)
-        stream_name = "logs." + valid_tenant_id + "." \
-                      + alias + "." + LogPublisherManager.get_current_date()
-
-        stream_version = "1.0.0"
-        self.stream_definition.name = stream_name
-        self.stream_definition.version = stream_version
-        self.stream_definition.description = "Apache Stratos Instance Log 
Publisher"
-        self.stream_definition.add_metadata_attribute("memberId", 'STRING')
-
-        self.stream_definition.add_payloaddata_attribute("tenantID", "STRING")
-        self.stream_definition.add_payloaddata_attribute("serverName", 
"STRING")
-        self.stream_definition.add_payloaddata_attribute("appName", "STRING")
-        self.stream_definition.add_payloaddata_attribute("logTime", "STRING")
-        self.stream_definition.add_payloaddata_attribute("priority", "STRING")
-        self.stream_definition.add_payloaddata_attribute("message", "STRING")
-        self.stream_definition.add_payloaddata_attribute("logger", "STRING")
-        self.stream_definition.add_payloaddata_attribute("ip", "STRING")
-        self.stream_definition.add_payloaddata_attribute("instance", "STRING")
-        self.stream_definition.add_payloaddata_attribute("stacktrace", 
"STRING")
+        self.stream_definition = self.define_stream()
 
     def run(self):
         if self.logfile_paths is not None and len(self.logfile_paths):
@@ -190,7 +199,7 @@ class DataPublisherConfiguration:
     def get_instance():
         """
         Singleton instance retriever
-        :return: Instnace
+        :return: Instance
         :rtype : DataPublisherConfiguration
         """
         if DataPublisherConfiguration.__instance is None:

http://git-wip-us.apache.org/repos/asf/stratos/blob/1c2f0462/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/__init__.py
----------------------------------------------------------------------
diff --git 
a/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/__init__.py
 
b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/__init__.py
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/stratos/blob/1c2f0462/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py
----------------------------------------------------------------------
diff --git 
a/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py
 
b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py
new file mode 100644
index 0000000..e2121b6
--- /dev/null
+++ 
b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py
@@ -0,0 +1,45 @@
+class AbstractHealthStatisticsReader:
+    """
+    TODO:
+    """
+
+    def stat_cartridge_health(self):
+        """
+        Abstract method that when implemented reads the memory usage and the 
load average
+        of the instance running the agent and returns a 
CartridgeHealthStatistics object
+        with the information
+
+        :return: CartridgeHealthStatistics object with memory usage and load 
average values
+        :rtype : CartridgeHealthStatistics
+        """
+        raise NotImplementedError
+
+
+class CartridgeHealthStatistics:
+    """
+    Holds the memory usage and load average reading
+    """
+
+    def __init__(self):
+        self.memory_usage = None
+        """:type : float"""
+        self.load_avg = None
+        """:type : float"""
+
+
+class CEPPublisherException(Exception):
+    """
+    Exception to be used during CEP publishing operations
+    """
+
+    def __init__(self, msg):
+        super(self,  msg)
+        self.message = msg
+
+    def get_message(self):
+        """
+        The message provided when the exception is raised
+        :return: message
+        :rtype: str
+        """
+        return self.message

http://git-wip-us.apache.org/repos/asf/stratos/blob/1c2f0462/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py
----------------------------------------------------------------------
diff --git 
a/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py
 
b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py
new file mode 100644
index 0000000..f647106
--- /dev/null
+++ 
b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py
@@ -0,0 +1,202 @@
+from threading import Thread
+import time
+import logging
+
+from abstracthealthstatisticspublisher import *
+from ..databridge.agent import *
+from ..config.cartridgeagentconfiguration import CartridgeAgentConfiguration
+from ..util import cartridgeagentutils, cartridgeagentconstants
+
+
+class HealthStatisticsPublisherManager(Thread):
+    """
+    Read from an implementation of AbstractHealthStatisticsPublisher the value 
for memory usage and
+    load average and publishes them as ThriftEvents to a CEP server
+    """
+    def __init__(self, publish_interval):
+        """
+        Initializes a new HealthStatistsPublisherManager with a given number 
of seconds as the interval
+        :param int publish_interval: Number of seconds as the interval
+        :return: void
+        """
+        Thread.__init__(self)
+
+        logging.basicConfig(level=logging.DEBUG)
+        self.log = logging.getLogger(__name__)
+
+        self.publish_interval = publish_interval
+        """:type : int"""
+        self.terminated = False
+
+        self.publisher = HealthStatisticsPublisher()
+        """:type : HealthStatisticsPublisher"""
+        # TODO: load plugins for the reader
+        self.stats_reader = DefaultHealthStatisticsReader()
+        """:type : AbstractHealthStatisticsReader"""
+
+    def run(self):
+        while not self.terminated:
+            time.sleep(self.publish_interval)
+
+            cartridge_stats = self.stats_reader.stat_cartridge_health()
+            self.log.debug("Publishing memory consumption: %r" % 
cartridge_stats.memory_usage)
+            self.publisher.publish_memory_usage(cartridge_stats.memory_usage)
+
+            self.log.debug("Publishing load average: %r" % 
cartridge_stats.load_avg)
+            self.publisher.publish_load_average(cartridge_stats.load_avg)
+
+
+class HealthStatisticsPublisher:
+    """
+    Publishes memory usage and load average to thrift server
+    """
+    def __init__(self):
+        logging.basicConfig(level=logging.DEBUG)
+        self.log = logging.getLogger(__name__)
+        self.ports = []
+        self.ports.append(CEPPublisherConfiguration.get_instance().server_port)
+        
cartridgeagentutils.wait_until_ports_active(CEPPublisherConfiguration.get_instance().server_ip,
 self.ports)
+        cep_active = 
cartridgeagentutils.check_ports_active(CEPPublisherConfiguration.get_instance().server_ip,
 self.ports)
+        if not cep_active:
+            raise CEPPublisherException("CEP server not active. Health 
statistics publishing aborted.")
+
+        self.stream_definition = 
HealthStatisticsPublisher.create_stream_definition()
+        self.publisher = ThriftPublisher(
+            CEPPublisherConfiguration.get_instance().server_ip,
+            CEPPublisherConfiguration.get_instance().server_port,
+            CEPPublisherConfiguration.get_instance().admin_username,
+            CEPPublisherConfiguration.get_instance().admin_password,
+            self.stream_definition)
+
+
+    @staticmethod
+    def create_stream_definition():
+        """
+        Create a StreamDefinition for publishing to CEP
+        """
+        stream_def = StreamDefinition()
+        stream_def.name = "cartridge_agent_health_stats"
+        stream_def.version = "1.0.0"
+        stream_def.nickname = "agent health stats"
+        stream_def.description = "agent health stats"
+
+        stream_def.add_payloaddata_attribute("cluster_id", "STRING")
+        stream_def.add_payloaddata_attribute("network_partition_id", "STRING")
+        stream_def.add_payloaddata_attribute("member_id", "STRING")
+        stream_def.add_payloaddata_attribute("partition_id", "STRING")
+        stream_def.add_payloaddata_attribute("health_description", "STRING")
+        stream_def.add_payloaddata_attribute("value", "DOUBLE")
+
+        return stream_def
+
+    def publish_memory_usage(self, memory_usage):
+        """
+        Publishes the given memory usage value to the thrift server as a 
ThriftEvent
+        :param float memory_usage: memory usage
+        """
+
+        event = ThriftEvent()
+        event.payloadData.append(CartridgeAgentConfiguration.cluster_id)
+        
event.payloadData.append(CartridgeAgentConfiguration.network_partition_id)
+        event.payloadData.append(CartridgeAgentConfiguration.member_id)
+        event.payloadData.append(CartridgeAgentConfiguration.partition_id)
+        event.payloadData.append(cartridgeagentconstants.MEMORY_CONSUMPTION)
+        event.payloadData.append(memory_usage)
+
+        self.log.debug("Publishing cep event: [stream] %r [version] %r" % 
(self.stream_definition.name, self.stream_definition.version))
+        self.publisher.publish(event)
+
+    def publish_load_average(self, load_avg):
+        """
+        Publishes the given load average value to the thrift server as a 
ThriftEvent
+        :param float load_avg: load average value
+        """
+
+        event = ThriftEvent()
+        event.payloadData.append(CartridgeAgentConfiguration.cluster_id)
+        
event.payloadData.append(CartridgeAgentConfiguration.network_partition_id)
+        event.payloadData.append(CartridgeAgentConfiguration.member_id)
+        event.payloadData.append(CartridgeAgentConfiguration.partition_id)
+        event.payloadData.append(cartridgeagentconstants.LOAD_AVERAGE)
+        event.payloadData.append(load_avg)
+
+        self.log.debug("Publishing cep event: [stream] %r [version] %r" % 
(self.stream_definition.name, self.stream_definition.version))
+        self.publisher.publish(event)
+
+
+class DefaultHealthStatisticsReader(AbstractHealthStatisticsReader):
+    """
+    Default implementation of the AbstractHealthStatisticsReader
+    """
+
+    def stat_cartridge_health(self):
+        cartridge_stats = CartridgeHealthStatistics()
+        cartridge_stats.memory_usage = 
DefaultHealthStatisticsReader.__read_mem_usage()
+        cartridge_stats.load_avg = 
DefaultHealthStatisticsReader.__read_load_avg()
+
+        return cartridge_stats
+
+    @staticmethod
+    def __read_mem_usage():
+        raise NotImplementedError
+
+    @staticmethod
+    def __read_load_avg():
+        raise NotImplementedError
+
+
+class CEPPublisherConfiguration:
+    """
+    A singleton implementation to access configuration information for data 
publishing to BAM/CEP
+    TODO: perfect singleton impl ex: Borg
+    """
+
+    __instance = None
+    logging.basicConfig(level=logging.DEBUG)
+    log = logging.getLogger(__name__)
+
+    @staticmethod
+    def get_instance():
+        """
+        Singleton instance retriever
+        :return: Instance
+        :rtype : CEPPublisherConfiguration
+        """
+        if CEPPublisherConfiguration.__instance is None:
+            CEPPublisherConfiguration.__instance = CEPPublisherConfiguration()
+
+        return CEPPublisherConfiguration.__instance
+
+    def __init__(self):
+        self.enabled = False
+        self.server_ip = None
+        self.server_port = None
+        self.admin_username = None
+        self.admin_password = None
+        self.read_config()
+
+    def read_config(self):
+        self.enabled = True if 
CartridgeAgentConfiguration.read_property("cep.stats.publisher.enabled", 
False).strip().lower() == "true" else False
+        if not self.enabled:
+            CEPPublisherConfiguration.log.info("CEP Publisher disabled")
+            return
+
+        CEPPublisherConfiguration.log.info("CEP Publisher enabled")
+
+        self.server_ip = 
CartridgeAgentConfiguration.read_property("thrift.receiver.ip", False)
+        if self.server_ip.strip() == "":
+            raise RuntimeError("System property not found: thrift.receiver.ip")
+
+        self.server_port = 
CartridgeAgentConfiguration.read_property("thrift.receiver.port", False)
+        if self.server_port.strip() == "":
+            raise RuntimeError("System property not found: 
thrift.receiver.port")
+
+        self.admin_username = 
CartridgeAgentConfiguration.read_property("thrift.server.admin.username", False)
+        if self.admin_username.strip() == "":
+            raise RuntimeError("System property not found: 
thrift.server.admin.username")
+
+        self.admin_password = 
CartridgeAgentConfiguration.read_property("thrift.server.admin.password", False)
+        if self.admin_password.strip() == "":
+            raise RuntimeError("System property not found: 
thrift.server.admin.password")
+
+        CEPPublisherConfiguration.log.info("CEP Publisher configuration 
initialized")

http://git-wip-us.apache.org/repos/asf/stratos/blob/1c2f0462/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py
----------------------------------------------------------------------
diff --git 
a/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py
 
b/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py
index a862967..fcbe5f1 100644
--- 
a/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py
+++ 
b/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py
@@ -5,6 +5,8 @@ import paho.mqtt.publish as publish
 from .. event.instance.status.events import *
 from .. config.cartridgeagentconfiguration import CartridgeAgentConfiguration
 from .. util import cartridgeagentconstants
+from .. healthstatspublisher.healthstats import *
+from .. healthstatspublisher.abstracthealthstatisticspublisher import *
 
 
 logging.basicConfig(level=logging.DEBUG, filename='/tmp/cartridge-agent.log')
@@ -57,7 +59,22 @@ def publish_instance_activated_event():
         log.info("Instance activated event published")
         log.info("Starting health statistics notifier")
 
-        # TODO: health stat publisher start()
+        if CEPPublisherConfiguration.get_instance().enabled:
+            interval_default = 15  # seconds
+            interval = 
CartridgeAgentConfiguration.read_property("stats.notifier.interval")
+            if interval is not None and len(interval) > 0:
+                try:
+                    interval = int(interval)
+                except ValueError:
+                    interval = interval_default
+            else:
+                interval = interval_default
+
+            health_stats_publisher = HealthStatisticsPublisherManager(interval)
+            health_stats_publisher.start()
+        else:
+            log.warn("Statistics publisher is disabled")
+
         activated = True
         log.info("Health statistics notifier started")
     else:

Reply via email to