PCA - Fixing message broker ha callback, agent graceful termination and extra debug logs
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/f6336002 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/f6336002 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/f6336002 Branch: refs/heads/master Commit: f6336002f17039a00037bfcc28c47ccbd7327878 Parents: 2c82881 Author: Chamila de Alwis <[email protected]> Authored: Mon Dec 14 15:25:31 2015 +0530 Committer: Chamila de Alwis <[email protected]> Committed: Fri Dec 18 11:54:50 2015 +0530 ---------------------------------------------------------------------- .../cartridge.agent/cartridge.agent/agent.py | 23 ++- .../cartridge.agent/cartridge.agent/entity.py | 1 - .../cartridge.agent/healthstats.py | 4 + .../cartridge.agent/logpublisher.py | 14 +- .../modules/artifactmgt/git/agentgithandler.py | 1 + .../modules/event/eventhandler.py | 23 ++- .../modules/util/asyncscheduledtask.py | 10 + .../cartridge.agent/publisher.py | 5 +- .../cartridge.agent/subscriber.py | 66 ++---- .../cartridge.agent/terminator.txt | 0 .../tests/AgentTerminationTestCase.java | 204 +++++++++++++++++++ .../tests/MessageBrokerHATestCase.java | 1 + .../tests/PythonAgentIntegrationTest.java | 3 +- .../AgentTerminationTestCase/agent.conf | 46 +++++ .../AgentTerminationTestCase/logging.ini | 52 +++++ .../payload/launch-params | 1 + .../src/test/resources/test-suite-ha.xml | 38 ++++ 17 files changed, 434 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py index cec80fc..aa9c035 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py @@ -44,7 +44,6 @@ class CartridgeAgent(object): def run_agent(self): self.__log.info("Starting Cartridge Agent...") - # Start topology event receiver thread self.register_topology_event_listeners() @@ -327,13 +326,31 @@ class Handlers(object): event_handler.on_application_signup_removed_event(event_obj) +def check_termination(agent_obj): + terminate = False + terminator_file_path = os.path.abspath(os.path.dirname(__file__)) + "/terminator.txt" + while not terminate: + time.sleep(60) + try: + with open(terminator_file_path, 'r') as f: + file_output = f.read() + terminate = True if "true" in file_output else False + except IOError: + pass + + log.info("Shutting down Stratos cartridge agent...") + agent_obj.terminate() + if __name__ == "__main__": log = LogFactory().get_log(__name__) + cartridge_agent = CartridgeAgent() try: log.info("Starting Stratos cartridge agent...") - cartridge_agent = CartridgeAgent() + task_thread = Thread(target=check_termination, args=(cartridge_agent,)) + task_thread.start() cartridge_agent.run_agent() except Exception as e: log.exception("Cartridge Agent Exception: %r" % e) - # cartridge_agent.terminate() + log.info("Terminating Stratos cartridge agent...") + cartridge_agent.terminate() http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py index d00afd0..5dde2ee 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/entity.py @@ -509,7 +509,6 @@ class TopologyContext: @staticmethod def update(topology): TopologyContext.topology = topology - TopologyContext.initialized = True class Tenant: http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py index 2005537..48d07c2 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py @@ -43,14 +43,18 @@ class HealthStatisticsPublisherManager(Thread): self.log = LogFactory().get_log(__name__) self.publish_interval = publish_interval """:type : int""" + self.setDaemon(True) self.terminated = False self.publisher = HealthStatisticsPublisher() """:type : HealthStatisticsPublisher""" """:type : IHealthStatReaderPlugin""" self.stats_reader = Config.health_stat_plugin + self.setName("HealthStatPublisherManagerThread") + self.log.debug("Created a HealthStatisticsPublisherManager thread") def run(self): + self.log.debug("Starting the HealthStatisticsPublisherManager thread") while not self.terminated: time.sleep(self.publish_interval) http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py index b9f5d63..dbb5046 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/logpublisher.py @@ -43,8 +43,12 @@ class LogPublisher(Thread): self.member_id = member_id self.terminated = False + self.setName("LogPublisherThread") + self.setDaemon(True) + self.log.debug("Created a LogPublisher thread") def run(self): + self.log.debug("Starting the LogPublisher threads") if os.path.isfile(self.file_path) and os.access(self.file_path, os.R_OK): self.log.info("Starting log publisher for file: " + self.file_path + ", thread: " + str(current_thread())) # open file and keep reading for new entries @@ -102,6 +106,9 @@ class LogPublisherManager(Thread): def define_stream(tenant_id, alias, date_time): """ Creates a stream definition for Log Publishing + :param date_time: + :param alias: + :param tenant_id: :return: A StreamDefinition object with the required attributes added :rtype : StreamDefinition """ @@ -134,7 +141,7 @@ class LogPublisherManager(Thread): Thread.__init__(self) self.log = LogFactory().get_log(__name__) - + self.setDaemon(True) self.logfile_paths = logfile_paths self.publishers = {} self.ports = [] @@ -158,8 +165,11 @@ class LogPublisherManager(Thread): self.date_time = LogPublisherManager.get_current_date() self.stream_definition = self.define_stream(self.tenant_id, self.alias, self.date_time) + self.setName("LogPublisherManagerThread") + self.log.debug("Created a LogPublisherManager thread") def run(self): + self.log.debug("Starting the LogPublisherManager thread") if self.logfile_paths is not None and len(self.logfile_paths): for log_path in self.logfile_paths: # thread for each log file @@ -170,6 +180,7 @@ class LogPublisherManager(Thread): def get_publisher(self, log_path): """ Retrieve the publisher for the specified log file path. Creates a new LogPublisher if one is not available + :param log_path: :return: The LogPublisher object :rtype : LogPublisher """ @@ -188,6 +199,7 @@ class LogPublisherManager(Thread): def terminate_publisher(self, log_path): """ Terminates the LogPublisher thread associated with the specified log file + :param log_path: """ if log_path in self.publishers: self.publishers[log_path].terminate() http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py index fb2c534..309814d 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/artifactmgt/git/agentgithandler.py @@ -296,6 +296,7 @@ class AgentGitHandler: artifact_update_task = ArtifactUpdateTask(repo_info, auto_checkout, auto_commit) async_task = ScheduledExecutor(update_interval, artifact_update_task) + AgentGitHandler.log.info("Starting a Scheduled Executor thread for Git polling task") git_repo.scheduled_update_task = async_task async_task.start() http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py index a4ff074..c8ccf68 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/event/eventhandler.py @@ -429,6 +429,8 @@ def execute_plugins_for_event(event, input_values): for plugin_info in plugins_for_event: log.debug("Executing plugin %s for event %s" % (plugin_info.name, event)) plugin_thread = PluginExecutor(plugin_info, input_values) + plugin_thread.setName("PluginExecutorThreadForPlugin%s" % plugin_info.name) + log.debug("Starting a PluginExecutor Thread for event %s" % event.__class__.__name__) plugin_thread.start() # block till plugin run completes. @@ -449,6 +451,8 @@ def execute_extension_for_event(event, extension_values): if Config.extension_executor is not None: log.debug("Executing extension for event [%s]" % event) extension_thread = PluginExecutor(Config.extension_executor, extension_values) + extension_thread.setName("ExtensionExecutorThreadForExtension%s" % event.__class__.__name__) + log.debug("Starting a PluginExecutor Thread for event extension %s" % event.__class__.__name__) extension_thread.start() # block till plugin run completes. @@ -528,13 +532,16 @@ def is_member_initialized_in_topology(service_name, cluster_id, member_id): if member is None: raise Exception("Member id not found in topology [member] %s" % member_id) - log.info("Found member: " + member.to_json()) + log.debug("Found member: " + member.to_json()) if member.status == MemberStatus.Initialized: return True + + log.debug("Member doesn't exist in topology") return False def member_exists_in_topology(service_name, cluster_id, member_id): + log.debug("Checking if member exists in topology : %s, %s, %s, " % (service_name, cluster_id, member_id)) topology = TopologyContext.get_topology() service = topology.get_service(service_name) if service is None: @@ -544,12 +551,14 @@ def member_exists_in_topology(service_name, cluster_id, member_id): if cluster is None: raise Exception("Cluster id not found in topology [cluster] %s" % cluster_id) - activated_member = cluster.get_member(member_id) - if activated_member is None: - log.error("Member id not found in topology [member] %s" % member_id) - return False + member = cluster.get_member(member_id) + if member is None: + raise Exception("Member id not found in topology [member] %s" % member_id) - return True + log.info("Found member: " + member.to_json()) + if member.status == MemberStatus.Initialized: + return True + return False def mark_member_as_initialized(service_name, cluster_id, member_id): @@ -678,8 +687,10 @@ class PluginExecutor(Thread): self.__plugin_info = plugin_info self.__values = values self.__log = LogFactory().get_log(__name__) + self.setDaemon(True) def run(self): + self.__log.debug("Starting the PluginExecutor thread") try: self.__plugin_info.plugin_object.run_plugin(self.__values) except Exception as e: http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py index f727414..9e3106b 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py @@ -17,6 +17,10 @@ import time from threading import Thread +from log import LogFactory + + +log = LogFactory().get_log(__name__) class AbstractAsyncScheduledTask: @@ -53,6 +57,9 @@ class ScheduledExecutor(Thread): """ :type : AbstractAsyncScheduledTask """ self.terminated = False """ :type : bool """ + self.setName("ScheduledExecutorForTask%s" % self.task.__class__.__name__) + self.setDaemon(True) + log.debug("Created a ScheduledExecutor thread for task %s" % self.task.__class__.__name__) def run(self): """ @@ -62,6 +69,9 @@ class ScheduledExecutor(Thread): while not self.terminated: time.sleep(self.delay) task_thread = Thread(target=self.task.execute_task) + task_thread.setName("WorkerThreadForTask%s" % self.task.__class__.__name__) + task_thread.setDaemon(True) + log.debug("Starting a worker thread for the Scheduled Executor for task %s" % self.task.__class__.__name__) task_thread.start() def terminate(self): http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py index 229c354..ec49294 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py @@ -94,8 +94,8 @@ def publish_instance_activated_event(): publisher = get_publisher(constants.INSTANCE_STATUS_TOPIC + constants.INSTANCE_ACTIVATED_EVENT) publisher.publish(instance_activated_event) + log.info("Instance activated event published") - log.info("Starting health statistics notifier") health_stat_publishing_enabled = Config.read_property(constants.CEP_PUBLISHER_ENABLED, True) if health_stat_publishing_enabled: @@ -231,6 +231,9 @@ class EventPublisher(object): # start a thread to execute publish event publisher_thread = Thread(target=self.__publish_event, args=(event, mb_ip, mb_port, auth, payload)) + publisher_thread.setDaemon(True) + publisher_thread.setName("MBEventPublisherThreadForEvent%s" % event.__class__.__name__) + self.__log.debug("Starting a publisher thread for event %s " % event.__class__.__name__) publisher_thread.start() # give sometime for the thread to complete http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py index c5a6d2d..bdee412 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py @@ -14,6 +14,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + from Queue import Queue import threading @@ -34,6 +35,7 @@ class EventSubscriber(threading.Thread): def __init__(self, topic, urls, username, password): threading.Thread.__init__(self) + self.setDaemon(True) self.__event_queue = Queue(maxsize=0) self.__event_executor = EventExecutor(self.__event_queue) @@ -44,19 +46,21 @@ class EventSubscriber(threading.Thread): self.__urls = urls self.__username = username self.__password = password + self.setName("MBSubscriberThreadForTopic%s" % topic) + EventSubscriber.log.debug("Created a subscriber thread for %s" % topic) def run(self): + EventSubscriber.log.debug("Starting the subscriber thread for %s" % self.__topic) # Start the event executor thread self.__event_executor.start() """ The following loop will iterate forever. - When a successful connection is made and the failover() method returns, a job will start - which will periodically check the availability of the connected message broker. Then the + When a successful connection is made, the failover() method returns. Then the blocking method loop_forever() will be called on the connected mqtt client. This will only return if disconnect() is called on the same client. If the connected message broker goes - down, the periodical job will call disconnect() on the connected client and the + down, the callback method on_disconnect() will call disconnect() on the connected client and the loop_forever() method will return. The parent loop will be called again and this repeats every time the message brokers are disconnected. @@ -68,6 +72,7 @@ class EventSubscriber(threading.Thread): self.__mb_client = mqtt.Client() self.__mb_client.on_connect = self.on_connect self.__mb_client.on_message = self.on_message + self.__mb_client.on_disconnect = self.on_disconnect if self.__username is not None: EventSubscriber.log.info("Message broker credentials are provided.") self.__mb_client.username_pw_set(self.__username, self.__password) @@ -77,28 +82,17 @@ class EventSubscriber(threading.Thread): EventSubscriber.failover(self.__urls, self.__mb_client) EventSubscriber.log.info( - "Connected to the message broker with address %r:%r" % (connected_mb_ip, connected_mb_port)) + "Connected to the message broker with address %s:%s" % (connected_mb_ip, connected_mb_port)) self.__subscribed = True - # Start a job to periodically check the online status of the connected message broker - heartbeat_task = MessageBrokerHeartBeatChecker( - self.__mb_client, - connected_mb_ip, - connected_mb_port, - self.__username, - self.__password) - - heartbeat_job = ScheduledExecutor(5, heartbeat_task) - heartbeat_job.start() - # Start blocking loop method self.__mb_client.loop_forever() - # Disconnected when the heart beat checker detected an offline message broker + # Disconnected when the on_disconnect calls disconnect() on the client self.__subscribed = False - heartbeat_job.terminate() - EventSubscriber.log.debug("Disconnected from the message broker %s:%s. Reconnecting..." % (connected_mb_ip, connected_mb_port)) + EventSubscriber.log.debug("Disconnected from the message broker %s:%s. Reconnecting..." + % (connected_mb_ip, connected_mb_port)) def register_handler(self, event, handler): """ @@ -125,6 +119,11 @@ class EventSubscriber(threading.Thread): EventSubscriber.log.debug("Message received: %s:\n%s" % (msg.topic, msg.payload)) self.__event_queue.put(msg) + def on_disconnect(self, client, userdata, rc): + EventSubscriber.log.debug("Message broker client disconnected. %s:%s" % (client._host, client._port)) + if rc != 0: + client.disconnect() + def is_subscribed(self): """ Checks if this event subscriber is successfully subscribed to the provided topic @@ -177,11 +176,15 @@ class EventExecutor(threading.Thread): """ def __init__(self, event_queue): threading.Thread.__init__(self) + self.setDaemon(True) self.__event_queue = event_queue self.__event_handlers = {} EventSubscriber.log = LogFactory().get_log(__name__) + self.setName("MBEventExecutorThread") + EventSubscriber.log.debug("Created an EventExecutor") def run(self): + EventSubscriber.log.debug("Starting an EventExecutor") while True: event_msg = self.__event_queue.get() event = event_msg.topic.rpartition('/')[2] @@ -201,30 +204,3 @@ class EventExecutor(threading.Thread): def terminate(self): self.terminate() - -class MessageBrokerHeartBeatChecker(AbstractAsyncScheduledTask): - """ - A scheduled task to periodically check if the connected message broker is online. - If the message broker goes offline, it will disconnect the currently connected - client object and it will return from the loop_forever() method. - """ - - def __init__(self, connected_client, mb_ip, mb_port, username=None, password=None): - self.__mb_client = mqtt.Client() - - if username is not None: - self.__mb_client.username_pw_set(username, password) - - self.__mb_ip = mb_ip - self.__mb_port = mb_port - self.__connected_client = connected_client - self.__log = LogFactory().get_log(__name__) - - def execute_task(self): - try: - self.__mb_client.connect(self.__mb_ip, self.__mb_port, 60) - self.__mb_client.disconnect() - except Exception: - self.__log.info( - "Message broker %s:%s cannot be reached. Disconnecting client..." % (self.__mb_ip, self.__mb_port)) - self.__connected_client.disconnect() http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/terminator.txt ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/terminator.txt b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/terminator.txt new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentTerminationTestCase.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentTerminationTestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentTerminationTestCase.java new file mode 100644 index 0000000..46684fa --- /dev/null +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/AgentTerminationTestCase.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.python.cartridge.agent.integration.tests; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.domain.LoadBalancingIPType; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent; +import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; +import org.apache.stratos.messaging.event.topology.MemberInitializedEvent; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * To test the agent termination flow by terminator.txt file + */ +public class AgentTerminationTestCase extends PythonAgentIntegrationTest { + public AgentTerminationTestCase() throws IOException { + } + + private static final Log log = LogFactory.getLog(AgentTerminationTestCase.class); + private static final int TIMEOUT = 300000; + private static final String CLUSTER_ID = "tomcat.domain"; + private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-6"; + private static final String AUTOSCALING_POLICY_NAME = "autoscaling-policy-6"; + private static final String APP_ID = "application-6"; + private static final String MEMBER_ID = "tomcat.member-1"; + private static final String INSTANCE_ID = "instance-1"; + private static final String CLUSTER_INSTANCE_ID = "cluster-1-instance-1"; + private static final String NETWORK_PARTITION_ID = "network-partition-1"; + private static final String PARTITION_ID = "partition-1"; + private static final String TENANT_ID = "6"; + private static final String SERVICE_NAME = "tomcat"; + + + @BeforeMethod(alwaysRun = true) + public void setup() throws Exception { + System.setProperty("jndi.properties.dir", getCommonResourcesPath()); + super.setup(TIMEOUT); + startServerSocket(8080); + } + + @AfterMethod(alwaysRun = true) + public void tearDownAgentTerminationTest(){ + tearDown(); + } + + @Test(groups = {"smoke"}) + public void testAgentTerminationByFile() throws IOException { + startCommunicatorThread(); + assertAgentActivation(); + sleep(5000); + + String terminatorFilePath = agentPath + PATH_SEP + "terminator.txt"; + log.info("Writing termination flag to " + terminatorFilePath); + File terminatorFile = new File(terminatorFilePath); + String msg = "true"; + Files.write(Paths.get(terminatorFile.getAbsolutePath()), msg.getBytes()); + + log.info("Waiting until agent reads termination flag"); + sleep(50000); + + List<String> outputLines = new ArrayList<>(); + boolean exit = false; + while (!exit) { + List<String> newLines = getNewLines(outputLines, outputStream.toString()); + if (newLines.size() > 0) { + for (String line : newLines) { + if (line.contains("Shutting down Stratos cartridge agent...")) { + log.info("Cartridge agent shutdown successfully"); + exit = true; + } + } + } + sleep(1000); + } + } + + private void assertAgentActivation() { + Thread startupTestThread = new Thread(new Runnable() { + @Override + public void run() { + while (!eventReceiverInitiated) { + sleep(1000); + } + List<String> outputLines = new ArrayList<>(); + boolean completeTopologyPublished = false; + boolean memberInitPublished = false; + while (!outputStream.isClosed()) { + List<String> newLines = getNewLines(outputLines, outputStream.toString()); + if (newLines.size() > 0) { + for (String line : newLines) { + if (line.contains("Waiting for complete topology event") && !completeTopologyPublished) { + sleep(2000); + // Send complete topology event + log.info("Publishing complete topology event..."); + Topology topology = createTestTopology(); + CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology); + publishEvent(completeTopologyEvent); + log.info("Complete topology event published"); + completeTopologyPublished = true; + } + + if (line.contains("Waiting for cartridge agent to be initialized") && !memberInitPublished) { + // Publish member initialized event + log.info("Publishing member initialized event..."); + MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent( + SERVICE_NAME, CLUSTER_ID, CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID, + PARTITION_ID, INSTANCE_ID + ); + publishEvent(memberInitializedEvent); + log.info("Member initialized event published"); + memberInitPublished = true; + } + + // Send artifact updated event to activate the instance first + if (line.contains("Artifact repository found")) { + publishEvent(getArtifactUpdatedEventForPublicRepo()); + log.info("Artifact updated event published"); + } + } + } + sleep(1000); + } + } + }); + startupTestThread.start(); + + while (!instanceStarted || !instanceActivated) { + // wait until the instance activated event is received. + // this will assert whether instance got activated within timeout period; no need for explicit assertions + sleep(2000); + } + } + + private ArtifactUpdatedEvent getArtifactUpdatedEventForPublicRepo() { + ArtifactUpdatedEvent publicRepoEvent = createTestArtifactUpdatedEvent(); + publicRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/opentestrepo1.git"); + return publicRepoEvent; + } + + private static ArtifactUpdatedEvent createTestArtifactUpdatedEvent() { + ArtifactUpdatedEvent artifactUpdatedEvent = new ArtifactUpdatedEvent(); + artifactUpdatedEvent.setClusterId(CLUSTER_ID); + artifactUpdatedEvent.setTenantId(TENANT_ID); + return artifactUpdatedEvent; + } + + /** + * Create test topology + * + * @return + */ + private Topology createTestTopology() { + Topology topology = new Topology(); + Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant); + topology.addService(service); + + Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME, + AUTOSCALING_POLICY_NAME, APP_ID); + service.addCluster(cluster); + + Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, + CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, + System.currentTimeMillis()); + + member.setDefaultPrivateIP("10.0.0.1"); + member.setDefaultPublicIP("20.0.0.1"); + Properties properties = new Properties(); + properties.setProperty("prop1", "value1"); + member.setProperties(properties); + member.setStatus(MemberStatus.Created); + cluster.addMember(member); + + return topology; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java index 1fdd8cd..07b4cfa 100644 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java @@ -152,6 +152,7 @@ public class MessageBrokerHATestCase extends PythonAgentIntegrationTest { log.info("MessageBrokerHATestCase subscriber test completed successfully."); } + @Test(timeOut = HA_TEST_TIMEOUT, groups = { "ha" }, priority = 2) http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java index 649430f..d8cbc9f 100644 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java @@ -86,6 +86,7 @@ public abstract class PythonAgentIntegrationTest { protected boolean instanceActivated; protected ByteArrayOutputStreamLocal outputStream; protected ThriftTestServer thriftTestServer; + protected String agentPath; private Map<String, BrokerService> messageBrokers; @@ -169,7 +170,7 @@ public abstract class PythonAgentIntegrationTest { thriftTestServer.start(cepPort); log.info("Started Thrift server with stream definition: " + str); - String agentPath = setupPythonAgent(); + agentPath = setupPythonAgent(); log.info("Python agent working directory name: " + PYTHON_AGENT_DIR_NAME); log.info("Starting python cartridge agent..."); this.outputStream = executeCommand("python " + agentPath + PATH_SEP + "agent.py", timeout); http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/agent.conf ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/agent.conf b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/agent.conf new file mode 100755 index 0000000..8395bfd --- /dev/null +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/agent.conf @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[agent] +mb.urls =localhost:1885,localhost:1886,localhost:1887 +mb.username =system +mb.password =manager +mb.publisher.timeout =20 +listen.address =localhost +thrift.receiver.urls =localhost:7712 +thrift.server.admin.username =admin +thrift.server.admin.password =admin +cep.stats.publisher.enabled =true +lb.private.ip = +lb.public.ip = +enable.artifact.update =true +auto.commit =true +auto.checkout =true +artifact.update.interval =15 +artifact.clone.retries =5 +artifact.clone.interval =10 +port.check.timeout =600000 +enable.data.publisher =false +monitoring.server.ip =localhost +monitoring.server.port =7612 +monitoring.server.secure.port =7712 +monitoring.server.admin.username =admin +monitoring.server.admin.password =admin +log.file.paths =/tmp/agent.screen-adc-test.log +metadata.service.url =https://localhost:9443 +super.tenant.repository.path =/repository/deployment/server/ +tenant.repository.path =/repository/tenants/ http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/logging.ini ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/logging.ini b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/logging.ini new file mode 100755 index 0000000..15cad9b --- /dev/null +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/logging.ini @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +[formatters] +keys=default + +[formatter_default] +format=[%(asctime)s] %(levelname)s {%(filename)s:%(funcName)s} - %(message)s +class=logging.Formatter + +[handlers] +keys=console, error_file, log_file + +[handler_console] +class=logging.StreamHandler +formatter=default +args=tuple() + +[handler_log_file] +class=logging.FileHandler +level=DEBUG +formatter=default +args=("agent.log", "w") + +[handler_error_file] +class=logging.FileHandler +level=ERROR +formatter=default +args=("error.log", "w") + +[loggers] +keys=root + +[logger_root] +level=DEBUG +formatter=default +handlers=console,error_file,log_file \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/payload/launch-params ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/payload/launch-params b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/payload/launch-params new file mode 100755 index 0000000..3498976 --- /dev/null +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentTerminationTestCase/payload/launch-params @@ -0,0 +1 @@ +APPLICATION_ID=application1,SERVICE_NAME=tomcat,HOST_NAME=tomcat.stratos.org,MULTITENANT=false,TENANT_ID=-1234,TENANT_RANGE=*,CARTRIDGE_ALIAS=tomcat,CLUSTER_ID=tomcat.domain,CLUSTER_INSTANCE_ID=cluster-1-instance-1,CARTRIDGE_KEY=PUjpXCLujDhYr5A6,DEPLOYMENT=default,PORTS=8080,PUPPET_IP=127.0.0.1,PUPPET_HOSTNAME=puppet.apache.stratos.org,PUPPET_ENV=false,MEMBER_ID=tomcat.member-1,LB_CLUSTER_ID=null,NETWORK_PARTITION_ID=network-p1,PARTITION_ID=p1,APPLICATION_PATH=/tmp/ADCTestCase,MIN_COUNT=1,INTERNAL=false,CLUSTERING_PRIMARY_KEY=A,LOG_FILE_PATHS=/tmp/temp.log,PERSISTENCE_MAPPING=null \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/f6336002/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-ha.xml ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-ha.xml b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-ha.xml new file mode 100755 index 0000000..7a7dabb --- /dev/null +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-suite-ha.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +<!DOCTYPE suite SYSTEM "http://testng.org/testng-1.0.dtd" > + +<suite name="PythonCartridgeAgentSmokeTestSuite"> + <test name="smoke" preserve-order="true" parallel="false"> + <groups> + <run> + <include name="ha"/> + <exclude name="smoke"/> + <exclude name="failed"/> + <exclude name="disabled"/> + </run> + </groups> + + <packages> + <package name="org.apache.stratos.python.cartridge.agent.integration.tests.*"/> + </packages> + </test> +</suite> \ No newline at end of file
