Repository: stratos Updated Branches: refs/heads/stratos-4.1.x 64f05a519 -> a242b52be
PCA - Validate agent.conf entries when loading Config, refactor handler methods for mb events, fix util methods, fix exception initialization Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/a242b52b Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/a242b52b Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/a242b52b Branch: refs/heads/stratos-4.1.x Commit: a242b52be5452ba03a2234333c62810d3ea67cfb Parents: 64f05a5 Author: Chamila de Alwis <[email protected]> Authored: Tue Dec 1 17:35:25 2015 +0530 Committer: Chamila de Alwis <[email protected]> Committed: Tue Dec 1 17:35:34 2015 +0530 ---------------------------------------------------------------------- .../cartridge.agent/cartridge.agent/agent.py | 144 +++++++++++-------- .../cartridge.agent/cartridge.agent/config.py | 83 ++++++++++- .../cartridge.agent/exception.py | 46 +++--- .../cartridge.agent/healthstats.py | 34 +---- .../cartridge.agent/logpublisher.py | 2 +- .../modules/artifactmgt/git/agentgithandler.py | 4 +- .../modules/event/application/signup/events.py | 3 - .../modules/event/instance/notifier/events.py | 2 +- .../modules/event/tenant/events.py | 2 +- .../modules/util/cartridgeagentutils.py | 75 ++++++---- .../cartridge.agent/publisher.py | 4 +- 11 files changed, 250 insertions(+), 149 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py index 6b81dff..7f95f26 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py @@ -30,12 +30,10 @@ from subscriber import EventSubscriber class CartridgeAgent(object): def __init__(self): Config.initialize_config() - self.__tenant_context_initialized = False - self.__log_publish_manager = None self.__terminated = False self.__log = LogFactory().get_log(__name__) - mb_urls = Config.mb_urls + mb_urls = Config.mb_urls.split(",") mb_uname = Config.mb_username mb_pwd = Config.mb_password @@ -87,8 +85,8 @@ class CartridgeAgent(object): # Execute start servers extension try: event_handler.start_server_extension() - except Exception as e: - self.__log.exception("Error processing start servers event: %s" % e) + except Exception as ex: + self.__log.exception("Error processing start servers event: %s" % ex) # check if artifact management is required before publishing instance activated event repo_url = Config.repo_url @@ -107,21 +105,22 @@ class CartridgeAgent(object): event_handler.volume_mount_extension(persistence_mapping_payload) # start log publishing thread + log_publish_manager = None if DataPublisherConfiguration.get_instance().enabled: log_file_paths = Config.log_file_paths if log_file_paths is None: self.__log.exception("No valid log file paths found, no logs will be published") else: self.__log.debug("Starting Log Publisher Manager: [Log file paths] %s" % ", ".join(log_file_paths)) - self.__log_publish_manager = LogPublisherManager(log_file_paths) - self.__log_publish_manager.start() + log_publish_manager = LogPublisherManager(log_file_paths) + log_publish_manager.start() # run until terminated while not self.__terminated: time.sleep(5) if DataPublisherConfiguration.get_instance().enabled: - self.__log_publish_manager.terminate_all_publishers() + log_publish_manager.terminate_all_publishers() def terminate(self): """ @@ -134,9 +133,10 @@ class CartridgeAgent(object): def register_instance_topic_listeners(self): self.__log.debug("Starting instance notifier event message receiver thread") - self.__inst_topic_subscriber.register_handler("ArtifactUpdatedEvent", self.on_artifact_updated) - self.__inst_topic_subscriber.register_handler("InstanceCleanupMemberEvent", self.on_instance_cleanup_member) - self.__inst_topic_subscriber.register_handler("InstanceCleanupClusterEvent", self.on_instance_cleanup_cluster) + self.__inst_topic_subscriber.register_handler("ArtifactUpdatedEvent", Handlers.on_artifact_updated) + self.__inst_topic_subscriber.register_handler("InstanceCleanupMemberEvent", Handlers.on_instance_cleanup_member) + self.__inst_topic_subscriber.register_handler( + "InstanceCleanupClusterEvent", Handlers.on_instance_cleanup_cluster) self.__inst_topic_subscriber.start() self.__log.info("Instance notifier event message receiver thread started") @@ -148,13 +148,13 @@ class CartridgeAgent(object): def register_topology_event_listeners(self): self.__log.debug("Starting topology event message receiver thread") - self.__topology_event_subscriber.register_handler("MemberActivatedEvent", self.on_member_activated) - self.__topology_event_subscriber.register_handler("MemberTerminatedEvent", self.on_member_terminated) - self.__topology_event_subscriber.register_handler("MemberSuspendedEvent", self.on_member_suspended) - self.__topology_event_subscriber.register_handler("CompleteTopologyEvent", self.on_complete_topology) - self.__topology_event_subscriber.register_handler("MemberStartedEvent", self.on_member_started) - self.__topology_event_subscriber.register_handler("MemberCreatedEvent", self.on_member_created) - self.__topology_event_subscriber.register_handler("MemberInitializedEvent", self.on_member_initialized) + self.__topology_event_subscriber.register_handler("MemberActivatedEvent", Handlers.on_member_activated) + self.__topology_event_subscriber.register_handler("MemberTerminatedEvent", Handlers.on_member_terminated) + self.__topology_event_subscriber.register_handler("MemberSuspendedEvent", Handlers.on_member_suspended) + self.__topology_event_subscriber.register_handler("CompleteTopologyEvent", Handlers.on_complete_topology) + self.__topology_event_subscriber.register_handler("MemberStartedEvent", Handlers.on_member_started) + self.__topology_event_subscriber.register_handler("MemberCreatedEvent", Handlers.on_member_created) + self.__topology_event_subscriber.register_handler("MemberInitializedEvent", Handlers.on_member_initialized) self.__topology_event_subscriber.start() self.__log.info("Cartridge agent topology receiver thread started") @@ -166,11 +166,11 @@ class CartridgeAgent(object): def register_tenant_event_listeners(self): self.__log.debug("Starting tenant event message receiver thread") self.__tenant_topic_subscriber.register_handler("DomainMappingAddedEvent", - self.on_domain_mapping_added) + Handlers.on_domain_mapping_added) self.__tenant_topic_subscriber.register_handler("DomainsMappingRemovedEvent", - self.on_domain_mapping_removed) - self.__tenant_topic_subscriber.register_handler("CompleteTenantEvent", self.on_complete_tenant) - self.__tenant_topic_subscriber.register_handler("TenantSubscribedEvent", self.on_tenant_subscribed) + Handlers.on_domain_mapping_removed) + self.__tenant_topic_subscriber.register_handler("CompleteTenantEvent", Handlers.on_complete_tenant) + self.__tenant_topic_subscriber.register_handler("TenantSubscribedEvent", Handlers.on_tenant_subscribed) self.__tenant_topic_subscriber.start() self.__log.info("Tenant event message receiver thread started") @@ -182,7 +182,7 @@ class CartridgeAgent(object): def register_application_signup_event_listeners(self): self.__log.debug("Starting application signup event message receiver thread") self.__app_topic_subscriber.register_handler("ApplicationSignUpRemovedEvent", - self.on_application_signup_removed) + Handlers.on_application_signup_removed) self.__app_topic_subscriber.start() self.__log.info("Application signup event message receiver thread started") @@ -191,18 +191,36 @@ class CartridgeAgent(object): while not self.__app_topic_subscriber.is_subscribed(): time.sleep(1) - def on_artifact_updated(self, msg): + def wait_for_complete_topology(self): + while not TopologyContext.topology.initialized: + self.__log.info("Waiting for complete topology event...") + time.sleep(5) + self.__log.info("Complete topology event received") + + +class Handlers(object): + """ + Handler methods for message broker events + """ + + __log = LogFactory().get_log(__name__) + __tenant_context_initialized = False + + @staticmethod + def on_artifact_updated(msg): event_obj = ArtifactUpdatedEvent.create_from_json(msg.payload) event_handler.on_artifact_updated_event(event_obj) - def on_instance_cleanup_member(self, msg): + @staticmethod + def on_instance_cleanup_member(msg): member_in_payload = Config.member_id event_obj = InstanceCleanupMemberEvent.create_from_json(msg.payload) member_in_event = event_obj.member_id if member_in_payload == member_in_event: event_handler.on_instance_cleanup_member_event() - def on_instance_cleanup_cluster(self, msg): + @staticmethod + def on_instance_cleanup_cluster(msg): event_obj = InstanceCleanupClusterEvent.create_from_json(msg.payload) cluster_in_payload = Config.cluster_id cluster_in_event = event_obj.cluster_id @@ -212,11 +230,13 @@ class CartridgeAgent(object): if cluster_in_event == cluster_in_payload and instance_in_payload == instance_in_event: event_handler.on_instance_cleanup_cluster_event() - def on_member_created(self, msg): - self.__log.debug("Member created event received: %r" % msg.payload) + @staticmethod + def on_member_created(msg): + Handlers.__log.debug("Member created event received: %r" % msg.payload) - def on_member_initialized(self, msg): - self.__log.debug("Member initialized event received: %r" % msg.payload) + @staticmethod + def on_member_initialized(msg): + Handlers.__log.debug("Member initialized event received: %r" % msg.payload) event_obj = MemberInitializedEvent.create_from_json(msg.payload) if not TopologyContext.topology.initialized: @@ -224,84 +244,88 @@ class CartridgeAgent(object): event_handler.on_member_initialized_event(event_obj) - def on_member_activated(self, msg): - self.__log.debug("Member activated event received: %r" % msg.payload) + @staticmethod + def on_member_activated(msg): + Handlers.__log.debug("Member activated event received: %r" % msg.payload) if not TopologyContext.topology.initialized: return event_obj = MemberActivatedEvent.create_from_json(msg.payload) event_handler.on_member_activated_event(event_obj) - def on_member_terminated(self, msg): - self.__log.debug("Member terminated event received: %r" % msg.payload) + @staticmethod + def on_member_terminated(msg): + Handlers.__log.debug("Member terminated event received: %r" % msg.payload) if not TopologyContext.topology.initialized: return event_obj = MemberTerminatedEvent.create_from_json(msg.payload) event_handler.on_member_terminated_event(event_obj) - def on_member_suspended(self, msg): - self.__log.debug("Member suspended event received: %r" % msg.payload) + @staticmethod + def on_member_suspended(msg): + Handlers.__log.debug("Member suspended event received: %r" % msg.payload) if not TopologyContext.topology.initialized: return event_obj = MemberSuspendedEvent.create_from_json(msg.payload) event_handler.on_member_suspended_event(event_obj) - def on_complete_topology(self, msg): + @staticmethod + def on_complete_topology(msg): event_obj = CompleteTopologyEvent.create_from_json(msg.payload) TopologyContext.update(event_obj.topology) if not TopologyContext.topology.initialized: - self.__log.info("Topology initialized from complete topology event") + Handlers.__log.info("Topology initialized from complete topology event") TopologyContext.topology.initialized = True event_handler.on_complete_topology_event(event_obj) - self.__log.debug("Topology context updated with [topology] %r" % event_obj.topology.json_str) + Handlers.__log.debug("Topology context updated with [topology] %r" % event_obj.topology.json_str) - def on_member_started(self, msg): - self.__log.debug("Member started event received: %r" % msg.payload) + @staticmethod + def on_member_started(msg): + Handlers.__log.debug("Member started event received: %r" % msg.payload) if not TopologyContext.topology.initialized: return event_obj = MemberStartedEvent.create_from_json(msg.payload) event_handler.on_member_started_event(event_obj) - def on_domain_mapping_added(self, msg): - self.__log.debug("Subscription domain added event received : %r" % msg.payload) + @staticmethod + def on_domain_mapping_added(msg): + Handlers.__log.debug("Subscription domain added event received : %r" % msg.payload) event_obj = DomainMappingAddedEvent.create_from_json(msg.payload) event_handler.on_domain_mapping_added_event(event_obj) - def on_domain_mapping_removed(self, msg): - self.__log.debug("Subscription domain removed event received : %r" % msg.payload) + @staticmethod + def on_domain_mapping_removed(msg): + Handlers.__log.debug("Subscription domain removed event received : %r" % msg.payload) event_obj = DomainMappingRemovedEvent.create_from_json(msg.payload) event_handler.on_domain_mapping_removed_event(event_obj) - def on_complete_tenant(self, msg): + @staticmethod + def on_complete_tenant(msg): event_obj = CompleteTenantEvent.create_from_json(msg.payload) TenantContext.update(event_obj.tenants) - if not self.__tenant_context_initialized: - self.__log.info("Tenant context initialized from complete tenant event") - self.__tenant_context_initialized = True + if not Handlers.__tenant_context_initialized: + Handlers.__log.info("Tenant context initialized from complete tenant event") + Handlers.__tenant_context_initialized = True event_handler.on_complete_tenant_event(event_obj) - self.__log.debug("Tenant context updated with [tenant list] %r" % event_obj.tenant_list_json) + Handlers.__log.debug("Tenant context updated with [tenant list] %r" % event_obj.tenant_list_json) - def on_tenant_subscribed(self, msg): - self.__log.debug("Tenant subscribed event received: %r" % msg.payload) + @staticmethod + def on_tenant_subscribed(msg): + Handlers.__log.debug("Tenant subscribed event received: %r" % msg.payload) event_obj = TenantSubscribedEvent.create_from_json(msg.payload) event_handler.on_tenant_subscribed_event(event_obj) - def on_application_signup_removed(self, msg): - self.__log.debug("Application signup removed event received: %r" % msg.payload) + @staticmethod + def on_application_signup_removed(msg): + Handlers.__log.debug("Application signup removed event received: %r" % msg.payload) event_obj = ApplicationSignUpRemovedEvent.create_from_json(msg.payload) event_handler.on_application_signup_removed_event(event_obj) - def wait_for_complete_topology(self): - while not TopologyContext.topology.initialized: - self.__log.info("Waiting for complete topology event...") - time.sleep(5) - self.__log.info("Complete topology event received") - if __name__ == "__main__": log = LogFactory().get_log(__name__) http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py index f1a70ec..72fc5e2 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py @@ -22,7 +22,7 @@ import sys from yapsy.PluginManager import PluginManager from modules.util.log import LogFactory -from exception import ParameterNotFoundException +from exception import ParameterNotFoundException, InvalidConfigValueException import constants from plugins.contracts import ICartridgeAgentPlugin, IArtifactCommitPlugin, IArtifactCheckoutPlugin, \ IHealthStatReaderPlugin @@ -148,6 +148,18 @@ class Config: """ :type : str """ mb_publisher_timeout = None """ :type : int """ + cep_username = None + """ :type : str """ + cep_password = None + """ :type : str """ + cep_urls = [] + """ :type : list """ + artifact_clone_retry_count = None + """ :type : str """ + artifact_clone_retry_interval = None + """ :type : str """ + port_check_timeout = None + """ :type : str """ @staticmethod def read_conf_file(): @@ -351,8 +363,29 @@ class Config: Config.mb_username = Config.read_property(constants.MB_USERNAME, False) Config.mb_password = Config.read_property(constants.MB_PASSWORD, False) - Config.mb_urls = Config.read_property(constants.MB_URLS).split(",") + Config.mb_urls = Config.read_property(constants.MB_URLS) Config.mb_publisher_timeout = int(Config.read_property(constants.MB_PUBLISHER_TIMEOUT)) + + Config.cep_username = Config.read_property(constants.CEP_SERVER_ADMIN_USERNAME) + Config.cep_password = Config.read_property(constants.CEP_SERVER_ADMIN_PASSWORD) + Config.cep_urls = Config.read_property(constants.CEP_RECEIVER_URLS) + + try: + Config.artifact_clone_retry_count = Config.read_property(constants.ARTIFACT_CLONE_RETRIES) + except ParameterNotFoundException: + Config.artifact_clone_retry_count = "5" + + try: + Config.artifact_clone_retry_interval = Config.read_property(constants.ARTIFACT_CLONE_INTERVAL) + except ParameterNotFoundException: + Config.artifact_clone_retry_interval = "10" + + try: + Config.port_check_timeout = Config.read_property(constants.PORT_CHECK_TIMEOUT) + except ParameterNotFoundException: + Config.port_check_timeout = "600000" + + Config.validate_config() except ParameterNotFoundException as ex: raise RuntimeError(ex) @@ -381,6 +414,52 @@ class Config: Config.extension_executor = Config.initialize_extensions() @staticmethod + def validate_config(): + try: + Config.validate_url_list(Config.mb_urls, constants.MB_URLS) + Config.validate_int(Config.mb_publisher_timeout, constants.MB_PUBLISHER_TIMEOUT) + Config.validate_url_list(Config.cep_urls, constants.CEP_RECEIVER_URLS) + Config.validate_int(Config.artifact_update_interval, constants.ARTIFACT_UPDATE_INTERVAL) + Config.validate_int(Config.artifact_clone_retry_count, constants.ARTIFACT_CLONE_RETRIES) + Config.validate_int(Config.artifact_clone_retry_interval, constants.ARTIFACT_CLONE_INTERVAL) + Config.validate_int(Config.port_check_timeout, constants.PORT_CHECK_TIMEOUT) + except ValueError as err: + raise InvalidConfigValueException("Invalid configuration for Cartridge Agent", err) + + @staticmethod + def validate_url_list(urls, field_name): + """ + host1:port1,host2:port2 + + :param urls: + :param field_name: + :return: + """ + url_list = str(urls).split(",") + if len(url_list) < 1: + raise ValueError("Invalid value [field] \"%s\"" % field_name) + + for single_url in url_list: + try: + url_ip, url_port = single_url.split(":") + except ValueError: + raise ValueError("Invalid host or port number value for [field] %s", field_name) + + @staticmethod + def validate_int(int_value, field_name): + """ + valid integer value + + :param int_value: + :param field_name: + :return: + """ + try: + int(int_value) + except ValueError: + raise ValueError("Invalid int value for [field] %s " % field_name) + + @staticmethod def initialize_plugins(): """ Find, load, activate and group plugins for Python CA :return: a tuple of (PluginManager, plugins, artifact management plugins) http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py index 1586c4d..345efee 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/exception.py @@ -21,17 +21,8 @@ class CartridgeAgentException(Exception): Exception super class to be used by specific exceptions thrown in the cartridge agent """ - def __init__(self, message): - super(CartridgeAgentException, self).__init__(message) - self.__message = message - - def get_message(self): - """ - The message provided when the exception is raised - :return: message - :rtype: str - """ - return self.__message + def __init__(self, *args, **kwargs): + Exception.__init__(self, *args, **kwargs) class DataPublisherException(CartridgeAgentException): @@ -39,8 +30,8 @@ class DataPublisherException(CartridgeAgentException): Exception to be used during log publishing operations """ - def __init__(self, message): - super(DataPublisherException, self).__init__(message) + def __init__(self, *args, **kwargs): + CartridgeAgentException.__init__(self, *args, **kwargs) class PluginExecutionException(CartridgeAgentException): @@ -48,8 +39,8 @@ class PluginExecutionException(CartridgeAgentException): Exception raised when a runtime error is met while executing an plugin script """ - def __init__(self, message): - super(PluginExecutionException, self).__init__(message) + def __init__(self, *args, **kwargs): + CartridgeAgentException.__init__(self, *args, **kwargs) class GitRepositorySynchronizationException(CartridgeAgentException): @@ -57,8 +48,8 @@ class GitRepositorySynchronizationException(CartridgeAgentException): Exception raised during a git repository related task """ - def __init__(self, message): - super(GitRepositorySynchronizationException, self).__init__(message) + def __init__(self, *args, **kwargs): + CartridgeAgentException.__init__(self, *args, **kwargs) class ParameterNotFoundException(CartridgeAgentException): @@ -67,8 +58,8 @@ class ParameterNotFoundException(CartridgeAgentException): of the cartridge agent """ - def __init__(self, message): - super(ParameterNotFoundException, self).__init__(message) + def __init__(self, *args, **kwargs): + CartridgeAgentException.__init__(self, *args, **kwargs) class ThriftReceiverOfflineException(CartridgeAgentException): @@ -76,8 +67,8 @@ class ThriftReceiverOfflineException(CartridgeAgentException): Exception raised when the connection to the Thrift receiver is dropped when publishing events """ - def __init__(self, message): - super(ThriftReceiverOfflineException, self).__init__(message) + def __init__(self, *args, **kwargs): + CartridgeAgentException.__init__(self, *args, **kwargs) class CEPPublisherException(CartridgeAgentException): @@ -85,5 +76,14 @@ class CEPPublisherException(CartridgeAgentException): Exception to be used during CEP publishing operations """ - def __init__(self, message): - super(CEPPublisherException, self).__init__(message) + def __init__(self, *args, **kwargs): + CartridgeAgentException.__init__(self, *args, **kwargs) + + +class InvalidConfigValueException(CartridgeAgentException): + """ + Exception to be used when validating agent configuration + """ + + def __init__(self, *args, **kwargs): + CartridgeAgentException.__init__(self, *args, **kwargs) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py index 71f2894..2005537 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py @@ -62,7 +62,7 @@ class HealthStatisticsPublisherManager(Thread): self.log.debug("Publishing load average: %r" % cartridge_stats.load_avg) self.publisher.publish_load_average(cartridge_stats.load_avg) - except Exception as e: + except Exception: self.log.exception( "Couldn't publish health statistics to CEP. Thrift Receiver offline. Reconnecting...") self.publisher = HealthStatisticsPublisher() @@ -76,33 +76,17 @@ class HealthStatisticsPublisher: """ log = LogFactory().get_log(__name__) - @staticmethod - def read_config(conf_key): - """ - Read a given key from the cartridge agent configuration - :param conf_key: The key to look for in the CA config - :return: The value for the key from the CA config - :raise: RuntimeError if the given key is not found in the CA config - """ - conf_value = Config.read_property(conf_key, False) - - if conf_value is None or conf_value.strip() == "": - raise RuntimeError("System property not found: " + conf_key) - - return conf_value - def __init__(self): self.publishers = [] self.deactive_publishers = [] - self.cep_admin_username = HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_USERNAME) - self.cep_admin_password = HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_PASSWORD) + self.cep_admin_username = Config.cep_username + self.cep_admin_password = Config.cep_password self.stream_definition = HealthStatisticsPublisher.create_stream_definition() HealthStatisticsPublisher.log.debug("Stream definition created: %r" % str(self.stream_definition)) # 1.1.1.1:1883,2.2.2.2:1883 - cep_urls = HealthStatisticsPublisher.read_config(constants.CEP_RECEIVER_URLS) - cep_urls = cep_urls.split(',') + cep_urls = Config.cep_urls.split(',') for cep_url in cep_urls: cep_active = self.is_cep_active(cep_url) @@ -207,14 +191,10 @@ class HealthStatisticsPublisher: return true if active :param cep_url: """ - ports = [] cep_ip = cep_url.split(':')[0] cep_port = cep_url.split(':')[1] - ports.append(cep_port) - cep_active = cartridgeagentutils.check_ports_active( - cep_ip, - ports) + cep_active = cartridgeagentutils.check_port_active(cep_ip, cep_port) return cep_active @@ -229,8 +209,8 @@ class HealthStatisticsPublisher: except Exception as ex: raise ThriftReceiverOfflineException(ex) - deactive_ceps = self.deactive_publishers - for cep_url in deactive_ceps: + inactive_ceps = self.deactive_publishers + for cep_url in inactive_ceps: cep_active = self.is_cep_active(cep_url) if cep_active: self.add_publishers(cep_url) http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py index e7ab1c7..b9f5d63 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py @@ -145,7 +145,7 @@ class LogPublisherManager(Thread): ports_active = cartridgeagentutils.wait_until_ports_active( DataPublisherConfiguration.get_instance().monitoring_server_ip, self.ports, - int(Config.read_property("port.check.timeout", critical=False))) + int(Config.port_check_timeout)) if not ports_active: self.log.debug("Monitoring server is not active") http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py index 2170a33..15d3733 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py @@ -168,8 +168,8 @@ class AgentGitHandler: """ git_clone_successful = False # Read properties from agent.conf - max_retry_attempts = int(Config.read_property(constants.ARTIFACT_CLONE_RETRIES, 5)) - retry_interval = int(Config.read_property(constants.ARTIFACT_CLONE_INTERVAL, 10)) + max_retry_attempts = int(Config.artifact_clone_retry_count) + retry_interval = int(Config.artifact_clone_retry_interval) retry_attempts = 0 # Iterate until git clone is successful or reaches max retry attempts http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/application/signup/events.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/application/signup/events.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/application/signup/events.py index f44dd04..13bb680 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/application/signup/events.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/application/signup/events.py @@ -27,7 +27,6 @@ class ApplicationSignUpAddedEvent: self.clusterIds = None """ :type : list[str] """ - @staticmethod def create_from_json(json_str): json_obj = json.loads(json_str) @@ -49,7 +48,6 @@ class ApplicationSignUpRemovedEvent: self.clusterIds = None """ :type : list[str] """ - @staticmethod def create_from_json(json_str): json_obj = json.loads(json_str) @@ -59,4 +57,3 @@ class ApplicationSignUpRemovedEvent: app_signup_removed.tenantId = json_obj["tenantId"] if "tenantId" in json_obj else None return app_signup_removed - http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/notifier/events.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/notifier/events.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/notifier/events.py index 41dc133..d19cd0c 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/notifier/events.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/instance/notifier/events.py @@ -79,4 +79,4 @@ class InstanceCleanupMemberEvent: json_obj = json.loads(json_str) m_id = json_obj["memberId"] if "memberId" in json_obj else None - return InstanceCleanupMemberEvent(m_id) \ No newline at end of file + return InstanceCleanupMemberEvent(m_id) http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py index 2e287bd..8a49bc2 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/tenant/events.py @@ -84,4 +84,4 @@ class TenantUnsubscribedEvent: instance.service_name = json_obj["serviceName"] if "serviceName" in json_obj else None instance.cluster_ids = json_obj["clusterIds"] if "clusterIds" in json_obj else None - return instance \ No newline at end of file + return instance http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py index 79bc6c5..e5ad877 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py @@ -24,12 +24,18 @@ import hashlib from log import LogFactory +BS = 16 + log = LogFactory().get_log(__name__) -unpad = lambda s: s[0:-ord(s[-1])] -current_milli_time = lambda: int(round(time.time() * 1000)) -BS = 16 -pad = lambda s: s + (BS - len(s) % BS) * chr(BS - len(s) % BS) + +def unpad(s): return s[0:-ord(s[-1])] + + +def current_milli_time(): return int(round(time.time() * 1000)) + + +def pad(s): return s + (BS - len(s) % BS) * chr(BS - len(s) % BS) def decrypt_password(pass_str, secret): @@ -77,11 +83,26 @@ def wait_until_ports_active(ip_address, ports, ports_check_timeout=600000): log.debug("Port check timeout: %s" % ports_check_timeout) - active = False + ports_left = ports start_time = current_milli_time() - while not active: + + # check ports until all are active or timeout exceeds + while True: log.info("Waiting for ports to be active: [ip] %s [ports] %s" % (ip_address, ports)) - active = check_ports_active(ip_address, ports) + + # check each port for activity + for checking_port in list(ports_left): + port_active = check_port_active(ip_address, checking_port) + if port_active: + log.debug("Port %s on host %s active" % (checking_port, ip_address)) + ports_left.remove(checking_port) + + # if no ports are left to check for activity, return + if len(ports_left) == 0: + log.info("Ports activated: [ip] %r [ports] %r" % (ip_address, ports)) + return True + + # active = check_ports_active(ip_address, ports) end_time = current_milli_time() duration = end_time - start_time @@ -92,33 +113,33 @@ def wait_until_ports_active(ip_address, ports, ports_check_timeout=600000): time.sleep(5) - log.info("Ports activated: [ip] %r [ports] %r" % (ip_address, ports)) - return True - -def check_ports_active(ip_address, ports): +def check_port_active(ip_address, port): """ - Checks the given list of port addresses for active state + Checks the given port on the given host for activity :param str ip_address: Ip address of the member to be checked - :param list[str] ports: The list of ports to be checked + :param str port: The port to be checked :return: True if the ports are active, False if at least one is not active :rtype: bool """ - if len(ports) < 1: - raise RuntimeError("No ports found") - - for port in ports: - s = socket.socket() - s.settimeout(5) - try: - s.connect((ip_address, int(port))) - log.debug("Port %s is active" % port) - s.close() - except socket.error: - log.debug("Port %s is not active" % port) - return False + if port is None: + raise RuntimeError("Cannot check invalid port for activity") + + try: + port_int = int(port) + except ValueError: + raise RuntimeError("Cannot check invalid port for activity %s" % port) - return True + s = socket.socket() + s.settimeout(5) + try: + s.connect((ip_address, port_int)) + log.debug("Port %s is active" % port) + s.close() + return True + except socket.error: + log.debug("Port %s is not active" % port) + return False class IncrementalCeilingListIterator(object): http://git-wip-us.apache.org/repos/asf/stratos/blob/a242b52b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py index 5b6190e..229c354 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py @@ -71,7 +71,7 @@ def publish_instance_activated_event(): ports_active = cartridgeagentutils.wait_until_ports_active( listen_address, configuration_ports, - int(Config.read_property("port.check.timeout", critical=False))) + int(Config.port_check_timeout)) if ports_active: log.info("Publishing instance activated event...") @@ -226,7 +226,7 @@ class EventPublisher(object): while int(time.time()) - self.__start_time < (Config.mb_publisher_timeout * 1000): retry_interval = retry_iterator.get_next_retry_interval() - for mb_url in Config.mb_urls: + for mb_url in Config.mb_urls.split(","): mb_ip, mb_port = mb_url.split(":") # start a thread to execute publish event
