Added new properties to the agent.conf Refactored modules and dependencies to avoid circular dependencies Added singleton to CartridgeAgentConfiguration by overriding __new__ Refactored hard coded strings to constants file Cleaned up files Fixed info log level to INFO
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/511aedfa Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/511aedfa Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/511aedfa Branch: refs/heads/master Commit: 511aedfa7bf93c83ec1eebcc39fe544504bbdb04 Parents: b5ed6f0 Author: Chamila de Alwis <[email protected]> Authored: Fri Oct 3 03:06:50 2014 +0530 Committer: Chamila de Alwis <[email protected]> Committed: Thu Oct 9 15:41:09 2014 +0530 ---------------------------------------------------------------------- .../cartridge-agent/agent.conf | 9 +- .../cartridge-agent/agent.py | 62 +- .../cartridge-agent/logging.ini | 2 +- .../modules/artifactmgt/git/agentgithandler.py | 33 +- .../config/cartridgeagentconfiguration.py | 571 +++++++++---------- .../cartridge-agent/modules/databridge/agent.py | 7 + .../modules/datapublisher/logpublisher.py | 74 ++- .../extensions/abstractextensionhandler.py | 6 +- .../extensions/defaultextensionhandler.py | 175 +++--- .../modules/healthstatspublisher/healthstats.py | 76 ++- .../publisher/cartridgeagentpublisher.py | 60 +- .../modules/subscriber/eventsubscriber.py | 20 +- .../modules/topology/topologycontext.py | 6 +- .../modules/util/cartridgeagentconstants.py | 49 +- .../modules/util/cartridgeagentutils.py | 30 +- .../modules/util/extensionutils.py | 127 +++-- .../cartridge-agent/modules/util/log.py | 2 +- 17 files changed, 711 insertions(+), 598 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/agent.conf ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/agent.conf b/tools/python-cartridge-agent/cartridge-agent/agent.conf index f9724fc..0161b7d 100644 --- a/tools/python-cartridge-agent/cartridge-agent/agent.conf +++ b/tools/python-cartridge-agent/cartridge-agent/agent.conf @@ -4,11 +4,9 @@ mb.port =MB-PORT listen.address =localhost thrift.receiver.ip =CEP-IP thrift.receiver.port =CEP-PORT -jndi.properties.template.file.path =${script_path}/../conf/templates/jndi.properties.template -jndi.properties.dir =${script_path}/../conf -log4j.configuration =file://${script_path}/../conf/log4j.properties -param.file.path =${script_path}/../payload/launch-params -extensions.dir =${script_path}/../extensions +agent.root = +param.file.path =/tmp/payload/launch-params +extensions.dir =/extensions cep.stats.publisher.enabled =true lb.private.ip = lb.public.ip = @@ -18,6 +16,7 @@ enable.artifact.update =true auto.commit =false auto.checkout =true artifact.update.interval =15 +port.check.timeout =600 enable.data.publisher =ENABLE-DATA-PUBLISHER monitoring.server.ip =MONITORING-SERVER-IP monitoring.server.port =MONITORING-SERVER-PORT http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/agent.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/agent.py b/tools/python-cartridge-agent/cartridge-agent/agent.py index ecc8c45..4858b9d 100644 --- a/tools/python-cartridge-agent/cartridge-agent/agent.py +++ b/tools/python-cartridge-agent/cartridge-agent/agent.py @@ -1,12 +1,8 @@ #!/usr/bin/env python -import logging import threading -import time -from modules.config.cartridgeagentconfiguration import CartridgeAgentConfiguration from modules.exception.parameternotfoundexception import ParameterNotFoundException -from modules.subscriber.eventsubscriber import EventSubscriber -from modules.extensions.defaultextensionhandler import DefaultExtensionHandler +from modules.subscriber import eventsubscriber from modules.publisher import cartridgeagentpublisher from modules.event.instance.notifier.events import * from modules.event.tenant.events import * @@ -14,7 +10,7 @@ from modules.event.topology.events import * from modules.tenant.tenantcontext import * from modules.topology.topologycontext import * from modules.datapublisher.logpublisher import * -from modules.extensions.abstractextensionhandler import * +from modules.config import cartridgeagentconfiguration class CartridgeAgent(threading.Thread): @@ -22,11 +18,22 @@ class CartridgeAgent(threading.Thread): def __init__(self): threading.Thread.__init__(self) - CartridgeAgentConfiguration.initialize_configuration() - self.__instance_event_subscriber = EventSubscriber(cartridgeagentconstants.INSTANCE_NOTIFIER_TOPIC) - self.__tenant_event_subscriber = EventSubscriber(cartridgeagentconstants.TENANT_TOPIC) - self.__topology_event_subscriber = EventSubscriber(cartridgeagentconstants.TOPOLOGY_TOPIC) + mb_ip = cartridgeagentconfiguration.CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_IP) + mb_port = cartridgeagentconfiguration.CartridgeAgentConfiguration().read_property(cartridgeagentconstants.MB_PORT) + + self.__instance_event_subscriber = eventsubscriber.EventSubscriber( + cartridgeagentconstants.INSTANCE_NOTIFIER_TOPIC, + mb_ip, + mb_port) + self.__tenant_event_subscriber = eventsubscriber.EventSubscriber( + cartridgeagentconstants.TENANT_TOPIC, + mb_ip, + mb_port) + self.__topology_event_subscriber = eventsubscriber.EventSubscriber( + cartridgeagentconstants.TOPOLOGY_TOPIC, + mb_ip, + mb_port) self.__tenant_context_initialized = False self.__topology_context_initialized = False @@ -37,6 +44,8 @@ class CartridgeAgent(threading.Thread): self.log = LogFactory().get_log(__name__) + self.cartridge_agent_config = CartridgeAgentConfiguration() + def run(self): self.log.info("Starting Cartridge Agent...") @@ -66,25 +75,26 @@ class CartridgeAgent(threading.Thread): #Wait for all ports to be active cartridgeagentutils.wait_until_ports_active( - CartridgeAgentConfiguration.listen_address, - CartridgeAgentConfiguration.ports + self.cartridge_agent_config.listen_address, + self.cartridge_agent_config.ports, + int(self.cartridge_agent_config.read_property("port.check.timeout", critical=False)) ) # check if artifact management is required before publishing instance activated event - repo_url = CartridgeAgentConfiguration.repo_url + repo_url = self.cartridge_agent_config.repo_url if repo_url is None or str(repo_url).strip() == "": self.log.info("No artifact repository found") CartridgeAgent.extension_handler.on_instance_activated_event() cartridgeagentpublisher.publish_instance_activated_event() - persistence_mappping_payload = CartridgeAgentConfiguration.persistence_mappings + persistence_mappping_payload = self.cartridge_agent_config.persistence_mappings if persistence_mappping_payload is not None: CartridgeAgent.extension_handler.volume_mount_extension(persistence_mappping_payload) # start log publishing thread if DataPublisherConfiguration.get_instance().enabled: - log_file_paths = CartridgeAgentConfiguration.log_file_paths + log_file_paths = self.cartridge_agent_config.log_file_paths if log_file_paths is None: self.log.exception("No valid log file paths found, no logs will be published") else: @@ -112,21 +122,21 @@ class CartridgeAgent(threading.Thread): """ # JNDI_PROPERTIES_DIR try: - CartridgeAgentConfiguration.read_property(cartridgeagentconstants.JNDI_PROPERTIES_DIR) + self.cartridge_agent_config.read_property(cartridgeagentconstants.JNDI_PROPERTIES_DIR) except ParameterNotFoundException: self.log.error("System property not found: %r" % cartridgeagentconstants.JNDI_PROPERTIES_DIR) return #PARAM_FILE_PATH try: - CartridgeAgentConfiguration.read_property(cartridgeagentconstants.PARAM_FILE_PATH) + self.cartridge_agent_config.read_property(cartridgeagentconstants.PARAM_FILE_PATH) except ParameterNotFoundException: self.log.error("System property not found: %r" % cartridgeagentconstants.PARAM_FILE_PATH) return #EXTENSIONS_DIR try: - CartridgeAgentConfiguration.read_property(cartridgeagentconstants.EXTENSIONS_DIR) + self.cartridge_agent_config.read_property(cartridgeagentconstants.EXTENSIONS_DIR) except ParameterNotFoundException: self.log.error("System property not found: %r" % cartridgeagentconstants.EXTENSIONS_DIR) return @@ -150,7 +160,7 @@ class CartridgeAgent(threading.Thread): CartridgeAgent.extension_handler.on_artifact_updated_event(event_obj) def on_instance_cleanup_member(self, msg): - member_in_payload = CartridgeAgentConfiguration.member_id + member_in_payload = self.cartridge_agent_config.member_id event_obj = InstanceCleanupMemberEvent.create_from_json(msg.payload) member_in_event = event_obj.member_id if member_in_payload == member_in_event: @@ -158,7 +168,7 @@ class CartridgeAgent(threading.Thread): def on_instance_cleanup_cluster(self, msg): event_obj = InstanceCleanupClusterEvent.create_from_json(msg.payload) - cluster_in_payload = CartridgeAgentConfiguration.cluster_id + cluster_in_payload = self.cartridge_agent_config.cluster_id cluster_in_event = event_obj.cluster_id if cluster_in_event == cluster_in_payload: @@ -277,18 +287,6 @@ class CartridgeAgent(threading.Thread): CartridgeAgent.extension_handler.on_tenant_unsubscribed_event(event_obj) except: self.log.exception("Error processing tenant unSubscribed event") - - @staticmethod - def get_extension_handler(): - """ - Returns the Extension handler implementation - :return: An implmentation of AbstractExtensionHandler - :rtype : AbstractExtensionHandler - """ - if CartridgeAgent.extension_handler is None: - CartridgeAgent.extension_handler = DefaultExtensionHandler() - - return CartridgeAgent.extension_handler def main(): http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/logging.ini ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/logging.ini b/tools/python-cartridge-agent/cartridge-agent/logging.ini index cf6bc7d..3fd9381 100644 --- a/tools/python-cartridge-agent/cartridge-agent/logging.ini +++ b/tools/python-cartridge-agent/cartridge-agent/logging.ini @@ -15,7 +15,7 @@ args=tuple() [handler_log_file] class=logging.FileHandler -level=ERROR +level=INFO formatter=default args=("agent.log", "w") http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/agentgithandler.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/agentgithandler.py b/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/agentgithandler.py index d192e75..be46c5d 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/agentgithandler.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/artifactmgt/git/agentgithandler.py @@ -1,17 +1,10 @@ -import logging from threading import current_thread, Thread -import os from git import * from gittle import Gittle, GittleAuth # GitPython and Gittle are both used at the time being for pros and cons of both import urllib2 -from gitrepository import GitRepository -from ... config.cartridgeagentconfiguration import CartridgeAgentConfiguration -from ... util import cartridgeagentutils, extensionutils, cartridgeagentconstants -from ... util.asyncscheduledtask import AsyncScheduledTask -from ... artifactmgt.repositoryinformation import RepositoryInformation from ... util.log import LogFactory -from ....agent import CartridgeAgent +from ... util import cartridgeagentutils, extensionutils, cartridgeagentconstants class AgentGitHandler: @@ -25,11 +18,13 @@ class AgentGitHandler: SUPER_TENANT_REPO_PATH = "/repository/deployment/server/" TENANT_REPO_PATH = "/repository/tenants/" - extension_handler = CartridgeAgent.get_extension_handler() + extension_handler = cartridgeagentutils.get_extension_handler() __git_repositories = {} # (tenant_id => gitrepository.GitRepository) + cartridge_agent_config = cartridgeagentconfiguration.CartridgeAgentConfiguration() + @staticmethod def checkout(repo_info): """ @@ -89,13 +84,15 @@ class AgentGitHandler: repo_context.repo.git.branch("-f", "--track", "master", "origin/master") return True except: - AgentGitHandler.log.exception("Error in adding remote origin %r for local repository %r" % (repo_context.repo_url, repo_context.local_repo_path)) + AgentGitHandler.log.exception("Error in adding remote origin %r for local repository %r" + % (repo_context.repo_url, repo_context.local_repo_path)) return False @staticmethod def init(path): try: repo = Gittle.init(path) + return repo except: AgentGitHandler.log.exception("Initializing local repo at %r failed" % path) raise Exception("Initializing local repo at %r failed" % path) @@ -326,7 +323,7 @@ class AgentGitHandler: if is_multitenant: if tenant_id == AgentGitHandler.SUPER_TENANT_ID: #super tenant, /repository/deploy/server/ - super_tenant_repo_path = CartridgeAgentConfiguration.super_tenant_repository_path + super_tenant_repo_path = AgentGitHandler.cartridge_agent_config.super_tenant_repository_path #"app_path" repo_path += git_local_repo_path @@ -341,7 +338,7 @@ class AgentGitHandler: else: #normal tenant, /repository/tenants/tenant_id - tenant_repo_path = CartridgeAgentConfiguration.tenant_repository_path + tenant_repo_path = AgentGitHandler.cartridge_agent_config.tenant_repository_path #"app_path" repo_path += git_local_repo_path @@ -451,10 +448,10 @@ class AgentGitHandler: AgentGitHandler.remove_repo_context(tenant_id) if tenant_id == -1234: - if CartridgeAgentConfiguration.is_multitenant: + if AgentGitHandler.cartridge_agent_config.is_multitenant: extensionutils.execute_copy_artifact_extension( cartridgeagentconstants.SUPERTENANT_TEMP_PATH, - CartridgeAgentConfiguration.app_path + "/repository/deployment/server/" + AgentGitHandler.cartridge_agent_config.app_path + "/repository/deployment/server/" ) AgentGitHandler.log.info("git repository deleted for tenant %r" % repo_context.tenant_id) @@ -476,7 +473,13 @@ class ArtifactUpdateTask(Thread): if self.auto_checkout: AgentGitHandler.checkout(self.repo_info) except: - self.log.exception() + self.log.exception("Auto checkout task failed") if self.auto_commit: AgentGitHandler.commit(self.repo_info) + + +from gitrepository import GitRepository +from ... config import cartridgeagentconfiguration +from ... util.asyncscheduledtask import AsyncScheduledTask +from ... artifactmgt.repositoryinformation import RepositoryInformation \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/config/cartridgeagentconfiguration.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/config/cartridgeagentconfiguration.py b/tools/python-cartridge-agent/cartridge-agent/modules/config/cartridgeagentconfiguration.py index c0e5ea6..27c9a89 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/config/cartridgeagentconfiguration.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/config/cartridgeagentconfiguration.py @@ -2,315 +2,304 @@ import ConfigParser import logging import os -from ..util import cartridgeagentconstants from ..util.log import LogFactory -from ..exception.parameternotfoundexception import ParameterNotFoundException class CartridgeAgentConfiguration: """ Handles the configuration information of the particular Cartridge Agent """ - # set log level - log = LogFactory().get_log(__name__) - - payload_params = {} - properties = None - - service_group = None - """ :type : str """ - is_clustered = False - """ :type : bool """ - service_name = None - """ :type : str """ - cluster_id = None - """ :type : str """ - network_partition_id = None - """ :type : str """ - partition_id = None - """ :type : str """ - member_id = None - """ :type : str """ - cartridge_key = None - """ :type : str """ - app_path = None - """ :type : str """ - repo_url = None - """ :type : str """ - ports = [] - """ :type : list[str] """ - log_file_paths = [] - """ :type : list[str] """ - is_multitenant = False - """ :type : bool """ - persistence_mappings = None - """ :type : str """ - is_commits_enabled = False - """ :type : bool """ - is_checkout_enabled = False - """ :type : bool """ - listen_address = None - """ :type : str """ - is_internal_repo = False - """ :type : bool """ - tenant_id = None - """ :type : str """ - lb_cluster_id = None - """ :type : str """ - min_count = None - """ :type : str """ - lb_private_ip = None - """ :type : str """ - lb_public_ip = None - """ :type : str """ - tenant_repository_path = None - """ :type : str """ - super_tenant_repository_path = None - """ :type : str """ - deployment = None - """ :type : str """ - manager_service_name = None - """ :type : str """ - worker_service_name = None - """ :type : str """ - is_primary = False - """ :type : bool """ - - @staticmethod - def initialize_configuration(): - """ - Initializes the configuration by reading and parsing properties - from configuration file and payload parameter file - :return: void - """ - - CartridgeAgentConfiguration.payload_params = {} - CartridgeAgentConfiguration.__read_conf_file() - CartridgeAgentConfiguration.__read_parameter_file() - - try: - CartridgeAgentConfiguration.service_group = CartridgeAgentConfiguration.payload_params[ - cartridgeagentconstants.SERVICE_GROUP] \ - if cartridgeagentconstants.SERVICE_GROUP in CartridgeAgentConfiguration.payload_params \ - else None - - if cartridgeagentconstants.CLUSTERING in CartridgeAgentConfiguration.payload_params and \ - str(CartridgeAgentConfiguration.payload_params[ - cartridgeagentconstants.CLUSTERING]).strip().lower() == "true": - CartridgeAgentConfiguration.is_clustered = True - else: - CartridgeAgentConfiguration.is_clustered = False - # CartridgeAgentConfiguration.__isClustered = CartridgeAgentConfiguration.payload_params[ - # cartridgeagentconstants.CLUSTERING] if cartridgeagentconstants.CLUSTERING in CartridgeAgentConfiguration.payload_params else None - - CartridgeAgentConfiguration.service_name = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.SERVICE_NAME) - CartridgeAgentConfiguration.cluster_id = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.CLUSTER_ID) - CartridgeAgentConfiguration.network_partition_id = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.NETWORK_PARTITION_ID) - CartridgeAgentConfiguration.partition_id = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.PARTITION_ID) - CartridgeAgentConfiguration.member_id = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.MEMBER_ID) - CartridgeAgentConfiguration.cartridge_key = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.CARTRIDGE_KEY) - CartridgeAgentConfiguration.app_path = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.APP_PATH) - CartridgeAgentConfiguration.repo_url = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.REPO_URL) - CartridgeAgentConfiguration.ports = str( - CartridgeAgentConfiguration.read_property(cartridgeagentconstants.PORTS)).split("|") + class __CartridgeAgentConfiguration: + def __init__(self): + # set log level + self.log = LogFactory().get_log(__name__) + + self.payload_params = {} + self.properties = None + + self.service_group = None + """ :type : str """ + self.is_clustered = False + """ :type : bool """ + self.service_name = None + """ :type : str """ + self.cluster_id = None + """ :type : str """ + self.network_partition_id = None + """ :type : str """ + self.partition_id = None + """ :type : str """ + self.member_id = None + """ :type : str """ + self.cartridge_key = None + """ :type : str """ + self.app_path = None + """ :type : str """ + self.repo_url = None + """ :type : str """ + self.ports = [] + """ :type : list[str] """ + self.log_file_paths = [] + """ :type : list[str] """ + self.is_multitenant = False + """ :type : bool """ + self.persistence_mappings = None + """ :type : str """ + self.is_commits_enabled = False + """ :type : bool """ + self.is_checkout_enabled = False + """ :type : bool """ + self.listen_address = None + """ :type : str """ + self.is_internal_repo = False + """ :type : bool """ + self.tenant_id = None + """ :type : str """ + self.lb_cluster_id = None + """ :type : str """ + self.min_count = None + """ :type : str """ + self.lb_private_ip = None + """ :type : str """ + self.lb_public_ip = None + """ :type : str """ + self.tenant_repository_path = None + """ :type : str """ + self.super_tenant_repository_path = None + """ :type : str """ + self.deployment = None + """ :type : str """ + self.manager_service_name = None + """ :type : str """ + self.worker_service_name = None + """ :type : str """ + self.is_primary = False + """ :type : bool """ + + self.payload_params = {} + self.__read_conf_file() + self.__read_parameter_file() try: - CartridgeAgentConfiguration.log_file_paths = str( - CartridgeAgentConfiguration.read_property(cartridgeagentconstants.CLUSTER_ID)).strip().split("|") - except ParameterNotFoundException as ex: - CartridgeAgentConfiguration.log.debug("Cannot read log file path : %r" % ex.get_message()) - CartridgeAgentConfiguration.log_file_paths = None + service_group = self.payload_params[cartridgeagentconstants.SERVICE_GROUP] \ + if cartridgeagentconstants.SERVICE_GROUP in self.payload_params \ + else None + + if cartridgeagentconstants.CLUSTERING in self.payload_params and \ + str(self.payload_params[cartridgeagentconstants.CLUSTERING]).strip().lower() == "true": + self.is_clustered = True + else: + self.is_clustered = False + # self.__isClustered = self.payload_params[ + # cartridgeagentconstants.CLUSTERING] if cartridgeagentconstants.CLUSTERING in self.payload_params else None + + self.service_name = self.read_property(cartridgeagentconstants.SERVICE_NAME) + self.cluster_id = self.read_property(cartridgeagentconstants.CLUSTER_ID) + self.network_partition_id = self.read_property(cartridgeagentconstants.NETWORK_PARTITION_ID) + self.partition_id = self.read_property(cartridgeagentconstants.PARTITION_ID) + self.member_id = self.read_property(cartridgeagentconstants.MEMBER_ID) + self.cartridge_key = self.read_property(cartridgeagentconstants.CARTRIDGE_KEY) + self.app_path = self.read_property(cartridgeagentconstants.APP_PATH) + self.repo_url = self.read_property(cartridgeagentconstants.REPO_URL) + self.ports = str(self.read_property(cartridgeagentconstants.PORTS)).split("|") - is_multi_str = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.CLUSTER_ID) - CartridgeAgentConfiguration.is_multitenant = True if str( - is_multi_str).lower().strip() == "true" else False + try: + self.log_file_paths = str( + self.read_property(cartridgeagentconstants.CLUSTER_ID)).strip().split("|") + except ParameterNotFoundException as ex: + self.log.debug("Cannot read log file path : %r" % ex.get_message()) + self.log_file_paths = None - try: - CartridgeAgentConfiguration.persistence_mappings = CartridgeAgentConfiguration.read_property( - "PERSISTENCE_MAPPING") - except ParameterNotFoundException as ex: - CartridgeAgentConfiguration.log.debug("Cannot read persistence mapping : %r" % ex.get_message()) - CartridgeAgentConfiguration.persistence_mappings = None + is_multi_str = self.read_property(cartridgeagentconstants.CLUSTER_ID) + self.is_multitenant = True if str( + is_multi_str).lower().strip() == "true" else False - try: - is_commit_str = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.COMMIT_ENABLED) - CartridgeAgentConfiguration.is_commits_enabled = True if str( - is_commit_str).lower().strip() == "true" else False - except ParameterNotFoundException: try: - is_commit_str = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.AUTO_COMMIT) - CartridgeAgentConfiguration.is_commits_enabled = True if str( + self.persistence_mappings = self.read_property( + cartridgeagentconstants.PERSISTENCE_MAPPING) + except ParameterNotFoundException as ex: + self.log.debug("Cannot read persistence mapping : %r" % ex.get_message()) + self.persistence_mappings = None + + try: + is_commit_str = self.read_property(cartridgeagentconstants.COMMIT_ENABLED) + self.is_commits_enabled = True if str( is_commit_str).lower().strip() == "true" else False except ParameterNotFoundException: - CartridgeAgentConfiguration.log.info( - "%r is not found and setting it to false" % cartridgeagentconstants.COMMIT_ENABLED) - CartridgeAgentConfiguration.is_commits_enabled = False + try: + is_commit_str = self.read_property(cartridgeagentconstants.AUTO_COMMIT) + self.is_commits_enabled = True if str( + is_commit_str).lower().strip() == "true" else False + except ParameterNotFoundException: + self.log.info( + "%r is not found and setting it to false" % cartridgeagentconstants.COMMIT_ENABLED) + self.is_commits_enabled = False + + auto_checkout_str = self.read_property(cartridgeagentconstants.AUTO_CHECKOUT, False) + self.is_checkout_enabled = True if str( + auto_checkout_str).lower().strip() == "true" else False + + self.listen_address = self.read_property( + cartridgeagentconstants.LISTEN_ADDRESS, False) - auto_checkout_str = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.AUTO_CHECKOUT, False) - CartridgeAgentConfiguration.is_checkout_enabled = True if str( - auto_checkout_str).lower().strip() == "true" else False + try: + int_repo_str = self.read_property(cartridgeagentconstants.PROVIDER) + self.is_internal_repo = True if str( + int_repo_str).strip().lower() == cartridgeagentconstants.INTERNAL else False + except ParameterNotFoundException: + self.log.info(" INTERNAL payload parameter is not found") + self.is_internal_repo = False + + self.tenant_id = self.read_property( + cartridgeagentconstants.TENANT_ID) + self.lb_cluster_id = self.read_property( + cartridgeagentconstants.LB_CLUSTER_ID) + self.min_count = self.read_property( + cartridgeagentconstants.MIN_INSTANCE_COUNT) + self.lb_private_ip = self.read_property( + cartridgeagentconstants.LB_PRIVATE_IP, False) + self.lb_public_ip = self.read_property( + cartridgeagentconstants.LB_PUBLIC_IP, False) + self.tenant_repository_path = self.read_property( + cartridgeagentconstants.TENANT_REPO_PATH, False) + self.super_tenant_repository_path = self.read_property( + cartridgeagentconstants.SUPER_TENANT_REPO_PATH, False) - CartridgeAgentConfiguration.listen_address = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.LISTEN_ADDRESS, False) + try: + self.deployment = self.read_property( + cartridgeagentconstants.DEPLOYMENT) + except ParameterNotFoundException: + self.deployment = None - try: - int_repo_str = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.PROVIDER) - CartridgeAgentConfiguration.is_internal_repo = True if str( - int_repo_str).strip().lower() == cartridgeagentconstants.INTERNAL else False - except ParameterNotFoundException: - CartridgeAgentConfiguration.log.info(" INTERNAL payload parameter is not found") - CartridgeAgentConfiguration.is_internal_repo = False - - CartridgeAgentConfiguration.tenant_id = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.TENANT_ID) - CartridgeAgentConfiguration.lb_cluster_id = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.LB_CLUSTER_ID) - CartridgeAgentConfiguration.min_count = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.MIN_INSTANCE_COUNT) - CartridgeAgentConfiguration.lb_private_ip = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.LB_PRIVATE_IP, False) - CartridgeAgentConfiguration.lb_public_ip = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.LB_PUBLIC_IP, False) - CartridgeAgentConfiguration.tenant_repository_path = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.TENANT_REPO_PATH, False) - CartridgeAgentConfiguration.super_tenant_repository_path = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.SUPER_TENANT_REPO_PATH, False) + # Setting worker-manager setup - manager service name + if self.deployment is None: + self.manager_service_name = None - try: - CartridgeAgentConfiguration.deployment = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.DEPLOYMENT) - except ParameterNotFoundException: - CartridgeAgentConfiguration.deployment = None - - # Setting worker-manager setup - manager service name - if CartridgeAgentConfiguration.deployment is None: - CartridgeAgentConfiguration.manager_service_name = None - - if str( - CartridgeAgentConfiguration.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower(): - CartridgeAgentConfiguration.manager_service_name = CartridgeAgentConfiguration.service_name - - elif str( - CartridgeAgentConfiguration.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower(): - CartridgeAgentConfiguration.deployment = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.MANAGER_SERVICE_TYPE) - - elif str( - CartridgeAgentConfiguration.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_DEFAULT.lower(): - CartridgeAgentConfiguration.deployment = None - else: - CartridgeAgentConfiguration.deployment = None - - # Setting worker-manager setup - worker service name - if CartridgeAgentConfiguration.deployment is None: - CartridgeAgentConfiguration.worker_service_name = None - - if str( - CartridgeAgentConfiguration.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower(): - CartridgeAgentConfiguration.manager_service_name = CartridgeAgentConfiguration.service_name - - elif str( - CartridgeAgentConfiguration.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower(): - CartridgeAgentConfiguration.deployment = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.WORKER_SERVICE_TYPE) - - elif str( - CartridgeAgentConfiguration.deployment).lower() == cartridgeagentconstants.DEPLOYMENT_DEFAULT.lower(): - CartridgeAgentConfiguration.deployment = None - else: - CartridgeAgentConfiguration.deployment = None + if self.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower(): + self.manager_service_name = self.service_name + + elif self.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower(): + self.deployment = self.read_property( + cartridgeagentconstants.MANAGER_SERVICE_TYPE) + + elif self.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_DEFAULT.lower(): + self.deployment = None + else: + self.deployment = None + + # Setting worker-manager setup - worker service name + if self.deployment is None: + self.worker_service_name = None + + if self.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower(): + self.manager_service_name = self.service_name + + elif self.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower(): + self.deployment = self.read_property( + cartridgeagentconstants.WORKER_SERVICE_TYPE) + + elif self.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_DEFAULT.lower(): + self.deployment = None + else: + self.deployment = None + + try: + self.is_primary = self.read_property( + cartridgeagentconstants.CLUSTERING_PRIMARY_KEY) + except ParameterNotFoundException: + self.is_primary = None + except ParameterNotFoundException as ex: + raise RuntimeError(ex) + + self.log.info("Cartridge agent configuration initialized") + + self.log.debug("service-name: %r" % self.service_name) + self.log.debug("cluster-id: %r" % self.cluster_id) + self.log.debug( + "network-partition-id: %r" % self.network_partition_id) + self.log.debug("partition-id: %r" % self.partition_id) + self.log.debug("member-id: %r" % self.member_id) + self.log.debug("cartridge-key: %r" % self.cartridge_key) + self.log.debug("app-path: %r" % self.app_path) + self.log.debug("repo-url: %r" % self.repo_url) + self.log.debug("ports: %r" % str(self.ports)) + self.log.debug("lb-private-ip: %r" % self.lb_private_ip) + self.log.debug("lb-public-ip: %r" % self.lb_public_ip) + + def __read_conf_file(self): + """ + Reads and stores the agent's configuration file + :return: void + """ + + conf_file_path = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "agent.conf" + self.log.debug("Config file path : %r" % conf_file_path) + self.properties = ConfigParser.SafeConfigParser() + self.properties.read(conf_file_path) + + def __read_parameter_file(self): + """ + Reads the payload file of the cartridge and stores the values in a dictionary + :return: void + """ + + param_file = self.read_property(cartridgeagentconstants.PARAM_FILE_PATH, False) try: - CartridgeAgentConfiguration.is_primary = CartridgeAgentConfiguration.read_property( - cartridgeagentconstants.CLUSTERING_PRIMARY_KEY) - except ParameterNotFoundException: - CartridgeAgentConfiguration.is_primary = None - except ParameterNotFoundException as ex: - raise RuntimeError(ex) - - CartridgeAgentConfiguration.log.info("Cartridge agent configuration initialized") - - CartridgeAgentConfiguration.log.debug("service-name: %r" % CartridgeAgentConfiguration.service_name) - CartridgeAgentConfiguration.log.debug("cluster-id: %r" % CartridgeAgentConfiguration.cluster_id) - CartridgeAgentConfiguration.log.debug( - "network-partition-id: %r" % CartridgeAgentConfiguration.network_partition_id) - CartridgeAgentConfiguration.log.debug("partition-id: %r" % CartridgeAgentConfiguration.partition_id) - CartridgeAgentConfiguration.log.debug("member-id: %r" % CartridgeAgentConfiguration.member_id) - CartridgeAgentConfiguration.log.debug("cartridge-key: %r" % CartridgeAgentConfiguration.cartridge_key) - CartridgeAgentConfiguration.log.debug("app-path: %r" % CartridgeAgentConfiguration.app_path) - CartridgeAgentConfiguration.log.debug("repo-url: %r" % CartridgeAgentConfiguration.repo_url) - CartridgeAgentConfiguration.log.debug("ports: %r" % str(CartridgeAgentConfiguration.ports)) - CartridgeAgentConfiguration.log.debug("lb-private-ip: %r" % CartridgeAgentConfiguration.lb_private_ip) - CartridgeAgentConfiguration.log.debug("lb-public-ip: %r" % CartridgeAgentConfiguration.lb_public_ip) - - @staticmethod - def __read_conf_file(): - """ - Reads and stores the agent's configuration file - :return: void - """ - - base_working_dir = os.path.abspath(os.path.dirname(__file__)).replace("modules/config", "") - conf_file_path = base_working_dir + "agent.conf" - CartridgeAgentConfiguration.log.debug("Config file path : %r" % conf_file_path) - CartridgeAgentConfiguration.properties = ConfigParser.SafeConfigParser() - CartridgeAgentConfiguration.properties.read(conf_file_path) - - @staticmethod - def __read_parameter_file(): - """ - Reads the payload file of the cartridge and stores the values in a dictionary - :return: void - """ - - param_file = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.PARAM_FILE_PATH, False) - - try: - if param_file is not None: - metadata_file = open(param_file) - metadata_payload_content = metadata_file.read() - for param in metadata_payload_content.split(","): - if param.strip() != "": - param_value = param.strip().split("=") - CartridgeAgentConfiguration.payload_params[param_value[0]] = param_value[1] - - # CartridgeAgentConfiguration.payload_params = dict( - # param.split("=") for param in metadata_payload_content.split(",")) - metadata_file.close() - else: - CartridgeAgentConfiguration.log.error("File not found: %r" % param_file) - except: - CartridgeAgentConfiguration.log.exception( - "Could not read launch parameter file, hence trying to read from System properties.") - - @staticmethod - def read_property(property_key, critical=True): - """ - Returns the value of the provided property - :param str property_key: the name of the property to be read - :return: Value of the property, - :rtype: str - :exception: ParameterNotFoundException if the provided property cannot be found - """ - - if CartridgeAgentConfiguration.properties.has_option("agent", property_key): - CartridgeAgentConfiguration.log.debug("Has key: %r" % property_key) - temp_str = CartridgeAgentConfiguration.properties.get("agent", property_key) - if temp_str != "" and temp_str is not None: - return temp_str - - if property_key in CartridgeAgentConfiguration.payload_params: - temp_str = CartridgeAgentConfiguration.payload_params[property_key] - if temp_str != "" and temp_str is not None: - return temp_str - - if critical: - raise ParameterNotFoundException("Cannot find the value of required parameter: %r" % property_key) + if param_file is not None: + metadata_file = open(param_file) + metadata_payload_content = metadata_file.read() + for param in metadata_payload_content.split(","): + if param.strip() != "": + param_value = param.strip().split("=") + self.payload_params[param_value[0]] = param_value[1] + + # self.payload_params = dict( + # param.split("=") for param in metadata_payload_content.split(",")) + metadata_file.close() + else: + self.log.error("File not found: %r" % param_file) + except: + self.log.exception( + "Could not read launch parameter file, hence trying to read from System properties.") + + def read_property(self, property_key, critical=True): + """ + Returns the value of the provided property + :param str property_key: the name of the property to be read + :return: Value of the property, + :rtype: str + :exception: ParameterNotFoundException if the provided property cannot be found + """ + + if self.properties.has_option("agent", property_key): + self.log.debug("Has key: %r" % property_key) + temp_str = self.properties.get("agent", property_key) + if temp_str != "" and temp_str is not None: + return temp_str + + if property_key in self.payload_params: + temp_str = self.payload_params[property_key] + if temp_str != "" and temp_str is not None: + return temp_str + + if critical: + raise ParameterNotFoundException("Cannot find the value of required parameter: %r" % property_key) + + instance = None + + def __new__(cls, *args, **kwargs): + if not CartridgeAgentConfiguration.instance: + CartridgeAgentConfiguration.instance = CartridgeAgentConfiguration.__CartridgeAgentConfiguration() + + return CartridgeAgentConfiguration.instance + + def __getattr__(self, name): + return getattr(self.instance, name) + + def __setattr__(self, name): + return setattr(self.instance, name) + + +from ..exception.parameternotfoundexception import ParameterNotFoundException +from ..util import cartridgeagentconstants http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py index b65a2be..e229af1 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py @@ -6,6 +6,13 @@ class StreamDefinition: """ Represents a BAM/CEP stream definition """ + + STRING = 'STRING' + DOUBLE = 'DOUBLE' + INT = 'INT' + LONG = 'LONG' + BOOL = 'BOOL' + def __init__(self): self.name = None """:type : str""" http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py index d5cdc01..ea30c85 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/datapublisher/logpublisher.py @@ -4,7 +4,7 @@ from threading import Thread, current_thread from ..databridge.agent import * from ..config.cartridgeagentconfiguration import CartridgeAgentConfiguration -from ..util import cartridgeagentutils +from ..util import cartridgeagentutils, cartridgeagentconstants from exception.datapublisherexception import DataPublisherException @@ -87,25 +87,26 @@ class LogPublisherManager(Thread): """ # stream definition stream_definition = StreamDefinition() - valid_tenant_id = LogPublisherManager.get_valid_tenant_id(CartridgeAgentConfiguration.tenant_id) - alias = LogPublisherManager.get_alias(CartridgeAgentConfiguration.cluster_id) + valid_tenant_id = LogPublisherManager.get_valid_tenant_id(CartridgeAgentConfiguration().tenant_id) + alias = LogPublisherManager.get_alias(CartridgeAgentConfiguration().cluster_id) stream_name = "logs." + valid_tenant_id + "." \ + alias + "." + LogPublisherManager.get_current_date() stream_version = "1.0.0" + stream_definition.name = stream_name stream_definition.version = stream_version stream_definition.description = "Apache Stratos Instance Log Publisher" - stream_definition.add_metadata_attribute("memberId", 'STRING') - stream_definition.add_payloaddata_attribute("tenantID", "STRING") - stream_definition.add_payloaddata_attribute("serverName", "STRING") - stream_definition.add_payloaddata_attribute("appName", "STRING") - stream_definition.add_payloaddata_attribute("logTime", "STRING") - stream_definition.add_payloaddata_attribute("priority", "STRING") - stream_definition.add_payloaddata_attribute("message", "STRING") - stream_definition.add_payloaddata_attribute("logger", "STRING") - stream_definition.add_payloaddata_attribute("ip", "STRING") - stream_definition.add_payloaddata_attribute("instance", "STRING") - stream_definition.add_payloaddata_attribute("stacktrace", "STRING") + stream_definition.add_metadata_attribute("memberId", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("tenantID", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("serverName", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("appName", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("logTime", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("priority", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("message", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("logger", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("ip", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("instance", StreamDefinition.STRING) + stream_definition.add_payloaddata_attribute("stacktrace", StreamDefinition.STRING) return stream_definition @@ -117,8 +118,17 @@ class LogPublisherManager(Thread): self.ports.append(DataPublisherConfiguration.get_instance().monitoring_server_port) self.ports.append(DataPublisherConfiguration.get_instance().monitoring_server_secure_port) - cartridgeagentutils.wait_until_ports_active(DataPublisherConfiguration.get_instance().monitoring_server_ip, self.ports) - ports_active = cartridgeagentutils.check_ports_active(DataPublisherConfiguration.get_instance().monitoring_server_ip, self.ports) + self.cartridge_agent_config = CartridgeAgentConfiguration() + + cartridgeagentutils.wait_until_ports_active( + DataPublisherConfiguration.get_instance().monitoring_server_ip, + self.ports, + int(self.cartridge_agent_config.read_property("port.check.timeout", critical=False))) + + ports_active = cartridgeagentutils.check_ports_active( + DataPublisherConfiguration.get_instance().monitoring_server_ip, + self.ports) + if not ports_active: raise DataPublisherException("Monitoring server not active, data publishing is aborted") @@ -158,14 +168,14 @@ class LogPublisherManager(Thread): @staticmethod def get_valid_tenant_id(tenant_id): - if tenant_id == "-1" or tenant_id == "-1234": + if tenant_id == cartridgeagentconstants.INVALID_TENANT_ID \ + or tenant_id == cartridgeagentconstants.SUPER_TENANT_ID: return "0" return tenant_id @staticmethod def get_alias(cluster_id): - alias = "" try: alias = cluster_id.split("\\.")[0] except: @@ -180,7 +190,7 @@ class LogPublisherManager(Thread): :return: Formatted date string :rtype : str """ - return datetime.date.today().strftime("%Y.%m.%d") + return datetime.date.today().strftime(cartridgeagentconstants.DATE_FORMAT) class DataPublisherConfiguration: @@ -213,32 +223,34 @@ class DataPublisherConfiguration: self.admin_password = None self.read_config() + self.cartridge_agent_config = CartridgeAgentConfiguration() + def read_config(self): - self.enabled = True if CartridgeAgentConfiguration.read_property("enable.data.publisher", False).strip().lower() == "true" else False + self.enabled = True if self.cartridge_agent_config.read_property(cartridgeagentconstants.MONITORING_PUBLISHER_ENABLED, False).strip().lower() == "true" else False if not self.enabled: DataPublisherConfiguration.log.info("Data Publisher disabled") return DataPublisherConfiguration.log.info("Data Publisher enabled") - self.monitoring_server_ip = CartridgeAgentConfiguration.read_property("monitoring.server.ip", False) + self.monitoring_server_ip = self.cartridge_agent_config.read_property(cartridgeagentconstants.MONITORING_RECEIVER_IP, False) if self.monitoring_server_ip.strip() == "": - raise RuntimeError("System property not found: monitoring.server.ip") + raise RuntimeError("System property not found: " + cartridgeagentconstants.MONITORING_RECEIVER_IP) - self.monitoring_server_port = CartridgeAgentConfiguration.read_property("monitoring.server.port", False) + self.monitoring_server_port = self.cartridge_agent_config.read_property(cartridgeagentconstants.MONITORING_RECEIVER_PORT, False) if self.monitoring_server_port.strip() == "": - raise RuntimeError("System property not found: monitoring.server.port") + raise RuntimeError("System property not found: " + cartridgeagentconstants.MONITORING_RECEIVER_PORT) - self.monitoring_server_secure_port = CartridgeAgentConfiguration.read_property("monitoring.server.secure.port", False) - if self.monitoring_server_secure_port.strip() == "": - raise RuntimeError("System property not found: monitoring.server.secure.port") + # self.monitoring_server_secure_port = self.cartridge_agent_config.read_property("monitoring.server.secure.port", False) + # if self.monitoring_server_secure_port.strip() == "": + # raise RuntimeError("System property not found: monitoring.server.secure.port") - self.admin_username = CartridgeAgentConfiguration.read_property("monitoring.server.admin.username", False) + self.admin_username = self.cartridge_agent_config.read_property(cartridgeagentconstants.MONITORING_SERVER_ADMIN_USERNAME, False) if self.admin_username.strip() == "": - raise RuntimeError("System property not found: monitoring.server.admin.username") + raise RuntimeError("System property not found: " + cartridgeagentconstants.MONITORING_SERVER_ADMIN_USERNAME) - self.admin_password = CartridgeAgentConfiguration.read_property("monitoring.server.admin.password", False) + self.admin_password = self.cartridge_agent_config.read_property(cartridgeagentconstants.MONITORING_SERVER_ADMIN_PASSWORD, False) if self.admin_password.strip() == "": - raise RuntimeError("System property not found: monitoring.server.admin.password") + raise RuntimeError("System property not found: " + cartridgeagentconstants.MONITORING_SERVER_ADMIN_PASSWORD) DataPublisherConfiguration.log.info("Data Publisher configuration initialized") http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/extensions/abstractextensionhandler.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/extensions/abstractextensionhandler.py b/tools/python-cartridge-agent/cartridge-agent/modules/extensions/abstractextensionhandler.py index f57a1cf..765d3bc 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/extensions/abstractextensionhandler.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/extensions/abstractextensionhandler.py @@ -12,10 +12,10 @@ class AbstractExtensionHandler: def on_artifact_update_scheduler_event(self, tenant_id): raise NotImplementedError - def on_instance_cleanup_cluster_event(self, instanceCleanupClusterEvent): + def on_instance_cleanup_cluster_event(self, instance_cleanup_cluster_event): raise NotImplementedError - def on_instance_cleanup_member_event(self, instanceCleanupMemberEvent): + def on_instance_cleanup_member_event(self, instance_cleanup_member_event): raise NotImplementedError def on_member_activated_event(self, member_activated_event): @@ -45,7 +45,7 @@ class AbstractExtensionHandler: def on_subscription_domain_added_event(self, subscription_domain_added_event): raise NotImplementedError - def on_subscription_domain_removed_event(self, subscriptionDomainRemovedEvent): + def on_subscription_domain_removed_event(self, subscription_domain_removed_event): raise NotImplementedError def on_copy_artifacts_extension(self, src, des): http://git-wip-us.apache.org/repos/asf/stratos/blob/511aedfa/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py b/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py index 8005a7a..58a5aa7 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/extensions/defaultextensionhandler.py @@ -1,15 +1,5 @@ -import logging import time -from ..artifactmgt.git.agentgithandler import AgentGitHandler -from ..artifactmgt.repositoryinformation import RepositoryInformation -from ..config.cartridgeagentconfiguration import CartridgeAgentConfiguration -from ..util import extensionutils -from ..publisher import cartridgeagentpublisher -from ..exception.parameternotfoundexception import ParameterNotFoundException -from ..topology.topologycontext import * -from ..tenant.tenantcontext import * -from ..util.log import LogFactory from abstractextensionhandler import AbstractExtensionHandler @@ -22,12 +12,13 @@ class DefaultExtensionHandler(AbstractExtensionHandler): def __init__(self): self.log = LogFactory().get_log(__name__) self.wk_members = [] + self.cartridge_agent_config = CartridgeAgentConfiguration() def on_instance_started_event(self): try: self.log.debug("Processing instance started event...") - if CartridgeAgentConfiguration.is_multitenant: - artifact_source = "%r/repository/deployment/server/" % CartridgeAgentConfiguration.app_path + if self.cartridge_agent_config.is_multitenant: + artifact_source = "%r/repository/deployment/server/" % self.cartridge_agent_config.app_path artifact_dest = cartridgeagentconstants.SUPERTENANT_TEMP_PATH extensionutils.execute_copy_artifact_extension(artifact_source, artifact_dest) @@ -45,18 +36,18 @@ class DefaultExtensionHandler(AbstractExtensionHandler): artifacts_updated_event.status)) cluster_id_event = str(artifacts_updated_event.cluster_id).strip() - cluster_id_payload = CartridgeAgentConfiguration.cluster_id + cluster_id_payload = self.cartridge_agent_config.cluster_id repo_url = str(artifacts_updated_event.repo_url).strip() if (repo_url != "") and (cluster_id_payload is not None) and (cluster_id_payload == cluster_id_event): - local_repo_path = CartridgeAgentConfiguration.app_path + local_repo_path = self.cartridge_agent_config.app_path - secret = CartridgeAgentConfiguration.cartridge_key + secret = self.cartridge_agent_config.cartridge_key repo_password = cartridgeagentutils.decrypt_password(artifacts_updated_event.repo_password, secret) repo_username = artifacts_updated_event.repo_username tenant_id = artifacts_updated_event.tenant_id - is_multitenant = CartridgeAgentConfiguration.is_multitenant + is_multitenant = self.cartridge_agent_config.is_multitenant commit_enabled = artifacts_updated_event.commit_enabled self.log.info("Executing git checkout") @@ -66,7 +57,7 @@ class DefaultExtensionHandler(AbstractExtensionHandler): is_multitenant, commit_enabled) # checkout code - subscribe_run, repo_context = AgentGitHandler.checkout(repo_info) + subscribe_run, repo_context = agentgithandler.AgentGitHandler.checkout(repo_info) # repo_context = checkout_result["repo_context"] # execute artifact updated extension env_params = {"STRATOS_ARTIFACT_UPDATED_CLUSTER_ID": artifacts_updated_event.cluster_id, @@ -82,15 +73,15 @@ class DefaultExtensionHandler(AbstractExtensionHandler): # publish instanceActivated cartridgeagentpublisher.publish_instance_activated_event() - update_artifacts = CartridgeAgentConfiguration.read_property(cartridgeagentconstants.ENABLE_ARTIFACT_UPDATE, False) + update_artifacts = self.cartridge_agent_config.read_property(cartridgeagentconstants.ENABLE_ARTIFACT_UPDATE, False) update_artifacts = True if str(update_artifacts).strip().lower() == "true" else False if update_artifacts: - auto_commit = CartridgeAgentConfiguration.is_commits_enabled - auto_checkout = CartridgeAgentConfiguration.is_checkout_enabled + auto_commit = self.cartridge_agent_config.is_commits_enabled + auto_checkout = self.cartridge_agent_config.is_checkout_enabled try: update_interval = len( - CartridgeAgentConfiguration.read_property(cartridgeagentconstants.ARTIFACT_UPDATE_INTERVAL, False)) + self.cartridge_agent_config.read_property(cartridgeagentconstants.ARTIFACT_UPDATE_INTERVAL, False)) except ParameterNotFoundException: self.log.exception("Invalid artifact sync interval specified ") update_interval = 10 @@ -103,7 +94,7 @@ class DefaultExtensionHandler(AbstractExtensionHandler): self.log.info("Auto Commit is turned %r " % "on" if auto_commit else "off") self.log.info("Auto Checkout is turned %r " % "on" if auto_checkout else "off") - AgentGitHandler.schedule_artifact_update_scheduled_task(repo_info, auto_checkout, auto_commit, + agentgithandler.AgentGitHandler.schedule_artifact_update_scheduled_task(repo_info, auto_checkout, auto_commit, update_interval) def on_artifact_update_scheduler_event(self, tenant_id): @@ -111,19 +102,21 @@ class DefaultExtensionHandler(AbstractExtensionHandler): extensionutils.execute_artifacts_updated_extension(env_params) - def on_instance_cleanup_cluster_event(self, instanceCleanupClusterEvent): + def on_instance_cleanup_cluster_event(self, instance_cleanup_cluster_event): self.cleanup() - def on_instance_cleanup_member_event(self, instanceCleanupMemberEvent): + def on_instance_cleanup_member_event(self, instance_cleanup_member_event): self.cleanup() def on_member_activated_event(self, member_activated_event): self.log.info("Member activated event received: [service] %r [cluster] %r [member] %r" % (member_activated_event.service_name, member_activated_event.cluster_id, member_activated_event.member_id)) - topology_consistent = extensionutils.check_topology_consistency(member_activated_event.service_name, - member_activated_event.cluster_id, - member_activated_event.member_id) + topology_consistent = extensionutils.check_topology_consistency( + member_activated_event.service_name, + member_activated_event.cluster_id, + member_activated_event.member_id) + if not topology_consistent: self.log.error("Topology is inconsistent...failed to execute member activated event") return @@ -164,7 +157,7 @@ class DefaultExtensionHandler(AbstractExtensionHandler): extensionutils.add_properties(cluster.properties, env_params, "MEMBER_ACTIVATED_CLUSTER_PROPERTY") extensionutils.add_properties(member.properties, env_params, "MEMBER_ACTIVATED_MEMBER_PROPERTY") - clustered = CartridgeAgentConfiguration.is_clustered + clustered = self.cartridge_agent_config.is_clustered if member.properties is not None and member.properties[ cartridgeagentconstants.CLUSTERING_PRIMARY_KEY] == "true" and clustered is not None and clustered: @@ -178,7 +171,7 @@ class DefaultExtensionHandler(AbstractExtensionHandler): self.log.debug(" hasWKIpChanged %r" + has_wk_ip_changed) - min_count = int(CartridgeAgentConfiguration.min_count) + min_count = int(self.cartridge_agent_config.min_count) is_wk_member_grp_ready = self.is_wk_member_group_ready(env_params, min_count) self.log.debug("MinCount: %r" % min_count) self.log.debug("is_wk_member_grp_ready : %r" % is_wk_member_grp_ready) @@ -189,7 +182,7 @@ class DefaultExtensionHandler(AbstractExtensionHandler): self.log.debug("Setting env var STRATOS_CLUSTERING to %r" % clustered) env_params["STRATOS_CLUSTERING"] = clustered - env_params["STRATOS_WK_MEMBER_COUNT"] = CartridgeAgentConfiguration.min_count + env_params["STRATOS_WK_MEMBER_COUNT"] = self.cartridge_agent_config.min_count extensionutils.execute_member_activated_extension(env_params) else: @@ -198,9 +191,9 @@ class DefaultExtensionHandler(AbstractExtensionHandler): def on_complete_topology_event(self, complete_topology_event): self.log.debug("Complete topology event received") - service_name_in_payload = CartridgeAgentConfiguration.service_name - cluster_id_in_payload = CartridgeAgentConfiguration.cluster_id - member_id_in_payload = CartridgeAgentConfiguration.member_id + service_name_in_payload = self.cartridge_agent_config.service_name + cluster_id_in_payload = self.cartridge_agent_config.cluster_id + member_id_in_payload = self.cartridge_agent_config.member_id extensionutils.check_topology_consistency(service_name_in_payload, cluster_id_in_payload, member_id_in_payload) @@ -224,7 +217,8 @@ class DefaultExtensionHandler(AbstractExtensionHandler): def on_member_terminated_event(self, member_terminated_event): self.log.info("Member terminated event received: [service] " + member_terminated_event.service_name + - " [cluster] " + member_terminated_event.cluster_id + " [member] " + member_terminated_event.member_id) + " [cluster] " + member_terminated_event.cluster_id + + " [member] " + member_terminated_event.member_id) topology_consistent = extensionutils.check_topology_consistency( member_terminated_event.service_name, @@ -374,9 +368,9 @@ class DefaultExtensionHandler(AbstractExtensionHandler): extensionutils.wait_for_complete_topology() self.log.info("[start server extension] complete topology event received") - service_name_in_payload = CartridgeAgentConfiguration.service_name - cluster_id_in_payload = CartridgeAgentConfiguration.cluster_id - member_id_in_payload = CartridgeAgentConfiguration.member_id + service_name_in_payload = self.cartridge_agent_config.service_name + cluster_id_in_payload = self.cartridge_agent_config.cluster_id + member_id_in_payload = self.cartridge_agent_config.member_id topology_consistant = extensionutils.check_topology_consistency(service_name_in_payload, cluster_id_in_payload, member_id_in_payload) @@ -393,12 +387,12 @@ class DefaultExtensionHandler(AbstractExtensionHandler): env_params = {} # if clustering is enabled wait until all well known members have started - clustering_enabled = CartridgeAgentConfiguration.is_clustered + clustering_enabled = self.cartridge_agent_config.is_clustered if clustering_enabled: env_params["STRATOS_CLUSTERING"] = "true" - env_params["STRATOS_WK_MEMBER_COUNT"] = CartridgeAgentConfiguration.min_count + env_params["STRATOS_WK_MEMBER_COUNT"] = self.cartridge_agent_config.min_count - env_params["STRATOS_PRIMARY"] = "true" if CartridgeAgentConfiguration.is_primary else "false" + env_params["STRATOS_PRIMARY"] = "true" if self.cartridge_agent_config.is_primary else "false" self.wait_for_wk_members(env_params) self.log.info("All well known members have started! Resuming start server extension...") @@ -430,16 +424,16 @@ class DefaultExtensionHandler(AbstractExtensionHandler): extensionutils.execute_subscription_domain_added_extension(env_params) - def on_subscription_domain_removed_event(self, subscriptionDomainRemovedEvent): - tenant_domain = self.find_tenant_domain(subscriptionDomainRemovedEvent.tenant_id) + def on_subscription_domain_removed_event(self, subscription_domain_removed_event): + tenant_domain = self.find_tenant_domain(subscription_domain_removed_event.tenant_id) self.log.info( - "Subscription domain removed event received: [tenant-id] " + subscriptionDomainRemovedEvent.tenant_id + - " [tenant-domain] " + tenant_domain + " [domain-name] " + subscriptionDomainRemovedEvent.domain_name + "Subscription domain removed event received: [tenant-id] " + subscription_domain_removed_event.tenant_id + + " [tenant-domain] " + tenant_domain + " [domain-name] " + subscription_domain_removed_event.domain_name ) - env_params = {"STRATOS_SUBSCRIPTION_SERVICE_NAME": subscriptionDomainRemovedEvent.service_name, - "STRATOS_SUBSCRIPTION_DOMAIN_NAME": subscriptionDomainRemovedEvent.domain_name, - "STRATOS_SUBSCRIPTION_TENANT_ID": int(subscriptionDomainRemovedEvent.tenant_id), + env_params = {"STRATOS_SUBSCRIPTION_SERVICE_NAME": subscription_domain_removed_event.service_name, + "STRATOS_SUBSCRIPTION_DOMAIN_NAME": subscription_domain_removed_event.domain_name, + "STRATOS_SUBSCRIPTION_TENANT_ID": int(subscription_domain_removed_event.tenant_id), "STRATOS_SUBSCRIPTION_TENANT_DOMAIN": tenant_domain} extensionutils.execute_subscription_domain_removed_extension(env_params) @@ -458,14 +452,15 @@ class DefaultExtensionHandler(AbstractExtensionHandler): def on_tenant_unsubscribed_event(self, tenant_unsubscribed_event): self.log.info( "Tenant unsubscribed event received: [tenant] " + tenant_unsubscribed_event.tenant_id + - " [service] " + tenant_unsubscribed_event.service_name + " [cluster] " + tenant_unsubscribed_event.cluster_ids + " [service] " + tenant_unsubscribed_event.service_name + + " [cluster] " + tenant_unsubscribed_event.cluster_ids ) try: - if CartridgeAgentConfiguration.service_name == tenant_unsubscribed_event.service_name: - AgentGitHandler.remove_repo(tenant_unsubscribed_event.tenant_id) + if self.cartridge_agent_config.service_name == tenant_unsubscribed_event.service_name: + agentgithandler.AgentGitHandler.remove_repo(tenant_unsubscribed_event.tenant_id) except: - self.log.exception() + self.log.exception("Removing git repository failed: ") extensionutils.execute_tenant_unsubscribed_extension({}) def cleanup(self): @@ -484,18 +479,20 @@ class DefaultExtensionHandler(AbstractExtensionHandler): if topology is None or not topology.initialized: return False - service_group_in_payload = CartridgeAgentConfiguration.service_group + service_group_in_payload = self.cartridge_agent_config.service_group if service_group_in_payload is not None: env_params["STRATOS_SERVICE_GROUP"] = service_group_in_payload # clustering logic for apimanager if service_group_in_payload is not None and service_group_in_payload == "apim": # handle apistore and publisher case - if CartridgeAgentConfiguration.service_name == "apistore" or \ - CartridgeAgentConfiguration.service_name == "publisher": + if self.cartridge_agent_config.service_name == cartridgeagentconstants.APIMANAGER_SERVICE_NAME or \ + self.cartridge_agent_config.service_name == cartridgeagentconstants.PUBLISHER_SERVICE_NAME: - apistore_cluster_collection = topology.get_service("apistore").get_clusters() - publisher_cluster_collection = topology.get_service("publisher").get_clusters() + apistore_cluster_collection = topology.get_service(cartridgeagentconstants.APIMANAGER_SERVICE_NAME)\ + .get_clusters() + publisher_cluster_collection = topology.get_service(cartridgeagentconstants.PUBLISHER_SERVICE_NAME)\ + .get_clusters() apistore_member_list = [] for member in apistore_cluster_collection[0].get_members(): @@ -526,42 +523,43 @@ class DefaultExtensionHandler(AbstractExtensionHandler): return True - elif CartridgeAgentConfiguration.service_name == "gatewaymgt" or \ - CartridgeAgentConfiguration.service_name == "gateway": + elif self.cartridge_agent_config.service_name == cartridgeagentconstants.GATEWAY_MGT_SERVICE_NAME or \ + self.cartridge_agent_config.service_name == cartridgeagentconstants.GATEWAY_SERVICE_NAME: - if CartridgeAgentConfiguration.deployment is not None: + if self.cartridge_agent_config.deployment is not None: # check if deployment is Manager Worker separated - if CartridgeAgentConfiguration.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower() or \ - CartridgeAgentConfiguration.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower(): + if self.cartridge_agent_config.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower() or \ + self.cartridge_agent_config.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower(): - self.log.info("Deployment pattern for the node: %r" % CartridgeAgentConfiguration.deployment) - env_params["DEPLOYMENT"] = CartridgeAgentConfiguration.deployment + self.log.info("Deployment pattern for the node: %r" % self.cartridge_agent_config.deployment) + env_params["DEPLOYMENT"] = self.cartridge_agent_config.deployment # check if WKA members of Manager Worker separated deployment is ready return self.is_manager_worker_WKA_group_ready(env_params) - elif CartridgeAgentConfiguration.service_name == "keymanager": + elif self.cartridge_agent_config.service_name == cartridgeagentconstants.KEY_MANAGER_SERVICE_NAME: return True else: - if CartridgeAgentConfiguration.deployment is not None: + if self.cartridge_agent_config.deployment is not None: # check if deployment is Manager Worker separated - if CartridgeAgentConfiguration.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower() or \ - CartridgeAgentConfiguration.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower(): + if self.cartridge_agent_config.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_MANAGER.lower() or \ + self.cartridge_agent_config.deployment.lower() == cartridgeagentconstants.DEPLOYMENT_WORKER.lower(): - self.log.info("Deployment pattern for the node: %r" % CartridgeAgentConfiguration.deployment) - env_params["DEPLOYMENT"] = CartridgeAgentConfiguration.deployment + self.log.info("Deployment pattern for the node: %r" % self.cartridge_agent_config.deployment) + env_params["DEPLOYMENT"] = self.cartridge_agent_config.deployment # check if WKA members of Manager Worker separated deployment is ready return self.is_manager_worker_WKA_group_ready(env_params) - service_name_in_payload = CartridgeAgentConfiguration.service_name - cluster_id_in_payload = CartridgeAgentConfiguration.cluster_id + service_name_in_payload = self.cartridge_agent_config.service_name + cluster_id_in_payload = self.cartridge_agent_config.cluster_id service = topology.get_service(service_name_in_payload) cluster = service.get_cluster(cluster_id_in_payload) wk_members = [] for member in cluster.get_members(): if member.properties is not None and \ - "PRIMARY" in member.properties and member.properties["PRIMARY"].lower() == "true" and \ + cartridgeagentconstants.PRIMARY in member.properties \ + and member.properties[cartridgeagentconstants.PRIMARY].lower() == "true" and \ (member.status == MemberStatus.Starting or member.status == MemberStatus.Activated): wk_members.append(member) @@ -584,8 +582,8 @@ class DefaultExtensionHandler(AbstractExtensionHandler): def is_manager_worker_WKA_group_ready(self, env_params): # for this, we need both manager cluster service name and worker cluster service name - manager_service_name = CartridgeAgentConfiguration.manager_service_name - worker_service_name = CartridgeAgentConfiguration.worker_service_name + manager_service_name = self.cartridge_agent_config.manager_service_name + worker_service_name = self.cartridge_agent_config.worker_service_name # managerServiceName and workerServiceName both should not be null /empty if manager_service_name is None or manager_service_name.strip() == "": @@ -623,7 +621,8 @@ class DefaultExtensionHandler(AbstractExtensionHandler): manager_wka_members = [] for member in manager_clusters[0].get_members(): if member.properties is not None and \ - "PRIMARY" in member.properties and member.properties["PRIMARY"].lower() == "true" and \ + cartridgeagentconstants.PRIMARY in member.properties \ + and member.properties[cartridgeagentconstants.PRIMARY].lower() == "true" and \ (member.status == MemberStatus.Starting or member.status == MemberStatus.Activated): manager_wka_members.append(member) @@ -656,8 +655,8 @@ class DefaultExtensionHandler(AbstractExtensionHandler): self.log.info( "Manager min instance count when allManagersNonPrimary true : " + manager_min_instance_count) - if member.properties is not None and "PRIMARY" in member.properties and \ - member.properties["PRIMARY"].lower() == "true": + if member.properties is not None and cartridgeagentconstants.PRIMARY in member.properties and \ + member.properties[cartridgeagentconstants.PRIMARY].lower() == "true": all_managers_non_primary = False break @@ -681,8 +680,8 @@ class DefaultExtensionHandler(AbstractExtensionHandler): for member in worker_clusters[0].get_members(): self.log.debug("Checking member : " + member.member_id) - if member.properties is not None and "PRIMARY" in member.properties and \ - member.properties["PRIMARY"].lower() == "true" and \ + if member.properties is not None and cartridgeagentconstants.PRIMARY in member.properties and \ + member.properties[cartridgeagentconstants.PRIMARY].lower() == "true" and \ (member.status == MemberStatus.Starting or member.status == MemberStatus.Activated): self.log.debug("Added worker member " + member.member_id) @@ -713,8 +712,8 @@ class DefaultExtensionHandler(AbstractExtensionHandler): return min_manager_instances_available and min_worker_instances_available def get_min_instance_count_from_member(self, member): - if "MIN_COUNT" in member.properties: - return int(member.properties["MIN_COUNT"]) + if cartridgeagentconstants.MIN_COUNT in member.properties: + return int(member.properties[cartridgeagentconstants.MIN_COUNT]) return 1 @@ -726,11 +725,21 @@ class DefaultExtensionHandler(AbstractExtensionHandler): return tenant.tenant_domain def wait_for_wk_members(self, env_params): - min_count = int(CartridgeAgentConfiguration.min_count) + min_count = int(self.cartridge_agent_config.min_count) is_wk_member_group_ready = False while not is_wk_member_group_ready: self.log.info("Waiting for %r well known members..." % min_count) time.sleep(5) - is_wk_member_group_ready = self.is_wk_member_group_ready(env_params, min_count) \ No newline at end of file + is_wk_member_group_ready = self.is_wk_member_group_ready(env_params, min_count) + +from ..artifactmgt.git import agentgithandler +from ..artifactmgt.repositoryinformation import RepositoryInformation +from ..config.cartridgeagentconfiguration import CartridgeAgentConfiguration +from ..util import extensionutils +from ..publisher import cartridgeagentpublisher +from ..exception.parameternotfoundexception import ParameterNotFoundException +from ..topology.topologycontext import * +from ..tenant.tenantcontext import * +from ..util.log import LogFactory \ No newline at end of file
