Added health statistic publishing classes Refactored stream definition creation in datapublishing
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/1c2f0462 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/1c2f0462 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/1c2f0462 Branch: refs/heads/master Commit: 1c2f0462963464b0958af81206939c8bcfa40492 Parents: 700a6d2 Author: Chamila de Alwis <[email protected]> Authored: Wed Oct 1 16:06:26 2014 +0530 Committer: Chamila de Alwis <[email protected]> Committed: Thu Oct 9 15:40:20 2014 +0530 ---------------------------------------------------------------------- .../cartridge-agent/modules/databridge/agent.py | 4 +- .../exception/datapublisherexception.py | 3 + .../modules/datapublisher/logpublisher.py | 59 +++--- .../modules/healthstatspublisher/__init__.py | 0 .../abstracthealthstatisticspublisher.py | 45 +++++ .../modules/healthstatspublisher/healthstats.py | 202 +++++++++++++++++++ .../publisher/cartridgeagentpublisher.py | 19 +- 7 files changed, 304 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/1c2f0462/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py index 03d5f31..7a8a0dc 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py @@ -71,7 +71,7 @@ class StreamDefinition: return json_str -class LogEvent: +class ThriftEvent: """ Represents an event to be published to a BAM/CEP monitoring server """ @@ -114,7 +114,7 @@ class ThriftPublisher: """ Publishes the given event by creating the event bundle from the log event - :param LogEvent event: The log event to be published + :param ThriftEvent event: The log event to be published :return: void """ event_bundler = EventBundle() http://git-wip-us.apache.org/repos/asf/stratos/blob/1c2f0462/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py index 59e103d..444c2c1 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/exception/datapublisherexception.py @@ -1,4 +1,7 @@ class DataPublisherException(Exception): + """ + Exception to be used during log publishing operations + """ def __init__(self, msg): super(self, msg) http://git-wip-us.apache.org/repos/asf/stratos/blob/1c2f0462/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py index cf00b0b..d2769d9 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py @@ -47,7 +47,7 @@ class LogPublisher(Thread): read_file.seek(where) # set seeker else: # new line detected, create event object - event = LogEvent() + event = ThriftEvent() event.metaData.append(self.member_id) event.payloadData.append(self.tenant_id) event.payloadData.append(self.alias) @@ -80,6 +80,37 @@ class LogPublisherManager(Thread): definition and the BAM/CEP server information for a single publishing context. """ + @staticmethod + def define_stream(): + """ + Creates a stream definition for Log Publishing + :return: A StreamDefinition object with the required attributes added + :rtype : StreamDefinition + """ + # stream definition + stream_definition = StreamDefinition() + valid_tenant_id = LogPublisherManager.get_valid_tenant_id(CartridgeAgentConfiguration.tenant_id) + alias = LogPublisherManager.get_alias(CartridgeAgentConfiguration.cluster_id) + stream_name = "logs." + valid_tenant_id + "." \ + + alias + "." + LogPublisherManager.get_current_date() + stream_version = "1.0.0" + stream_definition.name = stream_name + stream_definition.version = stream_version + stream_definition.description = "Apache Stratos Instance Log Publisher" + stream_definition.add_metadata_attribute("memberId", 'STRING') + stream_definition.add_payloaddata_attribute("tenantID", "STRING") + stream_definition.add_payloaddata_attribute("serverName", "STRING") + stream_definition.add_payloaddata_attribute("appName", "STRING") + stream_definition.add_payloaddata_attribute("logTime", "STRING") + stream_definition.add_payloaddata_attribute("priority", "STRING") + stream_definition.add_payloaddata_attribute("message", "STRING") + stream_definition.add_payloaddata_attribute("logger", "STRING") + stream_definition.add_payloaddata_attribute("ip", "STRING") + stream_definition.add_payloaddata_attribute("instance", "STRING") + stream_definition.add_payloaddata_attribute("stacktrace", "STRING") + + return stream_definition + def __init__(self, logfile_paths): Thread.__init__(self) self.logfile_paths = logfile_paths @@ -93,29 +124,7 @@ class LogPublisherManager(Thread): if not ports_active: raise DataPublisherException("Monitoring server not active, data publishing is aborted") - #stream definition - self.stream_definition = StreamDefinition() - valid_tenant_id = LogPublisherManager.get_valid_tenant_id(CartridgeAgentConfiguration.tenant_id) - alias = LogPublisherManager.get_alias(CartridgeAgentConfiguration.cluster_id) - stream_name = "logs." + valid_tenant_id + "." \ - + alias + "." + LogPublisherManager.get_current_date() - - stream_version = "1.0.0" - self.stream_definition.name = stream_name - self.stream_definition.version = stream_version - self.stream_definition.description = "Apache Stratos Instance Log Publisher" - self.stream_definition.add_metadata_attribute("memberId", 'STRING') - - self.stream_definition.add_payloaddata_attribute("tenantID", "STRING") - self.stream_definition.add_payloaddata_attribute("serverName", "STRING") - self.stream_definition.add_payloaddata_attribute("appName", "STRING") - self.stream_definition.add_payloaddata_attribute("logTime", "STRING") - self.stream_definition.add_payloaddata_attribute("priority", "STRING") - self.stream_definition.add_payloaddata_attribute("message", "STRING") - self.stream_definition.add_payloaddata_attribute("logger", "STRING") - self.stream_definition.add_payloaddata_attribute("ip", "STRING") - self.stream_definition.add_payloaddata_attribute("instance", "STRING") - self.stream_definition.add_payloaddata_attribute("stacktrace", "STRING") + self.stream_definition = self.define_stream() def run(self): if self.logfile_paths is not None and len(self.logfile_paths): @@ -190,7 +199,7 @@ class DataPublisherConfiguration: def get_instance(): """ Singleton instance retriever - :return: Instnace + :return: Instance :rtype : DataPublisherConfiguration """ if DataPublisherConfiguration.__instance is None: http://git-wip-us.apache.org/repos/asf/stratos/blob/1c2f0462/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/__init__.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/__init__.py b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/__init__.py new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/stratos/blob/1c2f0462/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py new file mode 100644 index 0000000..e2121b6 --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py @@ -0,0 +1,45 @@ +class AbstractHealthStatisticsReader: + """ + TODO: + """ + + def stat_cartridge_health(self): + """ + Abstract method that when implemented reads the memory usage and the load average + of the instance running the agent and returns a CartridgeHealthStatistics object + with the information + + :return: CartridgeHealthStatistics object with memory usage and load average values + :rtype : CartridgeHealthStatistics + """ + raise NotImplementedError + + +class CartridgeHealthStatistics: + """ + Holds the memory usage and load average reading + """ + + def __init__(self): + self.memory_usage = None + """:type : float""" + self.load_avg = None + """:type : float""" + + +class CEPPublisherException(Exception): + """ + Exception to be used during CEP publishing operations + """ + + def __init__(self, msg): + super(self, msg) + self.message = msg + + def get_message(self): + """ + The message provided when the exception is raised + :return: message + :rtype: str + """ + return self.message http://git-wip-us.apache.org/repos/asf/stratos/blob/1c2f0462/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py new file mode 100644 index 0000000..f647106 --- /dev/null +++ b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py @@ -0,0 +1,202 @@ +from threading import Thread +import time +import logging + +from abstracthealthstatisticspublisher import * +from ..databridge.agent import * +from ..config.cartridgeagentconfiguration import CartridgeAgentConfiguration +from ..util import cartridgeagentutils, cartridgeagentconstants + + +class HealthStatisticsPublisherManager(Thread): + """ + Read from an implementation of AbstractHealthStatisticsPublisher the value for memory usage and + load average and publishes them as ThriftEvents to a CEP server + """ + def __init__(self, publish_interval): + """ + Initializes a new HealthStatistsPublisherManager with a given number of seconds as the interval + :param int publish_interval: Number of seconds as the interval + :return: void + """ + Thread.__init__(self) + + logging.basicConfig(level=logging.DEBUG) + self.log = logging.getLogger(__name__) + + self.publish_interval = publish_interval + """:type : int""" + self.terminated = False + + self.publisher = HealthStatisticsPublisher() + """:type : HealthStatisticsPublisher""" + # TODO: load plugins for the reader + self.stats_reader = DefaultHealthStatisticsReader() + """:type : AbstractHealthStatisticsReader""" + + def run(self): + while not self.terminated: + time.sleep(self.publish_interval) + + cartridge_stats = self.stats_reader.stat_cartridge_health() + self.log.debug("Publishing memory consumption: %r" % cartridge_stats.memory_usage) + self.publisher.publish_memory_usage(cartridge_stats.memory_usage) + + self.log.debug("Publishing load average: %r" % cartridge_stats.load_avg) + self.publisher.publish_load_average(cartridge_stats.load_avg) + + +class HealthStatisticsPublisher: + """ + Publishes memory usage and load average to thrift server + """ + def __init__(self): + logging.basicConfig(level=logging.DEBUG) + self.log = logging.getLogger(__name__) + self.ports = [] + self.ports.append(CEPPublisherConfiguration.get_instance().server_port) + cartridgeagentutils.wait_until_ports_active(CEPPublisherConfiguration.get_instance().server_ip, self.ports) + cep_active = cartridgeagentutils.check_ports_active(CEPPublisherConfiguration.get_instance().server_ip, self.ports) + if not cep_active: + raise CEPPublisherException("CEP server not active. Health statistics publishing aborted.") + + self.stream_definition = HealthStatisticsPublisher.create_stream_definition() + self.publisher = ThriftPublisher( + CEPPublisherConfiguration.get_instance().server_ip, + CEPPublisherConfiguration.get_instance().server_port, + CEPPublisherConfiguration.get_instance().admin_username, + CEPPublisherConfiguration.get_instance().admin_password, + self.stream_definition) + + + @staticmethod + def create_stream_definition(): + """ + Create a StreamDefinition for publishing to CEP + """ + stream_def = StreamDefinition() + stream_def.name = "cartridge_agent_health_stats" + stream_def.version = "1.0.0" + stream_def.nickname = "agent health stats" + stream_def.description = "agent health stats" + + stream_def.add_payloaddata_attribute("cluster_id", "STRING") + stream_def.add_payloaddata_attribute("network_partition_id", "STRING") + stream_def.add_payloaddata_attribute("member_id", "STRING") + stream_def.add_payloaddata_attribute("partition_id", "STRING") + stream_def.add_payloaddata_attribute("health_description", "STRING") + stream_def.add_payloaddata_attribute("value", "DOUBLE") + + return stream_def + + def publish_memory_usage(self, memory_usage): + """ + Publishes the given memory usage value to the thrift server as a ThriftEvent + :param float memory_usage: memory usage + """ + + event = ThriftEvent() + event.payloadData.append(CartridgeAgentConfiguration.cluster_id) + event.payloadData.append(CartridgeAgentConfiguration.network_partition_id) + event.payloadData.append(CartridgeAgentConfiguration.member_id) + event.payloadData.append(CartridgeAgentConfiguration.partition_id) + event.payloadData.append(cartridgeagentconstants.MEMORY_CONSUMPTION) + event.payloadData.append(memory_usage) + + self.log.debug("Publishing cep event: [stream] %r [version] %r" % (self.stream_definition.name, self.stream_definition.version)) + self.publisher.publish(event) + + def publish_load_average(self, load_avg): + """ + Publishes the given load average value to the thrift server as a ThriftEvent + :param float load_avg: load average value + """ + + event = ThriftEvent() + event.payloadData.append(CartridgeAgentConfiguration.cluster_id) + event.payloadData.append(CartridgeAgentConfiguration.network_partition_id) + event.payloadData.append(CartridgeAgentConfiguration.member_id) + event.payloadData.append(CartridgeAgentConfiguration.partition_id) + event.payloadData.append(cartridgeagentconstants.LOAD_AVERAGE) + event.payloadData.append(load_avg) + + self.log.debug("Publishing cep event: [stream] %r [version] %r" % (self.stream_definition.name, self.stream_definition.version)) + self.publisher.publish(event) + + +class DefaultHealthStatisticsReader(AbstractHealthStatisticsReader): + """ + Default implementation of the AbstractHealthStatisticsReader + """ + + def stat_cartridge_health(self): + cartridge_stats = CartridgeHealthStatistics() + cartridge_stats.memory_usage = DefaultHealthStatisticsReader.__read_mem_usage() + cartridge_stats.load_avg = DefaultHealthStatisticsReader.__read_load_avg() + + return cartridge_stats + + @staticmethod + def __read_mem_usage(): + raise NotImplementedError + + @staticmethod + def __read_load_avg(): + raise NotImplementedError + + +class CEPPublisherConfiguration: + """ + A singleton implementation to access configuration information for data publishing to BAM/CEP + TODO: perfect singleton impl ex: Borg + """ + + __instance = None + logging.basicConfig(level=logging.DEBUG) + log = logging.getLogger(__name__) + + @staticmethod + def get_instance(): + """ + Singleton instance retriever + :return: Instance + :rtype : CEPPublisherConfiguration + """ + if CEPPublisherConfiguration.__instance is None: + CEPPublisherConfiguration.__instance = CEPPublisherConfiguration() + + return CEPPublisherConfiguration.__instance + + def __init__(self): + self.enabled = False + self.server_ip = None + self.server_port = None + self.admin_username = None + self.admin_password = None + self.read_config() + + def read_config(self): + self.enabled = True if CartridgeAgentConfiguration.read_property("cep.stats.publisher.enabled", False).strip().lower() == "true" else False + if not self.enabled: + CEPPublisherConfiguration.log.info("CEP Publisher disabled") + return + + CEPPublisherConfiguration.log.info("CEP Publisher enabled") + + self.server_ip = CartridgeAgentConfiguration.read_property("thrift.receiver.ip", False) + if self.server_ip.strip() == "": + raise RuntimeError("System property not found: thrift.receiver.ip") + + self.server_port = CartridgeAgentConfiguration.read_property("thrift.receiver.port", False) + if self.server_port.strip() == "": + raise RuntimeError("System property not found: thrift.receiver.port") + + self.admin_username = CartridgeAgentConfiguration.read_property("thrift.server.admin.username", False) + if self.admin_username.strip() == "": + raise RuntimeError("System property not found: thrift.server.admin.username") + + self.admin_password = CartridgeAgentConfiguration.read_property("thrift.server.admin.password", False) + if self.admin_password.strip() == "": + raise RuntimeError("System property not found: thrift.server.admin.password") + + CEPPublisherConfiguration.log.info("CEP Publisher configuration initialized") http://git-wip-us.apache.org/repos/asf/stratos/blob/1c2f0462/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py b/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py index a862967..fcbe5f1 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py @@ -5,6 +5,8 @@ import paho.mqtt.publish as publish from .. event.instance.status.events import * from .. config.cartridgeagentconfiguration import CartridgeAgentConfiguration from .. util import cartridgeagentconstants +from .. healthstatspublisher.healthstats import * +from .. healthstatspublisher.abstracthealthstatisticspublisher import * logging.basicConfig(level=logging.DEBUG, filename='/tmp/cartridge-agent.log') @@ -57,7 +59,22 @@ def publish_instance_activated_event(): log.info("Instance activated event published") log.info("Starting health statistics notifier") - # TODO: health stat publisher start() + if CEPPublisherConfiguration.get_instance().enabled: + interval_default = 15 # seconds + interval = CartridgeAgentConfiguration.read_property("stats.notifier.interval") + if interval is not None and len(interval) > 0: + try: + interval = int(interval) + except ValueError: + interval = interval_default + else: + interval = interval_default + + health_stats_publisher = HealthStatisticsPublisherManager(interval) + health_stats_publisher.start() + else: + log.warn("Statistics publisher is disabled") + activated = True log.info("Health statistics notifier started") else:
