Repository: stratos Updated Branches: refs/heads/stratos-4.1.x 44f527018 -> 9c57c7d80
PCA - Message broker HA support. The list of message brokers can be provided via the agent.conf file Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/9c57c7d8 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/9c57c7d8 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/9c57c7d8 Branch: refs/heads/stratos-4.1.x Commit: 9c57c7d803f790078963861e553e5b79290d8f2d Parents: 44f5270 Author: Chamila de Alwis <[email protected]> Authored: Mon Nov 16 20:40:23 2015 +0530 Committer: Chamila de Alwis <[email protected]> Committed: Mon Nov 16 20:41:01 2015 +0530 ---------------------------------------------------------------------- .../cartridge.agent/cartridge.agent/agent.conf | 4 +- .../cartridge.agent/cartridge.agent/agent.py | 24 +- .../cartridge.agent/cartridge.agent/config.py | 14 + .../cartridge.agent/constants.py | 4 +- .../modules/util/asyncscheduledtask.py | 2 +- .../modules/util/cartridgeagentutils.py | 43 ++- .../cartridge.agent/modules/util/log.py | 2 +- .../cartridge.agent/publisher.py | 60 +++- .../cartridge.agent/subscriber.py | 152 ++++++++-- .../tests/MessageBrokerHATestCase.java | 304 +++++++++++++++++++ .../tests/PythonAgentIntegrationTest.java | 126 ++++++-- .../resources/ADCExtensionTestCase/agent.conf | 3 +- .../ADCMTAppTenantUserTestCase/agent.conf | 3 +- .../test/resources/ADCMTAppTestCase/agent.conf | 3 +- .../src/test/resources/ADCTestCase/agent.conf | 3 +- .../resources/AgentStartupTestCase/agent.conf | 4 +- .../test/resources/CEPHAModeTestCase/agent.conf | 4 +- .../MessageBrokerHATestCase/agent.conf | 46 +++ .../MessageBrokerHATestCase/jndi.properties | 26 ++ .../MessageBrokerHATestCase/logging.ini | 52 ++++ .../payload/launch-params | 1 + .../test-conf/integration-test.properties | 4 +- 22 files changed, 780 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf index f16aa43..a8c8a19 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf @@ -16,10 +16,10 @@ # under the License. [agent] -mb.ip =MB-IP -mb.port =MB-PORT +mb.urls =MB-URLS mb.username =MB-USERNAME mb.password =MB-PASSWORD +mb.publisher.timeout =MB-PUBLISHER-TIMEOUT listen.address =LISTEN_ADDR thrift.receiver.urls =CEP-URLS thrift.server.admin.username =CEP-ADMIN-USERNAME http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py index 9c1159c..959568b 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py @@ -38,22 +38,14 @@ class CartridgeAgent(threading.Thread): self.__terminated = False self.__log = LogFactory().get_log(__name__) - mb_ip = Config.read_property(constants.MB_IP) - mb_port = Config.read_property(constants.MB_PORT) - mb_username = Config.read_property(constants.MB_USERNAME, False) - mb_password = Config.read_property(constants.MB_PASSWORD, False) - - self.__inst_topic_subscriber = \ - EventSubscriber(constants.INSTANCE_NOTIFIER_TOPIC, mb_ip, mb_port, mb_username, mb_password) - - self.__tenant_topic_subscriber = \ - EventSubscriber(constants.TENANT_TOPIC, mb_ip, mb_port, mb_username, mb_password) - - self.__app_topic_subscriber = \ - EventSubscriber(constants.APPLICATION_SIGNUP, mb_ip, mb_port, mb_username, mb_password) - - self.__topology_event_subscriber = \ - EventSubscriber(constants.TOPOLOGY_TOPIC, mb_ip, mb_port, mb_username, mb_password) + mb_urls = Config.mb_urls + mb_uname = Config.mb_username + mb_pwd = Config.mb_password + + self.__inst_topic_subscriber = EventSubscriber(constants.INSTANCE_NOTIFIER_TOPIC, mb_urls, mb_uname, mb_pwd) + self.__tenant_topic_subscriber = EventSubscriber(constants.TENANT_TOPIC, mb_urls, mb_uname, mb_pwd) + self.__app_topic_subscriber = EventSubscriber(constants.APPLICATION_SIGNUP, mb_urls, mb_uname, mb_pwd) + self.__topology_event_subscriber = EventSubscriber(constants.TOPOLOGY_TOPIC, mb_urls, mb_uname, mb_pwd) self.__event_handler = EventHandler() http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py index e50c47d..f1a70ec 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py @@ -140,6 +140,14 @@ class Config: """ :type : bool """ maintenance = False """ :type : bool """ + mb_urls = [] + """ :type : list """ + mb_username = None + """ :type : str """ + mb_password = None + """ :type : str """ + mb_publisher_timeout = None + """ :type : int """ @staticmethod def read_conf_file(): @@ -171,6 +179,7 @@ class Config: def read_payload_file(param_file_path): """ Reads the payload file of the cartridge and stores the values in a dictionary + :param param_file_path: payload parameter file path :return: Payload parameter dictionary of values :rtype: dict """ @@ -235,6 +244,7 @@ class Config: def read_property(property_key, critical=True): """ Returns the value of the provided property + :param critical: If absence of this value should throw an error :param str property_key: the name of the property to be read :return: Value of the property :exception: ParameterNotFoundException if the provided property cannot be found @@ -339,6 +349,10 @@ class Config: Config.is_primary = Config.read_property(constants.CLUSTERING_PRIMARY_KEY, False) + Config.mb_username = Config.read_property(constants.MB_USERNAME, False) + Config.mb_password = Config.read_property(constants.MB_PASSWORD, False) + Config.mb_urls = Config.read_property(constants.MB_URLS).split(",") + Config.mb_publisher_timeout = int(Config.read_property(constants.MB_PUBLISHER_TIMEOUT)) except ParameterNotFoundException as ex: raise RuntimeError(ex) http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py index 8c7a7b0..cd2ce36 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py @@ -19,10 +19,10 @@ PARAM_FILE_PATH = "param.file.path" PLUGINS_DIR = "plugins.dir" EXTENSIONS_DIR = "extensions.dir" -MB_IP = "mb.ip" -MB_PORT = "mb.port" +MB_URLS = "mb.urls" MB_USERNAME = "mb.username" MB_PASSWORD = "mb.password" +MB_PUBLISHER_TIMEOUT = "mb.publisher.timeout" CARTRIDGE_KEY = "CARTRIDGE_KEY" APPLICATION_ID = "APPLICATION_ID" http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py index d470d6e..f727414 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py @@ -69,4 +69,4 @@ class ScheduledExecutor(Thread): Terminate the scheduled task. Allow a maximum of 'delay' seconds to be terminated. :return: void """ - self.terminated = True \ No newline at end of file + self.terminated = True http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py index f9a9bbb..ebd6889 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py @@ -31,6 +31,7 @@ current_milli_time = lambda: int(round(time.time() * 1000)) BS = 16 pad = lambda s: s + (BS - len(s) % BS) * chr(BS - len(s) % BS) + def decrypt_password(pass_str, secret): """ Decrypts the given password using the given secret. The encryption is assumed to be done @@ -74,18 +75,19 @@ def wait_until_ports_active(ip_address, ports, ports_check_timeout=600000): if ports_check_timeout is None: ports_check_timeout = 1000 * 60 * 10 - log.debug("Port check timeout: %r" % ports_check_timeout) + log.debug("Port check timeout: %s" % ports_check_timeout) active = False start_time = current_milli_time() while not active: - log.info("Waiting for ports to be active: [ip] %r [ports] %r" % (ip_address, ports)) + log.info("Waiting for ports to be active: [ip] %s [ports] %s" % (ip_address, ports)) active = check_ports_active(ip_address, ports) end_time = current_milli_time() duration = end_time - start_time if duration > ports_check_timeout: - log.info("Port check timeout reached: [ip] %r [ports] %r [timeout] %r" % (ip_address, ports, ports_check_timeout)) + log.info("Port check timeout reached: [ip] %s [ports] %s [timeout] %s" + % (ip_address, ports, ports_check_timeout)) return False time.sleep(5) @@ -110,10 +112,39 @@ def check_ports_active(ip_address, ports): s.settimeout(5) try: s.connect((ip_address, int(port))) - log.debug("Port %r is active" % port) + log.debug("Port %s is active" % port) s.close() except socket.error: - log.debug("Port %r is not active" % port) + log.debug("Port %s is not active" % port) return False - return True \ No newline at end of file + return True + + +class IncrementalCeilingListIterator(object): + """ + Iterates through a given list and returns elements. At the end of the list if terminate_at_end is set to false, + the last element will be returned repeatedly. If terminate_at_end is set to true, an IndexError will be thrown. + """ + + def __init__(self, intervals, terminate_at_end): + self.__intervals = intervals + self.__index = 0 + self.__terminate_at_end = terminate_at_end + + def get_next_retry_interval(self): + """ + Retrieves the next element in the list. + :return: + :rtype: int + """ + if self.__index < len(self.__intervals): + next_interval = self.__intervals[self.__index] + self.__index += 1 + else: + if self.__terminate_at_end: + raise IndexError("Reached the end of the list") + else: + next_interval = self.__intervals[-1] + + return next_interval http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/log.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/log.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/log.py index 6a0804e..44c8655 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/log.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/log.py @@ -52,4 +52,4 @@ class LogFactory(object): :return: The logger class :rtype: RootLogger """ - return self.instance.get_log(name) \ No newline at end of file + return self.instance.get_log(name) http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py index 0994048..dcafe47 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py @@ -15,13 +15,16 @@ # specific language governing permissions and limitations # under the License. +import threading import paho.mqtt.publish as publish +import time import constants import healthstats from config import Config from modules.event.instance.status.events import * from modules.util import cartridgeagentutils +from modules.util.cartridgeagentutils import IncrementalCeilingListIterator from modules.util.log import * log = LogFactory().get_log(__name__) @@ -211,20 +214,55 @@ class EventPublisher: def __init__(self, topic): self.__topic = topic + self.__log = LogFactory().get_log(__name__) + self.__start_time = int(time.time()) def publish(self, event): - mb_ip = Config.read_property(constants.MB_IP) - mb_port = Config.read_property(constants.MB_PORT) - mb_username = Config.read_property(constants.MB_USERNAME, False) - mb_password = Config.read_property(constants.MB_PASSWORD, False) - if mb_username is None: + publisher_thread = threading.Thread(target=self.__publish_event, args=(event,)) + publisher_thread.start() + + def __publish_event(self, event): + """ + Publishes the given event to the message broker. + + When a list of message brokers are given the event is published to the first message broker + available. Therefore the message brokers should share the data (ex: Sharing the KahaDB in ActiveMQ). + + When the event cannot be published, it will be retried until the mb_publisher_timeout is exceeded. + This value is set in the agent.conf. + + :param event: + :return: True if the event was published. + """ + if Config.mb_username is None: auth = None else: - auth = {"username": mb_username, "password": mb_password} + auth = {"username": Config.mb_username, "password": Config.mb_password} payload = event.to_json() - publish.single(self.__topic, - payload, - hostname=mb_ip, - port=mb_port, - auth=auth) + + retry_iterator = IncrementalCeilingListIterator([2, 2, 5, 5, 10, 10, 20, 20, 30, 30, 40, 40, 50, 50, 60], False) + + # Retry to publish the event until the timeout exceeds + while int(time.time()) - self.__start_time < (Config.mb_publisher_timeout * 1000): + retry_interval = retry_iterator.get_next_retry_interval() + + for mb_url in Config.mb_urls: + mb_ip, mb_port = mb_url.split(":") + + try: + publish.single(self.__topic, payload, hostname=mb_ip, port=mb_port, auth=auth) + self.__log.debug("Event published to %s:%s" % (mb_ip, mb_port)) + return True + except: + self.__log.debug("Could not publish event to message broker %s:%s." % (mb_ip, mb_port)) + + self.__log.debug( + "Could not publish event to any of the provided message brokers. Retrying in %s seconds." + % retry_interval) + + time.sleep(retry_interval) + + self.__log.warn("Could not publish even to any of the provided message brokers before " + "the timeout [%s] exceeded. The event will be dropped." % Config.mb_publisher_timeout) + return False http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py index 908a44c..90a9771 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py @@ -20,6 +20,8 @@ import threading import paho.mqtt.client as mqtt from modules.util.log import LogFactory +from modules.util.asyncscheduledtask import * +from modules.util.cartridgeagentutils import IncrementalCeilingListIterator class EventSubscriber(threading.Thread): @@ -28,36 +30,75 @@ class EventSubscriber(threading.Thread): register event handlers for various events. """ - def __init__(self, topic, ip, port, username, password): + log = LogFactory().get_log(__name__) + + def __init__(self, topic, urls, username, password): threading.Thread.__init__(self) self.__event_queue = Queue(maxsize=0) self.__event_executor = EventExecutor(self.__event_queue) - self.log = LogFactory().get_log(__name__) - self.__mb_client = None self.__topic = topic self.__subscribed = False - self.__ip = ip - self.__port = port + self.__urls = urls self.__username = username self.__password = password def run(self): # Start the event executor thread self.__event_executor.start() - self.__mb_client = mqtt.Client() - self.__mb_client.on_connect = self.on_connect - self.__mb_client.on_message = self.on_message - if self.__username is not None: - self.log.debug("Message broker credentials are... %s:%s" % (self.__username, self.__password)) - self.__mb_client.username_pw_set(self.__username, self.__password) - self.log.debug("Connecting to the message broker with address %r:%r" % (self.__ip, self.__port)) - self.__mb_client.connect(self.__ip, self.__port, 60) - self.__subscribed = True - self.__mb_client.loop_forever() + """ + The following loop will iterate forever. + + When a successful connection is made and the failover() method returns, a job will start + which will periodically check the availability of the connected message broker. Then the + blocking method loop_forever() will be called on the connected mqtt client. This will only + return if disconnect() is called on the same client. If the connected message broker goes + down, the periodical job will call disconnect() on the connected client and the + loop_forever() method will return. The parent loop will be called again and this repeats + every time the message brokers are disconnected. + + This behavior guarantees that the subscriber is always subscribed to an available message + broker. + + """ + while True: + self.__mb_client = mqtt.Client() + self.__mb_client.on_connect = self.on_connect + self.__mb_client.on_message = self.on_message + if self.__username is not None: + EventSubscriber.log.info("Message broker credentials are provided.") + self.__mb_client.username_pw_set(self.__username, self.__password) + + # Select an online message broker and connect + self.__mb_client, connected_mb_ip, connected_mb_port = \ + EventSubscriber.failover(self.__urls, self.__mb_client) + + EventSubscriber.log.info( + "Connected to the message broker with address %r:%r" % (connected_mb_ip, connected_mb_port)) + + self.__subscribed = True + + # Start a job to periodically check the online status of the connected message broker + heartbeat_task = MessageBrokerHeartBeatChecker( + self.__mb_client, + connected_mb_ip, + connected_mb_port, + self.__username, + self.__password) + + heartbeat_job = ScheduledExecutor(5, heartbeat_task) + heartbeat_job.start() + + # Start blocking loop method + self.__mb_client.loop_forever() + + # Disconnected when the heart beat checker detected an offline message broker + self.__subscribed = False + heartbeat_job.terminate() + EventSubscriber.log.debug("Disconnected from the message broker %s:%s. Reconnecting..." % (connected_mb_ip, connected_mb_port)) def register_handler(self, event, handler): """ @@ -68,15 +109,15 @@ class EventSubscriber(threading.Thread): :rtype: void """ self.__event_executor.register_event_handler(event, handler) - self.log.debug("Registered handler for event %r" % event) + EventSubscriber.log.debug("Registered handler for event %r" % event) def on_connect(self, client, userdata, flags, rc): - self.log.debug("Connected to message broker.") + EventSubscriber.log.debug("Connected to message broker.") self.__mb_client.subscribe(self.__topic) - self.log.debug("Subscribed to %r" % self.__topic) + EventSubscriber.log.debug("Subscribed to %r" % self.__topic) def on_message(self, client, userdata, msg): - self.log.debug("Message received: %s:\n%s" % (msg.topic, msg.payload)) + EventSubscriber.log.debug("Message received: %s:\n%s" % (msg.topic, msg.payload)) self.__event_queue.put(msg) def is_subscribed(self): @@ -87,6 +128,43 @@ class EventSubscriber(threading.Thread): """ return self.__subscribed + @staticmethod + def failover(mb_urls, mb_client): + """ + Iterate through the list of message brokers provided and connect to the first available server. This will not + return until a message broker connection is established. + + :param mb_urls: the list of message broker URLS of format [host:port, host:port] + :param mb_client: the initialized message broker client object + :return: a tuple of the connected message broker client, connected message broker IP address and connected + message broker port + + """ + # Connection retry interval incrementer + message_broker_retry_timer = IncrementalCeilingListIterator( + [2, 2, 5, 5, 10, 10, 20, 20, 30, 30, 40, 40, 50, 50, 60], + False) + + # Cycling through the provided mb urls until forever + while True: + retry_interval = message_broker_retry_timer.get_next_retry_interval() + + for mb_url in mb_urls: + mb_ip, mb_port = mb_url.split(":") + EventSubscriber.log.debug( + "Trying to connect to the message broker with address %r:%r" % (mb_ip, mb_port)) + try: + mb_client.connect(mb_ip, mb_port, 60) + return mb_client, mb_ip, mb_port + except: + # The message broker didn't respond well + EventSubscriber.log.debug("Could not connect to the message broker at %s:%s." % (mb_ip, mb_port)) + + EventSubscriber.log.debug( + "Could not connect to any of the message brokers provided. Retrying in %s seconds." % retry_interval) + + time.sleep(retry_interval) + class EventExecutor(threading.Thread): """ @@ -97,7 +175,7 @@ class EventExecutor(threading.Thread): self.__event_queue = event_queue # TODO: several handlers for one event self.__event_handlers = {} - self.log = LogFactory().get_log(__name__) + EventSubscriber.log = LogFactory().get_log(__name__) def run(self): while True: @@ -106,16 +184,44 @@ class EventExecutor(threading.Thread): if event in self.__event_handlers: handler = self.__event_handlers[event] try: - self.log.debug("Executing handler for event %r" % event) + EventSubscriber.log.debug("Executing handler for event %r" % event) handler(event_msg) except: - self.log.exception("Error processing %r event" % event) + EventSubscriber.log.exception("Error processing %r event" % event) else: - self.log.debug("Event handler not found for event : %r" % event) + EventSubscriber.log.debug("Event handler not found for event : %r" % event) def register_event_handler(self, event, handler): self.__event_handlers[event] = handler def terminate(self): self.terminate() + + +class MessageBrokerHeartBeatChecker(AbstractAsyncScheduledTask): + """ + A scheduled task to periodically check if the connected message broker is online. + If the message broker goes offline, it will disconnect the currently connected + client object and it will return from the loop_forever() method. + """ + + def __init__(self, connected_client, mb_ip, mb_port, username=None, password=None): + self.__mb_client = mqtt.Client() + + if username is not None: + self.__mb_client.username_pw_set(username, password) + + self.__mb_ip = mb_ip + self.__mb_port = mb_port + self.__connected_client = connected_client + self.__log = LogFactory().get_log(__name__) + + def execute_task(self): + try: + self.__mb_client.connect(self.__mb_ip, self.__mb_port, 60) + self.__mb_client.disconnect() + except: + self.__log.info( + "Message broker %s:%s cannot be reached. Disconnecting client..." % (self.__mb_ip, self.__mb_port)) + self.__connected_client.disconnect() http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java new file mode 100644 index 0000000..30ff703 --- /dev/null +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.stratos.python.cartridge.agent.integration.tests; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.stratos.common.domain.LoadBalancingIPType; +import org.apache.stratos.messaging.domain.topology.*; +import org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent; +import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent; +import org.apache.stratos.messaging.event.topology.MemberInitializedEvent; +import org.testng.annotations.Test; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Created by chamilad on 11/11/15. + */ +public class MessageBrokerHATestCase extends PythonAgentIntegrationTest { + public MessageBrokerHATestCase() throws IOException { + } + + private static final Log log = LogFactory.getLog(MessageBrokerHATestCase.class); + private static final int ADC_TIMEOUT = 300000; + private static final String CLUSTER_ID = "tomcat.domain"; + private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-6"; + private static final String AUTOSCALING_POLICY_NAME = "autoscaling-policy-6"; + private static final String APP_ID = "application-6"; + private static final String MEMBER_ID = "tomcat.member-1"; + private static final String INSTANCE_ID = "instance-1"; + private static final String CLUSTER_INSTANCE_ID = "cluster-1-instance-1"; + private static final String NETWORK_PARTITION_ID = "network-partition-1"; + private static final String PARTITION_ID = "partition-1"; + private static final String TENANT_ID = "6"; + private static final String SERVICE_NAME = "tomcat"; + + + @BeforeMethod(alwaysRun = true) + public void setup() throws Exception { + System.setProperty("jndi.properties.dir", getTestCaseResourcesPath()); +// integrationTestPropertiesPath = new FileInputStream(new File(getTestCaseResourcesPath() + PATH_SEP + "integration-test.properties")); + + super.setup(ADC_TIMEOUT); + startServerSocket(8080); + } + + @AfterMethod(alwaysRun = true) + public void tearDownBrokerHATest(){ + tearDown(); + } + + @Test(groups = {"test"}) + public void testBrokerFailoverHeartbeat(){ + startCommunicatorThread(); + sleep(10000); +// assertAgentActivation(); + + // take down the default broker + log.info("Stopping subscribed message broker: DEFAULT"); + stopActiveMQInstance("testBroker-" + amqpBindPorts[0] + "-" + mqttBindPorts[0]); + + List<String> outputLines = new ArrayList<>(); + boolean exit = false; + while (!exit) { + List<String> newLines = getNewLines(outputLines, outputStream.toString()); + if (newLines.size() > 0) { + for (String line : newLines) { + if (line.contains("Message broker localhost:" + mqttBindPorts[0] + " cannot be reached. Disconnecting client...")) { + log.info("Message Broker Heartbeat checker has detected message broker node termination and is trying the next option."); + exit = true; + } + } + } + sleep(1000); + } + + sleep(10000); + log.info("Stopping subscribed message broker"); + stopActiveMQInstance("testBroker-" + amqpBindPorts[1] + "-" + mqttBindPorts[1]); + + exit = false; + while (!exit) { + List<String> newLines = getNewLines(outputLines, outputStream.toString()); + if (newLines.size() > 0) { + for (String line : newLines) { + if (line.contains("Message broker localhost:" + mqttBindPorts[1] + " cannot be reached. Disconnecting client...")) { + log.info("Message Broker Heartbeat checker has detected message broker node termination and is trying the next option."); + exit = true; + } + } + } + sleep(1000); + } + + sleep(20000); + log.info("Stopping subscribed message broker"); + stopActiveMQInstance("testBroker-" + amqpBindPorts[2] + "-" + mqttBindPorts[2]); + + exit = false; + while (!exit) { + List<String> newLines = getNewLines(outputLines, outputStream.toString()); + if (newLines.size() > 0) { + for (String line : newLines) { + if (line.contains("Message broker localhost:" + mqttBindPorts[2] + " cannot be reached. Disconnecting client...")) { + log.info("Message Broker Heartbeat checker has detected message broker node termination and is trying the next option."); + } + if (line.contains("Could not connect to any of the message brokers provided. Retrying in 2 seconds")) { + log.info("Failover went through all the options and will be retrying."); + exit = true; + } + } + } + sleep(1000); + } + } + + @Test(groups = {"smoke"}) + public void testBrokerFailoverForPublisher(){ + startCommunicatorThread(); + + + List<String> outputLines = new ArrayList<>(); + boolean exit = false; + while (!exit) { + List<String> newLines = getNewLines(outputLines, outputStream.toString()); + if (newLines.size() > 0) { + for (String line : newLines) { + if (line.contains("Subscribed to 'topology/#'")) { + // take down the default broker + stopActiveMQInstance("testBroker-" + amqpBindPorts[0] + "-" + mqttBindPorts[0]); + } + + if (line.contains("Waiting for complete topology event")) { + + sleep(4000); + +// stopActiveMQInstance("testBroker2"); +// stopActiveMQInstance("testBroker3"); + // Send complete topology event + log.info("Publishing complete topology event..."); + Topology topology = createTestTopology(); + CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology); + publishEvent(completeTopologyEvent); + log.info("Complete topology event published"); + } + + if (line.contains("Waiting for cartridge agent to be initialized")) { + // Publish member initialized event + log.info("Publishing member initialized event..."); + MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent( + SERVICE_NAME, CLUSTER_ID, CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID, + PARTITION_ID, INSTANCE_ID + ); + publishEvent(memberInitializedEvent); + log.info("Member initialized event published"); + } + + + // Send artifact updated event to activate the instance first + if (line.contains("Artifact repository found")) { + publishEvent(getArtifactUpdatedEventForPublicRepo()); + log.info("Artifact updated event published"); + } + + if (line.contains("Could not publish event to message broker localhost:1885.")) { + log.info("Event publishing to default message broker failed and the next option is tried."); + exit = true; + } + +// if (line.contains("The event will be dropped.")) { +// log.info("Event publishing failed after timeout exceeded and the event was dropped."); +// exit = true; +// } + } + } + sleep(1000); + } + +// assertAgentActivation(); + } + + private void assertAgentActivation() { + Thread startupTestThread = new Thread(new Runnable() { + @Override + public void run() { + while (!eventReceiverInitiated) { + sleep(1000); + } + List<String> outputLines = new ArrayList<>(); + boolean completeTopologyPublished = false; + boolean memberInitPublished = false; + while (!outputStream.isClosed()) { + List<String> newLines = getNewLines(outputLines, outputStream.toString()); + if (newLines.size() > 0) { + for (String line : newLines) { + if (line.contains("Waiting for complete topology event") && !completeTopologyPublished) { + sleep(2000); + // Send complete topology event + log.info("Publishing complete topology event..."); + Topology topology = createTestTopology(); + CompleteTopologyEvent completeTopologyEvent = new CompleteTopologyEvent(topology); + publishEvent(completeTopologyEvent); + log.info("Complete topology event published"); + completeTopologyPublished = true; + } + + if (line.contains("Waiting for cartridge agent to be initialized") && !memberInitPublished) { + // Publish member initialized event + log.info("Publishing member initialized event..."); + MemberInitializedEvent memberInitializedEvent = new MemberInitializedEvent( + SERVICE_NAME, CLUSTER_ID, CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID, + PARTITION_ID, INSTANCE_ID + ); + publishEvent(memberInitializedEvent); + log.info("Member initialized event published"); + memberInitPublished = true; + } + + // Send artifact updated event to activate the instance first + if (line.contains("Artifact repository found")) { + publishEvent(getArtifactUpdatedEventForPublicRepo()); + log.info("Artifact updated event published"); + } + } + } + sleep(1000); + } + } + }); + startupTestThread.start(); + + while (!instanceStarted || !instanceActivated) { + // wait until the instance activated event is received. + // this will assert whether instance got activated within timeout period; no need for explicit assertions + sleep(2000); + } + } + + private ArtifactUpdatedEvent getArtifactUpdatedEventForPublicRepo() { + ArtifactUpdatedEvent publicRepoEvent = createTestArtifactUpdatedEvent(); + publicRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/opentestrepo1.git"); + return publicRepoEvent; + } + + private static ArtifactUpdatedEvent createTestArtifactUpdatedEvent() { + ArtifactUpdatedEvent artifactUpdatedEvent = new ArtifactUpdatedEvent(); + artifactUpdatedEvent.setClusterId(CLUSTER_ID); + artifactUpdatedEvent.setTenantId(TENANT_ID); + return artifactUpdatedEvent; + } + + /** + * Create test topology + * + * @return + */ + private Topology createTestTopology() { + Topology topology = new Topology(); + Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant); + topology.addService(service); + + Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, DEPLOYMENT_POLICY_NAME, + AUTOSCALING_POLICY_NAME, APP_ID); + service.addCluster(cluster); + + Member member = new Member(service.getServiceName(), cluster.getClusterId(), MEMBER_ID, + CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, LoadBalancingIPType.Private, + System.currentTimeMillis()); + + member.setDefaultPrivateIP("10.0.0.1"); + member.setDefaultPublicIP("20.0.0.1"); + Properties properties = new Properties(); + properties.setProperty("prop1", "value1"); + member.setProperties(properties); + member.setStatus(MemberStatus.Created); + cluster.addMember(member); + + return topology; + } +} http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java index 3c7661b..fb12db6 100644 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java @@ -49,27 +49,30 @@ import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; public class PythonAgentIntegrationTest { - protected final Properties integrationProperties = new Properties(); - public static final String PATH_SEP = File.separator; - private static final Log log = LogFactory.getLog(PythonAgentIntegrationTest.class); - protected BrokerService broker; + public static final String PATH_SEP = File.separator; public static final String NEW_LINE = System.getProperty("line.separator"); - public static final String ACTIVEMQ_AMQP_BIND_ADDRESS = "activemq.amqp.bind.address"; - public static final String ACTIVEMQ_MQTT_BIND_ADDRESS = "activemq.mqtt.bind.address"; + + public static final String ACTIVEMQ_AMQP_BIND_PORTS = "activemq.amqp.bind.ports"; + public static final String ACTIVEMQ_MQTT_BIND_PORTS = "activemq.mqtt.bind.ports"; public static final String CEP_PORT = "cep.server.one.port"; public static final String CEP_SSL_PORT = "cep.server.one.ssl.port"; public static final String DISTRIBUTION_NAME = "distribution.name"; + + private static final Log log = LogFactory.getLog(PythonAgentIntegrationTest.class); + public static final String TEST_THREAD_POOL_SIZE = "test.thread.pool.size"; protected final UUID PYTHON_AGENT_DIR_NAME = UUID.randomUUID(); +// protected final String defaultBrokerName = "testBrokerDefault"; + protected final Properties integrationProperties = new Properties(); protected Map<Integer, ServerSocket> serverSocketMap = new HashMap<>(); protected Map<String, Executor> executorList = new HashMap<>(); protected int cepPort; protected int cepSSLPort; - protected String amqpBindAddress; - protected String mqttBindAddress; + protected String[] amqpBindPorts; + protected String[] mqttBindPorts; protected String distributionName; protected int testThreadPoolSize; @@ -82,12 +85,38 @@ public class PythonAgentIntegrationTest { protected ByteArrayOutputStreamLocal outputStream; protected ThriftTestServer thriftTestServer; + private Map<String, BrokerService> messageBrokers; + + /** * Setup method for test method testPythonCartridgeAgent */ protected void setup(int timeout) throws Exception { + messageBrokers = new HashMap<>(); + + distributionName = integrationProperties.getProperty(DISTRIBUTION_NAME); + + cepPort = Integer.parseInt(integrationProperties.getProperty(CEP_PORT)); + cepSSLPort = Integer.parseInt(integrationProperties.getProperty(CEP_SSL_PORT)); + + Properties jndiProperties = new Properties(); + jndiProperties.load(new FileInputStream(new File(System.getProperty("jndi.properties.dir") + PATH_SEP + "jndi.properties"))); + if (!jndiProperties.containsKey(ACTIVEMQ_AMQP_BIND_PORTS) || !jndiProperties.containsKey(ACTIVEMQ_MQTT_BIND_PORTS)) { + amqpBindPorts = integrationProperties.getProperty(ACTIVEMQ_AMQP_BIND_PORTS).split(","); + mqttBindPorts = integrationProperties.getProperty(ACTIVEMQ_MQTT_BIND_PORTS).split(","); + }else{ + amqpBindPorts = jndiProperties.getProperty(ACTIVEMQ_AMQP_BIND_PORTS).split(","); + mqttBindPorts = jndiProperties.getProperty(ACTIVEMQ_MQTT_BIND_PORTS).split(","); + } + + if (amqpBindPorts.length != mqttBindPorts.length) { + throw new RuntimeException("The number of AMQP ports and MQTT ports should be equal in integration-test.properties."); + } + // start ActiveMQ test server - startBroker(); + for (int i = 0; i < amqpBindPorts.length; i++){ + startActiveMQInstance(Integer.parseInt(amqpBindPorts[i]), Integer.parseInt(mqttBindPorts[i]), true); + } if (!this.eventReceiverInitiated) { ExecutorService executorService = StratosThreadPool.getExecutorService("TEST_THREAD_POOL", testThreadPoolSize); @@ -193,45 +222,86 @@ public class PythonAgentIntegrationTest { this.instanceActivated = false; this.instanceStarted = false; - try { - broker.stop(); - broker = null; - } catch (Exception ignore) { + + // stop the broker services + for (Map.Entry<String, BrokerService> entry : this.messageBrokers.entrySet()) { + try { + log.debug("Stopping broker service [" + entry.getKey() + "]"); + entry.getValue().stop(); + } catch (Exception ignore) { + } } + + this.messageBrokers = null; + // TODO: use thread synchronization and assert all connections are properly closed // leave some room to clear up active connections sleep(1000); } public PythonAgentIntegrationTest() throws IOException { - integrationProperties - .load(PythonAgentIntegrationTest.class.getResourceAsStream(PATH_SEP + "integration-test.properties")); + integrationProperties.load( + PythonAgentIntegrationTest.class.getResourceAsStream(PATH_SEP + "integration-test.properties")); distributionName = integrationProperties.getProperty(DISTRIBUTION_NAME); - amqpBindAddress = integrationProperties.getProperty(ACTIVEMQ_AMQP_BIND_ADDRESS); - mqttBindAddress = integrationProperties.getProperty(ACTIVEMQ_MQTT_BIND_ADDRESS); cepPort = Integer.parseInt(integrationProperties.getProperty(CEP_PORT)); cepSSLPort = Integer.parseInt(integrationProperties.getProperty(CEP_SSL_PORT)); testThreadPoolSize = Integer.parseInt(integrationProperties.getProperty(TEST_THREAD_POOL_SIZE)); log.info("PCA integration properties: " + integrationProperties.toString()); } - protected void startBroker() throws Exception { + protected String startActiveMQInstance(int amqpPort, int mqttPort, boolean secured) throws Exception { + + try { + ServerSocket serverSocket = new ServerSocket(amqpPort); + serverSocket.close(); + } catch (IOException e) { + throw new RuntimeException("AMQP port " + amqpPort + " is already in use.", e); + } + + try { + ServerSocket serverSocket = new ServerSocket(mqttPort); + serverSocket.close(); + } catch (IOException e) { + throw new RuntimeException("MQTT port " + mqttPort + " is already in use.", e); + } + System.setProperty("mb.username", "system"); System.setProperty("mb.password", "manager"); - broker = new BrokerService(); - broker.addConnector(amqpBindAddress); - broker.addConnector(mqttBindAddress); - AuthenticationUser authenticationUser = new AuthenticationUser("system", "manager", "users,admins"); - List<AuthenticationUser> authUserList = new ArrayList<>(); - authUserList.add(authenticationUser); - broker.setPlugins(new BrokerPlugin[]{new SimpleAuthenticationPlugin(authUserList)}); - broker.setBrokerName("testBroker"); + String brokerName = "testBroker-" + amqpPort + "-" + mqttPort; + + log.info("Starting an ActiveMQ instance"); + BrokerService broker = new BrokerService(); + broker.addConnector("tcp://localhost:" + (amqpPort)); + broker.addConnector("mqtt://localhost:" + (mqttPort)); + + if (secured) { + AuthenticationUser authenticationUser = new AuthenticationUser("system", "manager", "users,admins"); + List<AuthenticationUser> authUserList = new ArrayList<>(); + authUserList.add(authenticationUser); + broker.setPlugins(new BrokerPlugin[]{new SimpleAuthenticationPlugin(authUserList)}); + } + + broker.setBrokerName(brokerName); broker.setDataDirectory( PythonAgentIntegrationTest.class.getResource(PATH_SEP).getPath() + PATH_SEP + ".." + PATH_SEP + - PYTHON_AGENT_DIR_NAME + PATH_SEP + "activemq-data"); + PYTHON_AGENT_DIR_NAME + PATH_SEP + "activemq-data-" + brokerName); broker.start(); - log.info("Broker service started!"); + this.messageBrokers.put(brokerName, broker); + log.info("ActiveMQ Broker service [" + brokerName + "] started! [AMQP] " + amqpPort + " [MQTT] " + mqttPort); + + return brokerName; + } + + protected void stopActiveMQInstance(String brokerName){ + if (this.messageBrokers.containsKey(brokerName)){ + log.debug("Stopping broker service [" + brokerName + "]"); + BrokerService broker = this.messageBrokers.get(brokerName); + try { + broker.stop(); + } catch (Exception ignore) { + } + } } protected void startCommunicatorThread() { http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCExtensionTestCase/agent.conf ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCExtensionTestCase/agent.conf b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCExtensionTestCase/agent.conf index d01a246..894a254 100755 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCExtensionTestCase/agent.conf +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCExtensionTestCase/agent.conf @@ -16,8 +16,7 @@ # under the License. [agent] -mb.ip =localhost -mb.port =1885 +mb.urls =localhost:1885 mb.username =system mb.password =manager listen.address =localhost http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTenantUserTestCase/agent.conf ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTenantUserTestCase/agent.conf b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTenantUserTestCase/agent.conf index b5efb1c..00eb956 100755 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTenantUserTestCase/agent.conf +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTenantUserTestCase/agent.conf @@ -16,8 +16,7 @@ # under the License. [agent] -mb.ip =localhost -mb.port =1885 +mb.urls =localhost:1885 mb.username =system mb.password =manager listen.address =localhost http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTestCase/agent.conf ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTestCase/agent.conf b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTestCase/agent.conf index 7362697..644cd13 100755 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTestCase/agent.conf +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTestCase/agent.conf @@ -16,8 +16,7 @@ # under the License. [agent] -mb.ip =localhost -mb.port =1885 +mb.urls =localhost:1885 mb.username =system mb.password =manager listen.address =localhost http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCTestCase/agent.conf ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCTestCase/agent.conf b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCTestCase/agent.conf index d01a246..894a254 100755 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCTestCase/agent.conf +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCTestCase/agent.conf @@ -16,8 +16,7 @@ # under the License. [agent] -mb.ip =localhost -mb.port =1885 +mb.urls =localhost:1885 mb.username =system mb.password =manager listen.address =localhost http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentStartupTestCase/agent.conf ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentStartupTestCase/agent.conf b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentStartupTestCase/agent.conf index 35d462b..afcd2f2 100755 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentStartupTestCase/agent.conf +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentStartupTestCase/agent.conf @@ -16,10 +16,10 @@ # under the License. [agent] -mb.ip =localhost -mb.port =1885 +mb.urls =localhost:1885 mb.username =system mb.password =manager +mb.publisher.timeout =900 listen.address =localhost thrift.receiver.urls =localhost:7712 thrift.server.admin.username =admin http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/CEPHAModeTestCase/agent.conf ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/CEPHAModeTestCase/agent.conf b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/CEPHAModeTestCase/agent.conf index e982bc3..e0a8c23 100755 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/CEPHAModeTestCase/agent.conf +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/CEPHAModeTestCase/agent.conf @@ -16,10 +16,10 @@ # under the License. [agent] -mb.ip =localhost -mb.port =1885 +mb.urls =localhost:1885 mb.username =system mb.password =manager +mb.publisher.timeout =900 listen.address =localhost thrift.receiver.urls =localhost:7712,localhost:7713 thrift.server.admin.username =admin http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/agent.conf ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/agent.conf b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/agent.conf new file mode 100644 index 0000000..8395bfd --- /dev/null +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/agent.conf @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[agent] +mb.urls =localhost:1885,localhost:1886,localhost:1887 +mb.username =system +mb.password =manager +mb.publisher.timeout =20 +listen.address =localhost +thrift.receiver.urls =localhost:7712 +thrift.server.admin.username =admin +thrift.server.admin.password =admin +cep.stats.publisher.enabled =true +lb.private.ip = +lb.public.ip = +enable.artifact.update =true +auto.commit =true +auto.checkout =true +artifact.update.interval =15 +artifact.clone.retries =5 +artifact.clone.interval =10 +port.check.timeout =600000 +enable.data.publisher =false +monitoring.server.ip =localhost +monitoring.server.port =7612 +monitoring.server.secure.port =7712 +monitoring.server.admin.username =admin +monitoring.server.admin.password =admin +log.file.paths =/tmp/agent.screen-adc-test.log +metadata.service.url =https://localhost:9443 +super.tenant.repository.path =/repository/deployment/server/ +tenant.repository.path =/repository/tenants/ http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/jndi.properties ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/jndi.properties b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/jndi.properties new file mode 100644 index 0000000..485fd75 --- /dev/null +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/jndi.properties @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +connectionfactoryName=TopicConnectionFactory +java.naming.provider.url=failover:(tcp://localhost:61617,tcp://localhost:61618,tcp://localhost:61619)?initialReconnectDelay=100 +java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory +java.naming.security.principal=system +java.naming.security.credentials=manager +activemq.amqp.bind.ports=61617,61618,61619 +activemq.mqtt.bind.ports=1885,1886,1887 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/logging.ini ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/logging.ini b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/logging.ini new file mode 100644 index 0000000..15cad9b --- /dev/null +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/logging.ini @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +[formatters] +keys=default + +[formatter_default] +format=[%(asctime)s] %(levelname)s {%(filename)s:%(funcName)s} - %(message)s +class=logging.Formatter + +[handlers] +keys=console, error_file, log_file + +[handler_console] +class=logging.StreamHandler +formatter=default +args=tuple() + +[handler_log_file] +class=logging.FileHandler +level=DEBUG +formatter=default +args=("agent.log", "w") + +[handler_error_file] +class=logging.FileHandler +level=ERROR +formatter=default +args=("error.log", "w") + +[loggers] +keys=root + +[logger_root] +level=DEBUG +formatter=default +handlers=console,error_file,log_file \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/payload/launch-params ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/payload/launch-params b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/payload/launch-params new file mode 100644 index 0000000..3498976 --- /dev/null +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/payload/launch-params @@ -0,0 +1 @@ +APPLICATION_ID=application1,SERVICE_NAME=tomcat,HOST_NAME=tomcat.stratos.org,MULTITENANT=false,TENANT_ID=-1234,TENANT_RANGE=*,CARTRIDGE_ALIAS=tomcat,CLUSTER_ID=tomcat.domain,CLUSTER_INSTANCE_ID=cluster-1-instance-1,CARTRIDGE_KEY=PUjpXCLujDhYr5A6,DEPLOYMENT=default,PORTS=8080,PUPPET_IP=127.0.0.1,PUPPET_HOSTNAME=puppet.apache.stratos.org,PUPPET_ENV=false,MEMBER_ID=tomcat.member-1,LB_CLUSTER_ID=null,NETWORK_PARTITION_ID=network-p1,PARTITION_ID=p1,APPLICATION_PATH=/tmp/ADCTestCase,MIN_COUNT=1,INTERNAL=false,CLUSTERING_PRIMARY_KEY=A,LOG_FILE_PATHS=/tmp/temp.log,PERSISTENCE_MAPPING=null \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-conf/integration-test.properties ---------------------------------------------------------------------- diff --git a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-conf/integration-test.properties b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-conf/integration-test.properties index 5a98dbe..4e5c66f 100755 --- a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-conf/integration-test.properties +++ b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-conf/integration-test.properties @@ -18,8 +18,8 @@ # Stratos distribution properties added via filters during the build distribution.version=${project.version} distribution.name=${python.cartridge.agent.distribution.name}-${project.version} -activemq.amqp.bind.address=tcp://localhost:61617 -activemq.mqtt.bind.address=mqtt://localhost:1885 +activemq.amqp.bind.ports=61617 +activemq.mqtt.bind.ports=1885 cep.server.one.port=7612 cep.server.two.port=7613 cep.server.one.ssl.port=7712
