Repository: stratos Updated Branches: refs/heads/stratos-4.1.x 77a413fbc -> 0517d8ea9
Improvements to Python agent ADC logic and event handling Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/0517d8ea Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/0517d8ea Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/0517d8ea Branch: refs/heads/stratos-4.1.x Commit: 0517d8ea9e13d2146d9018a79e2ef705ae8f3032 Parents: 77a413f Author: Akila Perera <[email protected]> Authored: Wed Oct 14 22:42:46 2015 +0530 Committer: Akila Perera <[email protected]> Committed: Wed Oct 14 22:42:46 2015 +0530 ---------------------------------------------------------------------- .../cartridge.agent/cartridge.agent/agent.py | 8 +-- .../cartridge.agent/cartridge.agent/config.py | 9 +++- .../modules/event/eventhandler.py | 52 ++++++++----------- .../plugins/DefaultArtifactCheckout.py | 54 +++++++++++++------- .../cartridge.agent/publisher.py | 43 ++++++---------- 5 files changed, 87 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/0517d8ea/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py index d37c555..54a7421 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py @@ -33,7 +33,6 @@ import constants class CartridgeAgent(threading.Thread): - def __init__(self): threading.Thread.__init__(self) Config.initialize_config() @@ -105,9 +104,10 @@ class CartridgeAgent(threading.Thread): repo_url = Config.repo_url if repo_url is None or str(repo_url).strip() == "": self.__log.info("No artifact repository found") - self.__event_handler.on_instance_activated_event() publisher.publish_instance_activated_event() + self.__event_handler.on_instance_activated_event() else: + # instance activated event will be published in artifact updated event handler self.__log.info( "Artifact repository found, waiting for artifact updated event to checkout artifacts: [repo_url] %s", repo_url) @@ -128,7 +128,7 @@ class CartridgeAgent(threading.Thread): # run until terminated while not self.__terminated: - time.sleep(1) + time.sleep(5) if DataPublisherConfiguration.get_instance().enabled: self.__log_publish_manager.terminate_all_publishers() @@ -318,7 +318,7 @@ def main(): log = LogFactory().get_log(__name__) try: - log.debug("Starting cartridge agent") + log.info("Starting Stratos cartridge agent...") cartridge_agent.start() except Exception as e: log.exception("Cartridge Agent Exception: %r" % e) http://git-wip-us.apache.org/repos/asf/stratos/blob/0517d8ea/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py index 1f0680f..e50c47d 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py @@ -130,9 +130,16 @@ class Config: """ :type : str """ lvs_virtual_ip = None """ :type : str """ - initialized = False """ :type : bool """ + activated = False + """ :type : bool """ + started = False + """ :type : bool """ + ready_to_shutdown = False + """ :type : bool """ + maintenance = False + """ :type : bool """ @staticmethod def read_conf_file(): http://git-wip-us.apache.org/repos/asf/stratos/blob/0517d8ea/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py index 7fbdf5f..1dfe834 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py @@ -59,7 +59,7 @@ class EventHandler: self.execute_event_extendables(constants.INSTANCE_ACTIVATED_EVENT, {}) def on_artifact_updated_event(self, artifacts_updated_event): - self.__log.debug("Processing Artifact update event: [tenant] %s [cluster] %s [status] %s" % + self.__log.debug("Processing artifact updated event for [tenant] %s [cluster] %s [status] %s" % (str(artifacts_updated_event.tenant_id), artifacts_updated_event.cluster_id, artifacts_updated_event.status)) @@ -68,7 +68,16 @@ class EventHandler: cluster_id_payload = Config.cluster_id repo_url = str(artifacts_updated_event.repo_url).strip() - if (repo_url == "") or (cluster_id_payload is None) or (cluster_id_payload != cluster_id_event): + if repo_url == "": + self.__log.error("Repository URL is empty. Failed to process artifact updated event.") + return + + if cluster_id_payload is None or cluster_id_payload == "": + self.__log.error("Cluster ID in payload is empty. Failed to process artifact updated event.") + return + + if cluster_id_payload != cluster_id_event: + self.__log.debug("Cluster ID in artifact updated event does not match. Skipping event handler.") return repo_password = None @@ -76,41 +85,23 @@ class EventHandler: secret = Config.cartridge_key repo_password = cartridgeagentutils.decrypt_password(artifacts_updated_event.repo_password, secret) + if Config.app_path is None: + self.__log.error("Repository path is empty. Failed to process artifact updated event.") + return + repo_username = artifacts_updated_event.repo_username tenant_id = artifacts_updated_event.tenant_id is_multitenant = Config.is_multiTenant commit_enabled = artifacts_updated_event.commit_enabled - if Config.app_path is None: - raise GitRepositorySynchronizationException("Repository path is empty. Cannot perform Git operations.") - # create repo object local_repo_path = self.get_repo_path_for_tenant(str(tenant_id), Config.app_path, is_multitenant) repo_info = Repository(repo_url, repo_username, repo_password, local_repo_path, tenant_id, commit_enabled) - new_git_repo = AgentGitHandler.create_git_repo(repo_info) - - # check whether this is the first artifact updated event for this tenant - existing_git_repo = AgentGitHandler.get_repo(repo_info.tenant_id) - if existing_git_repo is not None: - # check whether this event has updated credentials for git repo - if AgentGitHandler.is_valid_git_repository( - new_git_repo) and new_git_repo.repo_url != existing_git_repo.repo_url: - # add the new git_repo object with updated credentials to repo list - AgentGitHandler.add_repo(new_git_repo) - - # update the origin remote URL with new credentials - self.__log.info("Changes detected in git credentials for tenant: %s" % new_git_repo.tenant_id) - self.__log.debug("Updating git repo remote URL for tenant: %s with new remote URL: %s" % ( - new_git_repo.tenant_id, new_git_repo.repo_url)) - (output, errors) = AgentGitHandler.execute_git_command( - ["remote", "set-url", "origin", new_git_repo.repo_url], new_git_repo.local_repo_path) - if errors.strip() != "": - self.__log.error("Failed to update git repo remote URL for tenant: %s" % new_git_repo.tenant_id) - self.__log.info("Executing checkout job on artifact updated event...") + try: Config.artifact_checkout_plugin.plugin_object.checkout(repo_info) - except GitRepositorySynchronizationException as e: + except Exception as e: self.__log.exception( "Checkout job on artifact updated event failed for tenant: %s %s" % (repo_info.tenant_id, e)) @@ -124,12 +115,13 @@ class EventHandler: try: self.execute_event_extendables(constants.ARTIFACT_UPDATED_EVENT, plugin_values) - except ValueError: - self.__log.exception("Could not execute plugins for artifact updated event: %s" % ValueError) + except Exception as e: + self.__log.exception("Could not execute plugins for artifact updated event: %s" % e) - if existing_git_repo is None: - # publish instance activated event for single tenant subscription + if not Config.activated: + # publish instance activated event if not yet activated publisher.publish_instance_activated_event() + self.on_instance_activated_event() update_artifacts = Config.read_property(constants.ENABLE_ARTIFACT_UPDATE, True) auto_commit = Config.is_commits_enabled http://git-wip-us.apache.org/repos/asf/stratos/blob/0517d8ea/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/DefaultArtifactCheckout.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/DefaultArtifactCheckout.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/DefaultArtifactCheckout.py index 53cadea..c25d0e8 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/DefaultArtifactCheckout.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/DefaultArtifactCheckout.py @@ -47,39 +47,57 @@ class DefaultArtifactCheckout(IArtifactCheckoutPlugin): subsequent calls or not :rtype: tuple(bool, bool) """ + new_git_repo = AgentGitHandler.create_git_repo(repo_info) + + # check whether this is the first artifact updated event for this tenant + existing_git_repo = AgentGitHandler.get_repo(repo_info.tenant_id) + if existing_git_repo is not None: + # check whether this event has updated credentials for git repo + if AgentGitHandler.is_valid_git_repository( + new_git_repo) and new_git_repo.repo_url != existing_git_repo.repo_url: + # add the new git_repo object with updated credentials to repo list + AgentGitHandler.add_repo(new_git_repo) + + # update the origin remote URL with new credentials + self.log.info("Changes detected in git credentials for tenant: %s" % new_git_repo.tenant_id) + (output, errors) = AgentGitHandler.execute_git_command( + ["remote", "set-url", "origin", new_git_repo.repo_url], new_git_repo.local_repo_path) + if errors.strip() != "": + self.log.error("Failed to update git repo remote URL for tenant: %s" % new_git_repo.tenant_id) + git_repo = AgentGitHandler.create_git_repo(repo_info) if AgentGitHandler.get_repo(repo_info.tenant_id) is not None: # has been previously cloned, this is not the subscription run if AgentGitHandler.is_valid_git_repository(git_repo): - AgentGitHandler.log.debug("Executing git pull: [tenant-id] %s [repo-url] %s", - git_repo.tenant_id, git_repo.repo_url) + self.log.debug("Executing git pull: [tenant-id] %s [repo-url] %s", + git_repo.tenant_id, git_repo.repo_url) updated = AgentGitHandler.pull(git_repo) - AgentGitHandler.log.debug("Git pull executed: [tenant-id] %s [repo-url] %s", - git_repo.tenant_id, git_repo.repo_url) + self.log.debug("Git pull executed: [tenant-id] %s [repo-url] %s", + git_repo.tenant_id, git_repo.repo_url) else: # not a valid repository, might've been corrupted. do a re-clone - AgentGitHandler.log.debug("Local repository is not valid. Doing a re-clone to purify.") + self.log.debug("Local repository is not valid. Doing a re-clone to purify.") git_repo.cloned = False - AgentGitHandler.log.debug("Executing git clone: [tenant-id] %s [repo-url] %s", - git_repo.tenant_id, git_repo.repo_url) + self.log.debug("Executing git clone: [tenant-id] %s [repo-url] %s", + git_repo.tenant_id, git_repo.repo_url) git_repo = AgentGitHandler.clone(git_repo) AgentGitHandler.add_repo(git_repo) - AgentGitHandler.log.debug("Git clone executed: [tenant-id] %s [repo-url] %s", - git_repo.tenant_id, git_repo.repo_url) + self.log.debug("Git clone executed: [tenant-id] %s [repo-url] %s", + git_repo.tenant_id, git_repo.repo_url) else: # subscribing run.. need to clone - AgentGitHandler.log.info("Cloning artifacts from %s for the first time to %s", - git_repo.repo_url, git_repo.local_repo_path) - AgentGitHandler.log.info("Executing git clone: [tenant-id] %s [repo-url] %s, [repo path] %s", - git_repo.tenant_id, git_repo.repo_url, git_repo.local_repo_path) + self.log.info("Cloning artifacts from %s for the first time to %s", + git_repo.repo_url, git_repo.local_repo_path) + self.log.info("Executing git clone: [tenant-id] %s [repo-url] %s, [repo path] %s", + git_repo.tenant_id, git_repo.repo_url, git_repo.local_repo_path) try: git_repo = AgentGitHandler.clone(git_repo) AgentGitHandler.add_repo(git_repo) - AgentGitHandler.log.debug("Git clone executed: [tenant-id] %s [repo-url] %s", - git_repo.tenant_id, git_repo.repo_url) - except GitRepositorySynchronizationException as e: - AgentGitHandler.log.exception("Git clone operation failed: %s" % e) + self.log.debug("Git clone executed: [tenant-id] %s [repo-url] %s", + git_repo.tenant_id, git_repo.repo_url) + except Exception as e: + self.log.exception("Git clone operation failed: %s" % e) # If first git clone is failed, execute retry_clone operation - AgentGitHandler.log.info("Retrying git clone operation...") + self.log.info("Retrying git clone operation...") AgentGitHandler.retry_clone(git_repo) AgentGitHandler.add_repo(git_repo) http://git-wip-us.apache.org/repos/asf/stratos/blob/0517d8ea/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py index f0ca275..f78f460 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py @@ -25,20 +25,13 @@ import constants from config import Config log = LogFactory().get_log(__name__) - -started = False -activated = False -ready_to_shutdown = False -maintenance = False - publishers = {} """ :type : dict[str, EventPublisher] """ def publish_instance_started_event(): - global started, log - if not started: - log.info("Publishing instance started event") + if not Config.started: + log.info("Publishing instance started event...") application_id = Config.application_id service_name = Config.service_name @@ -61,15 +54,14 @@ def publish_instance_started_event(): publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_STARTED_EVENT) publisher.publish(instance_started_event) - started = True + Config.started = True log.info("Instance started event published") else: log.warn("Instance already started") def publish_instance_activated_event(): - global activated, log - if not activated: + if not Config.activated: # Wait for all ports to be active listen_address = Config.listen_address configuration_ports = Config.ports @@ -79,7 +71,7 @@ def publish_instance_activated_event(): int(Config.read_property("port.check.timeout", critical=False))) if ports_active: - log.info("Publishing instance activated event") + log.info("Publishing instance activated event...") service_name = Config.service_name cluster_id = Config.cluster_id member_id = Config.member_id @@ -121,19 +113,19 @@ def publish_instance_activated_event(): else: log.warn("Statistics publisher is disabled") - activated = True + Config.activated = True log.info("Health statistics notifier started") else: - log.error("Ports activation timed out. Aborting InstanceActivatedEvent publishing [IPAddress] %s [Ports] %s" - % (listen_address, configuration_ports)) + log.error( + "Ports activation timed out. Aborting publishing instance activated event [IPAddress] %s [Ports] %s" + % (listen_address, configuration_ports)) else: log.warn("Instance already activated") def publish_maintenance_mode_event(): - global maintenance, log - if not maintenance: - log.info("Publishing instance maintenance mode event") + if not Config.maintenance: + log.info("Publishing instance maintenance mode event...") service_name = Config.service_name cluster_id = Config.cluster_id @@ -155,16 +147,15 @@ def publish_maintenance_mode_event(): publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_MAINTENANCE_MODE_EVENT) publisher.publish(instance_maintenance_mode_event) - maintenance = True - log.info("Instance Maintenance mode event published") + Config.maintenance = True + log.info("Instance maintenance mode event published") else: - log.warn("Instance already in a Maintenance mode...") + log.warn("Instance already in a maintenance mode") def publish_instance_ready_to_shutdown_event(): - global ready_to_shutdown, log - if not ready_to_shutdown: - log.info("Publishing instance activated event") + if not Config.ready_to_shutdown: + log.info("Publishing instance activated event...") service_name = Config.service_name cluster_id = Config.cluster_id @@ -186,7 +177,7 @@ def publish_instance_ready_to_shutdown_event(): publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_READY_TO_SHUTDOWN_EVENT) publisher.publish(instance_shutdown_event) - ready_to_shutdown = True + Config.ready_to_shutdown = True log.info("Instance ReadyToShutDown event published") else: log.warn("Instance already in a ReadyToShutDown event...")
