PCA - Refactored cartridge agent configuration class to be static and dropped singleton pattern.
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/bfec88d4 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/bfec88d4 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/bfec88d4 Branch: refs/heads/master Commit: bfec88d4969caa62e2c098c886014b706cdee0b1 Parents: 81d5d34 Author: Chamila de Alwis <[email protected]> Authored: Fri May 1 03:17:07 2015 +0530 Committer: Chamila de Alwis <[email protected]> Committed: Thu Jul 30 00:17:12 2015 -0400 ---------------------------------------------------------------------- .../cartridge.agent/cartridge.agent/agent.py | 21 +- .../cartridge.agent/cartridge.agent/config.py | 652 +++++++++---------- .../cartridge.agent/constants.py | 12 +- .../cartridge.agent/healthstats.py | 42 +- .../cartridge.agent/mdsclient.py | 15 +- .../modules/datapublisher/logpublisher.py | 54 +- .../modules/event/eventhandler.py | 76 +-- .../publisher/cartridgeagentpublisher.py | 105 +-- .../modules/util/cartridgeagentutils.py | 13 +- 9 files changed, 482 insertions(+), 508 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/bfec88d4/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py index 806f6c9..0c66666 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py @@ -28,7 +28,7 @@ from modules.event.domain.mapping.events import * from modules.tenant.tenantcontext import * from modules.topology.topologycontext import * from modules.datapublisher.logpublisher import * -from config import CartridgeAgentConfiguration +from config import Config from modules.event.eventhandler import EventHandler import constants @@ -42,10 +42,9 @@ class CartridgeAgent(threading.Thread): self.__log_publish_manager = None self.__terminated = False self.__log = LogFactory().get_log(__name__) - self.__config = CartridgeAgentConfiguration() - mb_ip = self.__config.read_property(constants.MB_IP) - mb_port = self.__config.read_property(constants.MB_PORT) + mb_ip = Config.read_property(constants.MB_IP) + mb_port = Config.read_property(constants.MB_PORT) self.__inst_topic_subscriber = EventSubscriber(constants.INSTANCE_NOTIFIER_TOPIC, mb_ip, mb_port) self.__tenant_topic_subscriber = EventSubscriber(constants.TENANT_TOPIC, mb_ip, mb_port) @@ -69,7 +68,7 @@ class CartridgeAgent(threading.Thread): self.wait_for_complete_topology() # wait for member initialized event - while not self.__config.initialized: + while not Config.initialized: self.__log.debug("Waiting for cartridge agent to be initialized...") time.sleep(1) @@ -95,7 +94,7 @@ class CartridgeAgent(threading.Thread): self.__log.exception("Error processing start servers event: %s" % e) # check if artifact management is required before publishing instance activated event - repo_url = self.__config.repo_url + repo_url = Config.repo_url if repo_url is None or str(repo_url).strip() == "": self.__log.info("No artifact repository found") self.__event_handler.on_instance_activated_event() @@ -105,13 +104,13 @@ class CartridgeAgent(threading.Thread): "Artifact repository found, waiting for artifact updated event to checkout artifacts: [repo_url] %s", repo_url) - persistence_mapping_payload = self.__config.persistence_mappings + persistence_mapping_payload = Config.persistence_mappings if persistence_mapping_payload is not None: self.__event_handler.volume_mount_extension(persistence_mapping_payload) # start log publishing thread if DataPublisherConfiguration.get_instance().enabled: - log_file_paths = self.__config.log_file_paths + log_file_paths = Config.log_file_paths if log_file_paths is None: self.__log.exception("No valid log file paths found, no logs will be published") else: @@ -199,7 +198,7 @@ class CartridgeAgent(threading.Thread): self.__event_handler.on_artifact_updated_event(event_obj) def on_instance_cleanup_member(self, msg): - member_in_payload = self.__config.member_id + member_in_payload = Config.member_id event_obj = InstanceCleanupMemberEvent.create_from_json(msg.payload) member_in_event = event_obj.member_id if member_in_payload == member_in_event: @@ -207,9 +206,9 @@ class CartridgeAgent(threading.Thread): def on_instance_cleanup_cluster(self, msg): event_obj = InstanceCleanupClusterEvent.create_from_json(msg.payload) - cluster_in_payload = self.__config.cluster_id + cluster_in_payload = Config.cluster_id cluster_in_event = event_obj.cluster_id - instance_in_payload = self.__config.cluster_instance_id + instance_in_payload = Config.cluster_instance_id instance_in_event = event_obj.cluster_instance_id if cluster_in_event == cluster_in_payload and instance_in_payload == instance_in_event: http://git-wip-us.apache.org/repos/asf/stratos/blob/bfec88d4/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py index 9035b61..9f3ed63 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py @@ -23,342 +23,342 @@ from exception import ParameterNotFoundException import constants -class CartridgeAgentConfiguration: +class Config: """ Handles the configuration information of the particular Cartridge Agent """ - class __CartridgeAgentConfiguration: - def __init__(self): - # set log level - self.log = LogFactory().get_log(__name__) - - self.__payload_params = {} - self.__properties = None - """ :type : ConfigParser.SafeConfigParser """ - - self.__read_conf_file() - self.__read_parameter_file() - - self.application_id = None - """ :type : str """ - self.service_group = None - """ :type : str """ - self.is_clustered = False - """ :type : bool """ - self.service_name = None - """ :type : str """ - self.cluster_id = None - """ :type : str """ - self.cluster_instance_id = None - """ :type : str """ - self.member_id = None - """ :type : str """ - self.instance_id = None - """ :type : str """ - self.network_partition_id = None - """ :type : str """ - self.partition_id = None - """ :type : str """ - self.cartridge_key = None - """ :type : str """ - self.app_path = None - """ :type : str """ - self.repo_url = None - """ :type : str """ - self.ports = [] - """ :type : list[str] """ - self.log_file_paths = [] - """ :type : list[str] """ - self.is_multitenant = False - """ :type : bool """ - self.persistence_mappings = None - """ :type : str """ - self.is_commits_enabled = False - """ :type : bool """ - self.is_checkout_enabled = False - """ :type : bool """ - self.listen_address = None - """ :type : str """ - self.is_internal_repo = False - """ :type : bool """ - self.tenant_id = None - """ :type : str """ - self.lb_cluster_id = None - """ :type : str """ - self.min_count = None - """ :type : str """ - self.lb_private_ip = None - """ :type : str """ - self.lb_public_ip = None - """ :type : str """ - self.tenant_repository_path = None - """ :type : str """ - self.super_tenant_repository_path = None - """ :type : str """ - self.deployment = None - """ :type : str """ - self.manager_service_name = None - """ :type : str """ - self.worker_service_name = None - """ :type : str """ - self.dependant_cluster_id = None - """ :type : str """ - self.export_metadata_keys = None - """ :type : str """ - self.import_metadata_keys = None - """ :type : str """ - self.is_primary = False - """ :type : bool """ - self.artifact_update_interval = None - """ :type : str """ - self.lvs_virtual_ip = None - """ :type : str """ - - self.initialized = False - """ :type : bool """ - try: - self.service_group = self.__payload_params[constants.SERVICE_GROUP] \ - if constants.SERVICE_GROUP in self.__payload_params \ - else None - - if constants.CLUSTERING in self.__payload_params and \ - str(self.__payload_params[constants.CLUSTERING]).strip().lower() == "true": - self.is_clustered = True - else: - self.is_clustered = False - - self.application_id = self.read_property(constants.APPLICATION_ID) - self.service_name = self.read_property(constants.SERVICE_NAME) - self.cluster_id = self.read_property(constants.CLUSTER_ID) - self.cluster_instance_id = self.read_property(constants.CLUSTER_INSTANCE_ID, False) - self.member_id = self.read_property(constants.MEMBER_ID, False) - self.network_partition_id = self.read_property(constants.NETWORK_PARTITION_ID, False) - self.partition_id = self.read_property(constants.PARTITION_ID, False) - self.cartridge_key = self.read_property(constants.CARTRIDGE_KEY) - self.app_path = self.read_property(constants.APPLICATION_PATH, False) - self.repo_url = self.read_property(constants.REPO_URL, False) - self.ports = str(self.read_property(constants.PORTS)).split("|") - self.dependant_cluster_id = self.read_property(constants.DEPENDENCY_CLUSTER_IDS, False) - self.export_metadata_keys = self.read_property(constants.EXPORT_METADATA_KEYS, False) - self.import_metadata_keys = self.read_property(constants.IMPORT_METADATA_KEYS, False) - self.lvs_virtual_ip = self.read_property(constants.LVS_VIRTUAL_IP,False) - - try: - self.log_file_paths = str( - self.read_property(constants.LOG_FILE_PATHS)).strip().split("|") - except ParameterNotFoundException as ex: - self.log.debug("Cannot read log file path : %r" % ex.get_message()) - self.log_file_paths = None - - is_multi_str = self.read_property(constants.MULTITENANT) - self.is_multitenant = True if str(is_multi_str).lower().strip() == "true" else False - - try: - self.persistence_mappings = self.read_property( - constants.PERSISTENCE_MAPPING) - except ParameterNotFoundException as ex: - self.log.debug("Cannot read persistence mapping : %r" % ex.get_message()) - self.persistence_mappings = None - - try: - is_commit_str = self.read_property(constants.COMMIT_ENABLED) - self.is_commits_enabled = True if str(is_commit_str).lower().strip() == "true" else False - except ParameterNotFoundException: - try: - is_commit_str = self.read_property(constants.AUTO_COMMIT) - self.is_commits_enabled = True if str(is_commit_str).lower().strip() == "true" else False - except ParameterNotFoundException: - self.log.info( - "%r is not found and setting it to false" % constants.COMMIT_ENABLED) - self.is_commits_enabled = False - - auto_checkout_str = self.read_property(constants.AUTO_CHECKOUT, False) - self.is_checkout_enabled = True if str(auto_checkout_str).lower().strip() == "true" else False - - self.listen_address = self.read_property( - constants.LISTEN_ADDRESS, False) - - try: - int_repo_str = self.read_property(constants.INTERNAL) - self.is_internal_repo = True if str(int_repo_str).strip().lower() == "true" else False - except ParameterNotFoundException: - self.log.info(" INTERNAL payload parameter is not found") - self.is_internal_repo = False - - self.tenant_id = self.read_property(constants.TENANT_ID) - self.lb_cluster_id = self.read_property(constants.LB_CLUSTER_ID, False) - self.min_count = self.read_property(constants.MIN_INSTANCE_COUNT, False) - self.lb_private_ip = self.read_property(constants.LB_PRIVATE_IP, False) - self.lb_public_ip = self.read_property(constants.LB_PUBLIC_IP, False) - self.tenant_repository_path = self.read_property(constants.TENANT_REPO_PATH, False) - self.super_tenant_repository_path = self.read_property(constants.SUPER_TENANT_REPO_PATH, False) - - try: - self.deployment = self.read_property( - constants.DEPLOYMENT) - except ParameterNotFoundException: - self.deployment = None - - # Setting worker-manager setup - manager service name - if self.deployment is None: - self.manager_service_name = None - - if str(self.deployment).lower() == constants.DEPLOYMENT_MANAGER.lower(): - self.manager_service_name = self.service_name - - elif str(self.deployment).lower() == constants.DEPLOYMENT_WORKER.lower(): - self.deployment = self.read_property( - constants.MANAGER_SERVICE_TYPE) - - elif str(self.deployment).lower() == constants.DEPLOYMENT_DEFAULT.lower(): - self.deployment = None - else: - self.deployment = None - - # Setting worker-manager setup - worker service name - if self.deployment is None: - self.worker_service_name = None - - if str(self.deployment).lower() == constants.DEPLOYMENT_WORKER.lower(): - self.manager_service_name = self.service_name - - elif str(self.deployment).lower() == constants.DEPLOYMENT_MANAGER.lower(): - self.deployment = self.read_property( - constants.WORKER_SERVICE_TYPE) + # set log level + log = LogFactory().get_log(__name__) + + payload_params = {} + properties = None + """ :type : ConfigParser.SafeConfigParser """ + + application_id = None + """ :type : str """ + service_group = None + """ :type : str """ + is_clustered = False + """ :type : bool """ + service_name = None + """ :type : str """ + cluster_id = None + """ :type : str """ + cluster_instance_id = None + """ :type : str """ + member_id = None + """ :type : str """ + instance_id = None + """ :type : str """ + network_partition_id = None + """ :type : str """ + partition_id = None + """ :type : str """ + cartridge_key = None + """ :type : str """ + app_path = None + """ :type : str """ + repo_url = None + """ :type : str """ + ports = [] + """ :type : list[str] """ + log_file_paths = [] + """ :type : list[str] """ + is_multiTenant = False + """ :type : bool """ + persistence_mappings = None + """ :type : str """ + is_commits_enabled = False + """ :type : bool """ + is_checkout_enabled = False + """ :type : bool """ + listen_address = None + """ :type : str """ + is_internal_repo = False + """ :type : bool """ + tenant_id = None + """ :type : str """ + lb_cluster_id = None + """ :type : str """ + min_count = None + """ :type : str """ + lb_private_ip = None + """ :type : str """ + lb_public_ip = None + """ :type : str """ + tenant_repository_path = None + """ :type : str """ + super_tenant_repository_path = None + """ :type : str """ + deployment = None + """ :type : str """ + manager_service_name = None + """ :type : str """ + worker_service_name = None + """ :type : str """ + dependant_cluster_id = None + """ :type : str """ + export_metadata_keys = None + """ :type : str """ + import_metadata_keys = None + """ :type : str """ + is_primary = False + """ :type : bool """ + artifact_update_interval = None + """ :type : str """ + lvs_virtual_ip = None + """ :type : str """ + + initialized = False + """ :type : bool """ + + @staticmethod + def read_conf_file(): + """ + Reads and stores the agent's configuration file + :return: properties object + :rtype: ConfigParser.SafeConfigParser() + """ + + conf_file_path = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "/agent.conf" + Config.log.debug("Config file path : %r" % conf_file_path) + + properties = ConfigParser.SafeConfigParser() + properties.read(conf_file_path) + + # set calculated values + param_file = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "/payload/launch-params" + properties.set("agent", constants.PARAM_FILE_PATH, param_file) + plugins_dir = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "/plugins" + properties.set("agent", constants.PLUGINS_DIR, plugins_dir) + plugins_dir = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "/extensions/py" + properties.set("agent", constants.EXTENSIONS_DIR, plugins_dir) + + return properties + + @staticmethod + def read_payload_file(param_file_path): + """ + Reads the payload file of the cartridge and stores the values in a dictionary + :return: Payload parameter dictionary of values + :rtype: dict + """ + Config.log.debug("Param file path : %r" % param_file_path) + + try: + payload_params = {} + if param_file_path is not None: + param_file = open(param_file_path) + payload_content = param_file.read() + for param in payload_content.split(","): + if param.strip() != "": + param_value = param.strip().split("=") + try: + if str(param_value[1]).strip().lower() == "null" or str(param_value[1]).strip() == "": + payload_params[param_value[0]] = None + else: + payload_params[param_value[0]] = param_value[1] + except IndexError: + # If an index error comes when reading values, keep on reading + pass + + param_file.close() + return payload_params + else: + raise RuntimeError("Payload parameter file not found: %r" % param_file_path) + except Exception as e: + Config.log.exception("Could not read payload parameter file: %s" % e) + + @staticmethod + def convert_to_type(value_string): + """ + Determine what type of data to return from the provided string + :param value_string: + :return: + """ + if value_string is None: + return None + + value_string = str(value_string).strip() + + if value_string == "" or value_string.lower() == "null": + # converted as a null value + return None + + if value_string.lower() == "true": + # boolean TRUE + return True + + if value_string.lower() == "false": + # boolean FALSE + return False + # + # value_split = value_string.split("|") + # if len(value_split) > 1: + # # can be split using the delimiter, array returned + # return value_split + + return value_string + + @staticmethod + def read_property(property_key, critical=True): + """ + Returns the value of the provided property + :param str property_key: the name of the property to be read + :return: Value of the property + :exception: ParameterNotFoundException if the provided property cannot be found + """ + + if Config.properties is None or Config.payload_params == {}: + Config.initialize_config() + + if Config.properties.has_option("agent", property_key): + temp_str = Config.properties.get("agent", property_key) + Config.log.debug("Reading property: %s = %s", property_key, temp_str) + real_value = Config.convert_to_type(temp_str) + if real_value is not None: + return real_value + + if property_key in Config.payload_params: + temp_str = Config.payload_params[property_key] + Config.log.debug("Reading payload parameter: %s = %s", property_key, temp_str) + real_value = Config.convert_to_type(temp_str) + if real_value is not None: + return real_value + + # real value is None + if critical: + raise ParameterNotFoundException("Cannot find the value of required parameter: %r" % property_key) + else: + return None + + @staticmethod + def get_payload_params(): + return Config.payload_params + + @staticmethod + def initialize_config(): + """ + Read the two inputs and load values to fields + :return: void + """ + Config.properties = Config.read_conf_file() + param_file_path = Config.properties.get("agent", constants.PARAM_FILE_PATH) + Config.payload_params = Config.read_payload_file(param_file_path) + + try: + Config.application_id = Config.read_property(constants.APPLICATION_ID) + Config.service_name = Config.read_property(constants.SERVICE_NAME) + Config.cluster_id = Config.read_property(constants.CLUSTER_ID) + Config.ports = Config.read_property(constants.PORTS).split("|") + Config.is_multiTenant = Config.read_property(constants.MULTITENANT) + Config.tenant_id = Config.read_property(constants.TENANT_ID) - elif str(self.deployment).lower() == constants.DEPLOYMENT_DEFAULT.lower(): - self.deployment = None - else: - self.deployment = None + try: + Config.is_clustered = Config.read_property(constants.CLUSTERING) + except ParameterNotFoundException: + Config.is_clustered = False + try: + Config.is_commits_enabled = Config.read_property(constants.COMMIT_ENABLED) + except ParameterNotFoundException: try: - self.is_primary = self.read_property( - constants.CLUSTERING_PRIMARY_KEY) + Config.is_commits_enabled = Config.read_property(constants.AUTO_COMMIT) except ParameterNotFoundException: - self.is_primary = None + Config.is_commits_enabled = False - try: - self.artifact_update_interval = self.read_property(constants.ARTIFACT_UPDATE_INTERVAL) - except ParameterNotFoundException: - self.artifact_update_interval = "10" - - except ParameterNotFoundException as ex: - raise RuntimeError(ex) - - self.log.info("Cartridge agent configuration initialized") - - self.log.debug("service-name: %r" % self.service_name) - self.log.debug("cluster-id: %r" % self.cluster_id) - self.log.debug("cluster-instance-id: %r" % self.cluster_instance_id) - self.log.debug("member-id: %r" % self.member_id) - self.log.debug("network-partition-id: %r" % self.network_partition_id) - self.log.debug("partition-id: %r" % self.partition_id) - self.log.debug("cartridge-key: %r" % self.cartridge_key) - self.log.debug("app-path: %r" % self.app_path) - self.log.debug("repo-url: %r" % self.repo_url) - self.log.debug("ports: %r" % str(self.ports)) - self.log.debug("lb-private-ip: %r" % self.lb_private_ip) - self.log.debug("lb-public-ip: %r" % self.lb_public_ip) - self.log.debug("dependant_cluster_id: %r" % self.dependant_cluster_id) - self.log.debug("export_metadata_keys: %r" % self.export_metadata_keys) - self.log.debug("import_metadata_keys: %r" % self.import_metadata_keys) - self.log.debug("artifact.update.interval: %r" % self.artifact_update_interval) - self.log.debug("lvs-virtual-ip: %r" % self.lvs_virtual_ip) - - def __read_conf_file(self): - """ - Reads and stores the agent's configuration file - :return: void - """ - - conf_file_path = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "/agent.conf" - self.log.debug("Config file path : %r" % conf_file_path) - self.__properties = ConfigParser.SafeConfigParser() - self.__properties.read(conf_file_path) - - # set calculated values - param_file = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "/payload/launch-params" - self.__properties.set("agent", constants.PARAM_FILE_PATH, param_file) - - plugins_dir = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "/plugins" - self.__properties.set("agent", constants.PLUGINS_DIR, plugins_dir) - - plugins_dir = os.path.abspath(os.path.dirname(__file__)).split("modules")[0] + "/extensions/py" - self.__properties.set("agent", constants.EXTENSIONS_DIR, plugins_dir) - - def __read_parameter_file(self): - """ - Reads the payload file of the cartridge and stores the values in a dictionary - :return: void - """ - - param_file = self.read_property(constants.PARAM_FILE_PATH, False) - self.log.debug("Param file path : %r" % param_file) + try: + Config.is_internal_repo = Config.read_property(constants.INTERNAL) + except ParameterNotFoundException: + Config.is_internal_repo = False try: - if param_file is not None: - metadata_file = open(param_file) - metadata_payload_content = metadata_file.read() - for param in metadata_payload_content.split(","): - if param.strip() != "": - param_value = param.strip().split("=") - try: - if str(param_value[1]).strip().lower() == "null" or str(param_value[1]).strip() == "": - self.__payload_params[param_value[0]] = None - else: - self.__payload_params[param_value[0]] = param_value[1] - except IndexError: - # If an index error comes when reading values, keep on reading - pass - - # self.payload_params = dict( - # param.split("=") for param in metadata_payload_content.split(",")) - metadata_file.close() - else: - self.log.error("File not found: %r" % param_file) - except Exception as e: - self.log.exception( - "Could not read launch parameter file: %s" % e) - - def read_property(self, property_key, critical=True): - """ - Returns the value of the provided property - :param str property_key: the name of the property to be read - :return: Value of the property, - :rtype: str - :exception: ParameterNotFoundException if the provided property cannot be found - """ - - if self.__properties.has_option("agent", property_key): - temp_str = self.__properties.get("agent", property_key) - self.log.debug("Reading property: %s = %s", property_key, temp_str) - if temp_str is not None and temp_str.strip() != "" and temp_str.strip().lower() != "null": - return str(temp_str).strip() - - if property_key in self.__payload_params: - temp_str = self.__payload_params[property_key] - self.log.debug("Reading payload parameter: %s = %s", property_key, temp_str) - if temp_str is not None and temp_str != "" and temp_str.strip().lower() != "null": - return str(temp_str).strip() - - if critical: - raise ParameterNotFoundException("Cannot find the value of required parameter: %r" % property_key) + Config.artifact_update_interval = Config.read_property(constants.ARTIFACT_UPDATE_INTERVAL) + except ParameterNotFoundException: + Config.artifact_update_interval = 10 + + Config.service_group = Config.read_property(constants.SERVICE_GROUP, False) + Config.cluster_instance_id = Config.read_property(constants.CLUSTER_INSTANCE_ID, False) + Config.member_id = Config.read_property(constants.MEMBER_ID, False) + Config.network_partition_id = Config.read_property(constants.NETWORK_PARTITION_ID, False) + Config.partition_id = Config.read_property(constants.PARTITION_ID, False) + Config.app_path = Config.read_property(constants.APPLICATION_PATH, False) + Config.repo_url = Config.read_property(constants.REPO_URL, False) + + if Config.repo_url is not None: + Config.cartridge_key = Config.read_property(constants.CARTRIDGE_KEY) else: - return None - - def get_payload_params(self): - return self.__payload_params + Config.cartridge_key = Config.read_property(constants.CARTRIDGE_KEY, False) - __instance = None - """ :type : __CartridgeAgentConfiguration""" - - def __init__(self): - if not CartridgeAgentConfiguration.__instance: - CartridgeAgentConfiguration.__instance = CartridgeAgentConfiguration.__CartridgeAgentConfiguration() - - def __getattr__(self, name): - return getattr(self.__instance, name) - - def __setattr__(self, name, value): - return setattr(self.__instance, name, value) + Config.dependant_cluster_id = Config.read_property(constants.DEPENDENCY_CLUSTER_IDS, False) + Config.export_metadata_keys = Config.read_property(constants.EXPORT_METADATA_KEYS, False) + Config.import_metadata_keys = Config.read_property(constants.IMPORT_METADATA_KEYS, False) + Config.lvs_virtual_ip = Config.read_property(constants.LVS_VIRTUAL_IP, False) + try: + Config.log_file_paths = Config.read_property(constants.LOG_FILE_PATHS).split("|") + except ParameterNotFoundException: + Config.log_file_paths = None + + Config.persistence_mappings = Config.read_property(constants.PERSISTENCE_MAPPING, False) + + Config.is_checkout_enabled = Config.read_property(constants.AUTO_CHECKOUT, False) + Config.listen_address = Config.read_property(constants.LISTEN_ADDRESS, False) + Config.lb_cluster_id = Config.read_property(constants.LB_CLUSTER_ID, False) + Config.min_count = Config.read_property(constants.MIN_INSTANCE_COUNT, False) + Config.lb_private_ip = Config.read_property(constants.LB_PRIVATE_IP, False) + Config.lb_public_ip = Config.read_property(constants.LB_PUBLIC_IP, False) + Config.tenant_repository_path = Config.read_property(constants.TENANT_REPO_PATH, False) + Config.super_tenant_repository_path = Config.read_property(constants.SUPER_TENANT_REPO_PATH, False) + # Config.deployment = Config.read_property(constants.DEPLOYMENT, False) + # + # # Setting worker-manager setup - manager service name + # if Config.deployment is None: + # Config.manager_service_name = None + # + # if str(Config.deployment).lower() == constants.DEPLOYMENT_MANAGER.lower(): + # Config.manager_service_name = Config.service_name + # elif str(Config.deployment).lower() == constants.DEPLOYMENT_WORKER.lower(): + # Config.deployment = Config.read_property(constants.MANAGER_SERVICE_TYPE) + # elif str(Config.deployment).lower() == constants.DEPLOYMENT_DEFAULT.lower(): + # Config.deployment = None + # else: + # Config.deployment = None + # + # # Setting worker-manager setup - worker service name + # if Config.deployment is None: + # Config.worker_service_name = None + # + # if str(Config.deployment).lower() == constants.DEPLOYMENT_WORKER.lower(): + # Config.manager_service_name = Config.service_name + # elif str(Config.deployment).lower() == constants.DEPLOYMENT_MANAGER.lower(): + # Config.deployment = Config.read_property(constants.WORKER_SERVICE_TYPE) + # elif str(Config.deployment).lower() == constants.DEPLOYMENT_DEFAULT.lower(): + # Config.deployment = None + # else: + # Config.deployment = None + + Config.is_primary = Config.read_property(constants.CLUSTERING_PRIMARY_KEY, False) + + except ParameterNotFoundException as ex: + raise RuntimeError(ex) + + Config.log.info("Cartridge agent configuration initialized") + Config.log.debug("service-name: %r" % Config.service_name) + Config.log.debug("cluster-id: %r" % Config.cluster_id) + Config.log.debug("cluster-instance-id: %r" % Config.cluster_instance_id) + Config.log.debug("member-id: %r" % Config.member_id) + Config.log.debug("network-partition-id: %r" % Config.network_partition_id) + Config.log.debug("partition-id: %r" % Config.partition_id) + Config.log.debug("cartridge-key: %r" % Config.cartridge_key) + Config.log.debug("app-path: %r" % Config.app_path) + Config.log.debug("repo-url: %r" % Config.repo_url) + Config.log.debug("ports: %r" % str(Config.ports)) + Config.log.debug("lb-private-ip: %r" % Config.lb_private_ip) + Config.log.debug("lb-public-ip: %r" % Config.lb_public_ip) + Config.log.debug("dependant_cluster_id: %r" % Config.dependant_cluster_id) + Config.log.debug("export_metadata_keys: %r" % Config.export_metadata_keys) + Config.log.debug("import_metadata_keys: %r" % Config.import_metadata_keys) + Config.log.debug("artifact.update.interval: %r" % Config.artifact_update_interval) + Config.log.debug("lvs-virtual-ip: %r" % Config.lvs_virtual_ip) + Config.log.debug("log_file_paths: %s" % Config.log_file_paths) http://git-wip-us.apache.org/repos/asf/stratos/blob/bfec88d4/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py index e6ae9c4..87b2e49 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py @@ -37,9 +37,9 @@ PARTITION_ID = "PARTITION_ID" TENANT_ID = "TENANT_ID" REPO_URL = "REPO_URL" PORTS = "PORTS" -DEPLOYMENT = "DEPLOYMENT" -MANAGER_SERVICE_TYPE = "MANAGER_SERVICE_TYPE" -WORKER_SERVICE_TYPE = "WORKER_SERVICE_TYPE" +# DEPLOYMENT = "DEPLOYMENT" +# MANAGER_SERVICE_TYPE = "MANAGER_SERVICE_TYPE" +# WORKER_SERVICE_TYPE = "WORKER_SERVICE_TYPE" PERSISTENCE_MAPPING = "PERSISTENCE_MAPPING" DEPENDENCY_CLUSTER_IDS = "DEPENDENCY_CLUSTER_IDS" EXPORT_METADATA_KEYS = "EXPORT_METADATA_KEYS" @@ -74,9 +74,9 @@ CLUSTERING_PRIMARY_KEY = "PRIMARY" SUPERTENANT_TEMP_PATH = "/tmp/-1234/" -DEPLOYMENT_MANAGER = "manager" -DEPLOYMENT_WORKER = "worker" -DEPLOYMENT_DEFAULT = "default" +# DEPLOYMENT_MANAGER = "manager" +# DEPLOYMENT_WORKER = "worker" +# DEPLOYMENT_DEFAULT = "default" SUPER_TENANT_REPO_PATH = "super.tenant.repository.path" TENANT_REPO_PATH = "tenant.repository.path" http://git-wip-us.apache.org/repos/asf/stratos/blob/bfec88d4/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py index 815ec01..6b0e9e7 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py @@ -21,7 +21,7 @@ import multiprocessing import psutil from modules.databridge.agent import * -from config import CartridgeAgentConfiguration +from config import Config from modules.util import cartridgeagentutils from exception import ThriftReceiverOfflineException, CEPPublisherException import constants @@ -82,17 +82,15 @@ class HealthStatisticsPublisher: """ log = LogFactory().get_log(__name__) - def read_config(self, conf_key): + @staticmethod + def read_config(conf_key): """ Read a given key from the cartridge agent configuration :param conf_key: The key to look for in the CA config :return: The value for the key from the CA config :raise: RuntimeError if the given key is not found in the CA config """ - if self.cartridge_agent_config is None: - self.cartridge_agent_config = CartridgeAgentConfiguration() - - conf_value = self.cartridge_agent_config.read_property(conf_key, False) + conf_value = Config.read_property(conf_key, False) if conf_value is None or conf_value.strip() == "": raise RuntimeError("System property not found: " + conf_key) @@ -100,18 +98,16 @@ class HealthStatisticsPublisher: return conf_value def __init__(self): - self.cartridge_agent_config = CartridgeAgentConfiguration() - self.ports = [] - cep_port = self.read_config(constants.CEP_RECEIVER_PORT) + cep_port = HealthStatisticsPublisher.read_config(constants.CEP_RECEIVER_PORT) self.ports.append(cep_port) - cep_ip = self.read_config(constants.CEP_RECEIVER_IP) + cep_ip = HealthStatisticsPublisher.read_config(constants.CEP_RECEIVER_IP) cartridgeagentutils.wait_until_ports_active( cep_ip, self.ports, - int(self.cartridge_agent_config.read_property("port.check.timeout", critical=False))) + int(Config.read_property("port.check.timeout", critical=False))) cep_active = cartridgeagentutils.check_ports_active( cep_ip, @@ -120,8 +116,8 @@ class HealthStatisticsPublisher: if not cep_active: raise CEPPublisherException("CEP server not active. Health statistics publishing aborted.") - cep_admin_username = self.read_config(constants.CEP_SERVER_ADMIN_USERNAME) - cep_admin_password = self.read_config(constants.CEP_SERVER_ADMIN_PASSWORD) + cep_admin_username = HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_USERNAME) + cep_admin_password = HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_PASSWORD) self.stream_definition = HealthStatisticsPublisher.create_stream_definition() HealthStatisticsPublisher.log.debug("Stream definition created: %r" % str(self.stream_definition)) @@ -164,11 +160,11 @@ class HealthStatisticsPublisher: """ event = ThriftEvent() - event.payloadData.append(self.cartridge_agent_config.cluster_id) - event.payloadData.append(self.cartridge_agent_config.cluster_instance_id) - event.payloadData.append(self.cartridge_agent_config.network_partition_id) - event.payloadData.append(self.cartridge_agent_config.member_id) - event.payloadData.append(self.cartridge_agent_config.partition_id) + event.payloadData.append(Config.cluster_id) + event.payloadData.append(Config.cluster_instance_id) + event.payloadData.append(Config.network_partition_id) + event.payloadData.append(Config.member_id) + event.payloadData.append(Config.partition_id) event.payloadData.append(constants.MEMORY_CONSUMPTION) event.payloadData.append(float(memory_usage)) # event.payloadData.append(str(memory_usage)) @@ -188,11 +184,11 @@ class HealthStatisticsPublisher: """ event = ThriftEvent() - event.payloadData.append(self.cartridge_agent_config.cluster_id) - event.payloadData.append(self.cartridge_agent_config.cluster_instance_id) - event.payloadData.append(self.cartridge_agent_config.network_partition_id) - event.payloadData.append(self.cartridge_agent_config.member_id) - event.payloadData.append(self.cartridge_agent_config.partition_id) + event.payloadData.append(Config.cluster_id) + event.payloadData.append(Config.cluster_instance_id) + event.payloadData.append(Config.network_partition_id) + event.payloadData.append(Config.member_id) + event.payloadData.append(Config.partition_id) event.payloadData.append(constants.LOAD_AVERAGE) event.payloadData.append(float(load_avg)) # event.payloadData.append(str(load_avg)) http://git-wip-us.apache.org/repos/asf/stratos/blob/bfec88d4/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/mdsclient.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/mdsclient.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/mdsclient.py index fb36e97..ef9e7d2 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/mdsclient.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/mdsclient.py @@ -19,16 +19,16 @@ import urllib2, urllib from urllib2 import URLError, HTTPError import json from modules.util.log import LogFactory -from config import CartridgeAgentConfiguration +from config import Config import constants log = LogFactory().get_log(__name__) -config = CartridgeAgentConfiguration() -mds_url = config.read_property(constants.METADATA_SERVICE_URL) -alias = config.read_property(constants.CARTRIDGE_ALIAS) -app_id = config.read_property(constants.APPLICATION_ID) -token = config.read_property(constants.TOKEN) + +mds_url = Config.read_property(constants.METADATA_SERVICE_URL) +alias = Config.read_property(constants.CARTRIDGE_ALIAS) +app_id = Config.read_property(constants.APPLICATION_ID) +token = Config.read_property(constants.TOKEN) alias_resource_url = mds_url + "/metadata/api/applications/" + app_id + "/clusters/" + alias + "/properties" app_resource_url = mds_url + "/metadata/api/applications/" + app_id + "/properties" @@ -51,7 +51,8 @@ def put(put_req, app=False): put_request.add_header('Content-Type', 'application/json') try: - log.debug("Publishing metadata to Metadata service. [URL] %s, [DATA] %s" % (put_request.get_full_url(), request_data)) + log.debug("Publishing metadata to Metadata service. [URL] %s, [DATA] %s" % + (put_request.get_full_url(), request_data)) handler = urllib2.urlopen(put_request, request_data) log.debug("Metadata service response: %s" % handler.getcode()) http://git-wip-us.apache.org/repos/asf/stratos/blob/bfec88d4/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/datapublisher/logpublisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/datapublisher/logpublisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/datapublisher/logpublisher.py index 10c398f..e38ae16 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/datapublisher/logpublisher.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/datapublisher/logpublisher.py @@ -19,7 +19,7 @@ import datetime from threading import Thread, current_thread from ..databridge.agent import * -from config import CartridgeAgentConfiguration +from config import Config from ..util import cartridgeagentutils from exception import DataPublisherException import constants @@ -29,9 +29,7 @@ class LogPublisher(Thread): def __init__(self, file_path, stream_definition, tenant_id, alias, date_time, member_id): Thread.__init__(self) - self.log = LogFactory().get_log(__name__) - self.file_path = file_path self.thrift_publisher = ThriftPublisher( DataPublisherConfiguration.get_instance().monitoring_server_ip, @@ -109,8 +107,7 @@ class LogPublisherManager(Thread): """ # stream definition stream_definition = StreamDefinition() - stream_name = "logs." + tenant_id + "." \ - + alias + "." + date_time + stream_name = "logs." + tenant_id + "." + alias + "." + date_time stream_version = "1.0.0" stream_nickname = "log entries from instance" stream_description = "Apache Stratos Instance Log Publisher" @@ -144,7 +141,7 @@ class LogPublisherManager(Thread): self.ports.append(DataPublisherConfiguration.get_instance().monitoring_server_port) self.ports.append(DataPublisherConfiguration.get_instance().monitoring_server_secure_port) - self.cartridge_agent_config = CartridgeAgentConfiguration() + self.cartridge_agent_config = Config self.log.debug("Checking if Monitoring server is active.") ports_active = cartridgeagentutils.wait_until_ports_active( @@ -158,8 +155,8 @@ class LogPublisherManager(Thread): self.log.debug("Monitoring server is up and running. Log Publisher Manager started.") - self.tenant_id = LogPublisherManager.get_valid_tenant_id(CartridgeAgentConfiguration().tenant_id) - self.alias = LogPublisherManager.get_alias(CartridgeAgentConfiguration().cluster_id) + self.tenant_id = LogPublisherManager.get_valid_tenant_id(Config.tenant_id) + self.alias = LogPublisherManager.get_alias(Config.cluster_id) self.date_time = LogPublisherManager.get_current_date() self.stream_definition = self.define_stream(self.tenant_id, self.alias, self.date_time) @@ -233,7 +230,7 @@ class LogPublisherManager(Thread): class DataPublisherConfiguration: """ A singleton implementation to access configuration information for data publishing to BAM/CEP - TODO: perfect singleton impl ex: Borg + TODO: get rid of this """ __instance = None @@ -258,52 +255,35 @@ class DataPublisherConfiguration: self.monitoring_server_secure_port = None self.admin_username = None self.admin_password = None - self.cartridge_agent_config = CartridgeAgentConfiguration() self.read_config() def read_config(self): - self.enabled = True if \ - self.cartridge_agent_config.read_property(constants.MONITORING_PUBLISHER_ENABLED, False).strip().lower() \ - == "true" \ - else False - + self.enabled = Config.read_property(constants.MONITORING_PUBLISHER_ENABLED, False) if not self.enabled: DataPublisherConfiguration.log.info("Data Publisher disabled") return DataPublisherConfiguration.log.info("Data Publisher enabled") - self.monitoring_server_ip = self.cartridge_agent_config.read_property(constants.MONITORING_RECEIVER_IP, False) - if self.monitoring_server_ip is None or self.monitoring_server_ip.strip() == "": + self.monitoring_server_ip = Config.read_property(constants.MONITORING_RECEIVER_IP, False) + if self.monitoring_server_ip is None: raise RuntimeError("System property not found: " + constants.MONITORING_RECEIVER_IP) - self.monitoring_server_port = self.cartridge_agent_config.read_property( - constants.MONITORING_RECEIVER_PORT, - False) - - if self.monitoring_server_port is None or self.monitoring_server_port.strip() == "": + self.monitoring_server_port = Config.read_property(constants.MONITORING_RECEIVER_PORT, False) + if self.monitoring_server_port is None: raise RuntimeError("System property not found: " + constants.MONITORING_RECEIVER_PORT) - self.monitoring_server_secure_port = self.cartridge_agent_config.read_property( - "monitoring.server.secure.port", - False) - - if self.monitoring_server_secure_port is None or self.monitoring_server_secure_port.strip() == "": + self.monitoring_server_secure_port = Config.read_property("monitoring.server.secure.port", False) + if self.monitoring_server_secure_port is None: raise RuntimeError("System property not found: monitoring.server.secure.port") - self.admin_username = self.cartridge_agent_config.read_property( - constants.MONITORING_SERVER_ADMIN_USERNAME, - False) - - if self.admin_username is None or self.admin_username.strip() == "": + self.admin_username = Config.read_property(constants.MONITORING_SERVER_ADMIN_USERNAME, False) + if self.admin_username is None: raise RuntimeError("System property not found: " + constants.MONITORING_SERVER_ADMIN_USERNAME) - self.admin_password = self.cartridge_agent_config.read_property( - constants.MONITORING_SERVER_ADMIN_PASSWORD, - False) - - if self.admin_password is None or self.admin_password.strip() == "": + self.admin_password = Config.read_property(constants.MONITORING_SERVER_ADMIN_PASSWORD, False) + if self.admin_password is None: raise RuntimeError("System property not found: " + constants.MONITORING_SERVER_ADMIN_PASSWORD) DataPublisherConfiguration.log.info("Data Publisher configuration initialized") http://git-wip-us.apache.org/repos/asf/stratos/blob/bfec88d4/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py index af66b81..cb1c245 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py @@ -23,7 +23,7 @@ from yapsy.PluginManager import PluginManager from plugins.contracts import ICartridgeAgentPlugin, IArtifactManagementPlugin, IHealthStatReaderPlugin from ..artifactmgt.git.agentgithandler import * from ..artifactmgt.repository import Repository -from config import CartridgeAgentConfiguration +from config import Config from ..publisher import cartridgeagentpublisher from ..topology.topologycontext import * from ..tenant.tenantcontext import * @@ -48,7 +48,6 @@ class EventHandler: def __init__(self): self.__log = LogFactory().get_log(__name__) - self.__config = CartridgeAgentConfiguration() self.__plugins = {} """ :type dict{str: [PluginInfo]} : """ self.__artifact_mgt_plugins = [] @@ -79,20 +78,20 @@ class EventHandler: artifacts_updated_event.status)) cluster_id_event = str(artifacts_updated_event.cluster_id).strip() - cluster_id_payload = self.__config.cluster_id + cluster_id_payload = Config.cluster_id repo_url = str(artifacts_updated_event.repo_url).strip() if (repo_url != "") and (cluster_id_payload is not None) and (cluster_id_payload == cluster_id_event): - local_repo_path = self.__config.app_path + local_repo_path = Config.app_path repo_password = None if artifacts_updated_event.repo_password is not None: - secret = self.__config.cartridge_key + secret = Config.cartridge_key repo_password = cartridgeagentutils.decrypt_password(artifacts_updated_event.repo_password, secret) repo_username = artifacts_updated_event.repo_username tenant_id = artifacts_updated_event.tenant_id - is_multitenant = self.__config.is_multitenant + is_multitenant = Config.is_multiTenant commit_enabled = artifacts_updated_event.commit_enabled self.__log.info("Executing git checkout") @@ -123,14 +122,13 @@ class EventHandler: # updated on pull self.on_artifact_update_scheduler_event(tenant_id) - update_artifacts = self.__config.read_property(constants.ENABLE_ARTIFACT_UPDATE, False) - update_artifacts = True if str(update_artifacts).strip().lower() == "true" else False + update_artifacts = Config.read_property(constants.ENABLE_ARTIFACT_UPDATE, False) if update_artifacts: - auto_commit = self.__config.is_commits_enabled - auto_checkout = self.__config.is_checkout_enabled + auto_commit = Config.is_commits_enabled + auto_checkout = Config.is_checkout_enabled try: - update_interval = int(self.__config.artifact_update_interval) + update_interval = int(Config.artifact_update_interval) except ValueError: self.__log.exception("Invalid artifact sync interval specified.") update_interval = 10 @@ -181,9 +179,9 @@ class EventHandler: def on_complete_topology_event(self, complete_topology_event): self.__log.debug("Processing Complete topology event...") - service_name_in_payload = self.__config.service_name - cluster_id_in_payload = self.__config.cluster_id - member_id_in_payload = self.__config.member_id + service_name_in_payload = Config.service_name + cluster_id_in_payload = Config.cluster_id + member_id_in_payload = Config.member_id member_initialized = self.check_member_state_in_topology( service_name_in_payload, @@ -193,7 +191,7 @@ class EventHandler: self.__log.debug("Member initialized %s", member_initialized) if member_initialized: # Set cartridge agent as initialized since member is available and it is in initialized state - self.__config.initialized = True + Config.initialized = True topology = complete_topology_event.get_topology() service = topology.get_service(service_name_in_payload) @@ -212,9 +210,9 @@ class EventHandler: """ self.__log.debug("Processing Member initialized event...") - service_name_in_payload = self.__config.service_name - cluster_id_in_payload = self.__config.cluster_id - member_id_in_payload = self.__config.member_id + service_name_in_payload = Config.service_name + cluster_id_in_payload = Config.cluster_id + member_id_in_payload = Config.member_id member_exists = self.member_exists_in_topology(service_name_in_payload, cluster_id_in_payload, member_id_in_payload) @@ -222,7 +220,7 @@ class EventHandler: self.__log.debug("Member exists: %s" % member_exists) if member_exists: - self.__config.initialized = True + Config.initialized = True self.execute_event_extendables(constants.MEMBER_INITIALIZED_EVENT, {}) @@ -289,9 +287,9 @@ class EventHandler: def start_server_extension(self): self.__log.info("Processing start server extension...") - service_name_in_payload = self.__config.service_name - cluster_id_in_payload = self.__config.cluster_id - member_id_in_payload = self.__config.member_id + service_name_in_payload = Config.service_name + cluster_id_in_payload = Config.cluster_id + member_id_in_payload = Config.member_id member_initialized = self.check_member_state_in_topology(service_name_in_payload, cluster_id_in_payload, member_id_in_payload) @@ -384,7 +382,7 @@ class EventHandler: try: # TODO: change plugin descriptor ext, plugin_manager.setPluginInfoExtension(AGENT_PLUGIN_EXT) - plugins_dir = self.__config.read_property(constants.PLUGINS_DIR) + plugins_dir = Config.read_property(constants.PLUGINS_DIR) category_filter = {CARTRIDGE_AGENT_PLUGIN: ICartridgeAgentPlugin, ARTIFACT_MGT_PLUGIN: IArtifactManagementPlugin, HEALTH_STAT_PLUGIN: IHealthStatReaderPlugin} @@ -443,7 +441,7 @@ class EventHandler: self.__log.info("Collecting and loading extensions") try: - extensions_dir = self.__config.read_property(constants.EXTENSIONS_DIR) + extensions_dir = Config.read_property(constants.EXTENSIONS_DIR) category_filter = {CARTRIDGE_AGENT_PLUGIN: ICartridgeAgentPlugin} extension_manager = EventHandler.create_plugin_manager(category_filter, extensions_dir) @@ -492,7 +490,7 @@ class EventHandler: :return: """ try: - input_values = self.add_common_input_values(input_values) + input_values = EventHandler.add_common_input_values(input_values) input_values["EVENT"] = event except Exception as e: self.__log.error("Error while adding common input values for event extendables: %s" % e) @@ -556,7 +554,7 @@ class EventHandler: if is_multitenant: if tenant_id == SUPER_TENANT_ID: # super tenant, /repository/deploy/server/ - super_tenant_repo_path = self.__config.super_tenant_repository_path + super_tenant_repo_path = Config.super_tenant_repository_path # "app_path" repo_path += git_local_repo_path @@ -573,7 +571,7 @@ class EventHandler: else: # normal tenant, /repository/tenants/tenant_id - tenant_repo_path = self.__config.tenant_repository_path + tenant_repo_path = Config.tenant_repository_path # "app_path" repo_path += git_local_repo_path @@ -596,6 +594,7 @@ class EventHandler: return repo_path def check_member_state_in_topology(self, service_name, cluster_id, member_id): + # TODO: refactor topology = TopologyContext.get_topology() service = topology.get_service(service_name) if service is None: @@ -636,7 +635,8 @@ class EventHandler: return True - def add_common_input_values(self, plugin_values): + @staticmethod + def add_common_input_values(plugin_values): """ Adds the common parameters to be used by the extension scripts :param dict[str, str] plugin_values: Dictionary to be added @@ -648,28 +648,28 @@ class EventHandler: elif type(plugin_values) != dict: plugin_values = {"VALUE1": str(plugin_values)} - plugin_values["APPLICATION_PATH"] = self.__config.app_path - plugin_values["PARAM_FILE_PATH"] = self.__config.read_property(constants.PARAM_FILE_PATH, False) - plugin_values["PERSISTENCE_MAPPINGS"] = self.__config.persistence_mappings + plugin_values["APPLICATION_PATH"] = Config.app_path + plugin_values["PARAM_FILE_PATH"] = Config.read_property(constants.PARAM_FILE_PATH, False) + plugin_values["PERSISTENCE_MAPPINGS"] = Config.persistence_mappings - lb_cluster_id_in_payload = self.__config.lb_cluster_id + lb_cluster_id_in_payload = Config.lb_cluster_id lb_private_ip, lb_public_ip = EventHandler.get_lb_member_ip(lb_cluster_id_in_payload) - plugin_values["LB_IP"] = lb_private_ip if lb_private_ip is not None else self.__config.lb_private_ip - plugin_values["LB_PUBLIC_IP"] = lb_public_ip if lb_public_ip is not None else self.__config.lb_public_ip + plugin_values["LB_IP"] = lb_private_ip if lb_private_ip is not None else Config.lb_private_ip + plugin_values["LB_PUBLIC_IP"] = lb_public_ip if lb_public_ip is not None else Config.lb_public_ip topology = TopologyContext.get_topology() if topology.initialized: - service = topology.get_service(self.__config.service_name) - cluster = service.get_cluster(self.__config.cluster_id) - member_id_in_payload = self.__config.member_id + service = topology.get_service(Config.service_name) + cluster = service.get_cluster(Config.cluster_id) + member_id_in_payload = Config.member_id member = cluster.get_member(member_id_in_payload) EventHandler.add_properties(service.properties, plugin_values, "SERVICE_PROPERTY") EventHandler.add_properties(cluster.properties, plugin_values, "CLUSTER_PROPERTY") EventHandler.add_properties(member.properties, plugin_values, "MEMBER_PROPERTY") - plugin_values.update(self.__config.get_payload_params()) + plugin_values.update(Config.get_payload_params()) return EventHandler.clean_process_parameters(plugin_values) http://git-wip-us.apache.org/repos/asf/stratos/blob/bfec88d4/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py index f94ae1d..4891800 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/publisher/cartridgeagentpublisher.py @@ -22,7 +22,7 @@ from .. util.log import * from .. util import cartridgeagentutils import healthstats import constants -from config import CartridgeAgentConfiguration +from config import Config log = LogFactory().get_log(__name__) @@ -41,14 +41,14 @@ def publish_instance_started_event(): if not started: log.info("Publishing instance started event") - application_id = CartridgeAgentConfiguration().application_id - service_name = CartridgeAgentConfiguration().service_name - cluster_id = CartridgeAgentConfiguration().cluster_id - member_id = CartridgeAgentConfiguration().member_id - instance_id = CartridgeAgentConfiguration().instance_id - cluster_instance_id = CartridgeAgentConfiguration().cluster_instance_id - network_partition_id = CartridgeAgentConfiguration().network_partition_id - partition_id = CartridgeAgentConfiguration().partition_id + application_id = Config.application_id + service_name = Config.service_name + cluster_id = Config.cluster_id + member_id = Config.member_id + instance_id = Config.instance_id + cluster_instance_id = Config.cluster_instance_id + network_partition_id = Config.network_partition_id + partition_id = Config.partition_id instance_started_event = InstanceStartedEvent( application_id, @@ -72,27 +72,31 @@ def publish_instance_activated_event(health_stat_plugin): global activated, log if not activated: # Wait for all ports to be active - - listen_address = CartridgeAgentConfiguration().listen_address - configuration__ports = CartridgeAgentConfiguration().ports + listen_address = Config.listen_address + configuration_ports = Config.ports ports_active = cartridgeagentutils.wait_until_ports_active( listen_address, - configuration__ports, - int(CartridgeAgentConfiguration().read_property("port.check.timeout", critical=False)) - ) - log.info("Publishing instance activated event") + configuration_ports, + int(Config.read_property("port.check.timeout", critical=False))) if ports_active: - service_name = CartridgeAgentConfiguration().service_name - cluster_id = CartridgeAgentConfiguration().cluster_id - member_id = CartridgeAgentConfiguration().member_id - instance_id = CartridgeAgentConfiguration().instance_id - cluster_instance_id = CartridgeAgentConfiguration().cluster_instance_id - network_partition_id = CartridgeAgentConfiguration().network_partition_id - partition_id = CartridgeAgentConfiguration().partition_id - - instance_activated_event = InstanceActivatedEvent(service_name, cluster_id, cluster_instance_id, member_id, - instance_id, network_partition_id, partition_id) + log.info("Publishing instance activated event") + service_name = Config.service_name + cluster_id = Config.cluster_id + member_id = Config.member_id + instance_id = Config.instance_id + cluster_instance_id = Config.cluster_instance_id + network_partition_id = Config.network_partition_id + partition_id = Config.partition_id + + instance_activated_event = InstanceActivatedEvent( + service_name, + cluster_id, + cluster_instance_id, + member_id, + instance_id, + network_partition_id, + partition_id) publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_ACTIVATED_EVENT) publisher.publish(instance_activated_event) @@ -100,12 +104,11 @@ def publish_instance_activated_event(health_stat_plugin): log.info("Instance activated event published") log.info("Starting health statistics notifier") - health_stat_publishing_enabled = True if CartridgeAgentConfiguration().read_property( - constants.CEP_PUBLISHER_ENABLED, False).strip().lower() == "true" else False + health_stat_publishing_enabled = Config.read_property(constants.CEP_PUBLISHER_ENABLED, False) if health_stat_publishing_enabled: interval_default = 15 # seconds - interval = CartridgeAgentConfiguration().read_property("stats.notifier.interval", False) + interval = Config.read_property("stats.notifier.interval", False) if interval is not None and len(interval) > 0: try: interval = int(interval) @@ -124,7 +127,7 @@ def publish_instance_activated_event(health_stat_plugin): log.info("Health statistics notifier started") else: log.error("Ports activation timed out. Aborting InstanceActivatedEvent publishing [IPAddress] %s [Ports] %s" - % (listen_address, configuration__ports)) + % (listen_address, configuration_ports)) else: log.warn("Instance already activated") @@ -134,13 +137,13 @@ def publish_maintenance_mode_event(): if not maintenance: log.info("Publishing instance maintenance mode event") - service_name = CartridgeAgentConfiguration().service_name - cluster_id = CartridgeAgentConfiguration().cluster_id - member_id = CartridgeAgentConfiguration().member_id - instance_id = CartridgeAgentConfiguration().instance_id - cluster_instance_id = CartridgeAgentConfiguration().cluster_instance_id - network_partition_id = CartridgeAgentConfiguration().network_partition_id - partition_id = CartridgeAgentConfiguration().partition_id + service_name = Config.service_name + cluster_id = Config.cluster_id + member_id = Config.member_id + instance_id = Config.instance_id + cluster_instance_id = Config.cluster_instance_id + network_partition_id = Config.network_partition_id + partition_id = Config.partition_id instance_maintenance_mode_event = InstanceMaintenanceModeEvent( service_name, @@ -165,16 +168,22 @@ def publish_instance_ready_to_shutdown_event(): if not ready_to_shutdown: log.info("Publishing instance activated event") - service_name = CartridgeAgentConfiguration().service_name - cluster_id = CartridgeAgentConfiguration().cluster_id - member_id = CartridgeAgentConfiguration().member_id - instance_id = CartridgeAgentConfiguration().instance_id - cluster_instance_id = CartridgeAgentConfiguration().cluster_instance_id - network_partition_id = CartridgeAgentConfiguration().network_partition_id - partition_id = CartridgeAgentConfiguration().partition_id + service_name = Config.service_name + cluster_id = Config.cluster_id + member_id = Config.member_id + instance_id = Config.instance_id + cluster_instance_id = Config.cluster_instance_id + network_partition_id = Config.network_partition_id + partition_id = Config.partition_id - instance_shutdown_event = InstanceReadyToShutdownEvent(service_name, cluster_id, cluster_instance_id, member_id, - instance_id, network_partition_id, partition_id) + instance_shutdown_event = InstanceReadyToShutdownEvent( + service_name, + cluster_id, + cluster_instance_id, + member_id, + instance_id, + network_partition_id, + partition_id) publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_READY_TO_SHUTDOWN_EVENT) publisher.publish(instance_shutdown_event) @@ -200,7 +209,7 @@ class EventPublisher: self.__topic = topic def publish(self, event): - mb_ip = CartridgeAgentConfiguration().read_property(constants.MB_IP) - mb_port = CartridgeAgentConfiguration().read_property(constants.MB_PORT) + mb_ip = Config.read_property(constants.MB_IP) + mb_port = Config.read_property(constants.MB_PORT) payload = event.to_json() publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/bfec88d4/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py index 0f825bf..4103bbc 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py @@ -17,7 +17,6 @@ from Crypto.Cipher import AES import base64 -import os import time import socket import string @@ -114,14 +113,4 @@ def check_ports_active(ip_address, ports): log.debug("Port %r is not active" % port) return False - return True - - -def get_working_dir(): - """ - Returns the base directory of the cartridge agent. - :return: Base working dir path - :rtype : str - """ - #"/path/to/cartridgeagent/modules/util/".split("modules") returns ["/path/to/cartridgeagent/", "/util"] - return os.path.abspath(os.path.dirname(__file__)).split("modules")[0] \ No newline at end of file + return True \ No newline at end of file
