Repository: stratos Updated Branches: refs/heads/master 7c52406fb -> 92eb63b52
PCA - Enable health stats reader plugins, refactor health stats publisher module Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/81d5d34c Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/81d5d34c Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/81d5d34c Branch: refs/heads/master Commit: 81d5d34ce1920b2dcca8e6e4ab592233f9a792d0 Parents: 7c52406 Author: Chamila de Alwis <[email protected]> Authored: Fri May 1 00:33:19 2015 +0530 Committer: Chamila de Alwis <[email protected]> Committed: Thu Jul 30 00:16:58 2015 -0400 ---------------------------------------------------------------------- .../cartridge.agent/cartridge.agent/agent.py | 2 +- .../cartridge.agent/exception.py | 11 +- .../cartridge.agent/healthstats.py | 246 +++++++++++++++++ .../modules/event/eventhandler.py | 34 ++- .../modules/healthstatspublisher/__init__.py | 16 -- .../abstracthealthstatisticspublisher.py | 63 ----- .../modules/healthstatspublisher/healthstats.py | 261 ------------------- .../publisher/cartridgeagentpublisher.py | 37 ++- .../cartridge.agent/plugins/contracts.py | 15 ++ 9 files changed, 325 insertions(+), 360 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py index 3051775..806f6c9 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py @@ -99,7 +99,7 @@ class CartridgeAgent(threading.Thread): if repo_url is None or str(repo_url).strip() == "": self.__log.info("No artifact repository found") self.__event_handler.on_instance_activated_event() - cartridgeagentpublisher.publish_instance_activated_event() + cartridgeagentpublisher.publish_instance_activated_event(self.__event_handler.health_stat_plugin) else: self.__log.info( "Artifact repository found, waiting for artifact updated event to checkout artifacts: [repo_url] %s", http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py index 5262d2a..1586c4d 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py @@ -77,4 +77,13 @@ class ThriftReceiverOfflineException(CartridgeAgentException): """ def __init__(self, message): - super(ThriftReceiverOfflineException, self).__init__(message) \ No newline at end of file + super(ThriftReceiverOfflineException, self).__init__(message) + + +class CEPPublisherException(CartridgeAgentException): + """ + Exception to be used during CEP publishing operations + """ + + def __init__(self, message): + super(CEPPublisherException, self).__init__(message) http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py new file mode 100644 index 0000000..815ec01 --- /dev/null +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py @@ -0,0 +1,246 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from threading import Thread +import multiprocessing + +import psutil + +from modules.databridge.agent import * +from config import CartridgeAgentConfiguration +from modules.util import cartridgeagentutils +from exception import ThriftReceiverOfflineException, CEPPublisherException +import constants + + +class HealthStatisticsPublisherManager(Thread): + """ + Read from provided health stat reader plugin or the default health stat reader, the value for memory usage and + load average and publishes them as ThriftEvents to a CEP server + """ + STREAM_NAME = "cartridge_agent_health_stats" + STREAM_VERSION = "1.0.0" + STREAM_NICKNAME = "agent health stats" + STREAM_DESCRIPTION = "agent health stats" + + def __init__(self, publish_interval, health_stat_plugin): + """ + Initializes a new HealthStatisticsPublisherManager with a given number of seconds as the interval + :param int publish_interval: Number of seconds as the interval + :return: void + """ + Thread.__init__(self) + + self.log = LogFactory().get_log(__name__) + + self.publish_interval = publish_interval + """:type : int""" + + self.terminated = False + + self.publisher = HealthStatisticsPublisher() + """:type : HealthStatisticsPublisher""" + # If there are no health stat reader plugins, create the default reader instance + self.stats_reader = health_stat_plugin if health_stat_plugin is not None else DefaultHealthStatisticsReader() + + def run(self): + while not self.terminated: + time.sleep(self.publish_interval) + + try: + ca_health_stat = CartridgeHealthStatistics() + cartridge_stats = self.stats_reader.stat_cartridge_health(ca_health_stat) + self.log.debug("Publishing memory consumption: %r" % cartridge_stats.memory_usage) + self.publisher.publish_memory_usage(cartridge_stats.memory_usage) + + self.log.debug("Publishing load average: %r" % cartridge_stats.load_avg) + self.publisher.publish_load_average(cartridge_stats.load_avg) + except ThriftReceiverOfflineException: + self.log.error("Couldn't publish health statistics to CEP. Thrift Receiver offline. Reconnecting...") + self.publisher = HealthStatisticsPublisher() + + self.publisher.publisher.disconnect() + + +class HealthStatisticsPublisher: + """ + Publishes memory usage and load average to thrift server + """ + log = LogFactory().get_log(__name__) + + def read_config(self, conf_key): + """ + Read a given key from the cartridge agent configuration + :param conf_key: The key to look for in the CA config + :return: The value for the key from the CA config + :raise: RuntimeError if the given key is not found in the CA config + """ + if self.cartridge_agent_config is None: + self.cartridge_agent_config = CartridgeAgentConfiguration() + + conf_value = self.cartridge_agent_config.read_property(conf_key, False) + + if conf_value is None or conf_value.strip() == "": + raise RuntimeError("System property not found: " + conf_key) + + return conf_value + + def __init__(self): + self.cartridge_agent_config = CartridgeAgentConfiguration() + + self.ports = [] + cep_port = self.read_config(constants.CEP_RECEIVER_PORT) + self.ports.append(cep_port) + + cep_ip = self.read_config(constants.CEP_RECEIVER_IP) + + cartridgeagentutils.wait_until_ports_active( + cep_ip, + self.ports, + int(self.cartridge_agent_config.read_property("port.check.timeout", critical=False))) + + cep_active = cartridgeagentutils.check_ports_active( + cep_ip, + self.ports) + + if not cep_active: + raise CEPPublisherException("CEP server not active. Health statistics publishing aborted.") + + cep_admin_username = self.read_config(constants.CEP_SERVER_ADMIN_USERNAME) + cep_admin_password = self.read_config(constants.CEP_SERVER_ADMIN_PASSWORD) + + self.stream_definition = HealthStatisticsPublisher.create_stream_definition() + HealthStatisticsPublisher.log.debug("Stream definition created: %r" % str(self.stream_definition)) + + self.publisher = ThriftPublisher( + cep_ip, + cep_port, + cep_admin_username, + cep_admin_password, + self.stream_definition) + + HealthStatisticsPublisher.log.debug("HealthStatisticsPublisher initialized") + + @staticmethod + def create_stream_definition(): + """ + Create a StreamDefinition for publishing to CEP + """ + stream_def = StreamDefinition() + stream_def.name = HealthStatisticsPublisherManager.STREAM_NAME + stream_def.version = HealthStatisticsPublisherManager.STREAM_VERSION + stream_def.nickname = HealthStatisticsPublisherManager.STREAM_NICKNAME + stream_def.description = HealthStatisticsPublisherManager.STREAM_DESCRIPTION + + # stream_def.add_payloaddata_attribute() + stream_def.add_payloaddata_attribute("cluster_id", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("cluster_instance_id", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("network_partition_id", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("member_id", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("partition_id", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("health_description", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("value", StreamDefinition.DOUBLE) + + return stream_def + + def publish_memory_usage(self, memory_usage): + """ + Publishes the given memory usage value to the thrift server as a ThriftEvent + :param float memory_usage: memory usage + """ + + event = ThriftEvent() + event.payloadData.append(self.cartridge_agent_config.cluster_id) + event.payloadData.append(self.cartridge_agent_config.cluster_instance_id) + event.payloadData.append(self.cartridge_agent_config.network_partition_id) + event.payloadData.append(self.cartridge_agent_config.member_id) + event.payloadData.append(self.cartridge_agent_config.partition_id) + event.payloadData.append(constants.MEMORY_CONSUMPTION) + event.payloadData.append(float(memory_usage)) + # event.payloadData.append(str(memory_usage)) + + HealthStatisticsPublisher.log.debug("Publishing cep event: [stream] %r [payload_data] %r [version] %r" + % ( + self.stream_definition.name, + event.payloadData, + self.stream_definition.version)) + + self.publisher.publish(event) + + def publish_load_average(self, load_avg): + """ + Publishes the given load average value to the thrift server as a ThriftEvent + :param float load_avg: load average value + """ + + event = ThriftEvent() + event.payloadData.append(self.cartridge_agent_config.cluster_id) + event.payloadData.append(self.cartridge_agent_config.cluster_instance_id) + event.payloadData.append(self.cartridge_agent_config.network_partition_id) + event.payloadData.append(self.cartridge_agent_config.member_id) + event.payloadData.append(self.cartridge_agent_config.partition_id) + event.payloadData.append(constants.LOAD_AVERAGE) + event.payloadData.append(float(load_avg)) + # event.payloadData.append(str(load_avg)) + + HealthStatisticsPublisher.log.debug("Publishing cep event: [stream] %r [payload_data] %r [version] %r" + % ( + self.stream_definition.name, + event.payloadData, + self.stream_definition.version)) + + self.publisher.publish(event) + + +class DefaultHealthStatisticsReader: + """ + Default implementation for the health statistics reader. If no Health Statistics Reader plugins are provided, + this will be used to read health stats from the instance. + """ + + def __init__(self): + self.log = LogFactory().get_log(__name__) + + def stat_cartridge_health(self, ca_health_stat): + ca_health_stat.memory_usage = DefaultHealthStatisticsReader.__read_mem_usage() + ca_health_stat.load_avg = DefaultHealthStatisticsReader.__read_load_avg() + + self.log.debug("Memory read: %r, CPU read: %r" % (ca_health_stat.memory_usage, ca_health_stat.load_avg)) + return ca_health_stat + + @staticmethod + def __read_mem_usage(): + return psutil.virtual_memory().percent + + @staticmethod + def __read_load_avg(): + (one, five, fifteen) = os.getloadavg() + cores = multiprocessing.cpu_count() + + return (one/cores) * 100 + + +class CartridgeHealthStatistics: + """ + Holds the memory usage and load average reading + """ + + def __init__(self): + self.memory_usage = None + """:type : float""" + self.load_avg = None + """:type : float""" http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py index 2cc9fba..af66b81 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py @@ -20,7 +20,7 @@ from threading import Thread from ..util import cartridgeagentutils from yapsy.PluginManager import PluginManager -from plugins.contracts import ICartridgeAgentPlugin, IArtifactManagementPlugin +from plugins.contracts import ICartridgeAgentPlugin, IArtifactManagementPlugin, IHealthStatReaderPlugin from ..artifactmgt.git.agentgithandler import * from ..artifactmgt.repository import Repository from config import CartridgeAgentConfiguration @@ -35,6 +35,7 @@ from exception import ParameterNotFoundException AGENT_PLUGIN_EXT = "agent-plugin" ARTIFACT_MGT_PLUGIN = "ArtifactManagementPlugin" CARTRIDGE_AGENT_PLUGIN = "CartridgeAgentPlugin" +HEALTH_STAT_PLUGIN = "HealthStatReaderPlugin" SUPER_TENANT_ID = -1234 SUPER_TENANT_REPO_PATH = "/repository/deployment/server/" TENANT_REPO_PATH = "/repository/tenants/" @@ -51,7 +52,7 @@ class EventHandler: self.__plugins = {} """ :type dict{str: [PluginInfo]} : """ self.__artifact_mgt_plugins = [] - self.__plugins, self.__artifact_mgt_plugins = self.initialize_plugins() + self.__plugins, self.__artifact_mgt_plugins, self.health_stat_plugin = self.initialize_plugins() self.__extension_executor = self.initialize_extensions() def on_instance_started_event(self): @@ -117,7 +118,7 @@ class EventHandler: if subscribe_run: # publish instanceActivated - cartridgeagentpublisher.publish_instance_activated_event() + cartridgeagentpublisher.publish_instance_activated_event(self.health_stat_plugin) elif updated: # updated on pull self.on_artifact_update_scheduler_event(tenant_id) @@ -384,13 +385,15 @@ class EventHandler: try: # TODO: change plugin descriptor ext, plugin_manager.setPluginInfoExtension(AGENT_PLUGIN_EXT) plugins_dir = self.__config.read_property(constants.PLUGINS_DIR) - category_filter = {CARTRIDGE_AGENT_PLUGIN: ICartridgeAgentPlugin, ARTIFACT_MGT_PLUGIN: IArtifactManagementPlugin} + category_filter = {CARTRIDGE_AGENT_PLUGIN: ICartridgeAgentPlugin, + ARTIFACT_MGT_PLUGIN: IArtifactManagementPlugin, + HEALTH_STAT_PLUGIN: IHealthStatReaderPlugin} plugin_manager = EventHandler.create_plugin_manager(category_filter, plugins_dir) # activate cartridge agent plugins plugins = plugin_manager.getPluginsOfCategory(CARTRIDGE_AGENT_PLUGIN) - grouped_plugins = {} + grouped_ca_plugins = {} for plugin_info in plugins: self.__log.debug("Found plugin [%s] at [%s]" % (plugin_info.name, plugin_info.path)) plugin_manager.activatePluginByName(plugin_info.name) @@ -399,19 +402,32 @@ class EventHandler: mapped_events = plugin_info.description.split(",") for mapped_event in mapped_events: if mapped_event.strip() != "": - if grouped_plugins.get(mapped_event) is None: - grouped_plugins[mapped_event] = [] + if grouped_ca_plugins.get(mapped_event) is None: + grouped_ca_plugins[mapped_event] = [] - grouped_plugins[mapped_event].append(plugin_info) + grouped_ca_plugins[mapped_event].append(plugin_info) # activate artifact management plugins artifact_mgt_plugins = plugin_manager.getPluginsOfCategory(ARTIFACT_MGT_PLUGIN) for plugin_info in artifact_mgt_plugins: + # TODO: Fix this to only load the first plugin self.__log.debug("Found artifact management plugin [%s] at [%s]" % (plugin_info.name, plugin_info.path)) plugin_manager.activatePluginByName(plugin_info.name) self.__log.info("Activated artifact management plugin [%s]" % plugin_info.name) - return grouped_plugins, artifact_mgt_plugins + health_stat_plugins = plugin_manager.getPluginsOfCategory(HEALTH_STAT_PLUGIN) + health_stat_plugin = None + + # If there are any health stat reader plugins, load the first one and ignore the rest + if len(health_stat_plugins) > 0: + plugin_info = health_stat_plugins[0] + self.__log.debug("Found health statistics reader plugin [%s] at [%s]" % + (plugin_info.name, plugin_info.path)) + plugin_manager.activatePluginByName(plugin_info.name) + self.__log.info("Activated health statistics reader plugin [%s]" % plugin_info.name) + health_stat_plugin = plugin_info + + return grouped_ca_plugins, artifact_mgt_plugins, health_stat_plugin except ParameterNotFoundException as e: self.__log.exception("Could not load plugins. Plugins directory not set: %s" % e) return None, None http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/__init__.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/__init__.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/__init__.py deleted file mode 100644 index 13a8339..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/__init__.py +++ /dev/null @@ -1,16 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py deleted file mode 100644 index 4839aa8..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/abstracthealthstatisticspublisher.py +++ /dev/null @@ -1,63 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - - -class AbstractHealthStatisticsReader: - """ - Abstract class to implement to create a custom health stat reader - """ - - def stat_cartridge_health(self): - """ - Abstract method that when implemented reads the memory usage and the load average - of the instance running the agent and returns a CartridgeHealthStatistics object - with the information - - :return: CartridgeHealthStatistics object with memory usage and load average values - :rtype : CartridgeHealthStatistics - """ - raise NotImplementedError - - -class CartridgeHealthStatistics: - """ - Holds the memory usage and load average reading - """ - - def __init__(self): - self.memory_usage = None - """:type : float""" - self.load_avg = None - """:type : float""" - - -class CEPPublisherException(Exception): - """ - Exception to be used during CEP publishing operations - """ - - def __init__(self, msg): - Exception.__init__(self, msg) - self.message = msg - - def get_message(self): - """ - The message provided when the exception is raised - :return: message - :rtype: str - """ - return self.message http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py deleted file mode 100644 index 9753c3e..0000000 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/healthstatspublisher/healthstats.py +++ /dev/null @@ -1,261 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from threading import Thread -import multiprocessing - -import psutil - -from abstracthealthstatisticspublisher import * -from ..databridge.agent import * -from config import CartridgeAgentConfiguration -from ..util import cartridgeagentutils -from exception import ThriftReceiverOfflineException -import constants - - -class HealthStatisticsPublisherManager(Thread): - """ - Read from an implementation of AbstractHealthStatisticsPublisher the value for memory usage and - load average and publishes them as ThriftEvents to a CEP server - """ - STREAM_NAME = "cartridge_agent_health_stats" - STREAM_VERSION = "1.0.0" - STREAM_NICKNAME = "agent health stats" - STREAM_DESCRIPTION = "agent health stats" - - def __init__(self, publish_interval): - """ - Initializes a new HealthStatistsPublisherManager with a given number of seconds as the interval - :param int publish_interval: Number of seconds as the interval - :return: void - """ - Thread.__init__(self) - - self.log = LogFactory().get_log(__name__) - - self.publish_interval = publish_interval - """:type : int""" - - self.terminated = False - - self.publisher = HealthStatisticsPublisher() - """:type : HealthStatisticsPublisher""" - # TODO: load plugins for the reader - self.stats_reader = DefaultHealthStatisticsReader() - """:type : AbstractHealthStatisticsReader""" - - def run(self): - while not self.terminated: - time.sleep(self.publish_interval) - - try: - cartridge_stats = self.stats_reader.stat_cartridge_health() - self.log.debug("Publishing memory consumption: %r" % cartridge_stats.memory_usage) - self.publisher.publish_memory_usage(cartridge_stats.memory_usage) - - self.log.debug("Publishing load average: %r" % cartridge_stats.load_avg) - self.publisher.publish_load_average(cartridge_stats.load_avg) - except ThriftReceiverOfflineException: - self.log.error("Couldn't publish health statistics to CEP. Thrift Receiver offline. Reconnecting...") - try: - self.publisher = HealthStatisticsPublisher() - except: - self.log.error("Couldn't connect. CEP Offline.") - - self.publisher.publisher.disconnect() - - -class HealthStatisticsPublisher: - """ - Publishes memory usage and load average to thrift server - """ - log = LogFactory().get_log(__name__) - - def __init__(self): - - self.ports = [] - self.ports.append(CEPPublisherConfiguration.get_instance().server_port) - - self.cartridge_agent_config = CartridgeAgentConfiguration() - - cartridgeagentutils.wait_until_ports_active( - CEPPublisherConfiguration.get_instance().server_ip, - self.ports, - int(self.cartridge_agent_config.read_property("port.check.timeout", critical=False))) - cep_active = cartridgeagentutils.check_ports_active(CEPPublisherConfiguration.get_instance().server_ip, self.ports) - if not cep_active: - raise CEPPublisherException("CEP server not active. Health statistics publishing aborted.") - - self.stream_definition = HealthStatisticsPublisher.create_stream_definition() - HealthStatisticsPublisher.log.debug("Stream definition created: %r" % str(self.stream_definition)) - self.publisher = ThriftPublisher( - CEPPublisherConfiguration.get_instance().server_ip, - CEPPublisherConfiguration.get_instance().server_port, - CEPPublisherConfiguration.get_instance().admin_username, - CEPPublisherConfiguration.get_instance().admin_password, - self.stream_definition) - - HealthStatisticsPublisher.log.debug("HealthStatisticsPublisher initialized") - - @staticmethod - def create_stream_definition(): - """ - Create a StreamDefinition for publishing to CEP - """ - stream_def = StreamDefinition() - stream_def.name = HealthStatisticsPublisherManager.STREAM_NAME - stream_def.version = HealthStatisticsPublisherManager.STREAM_VERSION - stream_def.nickname = HealthStatisticsPublisherManager.STREAM_NICKNAME - stream_def.description = HealthStatisticsPublisherManager.STREAM_DESCRIPTION - - # stream_def.add_payloaddata_attribute() - stream_def.add_payloaddata_attribute("cluster_id", StreamDefinition.STRING) - stream_def.add_payloaddata_attribute("cluster_instance_id", StreamDefinition.STRING) - stream_def.add_payloaddata_attribute("network_partition_id", StreamDefinition.STRING) - stream_def.add_payloaddata_attribute("member_id", StreamDefinition.STRING) - stream_def.add_payloaddata_attribute("partition_id", StreamDefinition.STRING) - stream_def.add_payloaddata_attribute("health_description", StreamDefinition.STRING) - stream_def.add_payloaddata_attribute("value", StreamDefinition.DOUBLE) - - return stream_def - - def publish_memory_usage(self, memory_usage): - """ - Publishes the given memory usage value to the thrift server as a ThriftEvent - :param float memory_usage: memory usage - """ - - event = ThriftEvent() - event.payloadData.append(self.cartridge_agent_config.cluster_id) - event.payloadData.append(self.cartridge_agent_config.cluster_instance_id) - event.payloadData.append(self.cartridge_agent_config.network_partition_id) - event.payloadData.append(self.cartridge_agent_config.member_id) - event.payloadData.append(self.cartridge_agent_config.partition_id) - event.payloadData.append(constants.MEMORY_CONSUMPTION) - event.payloadData.append(float(memory_usage)) - - HealthStatisticsPublisher.log.debug("Publishing cep event: [stream] %r [payload_data} %r [version] %r" % (self.stream_definition.name,event.payloadData, self.stream_definition.version)) - self.publisher.publish(event) - - def publish_load_average(self, load_avg): - """ - Publishes the given load average value to the thrift server as a ThriftEvent - :param float load_avg: load average value - """ - - event = ThriftEvent() - event.payloadData.append(self.cartridge_agent_config.cluster_id) - event.payloadData.append(self.cartridge_agent_config.cluster_instance_id) - event.payloadData.append(self.cartridge_agent_config.network_partition_id) - event.payloadData.append(self.cartridge_agent_config.member_id) - event.payloadData.append(self.cartridge_agent_config.partition_id) - event.payloadData.append(constants.LOAD_AVERAGE) - event.payloadData.append(float(load_avg)) - - HealthStatisticsPublisher.log.debug("Publishing cep event: [stream] %r [version] %r" % (self.stream_definition.name, self.stream_definition.version)) - self.publisher.publish(event) - - -class DefaultHealthStatisticsReader(AbstractHealthStatisticsReader): - """ - Default implementation of the AbstractHealthStatisticsReader - """ - - def __init__(self): - self.log = LogFactory().get_log(__name__) - - def stat_cartridge_health(self): - cartridge_stats = CartridgeHealthStatistics() - cartridge_stats.memory_usage = DefaultHealthStatisticsReader.__read_mem_usage() - cartridge_stats.load_avg = DefaultHealthStatisticsReader.__read_load_avg() - - self.log.debug("Memory read: %r, CPU read: %r" % (cartridge_stats.memory_usage, cartridge_stats.load_avg)) - return cartridge_stats - - @staticmethod - def __read_mem_usage(): - return psutil.virtual_memory().percent - - @staticmethod - def __read_load_avg(): - (one, five, fifteen) = os.getloadavg() - cores = multiprocessing.cpu_count() - - return (one/cores) * 100 - - -class CEPPublisherConfiguration: - """ - TODO: Extract common functionality - """ - - __instance = None - log = LogFactory().get_log(__name__) - - @staticmethod - def get_instance(): - """ - Singleton instance retriever - :return: Instance - :rtype : CEPPublisherConfiguration - """ - if CEPPublisherConfiguration.__instance is None: - CEPPublisherConfiguration.__instance = CEPPublisherConfiguration() - - return CEPPublisherConfiguration.__instance - - def __init__(self): - self.enabled = False - self.server_ip = None - self.server_port = None - self.admin_username = None - self.admin_password = None - self.cartridge_agent_config = CartridgeAgentConfiguration() - - self.read_config() - - def read_config(self): - self.enabled = True if self.cartridge_agent_config.read_property( - constants.CEP_PUBLISHER_ENABLED, False).strip().lower() == "true" else False - if not self.enabled: - CEPPublisherConfiguration.log.info("CEP Publisher disabled") - return - - CEPPublisherConfiguration.log.info("CEP Publisher enabled") - - self.server_ip = self.cartridge_agent_config.read_property( - constants.CEP_RECEIVER_IP, False) - if self.server_ip is None or self.server_ip.strip() == "": - raise RuntimeError("System property not found: " + constants.CEP_RECEIVER_IP) - - self.server_port = self.cartridge_agent_config.read_property( - constants.CEP_RECEIVER_PORT, False) - if self.server_port is None or self.server_port.strip() == "": - raise RuntimeError("System property not found: " + constants.CEP_RECEIVER_PORT) - - self.admin_username = self.cartridge_agent_config.read_property( - constants.CEP_SERVER_ADMIN_USERNAME, False) - if self.admin_username is None or self.admin_username.strip() == "": - raise RuntimeError("System property not found: " + constants.CEP_SERVER_ADMIN_USERNAME) - - self.admin_password = self.cartridge_agent_config.read_property( - constants.CEP_SERVER_ADMIN_PASSWORD, False) - if self.admin_password is None or self.admin_password.strip() == "": - raise RuntimeError("System property not found: " + constants.CEP_SERVER_ADMIN_PASSWORD) - - CEPPublisherConfiguration.log.info("CEP Publisher configuration initialized") http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py index aea0e58..f94ae1d 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py @@ -18,7 +18,9 @@ import paho.mqtt.publish as publish from .. event.instance.status.events import * -from .. healthstatspublisher.healthstats import * +from .. util.log import * +from .. util import cartridgeagentutils +import healthstats import constants from config import CartridgeAgentConfiguration @@ -48,8 +50,16 @@ def publish_instance_started_event(): network_partition_id = CartridgeAgentConfiguration().network_partition_id partition_id = CartridgeAgentConfiguration().partition_id - instance_started_event = InstanceStartedEvent(application_id, service_name, cluster_id, cluster_instance_id, member_id, - instance_id, network_partition_id, partition_id) + instance_started_event = InstanceStartedEvent( + application_id, + service_name, + cluster_id, + cluster_instance_id, + member_id, + instance_id, + network_partition_id, + partition_id) + publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_STARTED_EVENT) publisher.publish(instance_started_event) started = True @@ -58,7 +68,7 @@ def publish_instance_started_event(): log.warn("Instance already started") -def publish_instance_activated_event(): +def publish_instance_activated_event(health_stat_plugin): global activated, log if not activated: # Wait for all ports to be active @@ -90,7 +100,10 @@ def publish_instance_activated_event(): log.info("Instance activated event published") log.info("Starting health statistics notifier") - if CEPPublisherConfiguration.get_instance().enabled: + health_stat_publishing_enabled = True if CartridgeAgentConfiguration().read_property( + constants.CEP_PUBLISHER_ENABLED, False).strip().lower() == "true" else False + + if health_stat_publishing_enabled: interval_default = 15 # seconds interval = CartridgeAgentConfiguration().read_property("stats.notifier.interval", False) if interval is not None and len(interval) > 0: @@ -101,7 +114,7 @@ def publish_instance_activated_event(): else: interval = interval_default - health_stats_publisher = HealthStatisticsPublisherManager(interval) + health_stats_publisher = healthstats.HealthStatisticsPublisherManager(interval, health_stat_plugin) log.info("Starting Health statistics publisher with interval %r" % interval) health_stats_publisher.start() else: @@ -110,7 +123,7 @@ def publish_instance_activated_event(): activated = True log.info("Health statistics notifier started") else: - log.error("Ports activation timed out. Aborting InstanceActivatedEvent publishing. [IPAddress] %s [Ports] %s" + log.error("Ports activation timed out. Aborting InstanceActivatedEvent publishing [IPAddress] %s [Ports] %s" % (listen_address, configuration__ports)) else: log.warn("Instance already activated") @@ -129,8 +142,14 @@ def publish_maintenance_mode_event(): network_partition_id = CartridgeAgentConfiguration().network_partition_id partition_id = CartridgeAgentConfiguration().partition_id - instance_maintenance_mode_event = InstanceMaintenanceModeEvent(service_name, cluster_id, cluster_instance_id, member_id, - instance_id, network_partition_id, partition_id) + instance_maintenance_mode_event = InstanceMaintenanceModeEvent( + service_name, + cluster_id, + cluster_instance_id, + member_id, + instance_id, + network_partition_id, + partition_id) publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_MAINTENANCE_MODE_EVENT) publisher.publish(instance_maintenance_mode_event) http://git-wip-us.apache.org/repos/asf/stratos/blob/81d5d34c/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/contracts.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/contracts.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/contracts.py index 040401b..39baddc 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/contracts.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/contracts.py @@ -19,15 +19,30 @@ from yapsy.IPlugin import IPlugin class ICartridgeAgentPlugin(IPlugin): + """ + To implement a Cartridge Agent plugin to be executed on a MB event + """ def run_plugin(self, values): raise NotImplementedError class IArtifactManagementPlugin(IPlugin): + """ + To implement an artifact management plugin to manage artifact distribution using a custom version control tool + """ def checkout(self): raise NotImplementedError def push(self): + raise NotImplementedError + + +class IHealthStatReaderPlugin(IPlugin): + """ + To implement a health statistics reader plugin to read health statistics using a custom factor + """ + + def stat_cartridge_health(self, health_stat): raise NotImplementedError \ No newline at end of file
