http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py index ea85d44..64021a3 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py @@ -14,6 +14,11 @@ class HealthStatisticsPublisherManager(Thread): Read from an implementation of AbstractHealthStatisticsPublisher the value for memory usage and load average and publishes them as ThriftEvents to a CEP server """ + STREAM_NAME = "cartridge_agent_health_stats" + STREAM_VERSION = "1.0.0" + STREAM_NICKNAME = "agent health stats" + STREAM_DESCRIPTION = "agent health stats" + def __init__(self, publish_interval): """ Initializes a new HealthStatistsPublisherManager with a given number of seconds as the interval @@ -54,7 +59,13 @@ class HealthStatisticsPublisher: self.log = LogFactory().get_log(__name__) self.ports = [] self.ports.append(CEPPublisherConfiguration.get_instance().server_port) - cartridgeagentutils.wait_until_ports_active(CEPPublisherConfiguration.get_instance().server_ip, self.ports) + + self.cartridge_agent_config = CartridgeAgentConfiguration() + + cartridgeagentutils.wait_until_ports_active( + CEPPublisherConfiguration.get_instance().server_ip, + self.ports, + int(self.cartridge_agent_config.read_property("port.check.timeout", critical=False))) cep_active = cartridgeagentutils.check_ports_active(CEPPublisherConfiguration.get_instance().server_ip, self.ports) if not cep_active: raise CEPPublisherException("CEP server not active. Health statistics publishing aborted.") @@ -74,17 +85,17 @@ class HealthStatisticsPublisher: Create a StreamDefinition for publishing to CEP """ stream_def = StreamDefinition() - stream_def.name = "cartridge_agent_health_stats" - stream_def.version = "1.0.0" - stream_def.nickname = "agent health stats" - stream_def.description = "agent health stats" - - stream_def.add_payloaddata_attribute("cluster_id", "STRING") - stream_def.add_payloaddata_attribute("network_partition_id", "STRING") - stream_def.add_payloaddata_attribute("member_id", "STRING") - stream_def.add_payloaddata_attribute("partition_id", "STRING") - stream_def.add_payloaddata_attribute("health_description", "STRING") - stream_def.add_payloaddata_attribute("value", "DOUBLE") + stream_def.name = HealthStatisticsPublisherManager.STREAM_NAME + stream_def.version = HealthStatisticsPublisherManager.STREAM_VERSION + stream_def.nickname = HealthStatisticsPublisherManager.STREAM_NICKNAME + stream_def.description = HealthStatisticsPublisherManager.STREAM_DESCRIPTION + + stream_def.add_payloaddata_attribute("cluster_id", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("network_partition_id", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("member_id", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("partition_id", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("health_description", StreamDefinition.STRING) + stream_def.add_payloaddata_attribute("value", StreamDefinition.DOUBLE) return stream_def @@ -95,10 +106,10 @@ class HealthStatisticsPublisher: """ event = ThriftEvent() - event.payloadData.append(CartridgeAgentConfiguration.cluster_id) - event.payloadData.append(CartridgeAgentConfiguration.network_partition_id) - event.payloadData.append(CartridgeAgentConfiguration.member_id) - event.payloadData.append(CartridgeAgentConfiguration.partition_id) + event.payloadData.append(self.cartridge_agent_config.cluster_id) + event.payloadData.append(self.cartridge_agent_config.network_partition_id) + event.payloadData.append(self.cartridge_agent_config.member_id) + event.payloadData.append(self.cartridge_agent_config.partition_id) event.payloadData.append(cartridgeagentconstants.MEMORY_CONSUMPTION) event.payloadData.append(memory_usage) @@ -112,10 +123,10 @@ class HealthStatisticsPublisher: """ event = ThriftEvent() - event.payloadData.append(CartridgeAgentConfiguration.cluster_id) - event.payloadData.append(CartridgeAgentConfiguration.network_partition_id) - event.payloadData.append(CartridgeAgentConfiguration.member_id) - event.payloadData.append(CartridgeAgentConfiguration.partition_id) + event.payloadData.append(self.cartridge_agent_config.cluster_id) + event.payloadData.append(self.cartridge_agent_config.network_partition_id) + event.payloadData.append(self.cartridge_agent_config.member_id) + event.payloadData.append(self.cartridge_agent_config.partition_id) event.payloadData.append(cartridgeagentconstants.LOAD_AVERAGE) event.payloadData.append(load_avg) @@ -173,28 +184,35 @@ class CEPPublisherConfiguration: self.admin_password = None self.read_config() + self.cartridge_agent_config = CartridgeAgentConfiguration() + def read_config(self): - self.enabled = True if CartridgeAgentConfiguration.read_property("cep.stats.publisher.enabled", False).strip().lower() == "true" else False + self.enabled = True if self.cartridge_agent_config.read_property( + cartridgeagentconstants.CEP_PUBLISHER_ENABLED, False).strip().lower() == "true" else False if not self.enabled: CEPPublisherConfiguration.log.info("CEP Publisher disabled") return CEPPublisherConfiguration.log.info("CEP Publisher enabled") - self.server_ip = CartridgeAgentConfiguration.read_property("thrift.receiver.ip", False) + self.server_ip = self.cartridge_agent_config.read_property( + cartridgeagentconstants.CEP_RECEIVER_IP, False) if self.server_ip.strip() == "": - raise RuntimeError("System property not found: thrift.receiver.ip") + raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_RECEIVER_IP) - self.server_port = CartridgeAgentConfiguration.read_property("thrift.receiver.port", False) + self.server_port = self.cartridge_agent_config.read_property( + cartridgeagentconstants.CEP_RECEIVER_PORT, False) if self.server_port.strip() == "": - raise RuntimeError("System property not found: thrift.receiver.port") + raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_RECEIVER_PORT) - self.admin_username = CartridgeAgentConfiguration.read_property("thrift.server.admin.username", False) + self.admin_username = self.cartridge_agent_config.read_property( + cartridgeagentconstants.CEP_SERVER_ADMIN_USERNAME, False) if self.admin_username.strip() == "": - raise RuntimeError("System property not found: thrift.server.admin.username") + raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_SERVER_ADMIN_USERNAME) - self.admin_password = CartridgeAgentConfiguration.read_property("thrift.server.admin.password", False) + self.admin_password = self.cartridge_agent_config.read_property( + cartridgeagentconstants.CEP_SERVER_ADMIN_PASSWORD, False) if self.admin_password.strip() == "": - raise RuntimeError("System property not found: thrift.server.admin.password") + raise RuntimeError("System property not found: " + cartridgeagentconstants.CEP_SERVER_ADMIN_PASSWORD) CEPPublisherConfiguration.log.info("CEP Publisher configuration initialized")
http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py b/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py index 260d67d..9b4d819 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py @@ -24,15 +24,15 @@ def publish_instance_started_event(): global started, log if not started: log.info("Publishing instance started event") - service_name = CartridgeAgentConfiguration.service_name - cluster_id = CartridgeAgentConfiguration.cluster_id - network_partition_id = CartridgeAgentConfiguration.network_partition_id - parition_id = CartridgeAgentConfiguration.partition_id - member_id = CartridgeAgentConfiguration.member_id + service_name = CartridgeAgentConfiguration().service_name + cluster_id = CartridgeAgentConfiguration().cluster_id + network_partition_id = CartridgeAgentConfiguration().network_partition_id + parition_id = CartridgeAgentConfiguration().partition_id + member_id = CartridgeAgentConfiguration().member_id instance_started_event = InstanceStartedEvent(service_name, cluster_id, network_partition_id, parition_id, member_id) - publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + "InstanceStartedEvent") + publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_STARTED_EVENT) publisher.publish(instance_started_event) started = True log.info("Instance started event published") @@ -44,15 +44,15 @@ def publish_instance_activated_event(): global activated, log if not activated: log.info("Publishing instance activated event") - service_name = CartridgeAgentConfiguration.service_name - cluster_id = CartridgeAgentConfiguration.cluster_id - network_partition_id = CartridgeAgentConfiguration.network_partition_id - parition_id = CartridgeAgentConfiguration.partition_id - member_id = CartridgeAgentConfiguration.member_id + service_name = CartridgeAgentConfiguration().service_name + cluster_id = CartridgeAgentConfiguration().cluster_id + network_partition_id = CartridgeAgentConfiguration().network_partition_id + parition_id = CartridgeAgentConfiguration().partition_id + member_id = CartridgeAgentConfiguration().member_id instance_activated_event = InstanceActivatedEvent(service_name, cluster_id, network_partition_id, parition_id, member_id) - publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + "InstanceActivatedEvent") + publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_ACTIVATED_EVENT) publisher.publish(instance_activated_event) log.info("Instance activated event published") @@ -60,7 +60,7 @@ def publish_instance_activated_event(): if CEPPublisherConfiguration.get_instance().enabled: interval_default = 15 # seconds - interval = CartridgeAgentConfiguration.read_property("stats.notifier.interval") + interval = CartridgeAgentConfiguration().read_property("stats.notifier.interval") if interval is not None and len(interval) > 0: try: interval = int(interval) @@ -85,16 +85,16 @@ def publish_maintenance_mode_event(): if not maintenance: log.info("Publishing instance maintenance mode event") - service_name = CartridgeAgentConfiguration.service_name - cluster_id = CartridgeAgentConfiguration.cluster_id - network_partition_id = CartridgeAgentConfiguration.network_partition_id - parition_id = CartridgeAgentConfiguration.partition_id - member_id = CartridgeAgentConfiguration.member_id + service_name = CartridgeAgentConfiguration().service_name + cluster_id = CartridgeAgentConfiguration().cluster_id + network_partition_id = CartridgeAgentConfiguration().network_partition_id + parition_id = CartridgeAgentConfiguration().partition_id + member_id = CartridgeAgentConfiguration().member_id instance_maintenance_mode_event = InstanceMaintenanceModeEvent(service_name, cluster_id, network_partition_id, parition_id, member_id) - publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + "InstanceMaintenanceModeEvent") + publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_MAINTENANCE_MODE_EVENT) publisher.publish(instance_maintenance_mode_event) maintenance = True @@ -108,16 +108,16 @@ def publish_instance_ready_to_shutdown_event(): if not ready_to_shutdown: log.info("Publishing instance activated event") - service_name = CartridgeAgentConfiguration.service_name - cluster_id = CartridgeAgentConfiguration.cluster_id - network_partition_id = CartridgeAgentConfiguration.network_partition_id - parition_id = CartridgeAgentConfiguration.partition_id - member_id = CartridgeAgentConfiguration.member_id + service_name = CartridgeAgentConfiguration().service_name + cluster_id = CartridgeAgentConfiguration().cluster_id + network_partition_id = CartridgeAgentConfiguration().network_partition_id + parition_id = CartridgeAgentConfiguration().partition_id + member_id = CartridgeAgentConfiguration().member_id instance_shutdown_event = InstanceReadyToShutdownEvent(service_name, cluster_id, network_partition_id, parition_id, member_id) - publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + "InstanceReadyToShutdownEvent") + publisher = get_publisher(cartridgeagentconstants.INSTANCE_STATUS_TOPIC + cartridgeagentconstants.INSTANCE_READY_TO_SHUTDOWN_EVENT) publisher.publish(instance_shutdown_event) ready_to_shutdown = True @@ -140,14 +140,8 @@ class EventPublisher: def __init__(self, topic): self.__topic = topic - """ - msgs = [{'topic': "instance/status/InstanceStartedEvent", 'payload': instance_started_event.to_JSON()}] - #publish.single("instance", instance_started_event.to_JSON(), hostname="localhost", port=1883) - publish.multiple(msgs, "localhost", 1883) - """ - def publish(self, event): - mb_ip = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MB_IP) - mb_port = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MB_PORT) + mb_ip = CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_IP) + mb_port = CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_PORT) payload = event.to_json() publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/subscriber/eventsubscriber.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/subscriber/eventsubscriber.py b/tools/python-cartridge-agent/cartridge-agent/modules/subscriber/eventsubscriber.py index 46fe18a..da60d7d 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/subscriber/eventsubscriber.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/subscriber/eventsubscriber.py @@ -1,11 +1,6 @@ -import logging import threading import paho.mqtt.client as mqtt -from .. util import cartridgeagentconstants -from .. config.cartridgeagentconfiguration import CartridgeAgentConfiguration -from .. util.log import LogFactory - class EventSubscriber(threading.Thread): """ @@ -13,7 +8,7 @@ class EventSubscriber(threading.Thread): register event handlers for various events. """ - def __init__(self, topic): + def __init__(self, topic, ip, port): threading.Thread.__init__(self) #{"ArtifactUpdateEvent" : onArtifactUpdateEvent()} @@ -27,16 +22,16 @@ class EventSubscriber(threading.Thread): self.__subscribed = False + self.__ip = ip + self.__port = port + def run(self): self.__mb_client = mqtt.Client() self.__mb_client.on_connect = self.on_connect self.__mb_client.on_message = self.on_message - mb_ip = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MB_IP) - mb_port = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MB_PORT) - - self.log.debug("Connecting to the message broker with address %r:%r" % (mb_ip, mb_port)) - self.__mb_client.connect(mb_ip, mb_port, 60) + self.log.debug("Connecting to the message broker with address %r:%r" % (self.__ip, self.__port)) + self.__mb_client.connect(self.__ip, self.__port, 60) self.__subscribed = True self.__mb_client.loop_forever() @@ -79,3 +74,6 @@ class EventSubscriber(threading.Thread): :rtype: bool """ return self.__subscribed + + +from .. util.log import LogFactory \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py b/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py index 5d92306..4a13765 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/topology/topologycontext.py @@ -247,8 +247,10 @@ class Cluster: return member_id in self.member_map def __str__(self): - return "Cluster [serviceName=%r, clusterId=%r, autoscalePolicyName=%r, deploymentPolicyName=%r, hostNames=%r, tenantRange=%r, isLbCluster=%r, properties=%r]" % \ - (self.service_name, self.cluster_id, self.autoscale_policy_name, self.deployment_policy_name, self.hostnames, self.tenant_range, self.is_lb_cluster, self.properties) + return "Cluster [serviceName=" + self.service_name + ", clusterId=" + self.cluster_id \ + + ", autoscalePolicyName=" + self.autoscale_policy_name + ", deploymentPolicyName=" \ + + self.deployment_policy_name + ", hostNames=" + self.hostnames + ", tenantRange=" + self.tenant_range \ + + ", isLbCluster=" + self.is_lb_cluster + ", properties=" + self.properties + "]" def tenant_id_in_range(self, tenant_id): """ http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentconstants.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentconstants.py b/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentconstants.py index 3d6dea1..7265c15 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentconstants.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentconstants.py @@ -5,15 +5,6 @@ EXTENSIONS_DIR = "extensions.dir" MB_IP = "mb.ip" MB_PORT = "mb.port" -INSTANCE_STARTED_SH = "instance-started.sh" -START_SERVERS_SH = "start-servers.sh" -INSTANCE_ACTIVATED_SH = "instance-activated.sh" -ARTIFACTS_UPDATED_SH = "artifacts-updated.sh" -CLEAN_UP_SH = "clean.sh" -MOUNT_VOLUMES_SH = "mount_volumes.sh" -SUBSCRIPTION_DOMAIN_ADDED_SH = "subscription-domain-added.sh" -SUBSCRIPTION_DOMAIN_REMOVED_SH = "subscription-domain-removed.sh" - CARTRIDGE_KEY = "CARTRIDGE_KEY" APP_PATH = "APP_PATH" SERVICE_GROUP = "SERIVCE_GROUP" @@ -29,6 +20,7 @@ PORTS = "PORTS" DEPLOYMENT = "DEPLOYMENT" MANAGER_SERVICE_TYPE = "MANAGER_SERVICE_TYPE" WORKER_SERVICE_TYPE = "WORKER_SERVICE_TYPE" +PERSISTENCE_MAPPING = "PERSISTENCE_MAPPING" # stratos.sh environment variables keys LOG_FILE_PATHS = "LOG_FILE_PATHS" @@ -87,6 +79,41 @@ TOPOLOGY_TOPIC = "topology/#" TENANT_TOPIC = "tenant/#" INSTANCE_STATUS_TOPIC = "instance/status/" - #Messaging Model -TENANT_RANGE_DELIMITER = "-" \ No newline at end of file +TENANT_RANGE_DELIMITER = "-" + +INSTANCE_STARTED_EVENT = "InstanceStartedEvent" +INSTANCE_ACTIVATED_EVENT = "InstanceActivatedEvent" +INSTANCE_MAINTENANCE_MODE_EVENT = "InstanceMaintenanceModeEvent" +INSTANCE_READY_TO_SHUTDOWN_EVENT = "InstanceReadyToShutdownEvent" + +PUBLISHER_SERVICE_NAME = "publisher" +APISTORE_SERVICE_NAME = "apistore" +APIMANAGER_SERVICE_NAME = "apim" +GATEWAY_SERVICE_NAME = "gatewaymgt" +GATEWAY_MGT_SERVICE_NAME = "gateway" +KEY_MANAGER_SERVICE_NAME = "keymanager" + +PRIMARY = "PRIMARY" +MIN_COUNT = "MIN_COUNT" + +#multi tenant constants +INVALID_TENANT_ID = "-1" +SUPER_TENANT_ID = "-1234" + +DATE_FORMAT = "%Y.%m.%d" + +PORT_CHECK_TIMEOUT = "port.check.timeout" + +CEP_PUBLISHER_ENABLED = "cep.stats.publisher.enabled" +CEP_RECEIVER_IP = "thrift.receiver.ip" +CEP_RECEIVER_PORT = "thrift.receiver.port" +CEP_SERVER_ADMIN_USERNAME = "thrift.server.admin.username" +CEP_SERVER_ADMIN_PASSWORD = "thrift.server.admin.password" + +MONITORING_PUBLISHER_ENABLED = "enable.data.publisher" +MONITORING_RECEIVER_IP = "monitoring.server.ip" +MONITORING_RECEIVER_PORT = "monitoring.server.port" +MONITORING_RECEIVER_SECURE_PORT = "monitoring.server.secure.port" +MONITORING_SERVER_ADMIN_USERNAME = "monitoring.server.admin.username" +MONITORING_SERVER_ADMIN_PASSWORD = "monitoring.server.admin.password" \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py b/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py index d9916da..81b2b92 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/util/cartridgeagentutils.py @@ -6,7 +6,6 @@ import time import socket import shutil -from .. config.cartridgeagentconfiguration import CartridgeAgentConfiguration import cartridgeagentconstants from log import LogFactory @@ -16,6 +15,16 @@ log = LogFactory().get_log(__name__) current_milli_time = lambda: int(round(time.time() * 1000)) +extension_handler = None + + +def get_extension_handler(): + global extension_handler + if extension_handler is None: + extension_handler = defaultextensionhandler.DefaultExtensionHandler() + + return extension_handler + def decrypt_password(pass_str, secret): """ @@ -75,14 +84,14 @@ def delete_folder_tree(path): log.exception("Deletion of folder path %r failed." % path) -def wait_until_ports_active(ip_address, ports): +def wait_until_ports_active(ip_address, ports, ports_check_timeout=600000): """ Blocks until the given list of ports become active :param str ip_address: Ip address of the member to be checked :param list[str] ports: List of ports to be checked + :param int ports_check_timeout: The timeout in milliseconds, defaults to 1000*60*10 :return: void """ - ports_check_timeout = CartridgeAgentConfiguration.read_property("port.check.timeout", critical=False) if ports_check_timeout is None: ports_check_timeout = 1000 * 60 * 10 @@ -160,4 +169,17 @@ def get_carbon_server_property(property_key): :rtype : str """ - raise NotImplementedError \ No newline at end of file + raise NotImplementedError + + +def get_working_dir(): + """ + Returns the base directory of the cartridge agent. + :return: Base working dir path + :rtype : str + """ + #"/path/to/cartridge-agent/modules/util/".split("modules") returns ["/path/to/cartridge-agent/", "/util"] + return os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + + +from ..extensions import defaultextensionhandler http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py b/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py index a694ff1..487def4 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/util/extensionutils.py @@ -3,17 +3,18 @@ import os import subprocess import time -from .. config.cartridgeagentconfiguration import CartridgeAgentConfiguration -from .. topology.topologycontext import * from log import LogFactory log = LogFactory().get_log(__name__) +cartridge_agent_config = cartridgeagentconfiguration.CartridgeAgentConfiguration() + def execute_copy_artifact_extension(source, destination): try: log.debug("Executing artifacts copy extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.ARTIFACTS_COPY_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.ARTIFACTS_COPY_SCRIPT, False) command = prepare_command(script_name) output, errors = execute_command(command + " " + source + " " + destination) @@ -26,7 +27,8 @@ def execute_instance_started_extension(env_params): try: log.debug("Executing instance started extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.INSTANCE_STARTED_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.INSTANCE_STARTED_SCRIPT, False) command = prepare_command(script_name) env_params = add_payload_parameters(env_params) env_params = clean_process_parameters(env_params) @@ -40,7 +42,8 @@ def execute_instance_started_extension(env_params): def execute_instance_activated_extension(): try: log.debug("Executing instance activated extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.INSTANCE_ACTIVATED_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.INSTANCE_ACTIVATED_SCRIPT, False) command = prepare_command(script_name) output, errors = execute_command(command) @@ -53,7 +56,8 @@ def execute_artifacts_updated_extension(env_params): try: log.debug("Executing artifacts updated extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.ARTIFACTS_UPDATED_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.ARTIFACTS_UPDATED_SCRIPT, False) command = prepare_command(script_name) env_params = add_payload_parameters(env_params) env_params = clean_process_parameters(env_params) @@ -68,7 +72,8 @@ def execute_subscription_domain_added_extension(env_params): try: log.debug("Executing subscription domain added extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.SUBSCRIPTION_DOMAIN_ADDED_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.SUBSCRIPTION_DOMAIN_ADDED_SCRIPT, False) command = prepare_command(script_name) env_params = add_payload_parameters(env_params) env_params = clean_process_parameters(env_params) @@ -83,7 +88,8 @@ def execute_subscription_domain_removed_extension(env_params): try: log.debug("Executing subscription domain removed extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.SUBSCRIPTION_DOMAIN_REMOVED_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.SUBSCRIPTION_DOMAIN_REMOVED_SCRIPT, False) command = prepare_command(script_name) env_params = add_payload_parameters(env_params) env_params = clean_process_parameters(env_params) @@ -98,7 +104,8 @@ def execute_start_servers_extension(env_params): try: log.debug("Executing start servers extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.START_SERVERS_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.START_SERVERS_SCRIPT, False) command = prepare_command(script_name) env_params = add_payload_parameters(env_params) env_params = clean_process_parameters(env_params) @@ -113,7 +120,8 @@ def execute_complete_topology_extension(env_params): try: log.debug("Executing complete topology extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.COMPLETE_TOPOLOGY_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.COMPLETE_TOPOLOGY_SCRIPT, False) command = prepare_command(script_name) env_params = add_payload_parameters(env_params) env_params = clean_process_parameters(env_params) @@ -128,7 +136,8 @@ def execute_complete_tenant_extension(env_params): try: log.debug("Executing complete tenant extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.COMPLETE_TENANT_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.COMPLETE_TENANT_SCRIPT, False) command = prepare_command(script_name) env_params = add_payload_parameters(env_params) env_params = clean_process_parameters(env_params) @@ -143,7 +152,8 @@ def execute_tenant_subscribed_extension(env_params): try: log.debug("Executing tenant subscribed extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.TENANT_SUBSCRIBED_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.TENANT_SUBSCRIBED_SCRIPT, False) command = prepare_command(script_name) env_params = add_payload_parameters(env_params) env_params = clean_process_parameters(env_params) @@ -158,7 +168,8 @@ def execute_tenant_unsubscribed_extension(env_params): try: log.debug("Executing tenant unsubscribed extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.TENANT_UNSUBSCRIBED_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.TENANT_UNSUBSCRIBED_SCRIPT, False) command = prepare_command(script_name) env_params = add_payload_parameters(env_params) env_params = clean_process_parameters(env_params) @@ -173,7 +184,8 @@ def execute_member_terminated_extension(env_params): try: log.debug("Executing member terminated extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MEMBER_TERMINATED_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.MEMBER_TERMINATED_SCRIPT, False) command = prepare_command(script_name) env_params = add_payload_parameters(env_params) env_params = clean_process_parameters(env_params) @@ -188,7 +200,8 @@ def execute_member_suspended_extension(env_params): try: log.debug("Executing member suspended extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MEMBER_SUSPENDED_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.MEMBER_SUSPENDED_SCRIPT, False) command = prepare_command(script_name) env_params = add_payload_parameters(env_params) env_params = clean_process_parameters(env_params) @@ -198,11 +211,13 @@ def execute_member_suspended_extension(env_params): except: log.exception("Could not execute member suspended extension") + def execute_member_started_extension(env_params): try: log.debug("Executing member started extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MEMBER_STARTED_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.MEMBER_STARTED_SCRIPT, False) command = prepare_command(script_name) env_params = add_payload_parameters(env_params) env_params = clean_process_parameters(env_params) @@ -240,7 +255,7 @@ def check_topology_consistency(service_name, cluster_id, member_id): def is_relevant_member_event(service_name, cluster_id, lb_cluster_id): - cluster_id_in_payload = CartridgeAgentConfiguration.cluster_id + cluster_id_in_payload = cartridge_agent_config.cluster_id if cluster_id_in_payload is None: return False @@ -254,7 +269,7 @@ def is_relevant_member_event(service_name, cluster_id, lb_cluster_id): if cluster_id_in_payload == lb_cluster_id: return True - service_group_in_payload = CartridgeAgentConfiguration.service_group + service_group_in_payload = cartridge_agent_config.service_group if service_group_in_payload is not None: service_properties = topology.get_service(service_name).properties if service_properties is None: @@ -262,19 +277,27 @@ def is_relevant_member_event(service_name, cluster_id, lb_cluster_id): member_service_group = service_properties[cartridgeagentconstants.SERVICE_GROUP_TOPOLOGY_KEY] if member_service_group is not None and member_service_group == service_group_in_payload: - if service_name == CartridgeAgentConfiguration.service_name: + if service_name == cartridge_agent_config.service_name: log.debug("Service names are same") return True - elif "apistore" == CartridgeAgentConfiguration.service_name and service_name == "publisher": + elif cartridgeagentconstants.APISTORE_SERVICE_NAME == \ + cartridge_agent_config.service_name \ + and service_name == cartridgeagentconstants.PUBLISHER_SERVICE_NAME: log.debug("Service name in payload is [store]. Serivce name in event is [%r] " % service_name) return True - elif "publisher" == CartridgeAgentConfiguration.service_name and service_name == "apistore": + elif cartridgeagentconstants.PUBLISHER_SERVICE_NAME == \ + cartridge_agent_config.service_name \ + and service_name == cartridgeagentconstants.APISTORE_SERVICE_NAME: log.debug("Service name in payload is [publisher]. Serivce name in event is [%r] " % service_name) return True - elif cartridgeagentconstants.DEPLOYMENT_WORKER == CartridgeAgentConfiguration.deployment and service_name == CartridgeAgentConfiguration.manager_service_name: + elif cartridgeagentconstants.DEPLOYMENT_WORKER == \ + cartridge_agent_config.deployment \ + and service_name == cartridge_agent_config.manager_service_name: log.debug("Deployment is worker. Worker's manager service name & service name in event are same") return True - elif cartridgeagentconstants.DEPLOYMENT_MANAGER == CartridgeAgentConfiguration.deployment and service_name == CartridgeAgentConfiguration.worker_service_name: + elif cartridgeagentconstants.DEPLOYMENT_MANAGER == \ + cartridge_agent_config.deployment \ + and service_name == cartridge_agent_config.worker_service_name: log.debug("Deployment is manager. Manager's worker service name & service name in event are same") return True @@ -284,7 +307,8 @@ def is_relevant_member_event(service_name, cluster_id, lb_cluster_id): def execute_volume_mount_extension(persistance_mappings_payload): try: log.debug("Executing volume mounting extension: [payload] %r" % persistance_mappings_payload) - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MOUNT_VOLUMES_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.MOUNT_VOLUMES_SCRIPT, False) command = prepare_command(script_name) output, errors = execute_command(command + " " + persistance_mappings_payload) @@ -296,7 +320,8 @@ def execute_volume_mount_extension(persistance_mappings_payload): def execute_cleanup_extension(): try: log.debug("Executing cleanup extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.CLEAN_UP_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.CLEAN_UP_SCRIPT, False) command = prepare_command(script_name) output, errors = execute_command(command) @@ -309,7 +334,8 @@ def execute_member_activated_extension(env_params): try: log.debug("Executing member activated extension") - script_name = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.MEMBER_ACTIVATED_SCRIPT, False) + script_name = cartridge_agent_config.read_property( + cartridgeagentconstants.MEMBER_ACTIVATED_SCRIPT, False) command = prepare_command(script_name) env_params = add_payload_parameters(env_params) env_params = clean_process_parameters(env_params) @@ -321,11 +347,13 @@ def execute_member_activated_extension(env_params): def prepare_command(script_name): - extensions_dir = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.EXTENSIONS_DIR, False) + extensions_dir = cartridge_agent_config.read_property( + cartridgeagentconstants.EXTENSIONS_DIR, False) if extensions_dir.strip() == "": raise RuntimeError("System property not found: %r" % cartridgeagentconstants.EXTENSIONS_DIR) - file_path = extensions_dir + script_name if str(extensions_dir).endswith("/") else extensions_dir + "/" + script_name + file_path = extensions_dir + script_name if str(extensions_dir).endswith("/") \ + else extensions_dir + "/" + script_name if os.path.isfile(file_path): return file_path @@ -354,32 +382,35 @@ def add_payload_parameters(env_params): :return: Dictionary with updated parameters :rtype: dict[str, str] """ - env_params["STRATOS_APP_PATH"] = CartridgeAgentConfiguration.app_path - env_params["STRATOS_PARAM_FILE_PATH"] = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.PARAM_FILE_PATH, False) - env_params["STRATOS_SERVICE_NAME"] = CartridgeAgentConfiguration.service_name - env_params["STRATOS_TENANT_ID"] = CartridgeAgentConfiguration.tenant_id - env_params["STRATOS_CARTRIDGE_KEY"] = CartridgeAgentConfiguration.cartridge_key - env_params["STRATOS_LB_CLUSTER_ID"] = CartridgeAgentConfiguration.lb_cluster_id - env_params["STRATOS_CLUSTER_ID"] = CartridgeAgentConfiguration.cluster_id - env_params["STRATOS_NETWORK_PARTITION_ID"] = CartridgeAgentConfiguration.network_partition_id - env_params["STRATOS_PARTITION_ID"] = CartridgeAgentConfiguration.partition_id - env_params["STRATOS_PERSISTENCE_MAPPINGS"] = CartridgeAgentConfiguration.persistence_mappings - env_params["STRATOS_REPO_URL"] = CartridgeAgentConfiguration.repo_url - - lb_cluster_id_in_payload = CartridgeAgentConfiguration.lb_cluster_id + env_params["STRATOS_APP_PATH"] = cartridge_agent_config.app_path + env_params["STRATOS_PARAM_FILE_PATH"] = cartridge_agent_config.read_property( + cartridgeagentconstants.PARAM_FILE_PATH, False) + env_params["STRATOS_SERVICE_NAME"] = cartridge_agent_config.service_name + env_params["STRATOS_TENANT_ID"] = cartridge_agent_config.tenant_id + env_params["STRATOS_CARTRIDGE_KEY"] = cartridge_agent_config.cartridge_key + env_params["STRATOS_LB_CLUSTER_ID"] = cartridge_agent_config.lb_cluster_id + env_params["STRATOS_CLUSTER_ID"] = cartridge_agent_config.cluster_id + env_params["STRATOS_NETWORK_PARTITION_ID"] = \ + cartridge_agent_config.network_partition_id + env_params["STRATOS_PARTITION_ID"] = cartridge_agent_config.partition_id + env_params["STRATOS_PERSISTENCE_MAPPINGS"] = \ + cartridge_agent_config.persistence_mappings + env_params["STRATOS_REPO_URL"] = cartridge_agent_config.repo_url + + lb_cluster_id_in_payload = cartridge_agent_config.lb_cluster_id member_ips = get_lb_member_ip(lb_cluster_id_in_payload) if member_ips is not None: env_params["STRATOS_LB_IP"] = member_ips[0] env_params["STRATOS_LB_PUBLIC_IP"] = member_ips[1] else: - env_params["STRATOS_LB_IP"] = CartridgeAgentConfiguration.lb_private_ip - env_params["STRATOS_LB_PUBLIC_IP"] = CartridgeAgentConfiguration.lb_public_ip + env_params["STRATOS_LB_IP"] = cartridge_agent_config.lb_private_ip + env_params["STRATOS_LB_PUBLIC_IP"] = cartridge_agent_config.lb_public_ip topology = TopologyContext.get_topology() if topology.initialized: - service = topology.get_service(CartridgeAgentConfiguration.service_name) - cluster = service.get_cluster(CartridgeAgentConfiguration.cluster_id) - member_id_in_payload = CartridgeAgentConfiguration.member_id + service = topology.get_service(cartridge_agent_config.service_name) + cluster = service.get_cluster(cartridge_agent_config.cluster_id) + member_id_in_payload = cartridge_agent_config.member_id add_properties(service.properties, env_params, "SERVICE_PROPERTY") add_properties(cluster.properties, env_params, "CLUSTER_PROPERTY") add_properties(cluster.get_member(member_id_in_payload).properties, env_params, "MEMBER_PROPERTY") @@ -439,3 +470,7 @@ def execute_command(command, env_params=None): raise RuntimeError("Command execution failed: \n %r" % errors) return output, errors + + +from .. config import cartridgeagentconfiguration +from .. topology.topologycontext import * \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/util/log.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/util/log.py b/tools/python-cartridge-agent/cartridge-agent/modules/util/log.py index 83b1f50..b6fec95 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/util/log.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/util/log.py @@ -10,7 +10,7 @@ class LogFactory(object): class __LogFactory: def __init__(self): self.logs = {} - logging_conf = os.path.join(os.path.dirname(__file__), "logging.ini") + logging_conf = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "logging.ini" logging.config.fileConfig(logging_conf) def get_log(self, name):
