Repository: stratos
Updated Branches:
  refs/heads/stratos-4.1.x 77a413fbc -> 0517d8ea9


Improvements to Python agent ADC logic and event handling


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/0517d8ea
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/0517d8ea
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/0517d8ea

Branch: refs/heads/stratos-4.1.x
Commit: 0517d8ea9e13d2146d9018a79e2ef705ae8f3032
Parents: 77a413f
Author: Akila Perera <[email protected]>
Authored: Wed Oct 14 22:42:46 2015 +0530
Committer: Akila Perera <[email protected]>
Committed: Wed Oct 14 22:42:46 2015 +0530

----------------------------------------------------------------------
 .../cartridge.agent/cartridge.agent/agent.py    |  8 +--
 .../cartridge.agent/cartridge.agent/config.py   |  9 +++-
 .../modules/event/eventhandler.py               | 52 ++++++++-----------
 .../plugins/DefaultArtifactCheckout.py          | 54 +++++++++++++-------
 .../cartridge.agent/publisher.py                | 43 ++++++----------
 5 files changed, 87 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/0517d8ea/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
index d37c555..54a7421 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
@@ -33,7 +33,6 @@ import constants
 
 
 class CartridgeAgent(threading.Thread):
-
     def __init__(self):
         threading.Thread.__init__(self)
         Config.initialize_config()
@@ -105,9 +104,10 @@ class CartridgeAgent(threading.Thread):
         repo_url = Config.repo_url
         if repo_url is None or str(repo_url).strip() == "":
             self.__log.info("No artifact repository found")
-            self.__event_handler.on_instance_activated_event()
             publisher.publish_instance_activated_event()
+            self.__event_handler.on_instance_activated_event()
         else:
+            # instance activated event will be published in artifact updated 
event handler
             self.__log.info(
                 "Artifact repository found, waiting for artifact updated event 
to checkout artifacts: [repo_url] %s",
                 repo_url)
@@ -128,7 +128,7 @@ class CartridgeAgent(threading.Thread):
 
         # run until terminated
         while not self.__terminated:
-            time.sleep(1)
+            time.sleep(5)
 
         if DataPublisherConfiguration.get_instance().enabled:
             self.__log_publish_manager.terminate_all_publishers()
@@ -318,7 +318,7 @@ def main():
     log = LogFactory().get_log(__name__)
 
     try:
-        log.debug("Starting cartridge agent")
+        log.info("Starting Stratos cartridge agent...")
         cartridge_agent.start()
     except Exception as e:
         log.exception("Cartridge Agent Exception: %r" % e)

http://git-wip-us.apache.org/repos/asf/stratos/blob/0517d8ea/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
index 1f0680f..e50c47d 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
@@ -130,9 +130,16 @@ class Config:
     """ :type : str """
     lvs_virtual_ip = None
     """ :type : str """
-
     initialized = False
     """ :type : bool """
+    activated = False
+    """ :type : bool """
+    started = False
+    """ :type : bool """
+    ready_to_shutdown = False
+    """ :type : bool """
+    maintenance = False
+    """ :type : bool """
 
     @staticmethod
     def read_conf_file():

http://git-wip-us.apache.org/repos/asf/stratos/blob/0517d8ea/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
index 7fbdf5f..1dfe834 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
@@ -59,7 +59,7 @@ class EventHandler:
         self.execute_event_extendables(constants.INSTANCE_ACTIVATED_EVENT, {})
 
     def on_artifact_updated_event(self, artifacts_updated_event):
-        self.__log.debug("Processing Artifact update event: [tenant] %s 
[cluster] %s [status] %s" %
+        self.__log.debug("Processing artifact updated event for [tenant] %s 
[cluster] %s [status] %s" %
                          (str(artifacts_updated_event.tenant_id),
                           artifacts_updated_event.cluster_id,
                           artifacts_updated_event.status))
@@ -68,7 +68,16 @@ class EventHandler:
         cluster_id_payload = Config.cluster_id
         repo_url = str(artifacts_updated_event.repo_url).strip()
 
-        if (repo_url == "") or (cluster_id_payload is None) or 
(cluster_id_payload != cluster_id_event):
+        if repo_url == "":
+            self.__log.error("Repository URL is empty. Failed to process 
artifact updated event.")
+            return
+
+        if cluster_id_payload is None or cluster_id_payload == "":
+            self.__log.error("Cluster ID in payload is empty. Failed to 
process artifact updated event.")
+            return
+
+        if cluster_id_payload != cluster_id_event:
+            self.__log.debug("Cluster ID in artifact updated event does not 
match. Skipping event handler.")
             return
 
         repo_password = None
@@ -76,41 +85,23 @@ class EventHandler:
             secret = Config.cartridge_key
             repo_password = 
cartridgeagentutils.decrypt_password(artifacts_updated_event.repo_password, 
secret)
 
+        if Config.app_path is None:
+            self.__log.error("Repository path is empty. Failed to process 
artifact updated event.")
+            return
+
         repo_username = artifacts_updated_event.repo_username
         tenant_id = artifacts_updated_event.tenant_id
         is_multitenant = Config.is_multiTenant
         commit_enabled = artifacts_updated_event.commit_enabled
 
-        if Config.app_path is None:
-            raise GitRepositorySynchronizationException("Repository path is 
empty. Cannot perform Git operations.")
-
         # create repo object
         local_repo_path = self.get_repo_path_for_tenant(str(tenant_id), 
Config.app_path, is_multitenant)
         repo_info = Repository(repo_url, repo_username, repo_password, 
local_repo_path, tenant_id, commit_enabled)
-        new_git_repo = AgentGitHandler.create_git_repo(repo_info)
-
-        # check whether this is the first artifact updated event for this 
tenant
-        existing_git_repo = AgentGitHandler.get_repo(repo_info.tenant_id)
-        if existing_git_repo is not None:
-            # check whether this event has updated credentials for git repo
-            if AgentGitHandler.is_valid_git_repository(
-                    new_git_repo) and new_git_repo.repo_url != 
existing_git_repo.repo_url:
-                # add the new git_repo object with updated credentials to repo 
list
-                AgentGitHandler.add_repo(new_git_repo)
-
-                # update the origin remote URL with new credentials
-                self.__log.info("Changes detected in git credentials for 
tenant: %s" % new_git_repo.tenant_id)
-                self.__log.debug("Updating git repo remote URL for tenant: %s 
with new remote URL: %s" % (
-                    new_git_repo.tenant_id, new_git_repo.repo_url))
-                (output, errors) = AgentGitHandler.execute_git_command(
-                    ["remote", "set-url", "origin", new_git_repo.repo_url], 
new_git_repo.local_repo_path)
-                if errors.strip() != "":
-                    self.__log.error("Failed to update git repo remote URL for 
tenant: %s" % new_git_repo.tenant_id)
-
         self.__log.info("Executing checkout job on artifact updated event...")
+
         try:
             Config.artifact_checkout_plugin.plugin_object.checkout(repo_info)
-        except GitRepositorySynchronizationException as e:
+        except Exception as e:
             self.__log.exception(
                 "Checkout job on artifact updated event failed for tenant: %s 
%s" % (repo_info.tenant_id, e))
 
@@ -124,12 +115,13 @@ class EventHandler:
 
         try:
             self.execute_event_extendables(constants.ARTIFACT_UPDATED_EVENT, 
plugin_values)
-        except ValueError:
-            self.__log.exception("Could not execute plugins for artifact 
updated event: %s" % ValueError)
+        except Exception as e:
+            self.__log.exception("Could not execute plugins for artifact 
updated event: %s" % e)
 
-        if existing_git_repo is None:
-            # publish instance activated event for single tenant subscription
+        if not Config.activated:
+            # publish instance activated event if not yet activated
             publisher.publish_instance_activated_event()
+            self.on_instance_activated_event()
 
         update_artifacts = 
Config.read_property(constants.ENABLE_ARTIFACT_UPDATE, True)
         auto_commit = Config.is_commits_enabled

http://git-wip-us.apache.org/repos/asf/stratos/blob/0517d8ea/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/DefaultArtifactCheckout.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/DefaultArtifactCheckout.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/DefaultArtifactCheckout.py
index 53cadea..c25d0e8 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/DefaultArtifactCheckout.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/plugins/DefaultArtifactCheckout.py
@@ -47,39 +47,57 @@ class DefaultArtifactCheckout(IArtifactCheckoutPlugin):
         subsequent calls or not
         :rtype: tuple(bool, bool)
         """
+        new_git_repo = AgentGitHandler.create_git_repo(repo_info)
+
+        # check whether this is the first artifact updated event for this 
tenant
+        existing_git_repo = AgentGitHandler.get_repo(repo_info.tenant_id)
+        if existing_git_repo is not None:
+            # check whether this event has updated credentials for git repo
+            if AgentGitHandler.is_valid_git_repository(
+                    new_git_repo) and new_git_repo.repo_url != 
existing_git_repo.repo_url:
+                # add the new git_repo object with updated credentials to repo 
list
+                AgentGitHandler.add_repo(new_git_repo)
+
+                # update the origin remote URL with new credentials
+                self.log.info("Changes detected in git credentials for tenant: 
%s" % new_git_repo.tenant_id)
+                (output, errors) = AgentGitHandler.execute_git_command(
+                    ["remote", "set-url", "origin", new_git_repo.repo_url], 
new_git_repo.local_repo_path)
+                if errors.strip() != "":
+                    self.log.error("Failed to update git repo remote URL for 
tenant: %s" % new_git_repo.tenant_id)
+
         git_repo = AgentGitHandler.create_git_repo(repo_info)
         if AgentGitHandler.get_repo(repo_info.tenant_id) is not None:
             # has been previously cloned, this is not the subscription run
             if AgentGitHandler.is_valid_git_repository(git_repo):
-                AgentGitHandler.log.debug("Executing git pull: [tenant-id] %s 
[repo-url] %s",
-                                          git_repo.tenant_id, 
git_repo.repo_url)
+                self.log.debug("Executing git pull: [tenant-id] %s [repo-url] 
%s",
+                               git_repo.tenant_id, git_repo.repo_url)
                 updated = AgentGitHandler.pull(git_repo)
-                AgentGitHandler.log.debug("Git pull executed: [tenant-id] %s 
[repo-url] %s",
-                                          git_repo.tenant_id, 
git_repo.repo_url)
+                self.log.debug("Git pull executed: [tenant-id] %s [repo-url] 
%s",
+                               git_repo.tenant_id, git_repo.repo_url)
             else:
                 # not a valid repository, might've been corrupted. do a 
re-clone
-                AgentGitHandler.log.debug("Local repository is not valid. 
Doing a re-clone to purify.")
+                self.log.debug("Local repository is not valid. Doing a 
re-clone to purify.")
                 git_repo.cloned = False
-                AgentGitHandler.log.debug("Executing git clone: [tenant-id] %s 
[repo-url] %s",
-                                          git_repo.tenant_id, 
git_repo.repo_url)
+                self.log.debug("Executing git clone: [tenant-id] %s [repo-url] 
%s",
+                               git_repo.tenant_id, git_repo.repo_url)
                 git_repo = AgentGitHandler.clone(git_repo)
                 AgentGitHandler.add_repo(git_repo)
-                AgentGitHandler.log.debug("Git clone executed: [tenant-id] %s 
[repo-url] %s",
-                                          git_repo.tenant_id, 
git_repo.repo_url)
+                self.log.debug("Git clone executed: [tenant-id] %s [repo-url] 
%s",
+                               git_repo.tenant_id, git_repo.repo_url)
         else:
             # subscribing run.. need to clone
-            AgentGitHandler.log.info("Cloning artifacts from %s for the first 
time to %s",
-                                     git_repo.repo_url, 
git_repo.local_repo_path)
-            AgentGitHandler.log.info("Executing git clone: [tenant-id] %s 
[repo-url] %s, [repo path] %s",
-                                     git_repo.tenant_id, git_repo.repo_url, 
git_repo.local_repo_path)
+            self.log.info("Cloning artifacts from %s for the first time to %s",
+                          git_repo.repo_url, git_repo.local_repo_path)
+            self.log.info("Executing git clone: [tenant-id] %s [repo-url] %s, 
[repo path] %s",
+                          git_repo.tenant_id, git_repo.repo_url, 
git_repo.local_repo_path)
             try:
                 git_repo = AgentGitHandler.clone(git_repo)
                 AgentGitHandler.add_repo(git_repo)
-                AgentGitHandler.log.debug("Git clone executed: [tenant-id] %s 
[repo-url] %s",
-                                          git_repo.tenant_id, 
git_repo.repo_url)
-            except GitRepositorySynchronizationException as e:
-                AgentGitHandler.log.exception("Git clone operation failed: %s" 
% e)
+                self.log.debug("Git clone executed: [tenant-id] %s [repo-url] 
%s",
+                               git_repo.tenant_id, git_repo.repo_url)
+            except Exception as e:
+                self.log.exception("Git clone operation failed: %s" % e)
                 # If first git clone is failed, execute retry_clone operation
-                AgentGitHandler.log.info("Retrying git clone operation...")
+                self.log.info("Retrying git clone operation...")
                 AgentGitHandler.retry_clone(git_repo)
                 AgentGitHandler.add_repo(git_repo)

http://git-wip-us.apache.org/repos/asf/stratos/blob/0517d8ea/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
index f0ca275..f78f460 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
@@ -25,20 +25,13 @@ import constants
 from config import Config
 
 log = LogFactory().get_log(__name__)
-
-started = False
-activated = False
-ready_to_shutdown = False
-maintenance = False
-
 publishers = {}
 """ :type : dict[str, EventPublisher] """
 
 
 def publish_instance_started_event():
-    global started, log
-    if not started:
-        log.info("Publishing instance started event")
+    if not Config.started:
+        log.info("Publishing instance started event...")
 
         application_id = Config.application_id
         service_name = Config.service_name
@@ -61,15 +54,14 @@ def publish_instance_started_event():
 
         publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + 
constants.INSTANCE_STARTED_EVENT)
         publisher.publish(instance_started_event)
-        started = True
+        Config.started = True
         log.info("Instance started event published")
     else:
         log.warn("Instance already started")
 
 
 def publish_instance_activated_event():
-    global activated, log
-    if not activated:
+    if not Config.activated:
         # Wait for all ports to be active
         listen_address = Config.listen_address
         configuration_ports = Config.ports
@@ -79,7 +71,7 @@ def publish_instance_activated_event():
             int(Config.read_property("port.check.timeout", critical=False)))
 
         if ports_active:
-            log.info("Publishing instance activated event")
+            log.info("Publishing instance activated event...")
             service_name = Config.service_name
             cluster_id = Config.cluster_id
             member_id = Config.member_id
@@ -121,19 +113,19 @@ def publish_instance_activated_event():
             else:
                 log.warn("Statistics publisher is disabled")
 
-            activated = True
+            Config.activated = True
             log.info("Health statistics notifier started")
         else:
-            log.error("Ports activation timed out. Aborting 
InstanceActivatedEvent publishing [IPAddress] %s [Ports] %s"
-                      % (listen_address, configuration_ports))
+            log.error(
+                "Ports activation timed out. Aborting publishing instance 
activated event [IPAddress] %s [Ports] %s"
+                % (listen_address, configuration_ports))
     else:
         log.warn("Instance already activated")
 
 
 def publish_maintenance_mode_event():
-    global maintenance, log
-    if not maintenance:
-        log.info("Publishing instance maintenance mode event")
+    if not Config.maintenance:
+        log.info("Publishing instance maintenance mode event...")
 
         service_name = Config.service_name
         cluster_id = Config.cluster_id
@@ -155,16 +147,15 @@ def publish_maintenance_mode_event():
         publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + 
constants.INSTANCE_MAINTENANCE_MODE_EVENT)
         publisher.publish(instance_maintenance_mode_event)
 
-        maintenance = True
-        log.info("Instance Maintenance mode event published")
+        Config.maintenance = True
+        log.info("Instance maintenance mode event published")
     else:
-        log.warn("Instance already in a Maintenance mode...")
+        log.warn("Instance already in a maintenance mode")
 
 
 def publish_instance_ready_to_shutdown_event():
-    global ready_to_shutdown, log
-    if not ready_to_shutdown:
-        log.info("Publishing instance activated event")
+    if not Config.ready_to_shutdown:
+        log.info("Publishing instance activated event...")
 
         service_name = Config.service_name
         cluster_id = Config.cluster_id
@@ -186,7 +177,7 @@ def publish_instance_ready_to_shutdown_event():
         publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + 
constants.INSTANCE_READY_TO_SHUTDOWN_EVENT)
         publisher.publish(instance_shutdown_event)
 
-        ready_to_shutdown = True
+        Config.ready_to_shutdown = True
         log.info("Instance ReadyToShutDown event published")
     else:
         log.warn("Instance already in a ReadyToShutDown event...")

Reply via email to