PCA - Fixing message broker ha callback, agent graceful termination and extra 
debug logs


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/f6336002
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/f6336002
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/f6336002

Branch: refs/heads/master
Commit: f6336002f17039a00037bfcc28c47ccbd7327878
Parents: 2c82881
Author: Chamila de Alwis <[email protected]>
Authored: Mon Dec 14 15:25:31 2015 +0530
Committer: Chamila de Alwis <[email protected]>
Committed: Fri Dec 18 11:54:50 2015 +0530

----------------------------------------------------------------------
 .../cartridge.agent/cartridge.agent/agent.py    |  23 ++-
 .../cartridge.agent/cartridge.agent/entity.py   |   1 -
 .../cartridge.agent/healthstats.py              |   4 +
 .../cartridge.agent/logpublisher.py             |  14 +-
 .../modules/artifactmgt/git/agentgithandler.py  |   1 +
 .../modules/event/eventhandler.py               |  23 ++-
 .../modules/util/asyncscheduledtask.py          |  10 +
 .../cartridge.agent/publisher.py                |   5 +-
 .../cartridge.agent/subscriber.py               |  66 ++----
 .../cartridge.agent/terminator.txt              |   0
 .../tests/AgentTerminationTestCase.java         | 204 +++++++++++++++++++
 .../tests/MessageBrokerHATestCase.java          |   1 +
 .../tests/PythonAgentIntegrationTest.java       |   3 +-
 .../AgentTerminationTestCase/agent.conf         |  46 +++++
 .../AgentTerminationTestCase/logging.ini        |  52 +++++
 .../payload/launch-params                       |   1 +
 .../src/test/resources/test-suite-ha.xml        |  38 ++++
 17 files changed, 434 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
index cec80fc..aa9c035 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
@@ -44,7 +44,6 @@ class CartridgeAgent(object):
 
     def run_agent(self):
         self.__log.info("Starting Cartridge Agent...")
-
         # Start topology event receiver thread
         self.register_topology_event_listeners()
 
@@ -327,13 +326,31 @@ class Handlers(object):
         event_handler.on_application_signup_removed_event(event_obj)
 
 
+def check_termination(agent_obj):
+    terminate = False
+    terminator_file_path = os.path.abspath(os.path.dirname(__file__)) + 
"/terminator.txt"
+    while not terminate:
+        time.sleep(60)
+        try:
+            with open(terminator_file_path, 'r') as f:
+                file_output = f.read()
+                terminate = True if "true" in file_output else False
+        except IOError:
+            pass
+
+    log.info("Shutting down Stratos cartridge agent...")
+    agent_obj.terminate()
+
 
 if __name__ == "__main__":
     log = LogFactory().get_log(__name__)
+    cartridge_agent = CartridgeAgent()
     try:
         log.info("Starting Stratos cartridge agent...")
-        cartridge_agent = CartridgeAgent()
+        task_thread = Thread(target=check_termination, args=(cartridge_agent,))
+        task_thread.start()
         cartridge_agent.run_agent()
     except Exception as e:
         log.exception("Cartridge Agent Exception: %r" % e)
-        # cartridge_agent.terminate()
+        log.info("Terminating Stratos cartridge agent...")
+        cartridge_agent.terminate()

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py
index d00afd0..5dde2ee 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py
@@ -509,7 +509,6 @@ class TopologyContext:
     @staticmethod
     def update(topology):
         TopologyContext.topology = topology
-        TopologyContext.initialized = True
 
 
 class Tenant:

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
index 2005537..48d07c2 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
@@ -43,14 +43,18 @@ class HealthStatisticsPublisherManager(Thread):
         self.log = LogFactory().get_log(__name__)
         self.publish_interval = publish_interval
         """:type : int"""
+        self.setDaemon(True)
         self.terminated = False
         self.publisher = HealthStatisticsPublisher()
         """:type : HealthStatisticsPublisher"""
 
         """:type : IHealthStatReaderPlugin"""
         self.stats_reader = Config.health_stat_plugin
+        self.setName("HealthStatPublisherManagerThread")
+        self.log.debug("Created a HealthStatisticsPublisherManager thread")
 
     def run(self):
+        self.log.debug("Starting the HealthStatisticsPublisherManager thread")
         while not self.terminated:
             time.sleep(self.publish_interval)
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py
index b9f5d63..dbb5046 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py
@@ -43,8 +43,12 @@ class LogPublisher(Thread):
         self.member_id = member_id
 
         self.terminated = False
+        self.setName("LogPublisherThread")
+        self.setDaemon(True)
+        self.log.debug("Created a LogPublisher thread")
 
     def run(self):
+        self.log.debug("Starting the LogPublisher threads")
         if os.path.isfile(self.file_path) and os.access(self.file_path, 
os.R_OK):
             self.log.info("Starting log publisher for file: " + self.file_path 
+ ", thread: " + str(current_thread()))
             # open file and keep reading for new entries
@@ -102,6 +106,9 @@ class LogPublisherManager(Thread):
     def define_stream(tenant_id, alias, date_time):
         """
         Creates a stream definition for Log Publishing
+        :param date_time:
+        :param alias:
+        :param tenant_id:
         :return: A StreamDefinition object with the required attributes added
         :rtype : StreamDefinition
         """
@@ -134,7 +141,7 @@ class LogPublisherManager(Thread):
         Thread.__init__(self)
 
         self.log = LogFactory().get_log(__name__)
-
+        self.setDaemon(True)
         self.logfile_paths = logfile_paths
         self.publishers = {}
         self.ports = []
@@ -158,8 +165,11 @@ class LogPublisherManager(Thread):
         self.date_time = LogPublisherManager.get_current_date()
 
         self.stream_definition = self.define_stream(self.tenant_id, 
self.alias, self.date_time)
+        self.setName("LogPublisherManagerThread")
+        self.log.debug("Created a LogPublisherManager thread")
 
     def run(self):
+        self.log.debug("Starting the LogPublisherManager thread")
         if self.logfile_paths is not None and len(self.logfile_paths):
             for log_path in self.logfile_paths:
                 # thread for each log file
@@ -170,6 +180,7 @@ class LogPublisherManager(Thread):
     def get_publisher(self, log_path):
         """
         Retrieve the publisher for the specified log file path. Creates a new 
LogPublisher if one is not available
+        :param log_path:
         :return: The LogPublisher object
         :rtype : LogPublisher
         """
@@ -188,6 +199,7 @@ class LogPublisherManager(Thread):
     def terminate_publisher(self, log_path):
         """
         Terminates the LogPublisher thread associated with the specified log 
file
+        :param log_path:
         """
         if log_path in self.publishers:
             self.publishers[log_path].terminate()

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py
index fb2c534..309814d 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py
@@ -296,6 +296,7 @@ class AgentGitHandler:
 
             artifact_update_task = ArtifactUpdateTask(repo_info, 
auto_checkout, auto_commit)
             async_task = ScheduledExecutor(update_interval, 
artifact_update_task)
+            AgentGitHandler.log.info("Starting a Scheduled Executor thread for 
Git polling task")
 
             git_repo.scheduled_update_task = async_task
             async_task.start()

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
index a4ff074..c8ccf68 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py
@@ -429,6 +429,8 @@ def execute_plugins_for_event(event, input_values):
             for plugin_info in plugins_for_event:
                 log.debug("Executing plugin %s for event %s" % 
(plugin_info.name, event))
                 plugin_thread = PluginExecutor(plugin_info, input_values)
+                plugin_thread.setName("PluginExecutorThreadForPlugin%s" % 
plugin_info.name)
+                log.debug("Starting a PluginExecutor Thread for event %s" % 
event.__class__.__name__)
                 plugin_thread.start()
 
                 # block till plugin run completes.
@@ -449,6 +451,8 @@ def execute_extension_for_event(event, extension_values):
         if Config.extension_executor is not None:
             log.debug("Executing extension for event [%s]" % event)
             extension_thread = PluginExecutor(Config.extension_executor, 
extension_values)
+            extension_thread.setName("ExtensionExecutorThreadForExtension%s" % 
event.__class__.__name__)
+            log.debug("Starting a PluginExecutor Thread for event extension 
%s" % event.__class__.__name__)
             extension_thread.start()
 
             # block till plugin run completes.
@@ -528,13 +532,16 @@ def is_member_initialized_in_topology(service_name, 
cluster_id, member_id):
         if member is None:
             raise Exception("Member id not found in topology [member] %s" % 
member_id)
 
-        log.info("Found member: " + member.to_json())
+        log.debug("Found member: " + member.to_json())
         if member.status == MemberStatus.Initialized:
             return True
+
+    log.debug("Member doesn't exist in topology")
     return False
 
 
 def member_exists_in_topology(service_name, cluster_id, member_id):
+    log.debug("Checking if member exists in topology : %s, %s, %s, " % 
(service_name, cluster_id, member_id))
     topology = TopologyContext.get_topology()
     service = topology.get_service(service_name)
     if service is None:
@@ -544,12 +551,14 @@ def member_exists_in_topology(service_name, cluster_id, 
member_id):
     if cluster is None:
         raise Exception("Cluster id not found in topology [cluster] %s" % 
cluster_id)
 
-    activated_member = cluster.get_member(member_id)
-    if activated_member is None:
-        log.error("Member id not found in topology [member] %s" % member_id)
-        return False
+    member = cluster.get_member(member_id)
+    if member is None:
+        raise Exception("Member id not found in topology [member] %s" % 
member_id)
 
-    return True
+    log.info("Found member: " + member.to_json())
+    if member.status == MemberStatus.Initialized:
+        return True
+    return False
 
 
 def mark_member_as_initialized(service_name, cluster_id, member_id):
@@ -678,8 +687,10 @@ class PluginExecutor(Thread):
         self.__plugin_info = plugin_info
         self.__values = values
         self.__log = LogFactory().get_log(__name__)
+        self.setDaemon(True)
 
     def run(self):
+        self.__log.debug("Starting the PluginExecutor thread")
         try:
             self.__plugin_info.plugin_object.run_plugin(self.__values)
         except Exception as e:

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py
index f727414..9e3106b 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py
@@ -17,6 +17,10 @@
 
 import time
 from threading import Thread
+from log import LogFactory
+
+
+log = LogFactory().get_log(__name__)
 
 
 class AbstractAsyncScheduledTask:
@@ -53,6 +57,9 @@ class ScheduledExecutor(Thread):
         """ :type : AbstractAsyncScheduledTask  """
         self.terminated = False
         """ :type : bool  """
+        self.setName("ScheduledExecutorForTask%s" % 
self.task.__class__.__name__)
+        self.setDaemon(True)
+        log.debug("Created a ScheduledExecutor thread for task %s" % 
self.task.__class__.__name__)
 
     def run(self):
         """
@@ -62,6 +69,9 @@ class ScheduledExecutor(Thread):
         while not self.terminated:
             time.sleep(self.delay)
             task_thread = Thread(target=self.task.execute_task)
+            task_thread.setName("WorkerThreadForTask%s" % 
self.task.__class__.__name__)
+            task_thread.setDaemon(True)
+            log.debug("Starting a worker thread for the Scheduled Executor for 
task %s" % self.task.__class__.__name__)
             task_thread.start()
 
     def terminate(self):

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
index 229c354..ec49294 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
@@ -94,8 +94,8 @@ def publish_instance_activated_event():
 
             publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + 
constants.INSTANCE_ACTIVATED_EVENT)
             publisher.publish(instance_activated_event)
+            log.info("Instance activated event published")
 
-            log.info("Starting health statistics notifier")
             health_stat_publishing_enabled = 
Config.read_property(constants.CEP_PUBLISHER_ENABLED, True)
 
             if health_stat_publishing_enabled:
@@ -231,6 +231,9 @@ class EventPublisher(object):
 
                 # start a thread to execute publish event
                 publisher_thread = Thread(target=self.__publish_event, 
args=(event, mb_ip, mb_port, auth, payload))
+                publisher_thread.setDaemon(True)
+                publisher_thread.setName("MBEventPublisherThreadForEvent%s" % 
event.__class__.__name__)
+                self.__log.debug("Starting a publisher thread for event %s " % 
event.__class__.__name__)
                 publisher_thread.start()
 
                 # give sometime for the thread to complete

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
index c5a6d2d..bdee412 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
@@ -14,6 +14,7 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 from Queue import Queue
 
 import threading
@@ -34,6 +35,7 @@ class EventSubscriber(threading.Thread):
 
     def __init__(self, topic, urls, username, password):
         threading.Thread.__init__(self)
+        self.setDaemon(True)
 
         self.__event_queue = Queue(maxsize=0)
         self.__event_executor = EventExecutor(self.__event_queue)
@@ -44,19 +46,21 @@ class EventSubscriber(threading.Thread):
         self.__urls = urls
         self.__username = username
         self.__password = password
+        self.setName("MBSubscriberThreadForTopic%s" % topic)
+        EventSubscriber.log.debug("Created a subscriber thread for %s" % topic)
 
     def run(self):
+        EventSubscriber.log.debug("Starting the subscriber thread for %s" % 
self.__topic)
         #  Start the event executor thread
         self.__event_executor.start()
 
         """
         The following loop will iterate forever.
 
-        When a successful connection is made and the failover() method 
returns, a job will start
-        which will periodically check the availability of the connected 
message broker. Then the
+        When a successful connection is made, the failover() method returns. 
Then the
         blocking method loop_forever() will be called on the connected mqtt 
client. This will only
         return if disconnect() is called on the same client. If the connected 
message broker goes
-        down, the periodical job will call disconnect() on the connected 
client and the
+        down, the callback method on_disconnect() will call disconnect() on 
the connected client and the
         loop_forever() method will return. The parent loop will be called 
again and this repeats
         every time the message brokers are disconnected.
 
@@ -68,6 +72,7 @@ class EventSubscriber(threading.Thread):
             self.__mb_client = mqtt.Client()
             self.__mb_client.on_connect = self.on_connect
             self.__mb_client.on_message = self.on_message
+            self.__mb_client.on_disconnect = self.on_disconnect
             if self.__username is not None:
                 EventSubscriber.log.info("Message broker credentials are 
provided.")
                 self.__mb_client.username_pw_set(self.__username, 
self.__password)
@@ -77,28 +82,17 @@ class EventSubscriber(threading.Thread):
                 EventSubscriber.failover(self.__urls, self.__mb_client)
 
             EventSubscriber.log.info(
-                "Connected to the message broker with address %r:%r" % 
(connected_mb_ip, connected_mb_port))
+                "Connected to the message broker with address %s:%s" % 
(connected_mb_ip, connected_mb_port))
 
             self.__subscribed = True
 
-            # Start a job to periodically check the online status of the 
connected message broker
-            heartbeat_task = MessageBrokerHeartBeatChecker(
-                                                            self.__mb_client,
-                                                            connected_mb_ip,
-                                                            connected_mb_port,
-                                                            self.__username,
-                                                            self.__password)
-
-            heartbeat_job = ScheduledExecutor(5, heartbeat_task)
-            heartbeat_job.start()
-
             # Start blocking loop method
             self.__mb_client.loop_forever()
 
-            # Disconnected when the heart beat checker detected an offline 
message broker
+            # Disconnected when the on_disconnect calls disconnect() on the 
client
             self.__subscribed = False
-            heartbeat_job.terminate()
-            EventSubscriber.log.debug("Disconnected from the message broker 
%s:%s. Reconnecting..." % (connected_mb_ip, connected_mb_port))
+            EventSubscriber.log.debug("Disconnected from the message broker 
%s:%s. Reconnecting..."
+                                      % (connected_mb_ip, connected_mb_port))
 
     def register_handler(self, event, handler):
         """
@@ -125,6 +119,11 @@ class EventSubscriber(threading.Thread):
         EventSubscriber.log.debug("Message received: %s:\n%s" % (msg.topic, 
msg.payload))
         self.__event_queue.put(msg)
 
+    def on_disconnect(self, client, userdata, rc):
+        EventSubscriber.log.debug("Message broker client disconnected. %s:%s" 
% (client._host, client._port))
+        if rc != 0:
+            client.disconnect()
+
     def is_subscribed(self):
         """
         Checks if this event subscriber is successfully subscribed to the 
provided topic
@@ -177,11 +176,15 @@ class EventExecutor(threading.Thread):
     """
     def __init__(self, event_queue):
         threading.Thread.__init__(self)
+        self.setDaemon(True)
         self.__event_queue = event_queue
         self.__event_handlers = {}
         EventSubscriber.log = LogFactory().get_log(__name__)
+        self.setName("MBEventExecutorThread")
+        EventSubscriber.log.debug("Created an EventExecutor")
 
     def run(self):
+        EventSubscriber.log.debug("Starting an EventExecutor")
         while True:
             event_msg = self.__event_queue.get()
             event = event_msg.topic.rpartition('/')[2]
@@ -201,30 +204,3 @@ class EventExecutor(threading.Thread):
     def terminate(self):
         self.terminate()
 
-
-class MessageBrokerHeartBeatChecker(AbstractAsyncScheduledTask):
-    """
-    A scheduled task to periodically check if the connected message broker is 
online.
-    If the message broker goes offline, it will disconnect the currently 
connected
-    client object and it will return from the loop_forever() method.
-    """
-
-    def __init__(self, connected_client, mb_ip, mb_port, username=None, 
password=None):
-        self.__mb_client = mqtt.Client()
-
-        if username is not None:
-            self.__mb_client.username_pw_set(username, password)
-
-        self.__mb_ip = mb_ip
-        self.__mb_port = mb_port
-        self.__connected_client = connected_client
-        self.__log = LogFactory().get_log(__name__)
-
-    def execute_task(self):
-        try:
-            self.__mb_client.connect(self.__mb_ip, self.__mb_port, 60)
-            self.__mb_client.disconnect()
-        except Exception:
-            self.__log.info(
-                "Message broker %s:%s cannot be reached. Disconnecting 
client..." % (self.__mb_ip, self.__mb_port))
-            self.__connected_client.disconnect()

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/terminator.txt
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/terminator.txt
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/terminator.txt
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentTerminationTestCase.java
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentTerminationTestCase.java
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentTerminationTestCase.java
new file mode 100644
index 0000000..46684fa
--- /dev/null
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentTerminationTestCase.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.python.cartridge.agent.integration.tests;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.domain.LoadBalancingIPType;
+import org.apache.stratos.messaging.domain.topology.*;
+import 
org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.event.topology.MemberInitializedEvent;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * To test the agent termination flow by terminator.txt file
+ */
+public class AgentTerminationTestCase extends PythonAgentIntegrationTest {
+    public AgentTerminationTestCase() throws IOException {
+    }
+
+    private static final Log log = 
LogFactory.getLog(AgentTerminationTestCase.class);
+    private static final int TIMEOUT = 300000;
+    private static final String CLUSTER_ID = "tomcat.domain";
+    private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-6";
+    private static final String AUTOSCALING_POLICY_NAME = 
"autoscaling-policy-6";
+    private static final String APP_ID = "application-6";
+    private static final String MEMBER_ID = "tomcat.member-1";
+    private static final String INSTANCE_ID = "instance-1";
+    private static final String CLUSTER_INSTANCE_ID = "cluster-1-instance-1";
+    private static final String NETWORK_PARTITION_ID = "network-partition-1";
+    private static final String PARTITION_ID = "partition-1";
+    private static final String TENANT_ID = "6";
+    private static final String SERVICE_NAME = "tomcat";
+
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup() throws Exception {
+        System.setProperty("jndi.properties.dir", getCommonResourcesPath());
+        super.setup(TIMEOUT);
+        startServerSocket(8080);
+    }
+    
+    @AfterMethod(alwaysRun = true)
+    public void tearDownAgentTerminationTest(){
+        tearDown();
+    }
+
+    @Test(groups = {"smoke"})
+    public void testAgentTerminationByFile() throws IOException {
+        startCommunicatorThread();
+        assertAgentActivation();
+        sleep(5000);
+
+        String terminatorFilePath = agentPath + PATH_SEP + "terminator.txt";
+        log.info("Writing termination flag to " + terminatorFilePath);
+        File terminatorFile = new File(terminatorFilePath);
+        String msg = "true";
+        Files.write(Paths.get(terminatorFile.getAbsolutePath()), 
msg.getBytes());
+
+        log.info("Waiting until agent reads termination flag");
+        sleep(50000);
+
+        List<String> outputLines = new ArrayList<>();
+        boolean exit = false;
+        while (!exit) {
+            List<String> newLines = getNewLines(outputLines, 
outputStream.toString());
+            if (newLines.size() > 0) {
+                for (String line : newLines) {
+                    if (line.contains("Shutting down Stratos cartridge 
agent...")) {
+                        log.info("Cartridge agent shutdown successfully");
+                        exit = true;
+                    }
+                }
+            }
+            sleep(1000);
+        }
+    }
+
+    private void assertAgentActivation() {
+        Thread startupTestThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (!eventReceiverInitiated) {
+                    sleep(1000);
+                }
+                List<String> outputLines = new ArrayList<>();
+                boolean completeTopologyPublished = false;
+                boolean memberInitPublished = false;
+                while (!outputStream.isClosed()) {
+                    List<String> newLines = getNewLines(outputLines, 
outputStream.toString());
+                    if (newLines.size() > 0) {
+                        for (String line : newLines) {
+                            if (line.contains("Waiting for complete topology 
event") && !completeTopologyPublished) {
+                                sleep(2000);
+                                // Send complete topology event
+                                log.info("Publishing complete topology 
event...");
+                                Topology topology = createTestTopology();
+                                CompleteTopologyEvent completeTopologyEvent = 
new CompleteTopologyEvent(topology);
+                                publishEvent(completeTopologyEvent);
+                                log.info("Complete topology event published");
+                                completeTopologyPublished = true;
+                            }
+
+                            if (line.contains("Waiting for cartridge agent to 
be initialized") && !memberInitPublished) {
+                                // Publish member initialized event
+                                log.info("Publishing member initialized 
event...");
+                                MemberInitializedEvent memberInitializedEvent 
= new MemberInitializedEvent(
+                                        SERVICE_NAME, CLUSTER_ID, 
CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID,
+                                        PARTITION_ID, INSTANCE_ID
+                                );
+                                publishEvent(memberInitializedEvent);
+                                log.info("Member initialized event published");
+                                memberInitPublished = true;
+                            }
+
+                            // Send artifact updated event to activate the 
instance first
+                            if (line.contains("Artifact repository found")) {
+                                
publishEvent(getArtifactUpdatedEventForPublicRepo());
+                                log.info("Artifact updated event published");
+                            }
+                        }
+                    }
+                    sleep(1000);
+                }
+            }
+        });
+        startupTestThread.start();
+
+        while (!instanceStarted || !instanceActivated) {
+            // wait until the instance activated event is received.
+            // this will assert whether instance got activated within timeout 
period; no need for explicit assertions
+            sleep(2000);
+        }
+    }
+
+    private ArtifactUpdatedEvent getArtifactUpdatedEventForPublicRepo() {
+        ArtifactUpdatedEvent publicRepoEvent = 
createTestArtifactUpdatedEvent();
+        
publicRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/opentestrepo1.git";);
+        return publicRepoEvent;
+    }
+
+    private static ArtifactUpdatedEvent createTestArtifactUpdatedEvent() {
+        ArtifactUpdatedEvent artifactUpdatedEvent = new ArtifactUpdatedEvent();
+        artifactUpdatedEvent.setClusterId(CLUSTER_ID);
+        artifactUpdatedEvent.setTenantId(TENANT_ID);
+        return artifactUpdatedEvent;
+    }
+
+    /**
+     * Create test topology
+     *
+     * @return
+     */
+    private Topology createTestTopology() {
+        Topology topology = new Topology();
+        Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant);
+        topology.addService(service);
+
+        Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, 
DEPLOYMENT_POLICY_NAME,
+                AUTOSCALING_POLICY_NAME, APP_ID);
+        service.addCluster(cluster);
+
+        Member member = new Member(service.getServiceName(), 
cluster.getClusterId(), MEMBER_ID,
+                CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, 
LoadBalancingIPType.Private,
+                System.currentTimeMillis());
+
+        member.setDefaultPrivateIP("10.0.0.1");
+        member.setDefaultPublicIP("20.0.0.1");
+        Properties properties = new Properties();
+        properties.setProperty("prop1", "value1");
+        member.setProperties(properties);
+        member.setStatus(MemberStatus.Created);
+        cluster.addMember(member);
+
+        return topology;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
index 1fdd8cd..07b4cfa 100644
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
@@ -152,6 +152,7 @@ public class MessageBrokerHATestCase extends 
PythonAgentIntegrationTest {
         log.info("MessageBrokerHATestCase subscriber test completed 
successfully.");
     }
 
+
     @Test(timeOut = HA_TEST_TIMEOUT,
           groups = { "ha" },
           priority = 2)

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
index 649430f..d8cbc9f 100644
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
@@ -86,6 +86,7 @@ public abstract class PythonAgentIntegrationTest {
     protected boolean instanceActivated;
     protected ByteArrayOutputStreamLocal outputStream;
     protected ThriftTestServer thriftTestServer;
+    protected String agentPath;
 
     private Map<String, BrokerService> messageBrokers;
 
@@ -169,7 +170,7 @@ public abstract class PythonAgentIntegrationTest {
         thriftTestServer.start(cepPort);
         log.info("Started Thrift server with stream definition: " + str);
 
-        String agentPath = setupPythonAgent();
+        agentPath = setupPythonAgent();
         log.info("Python agent working directory name: " + 
PYTHON_AGENT_DIR_NAME);
         log.info("Starting python cartridge agent...");
         this.outputStream = executeCommand("python " + agentPath + PATH_SEP + 
"agent.py", timeout);

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/agent.conf
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/agent.conf
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/agent.conf
new file mode 100755
index 0000000..8395bfd
--- /dev/null
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/agent.conf
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[agent]
+mb.urls                               
=localhost:1885,localhost:1886,localhost:1887
+mb.username                           =system
+mb.password                           =manager
+mb.publisher.timeout                  =20
+listen.address                        =localhost
+thrift.receiver.urls                  =localhost:7712
+thrift.server.admin.username          =admin
+thrift.server.admin.password          =admin
+cep.stats.publisher.enabled           =true
+lb.private.ip                         =
+lb.public.ip                          =
+enable.artifact.update                =true
+auto.commit                           =true
+auto.checkout                         =true
+artifact.update.interval              =15
+artifact.clone.retries                =5
+artifact.clone.interval               =10
+port.check.timeout                    =600000
+enable.data.publisher                 =false
+monitoring.server.ip                  =localhost
+monitoring.server.port                =7612
+monitoring.server.secure.port         =7712
+monitoring.server.admin.username      =admin
+monitoring.server.admin.password      =admin
+log.file.paths                        =/tmp/agent.screen-adc-test.log
+metadata.service.url                  =https://localhost:9443
+super.tenant.repository.path          =/repository/deployment/server/
+tenant.repository.path                =/repository/tenants/

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/logging.ini
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/logging.ini
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/logging.ini
new file mode 100755
index 0000000..15cad9b
--- /dev/null
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/logging.ini
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+[formatters]
+keys=default
+
+[formatter_default]
+format=[%(asctime)s] %(levelname)s {%(filename)s:%(funcName)s} - %(message)s
+class=logging.Formatter
+
+[handlers]
+keys=console, error_file, log_file
+
+[handler_console]
+class=logging.StreamHandler
+formatter=default
+args=tuple()
+
+[handler_log_file]
+class=logging.FileHandler
+level=DEBUG
+formatter=default
+args=("agent.log", "w")
+
+[handler_error_file]
+class=logging.FileHandler
+level=ERROR
+formatter=default
+args=("error.log", "w")
+
+[loggers]
+keys=root
+
+[logger_root]
+level=DEBUG
+formatter=default
+handlers=console,error_file,log_file
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/payload/launch-params
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/payload/launch-params
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/payload/launch-params
new file mode 100755
index 0000000..3498976
--- /dev/null
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/payload/launch-params
@@ -0,0 +1 @@
+APPLICATION_ID=application1,SERVICE_NAME=tomcat,HOST_NAME=tomcat.stratos.org,MULTITENANT=false,TENANT_ID=-1234,TENANT_RANGE=*,CARTRIDGE_ALIAS=tomcat,CLUSTER_ID=tomcat.domain,CLUSTER_INSTANCE_ID=cluster-1-instance-1,CARTRIDGE_KEY=PUjpXCLujDhYr5A6,DEPLOYMENT=default,PORTS=8080,PUPPET_IP=127.0.0.1,PUPPET_HOSTNAME=puppet.apache.stratos.org,PUPPET_ENV=false,MEMBER_ID=tomcat.member-1,LB_CLUSTER_ID=null,NETWORK_PARTITION_ID=network-p1,PARTITION_ID=p1,APPLICATION_PATH=/tmp/ADCTestCase,MIN_COUNT=1,INTERNAL=false,CLUSTERING_PRIMARY_KEY=A,LOG_FILE_PATHS=/tmp/temp.log,PERSISTENCE_MAPPING=null
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-ha.xml
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-ha.xml
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-ha.xml
new file mode 100755
index 0000000..7a7dabb
--- /dev/null
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-ha.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd"; >
+
+<suite name="PythonCartridgeAgentSmokeTestSuite">
+    <test name="smoke" preserve-order="true" parallel="false">
+        <groups>
+            <run>
+                <include name="ha"/>
+                <exclude name="smoke"/>
+                <exclude name="failed"/>
+                <exclude name="disabled"/>
+            </run>
+        </groups>
+
+        <packages>
+            <package 
name="org.apache.stratos.python.cartridge.agent.integration.tests.*"/>
+        </packages>
+    </test>
+</suite>
\ No newline at end of file

Reply via email to