Repository: stratos
Updated Branches:
  refs/heads/stratos-4.1.x 44f527018 -> 9c57c7d80


PCA - Message broker HA support. The list of message brokers can be provided 
via the agent.conf file


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/9c57c7d8
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/9c57c7d8
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/9c57c7d8

Branch: refs/heads/stratos-4.1.x
Commit: 9c57c7d803f790078963861e553e5b79290d8f2d
Parents: 44f5270
Author: Chamila de Alwis <[email protected]>
Authored: Mon Nov 16 20:40:23 2015 +0530
Committer: Chamila de Alwis <[email protected]>
Committed: Mon Nov 16 20:41:01 2015 +0530

----------------------------------------------------------------------
 .../cartridge.agent/cartridge.agent/agent.conf  |   4 +-
 .../cartridge.agent/cartridge.agent/agent.py    |  24 +-
 .../cartridge.agent/cartridge.agent/config.py   |  14 +
 .../cartridge.agent/constants.py                |   4 +-
 .../modules/util/asyncscheduledtask.py          |   2 +-
 .../modules/util/cartridgeagentutils.py         |  43 ++-
 .../cartridge.agent/modules/util/log.py         |   2 +-
 .../cartridge.agent/publisher.py                |  60 +++-
 .../cartridge.agent/subscriber.py               | 152 ++++++++--
 .../tests/MessageBrokerHATestCase.java          | 304 +++++++++++++++++++
 .../tests/PythonAgentIntegrationTest.java       | 126 ++++++--
 .../resources/ADCExtensionTestCase/agent.conf   |   3 +-
 .../ADCMTAppTenantUserTestCase/agent.conf       |   3 +-
 .../test/resources/ADCMTAppTestCase/agent.conf  |   3 +-
 .../src/test/resources/ADCTestCase/agent.conf   |   3 +-
 .../resources/AgentStartupTestCase/agent.conf   |   4 +-
 .../test/resources/CEPHAModeTestCase/agent.conf |   4 +-
 .../MessageBrokerHATestCase/agent.conf          |  46 +++
 .../MessageBrokerHATestCase/jndi.properties     |  26 ++
 .../MessageBrokerHATestCase/logging.ini         |  52 ++++
 .../payload/launch-params                       |   1 +
 .../test-conf/integration-test.properties       |   4 +-
 22 files changed, 780 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf
index f16aa43..a8c8a19 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf
@@ -16,10 +16,10 @@
 # under the License.
 
 [agent]
-mb.ip                                 =MB-IP
-mb.port                               =MB-PORT
+mb.urls                               =MB-URLS
 mb.username                           =MB-USERNAME
 mb.password                           =MB-PASSWORD
+mb.publisher.timeout                  =MB-PUBLISHER-TIMEOUT
 listen.address                        =LISTEN_ADDR
 thrift.receiver.urls                  =CEP-URLS
 thrift.server.admin.username          =CEP-ADMIN-USERNAME

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
index 9c1159c..959568b 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.py
@@ -38,22 +38,14 @@ class CartridgeAgent(threading.Thread):
         self.__terminated = False
         self.__log = LogFactory().get_log(__name__)
 
-        mb_ip = Config.read_property(constants.MB_IP)
-        mb_port = Config.read_property(constants.MB_PORT)
-        mb_username = Config.read_property(constants.MB_USERNAME, False)
-        mb_password = Config.read_property(constants.MB_PASSWORD, False)
-
-        self.__inst_topic_subscriber = \
-            EventSubscriber(constants.INSTANCE_NOTIFIER_TOPIC, mb_ip, mb_port, 
mb_username, mb_password)
-
-        self.__tenant_topic_subscriber = \
-            EventSubscriber(constants.TENANT_TOPIC, mb_ip, mb_port, 
mb_username, mb_password)
-
-        self.__app_topic_subscriber = \
-            EventSubscriber(constants.APPLICATION_SIGNUP, mb_ip, mb_port, 
mb_username, mb_password)
-
-        self.__topology_event_subscriber = \
-            EventSubscriber(constants.TOPOLOGY_TOPIC, mb_ip, mb_port, 
mb_username, mb_password)
+        mb_urls = Config.mb_urls
+        mb_uname = Config.mb_username
+        mb_pwd = Config.mb_password
+
+        self.__inst_topic_subscriber = 
EventSubscriber(constants.INSTANCE_NOTIFIER_TOPIC, mb_urls, mb_uname, mb_pwd)
+        self.__tenant_topic_subscriber = 
EventSubscriber(constants.TENANT_TOPIC, mb_urls, mb_uname, mb_pwd)
+        self.__app_topic_subscriber = 
EventSubscriber(constants.APPLICATION_SIGNUP, mb_urls, mb_uname, mb_pwd)
+        self.__topology_event_subscriber = 
EventSubscriber(constants.TOPOLOGY_TOPIC, mb_urls, mb_uname, mb_pwd)
 
         self.__event_handler = EventHandler()
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
index e50c47d..f1a70ec 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/config.py
@@ -140,6 +140,14 @@ class Config:
     """ :type : bool """
     maintenance = False
     """ :type : bool """
+    mb_urls = []
+    """ :type : list """
+    mb_username = None
+    """ :type : str """
+    mb_password = None
+    """ :type : str """
+    mb_publisher_timeout = None
+    """ :type : int """
 
     @staticmethod
     def read_conf_file():
@@ -171,6 +179,7 @@ class Config:
     def read_payload_file(param_file_path):
         """
         Reads the payload file of the cartridge and stores the values in a 
dictionary
+        :param param_file_path: payload parameter file path
         :return: Payload parameter dictionary of values
         :rtype: dict
         """
@@ -235,6 +244,7 @@ class Config:
     def read_property(property_key, critical=True):
         """
         Returns the value of the provided property
+        :param critical: If absence of this value should throw an error
         :param str property_key: the name of the property to be read
         :return: Value of the property
         :exception: ParameterNotFoundException if the provided property cannot 
be found
@@ -339,6 +349,10 @@ class Config:
 
             Config.is_primary = 
Config.read_property(constants.CLUSTERING_PRIMARY_KEY, False)
 
+            Config.mb_username = Config.read_property(constants.MB_USERNAME, 
False)
+            Config.mb_password = Config.read_property(constants.MB_PASSWORD, 
False)
+            Config.mb_urls = Config.read_property(constants.MB_URLS).split(",")
+            Config.mb_publisher_timeout = 
int(Config.read_property(constants.MB_PUBLISHER_TIMEOUT))
         except ParameterNotFoundException as ex:
             raise RuntimeError(ex)
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
index 8c7a7b0..cd2ce36 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
@@ -19,10 +19,10 @@ PARAM_FILE_PATH = "param.file.path"
 PLUGINS_DIR = "plugins.dir"
 EXTENSIONS_DIR = "extensions.dir"
 
-MB_IP = "mb.ip"
-MB_PORT = "mb.port"
+MB_URLS = "mb.urls"
 MB_USERNAME = "mb.username"
 MB_PASSWORD = "mb.password"
+MB_PUBLISHER_TIMEOUT = "mb.publisher.timeout"
 
 CARTRIDGE_KEY = "CARTRIDGE_KEY"
 APPLICATION_ID = "APPLICATION_ID"

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py
index d470d6e..f727414 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/asyncscheduledtask.py
@@ -69,4 +69,4 @@ class ScheduledExecutor(Thread):
         Terminate the scheduled task. Allow a maximum of 'delay' seconds to be 
terminated.
         :return: void
         """
-        self.terminated = True
\ No newline at end of file
+        self.terminated = True

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py
index f9a9bbb..ebd6889 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/cartridgeagentutils.py
@@ -31,6 +31,7 @@ current_milli_time = lambda: int(round(time.time() * 1000))
 BS = 16
 pad = lambda s: s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
 
+
 def decrypt_password(pass_str, secret):
     """
     Decrypts the given password using the given secret. The encryption is 
assumed to be done
@@ -74,18 +75,19 @@ def wait_until_ports_active(ip_address, ports, 
ports_check_timeout=600000):
     if ports_check_timeout is None:
         ports_check_timeout = 1000 * 60 * 10
 
-    log.debug("Port check timeout: %r" % ports_check_timeout)
+    log.debug("Port check timeout: %s" % ports_check_timeout)
 
     active = False
     start_time = current_milli_time()
     while not active:
-        log.info("Waiting for ports to be active: [ip] %r [ports] %r" % 
(ip_address, ports))
+        log.info("Waiting for ports to be active: [ip] %s [ports] %s" % 
(ip_address, ports))
         active = check_ports_active(ip_address, ports)
         end_time = current_milli_time()
         duration = end_time - start_time
 
         if duration > ports_check_timeout:
-            log.info("Port check timeout reached: [ip] %r [ports] %r [timeout] 
%r" % (ip_address, ports, ports_check_timeout))
+            log.info("Port check timeout reached: [ip] %s [ports] %s [timeout] 
%s"
+                     % (ip_address, ports, ports_check_timeout))
             return False
 
         time.sleep(5)
@@ -110,10 +112,39 @@ def check_ports_active(ip_address, ports):
         s.settimeout(5)
         try:
             s.connect((ip_address, int(port)))
-            log.debug("Port %r is active" % port)
+            log.debug("Port %s is active" % port)
             s.close()
         except socket.error:
-            log.debug("Port %r is not active" % port)
+            log.debug("Port %s is not active" % port)
             return False
 
-    return True
\ No newline at end of file
+    return True
+
+
+class IncrementalCeilingListIterator(object):
+    """
+    Iterates through a given list and returns elements. At the end of the list 
if terminate_at_end is set to false,
+    the last element will be returned repeatedly. If terminate_at_end is set 
to true, an IndexError will be thrown.
+    """
+
+    def __init__(self, intervals, terminate_at_end):
+        self.__intervals = intervals
+        self.__index = 0
+        self.__terminate_at_end = terminate_at_end
+
+    def get_next_retry_interval(self):
+        """
+        Retrieves the next element in the list.
+        :return:
+        :rtype: int
+        """
+        if self.__index < len(self.__intervals):
+            next_interval = self.__intervals[self.__index]
+            self.__index += 1
+        else:
+            if self.__terminate_at_end:
+                raise IndexError("Reached the end of the list")
+            else:
+                next_interval = self.__intervals[-1]
+
+        return next_interval

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/log.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/log.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/log.py
index 6a0804e..44c8655 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/log.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/modules/util/log.py
@@ -52,4 +52,4 @@ class LogFactory(object):
         :return: The logger class
         :rtype: RootLogger
         """
-        return self.instance.get_log(name)
\ No newline at end of file
+        return self.instance.get_log(name)

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
index 0994048..dcafe47 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/publisher.py
@@ -15,13 +15,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import threading
 import paho.mqtt.publish as publish
+import time
 
 import constants
 import healthstats
 from config import Config
 from modules.event.instance.status.events import *
 from modules.util import cartridgeagentutils
+from modules.util.cartridgeagentutils import IncrementalCeilingListIterator
 from modules.util.log import *
 
 log = LogFactory().get_log(__name__)
@@ -211,20 +214,55 @@ class EventPublisher:
 
     def __init__(self, topic):
         self.__topic = topic
+        self.__log = LogFactory().get_log(__name__)
+        self.__start_time = int(time.time())
 
     def publish(self, event):
-        mb_ip = Config.read_property(constants.MB_IP)
-        mb_port = Config.read_property(constants.MB_PORT)
-        mb_username = Config.read_property(constants.MB_USERNAME, False)
-        mb_password = Config.read_property(constants.MB_PASSWORD, False)
-        if mb_username is None:
+        publisher_thread = threading.Thread(target=self.__publish_event, 
args=(event,))
+        publisher_thread.start()
+
+    def __publish_event(self, event):
+        """
+        Publishes the given event to the message broker.
+
+        When a list of message brokers are given the event is published to the 
first message broker
+        available. Therefore the message brokers should share the data (ex: 
Sharing the KahaDB in ActiveMQ).
+
+        When the event cannot be published, it will be retried until the 
mb_publisher_timeout is exceeded.
+        This value is set in the agent.conf.
+
+        :param event:
+        :return: True if the event was published.
+        """
+        if Config.mb_username is None:
             auth = None
         else:
-            auth = {"username": mb_username, "password": mb_password}
+            auth = {"username": Config.mb_username, "password": 
Config.mb_password}
 
         payload = event.to_json()
-        publish.single(self.__topic,
-                       payload,
-                       hostname=mb_ip,
-                       port=mb_port,
-                       auth=auth)
+
+        retry_iterator = IncrementalCeilingListIterator([2, 2, 5, 5, 10, 10, 
20, 20, 30, 30, 40, 40, 50, 50, 60], False)
+
+        # Retry to publish the event until the timeout exceeds
+        while int(time.time()) - self.__start_time < 
(Config.mb_publisher_timeout * 1000):
+            retry_interval = retry_iterator.get_next_retry_interval()
+
+            for mb_url in Config.mb_urls:
+                mb_ip, mb_port = mb_url.split(":")
+
+                try:
+                    publish.single(self.__topic, payload, hostname=mb_ip, 
port=mb_port, auth=auth)
+                    self.__log.debug("Event published to %s:%s" % (mb_ip, 
mb_port))
+                    return True
+                except:
+                    self.__log.debug("Could not publish event to message 
broker %s:%s." % (mb_ip, mb_port))
+
+            self.__log.debug(
+                "Could not publish event to any of the provided message 
brokers. Retrying in %s seconds."
+                % retry_interval)
+
+            time.sleep(retry_interval)
+
+        self.__log.warn("Could not publish even to any of the provided message 
brokers before "
+                        "the timeout [%s] exceeded. The event will be 
dropped." % Config.mb_publisher_timeout)
+        return False

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
index 908a44c..90a9771 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/subscriber.py
@@ -20,6 +20,8 @@ import threading
 import paho.mqtt.client as mqtt
 
 from modules.util.log import LogFactory
+from modules.util.asyncscheduledtask import *
+from modules.util.cartridgeagentutils import IncrementalCeilingListIterator
 
 
 class EventSubscriber(threading.Thread):
@@ -28,36 +30,75 @@ class EventSubscriber(threading.Thread):
     register event handlers for various events.
     """
 
-    def __init__(self, topic, ip, port, username, password):
+    log = LogFactory().get_log(__name__)
+
+    def __init__(self, topic, urls, username, password):
         threading.Thread.__init__(self)
 
         self.__event_queue = Queue(maxsize=0)
         self.__event_executor = EventExecutor(self.__event_queue)
 
-        self.log = LogFactory().get_log(__name__)
-
         self.__mb_client = None
         self.__topic = topic
         self.__subscribed = False
-        self.__ip = ip
-        self.__port = port
+        self.__urls = urls
         self.__username = username
         self.__password = password
 
     def run(self):
         #  Start the event executor thread
         self.__event_executor.start()
-        self.__mb_client = mqtt.Client()
-        self.__mb_client.on_connect = self.on_connect
-        self.__mb_client.on_message = self.on_message
-        if self.__username is not None:
-            self.log.debug("Message broker credentials are... %s:%s" % 
(self.__username, self.__password))
-            self.__mb_client.username_pw_set(self.__username, self.__password)
 
-        self.log.debug("Connecting to the message broker with address %r:%r" % 
(self.__ip, self.__port))
-        self.__mb_client.connect(self.__ip, self.__port, 60)
-        self.__subscribed = True
-        self.__mb_client.loop_forever()
+        """
+        The following loop will iterate forever.
+
+        When a successful connection is made and the failover() method 
returns, a job will start
+        which will periodically check the availability of the connected 
message broker. Then the
+        blocking method loop_forever() will be called on the connected mqtt 
client. This will only
+        return if disconnect() is called on the same client. If the connected 
message broker goes
+        down, the periodical job will call disconnect() on the connected 
client and the
+        loop_forever() method will return. The parent loop will be called 
again and this repeats
+        every time the message brokers are disconnected.
+
+        This behavior guarantees that the subscriber is always subscribed to 
an available message
+        broker.
+
+        """
+        while True:
+            self.__mb_client = mqtt.Client()
+            self.__mb_client.on_connect = self.on_connect
+            self.__mb_client.on_message = self.on_message
+            if self.__username is not None:
+                EventSubscriber.log.info("Message broker credentials are 
provided.")
+                self.__mb_client.username_pw_set(self.__username, 
self.__password)
+
+            # Select an online message broker and connect
+            self.__mb_client, connected_mb_ip, connected_mb_port = \
+                EventSubscriber.failover(self.__urls, self.__mb_client)
+
+            EventSubscriber.log.info(
+                "Connected to the message broker with address %r:%r" % 
(connected_mb_ip, connected_mb_port))
+
+            self.__subscribed = True
+
+            # Start a job to periodically check the online status of the 
connected message broker
+            heartbeat_task = MessageBrokerHeartBeatChecker(
+                                                            self.__mb_client,
+                                                            connected_mb_ip,
+                                                            connected_mb_port,
+                                                            self.__username,
+                                                            self.__password)
+
+            heartbeat_job = ScheduledExecutor(5, heartbeat_task)
+            heartbeat_job.start()
+
+            # Start blocking loop method
+            self.__mb_client.loop_forever()
+
+            # Disconnected when the heart beat checker detected an offline 
message broker
+            self.__subscribed = False
+            heartbeat_job.terminate()
+            EventSubscriber.log.debug("Disconnected from the message broker 
%s:%s. Reconnecting..." % (connected_mb_ip, connected_mb_port))
 
     def register_handler(self, event, handler):
         """
@@ -68,15 +109,15 @@ class EventSubscriber(threading.Thread):
         :rtype: void
         """
         self.__event_executor.register_event_handler(event, handler)
-        self.log.debug("Registered handler for event %r" % event)
+        EventSubscriber.log.debug("Registered handler for event %r" % event)
 
     def on_connect(self, client, userdata, flags, rc):
-        self.log.debug("Connected to message broker.")
+        EventSubscriber.log.debug("Connected to message broker.")
         self.__mb_client.subscribe(self.__topic)
-        self.log.debug("Subscribed to %r" % self.__topic)
+        EventSubscriber.log.debug("Subscribed to %r" % self.__topic)
 
     def on_message(self, client, userdata, msg):
-        self.log.debug("Message received: %s:\n%s" % (msg.topic, msg.payload))
+        EventSubscriber.log.debug("Message received: %s:\n%s" % (msg.topic, 
msg.payload))
         self.__event_queue.put(msg)
 
     def is_subscribed(self):
@@ -87,6 +128,43 @@ class EventSubscriber(threading.Thread):
         """
         return self.__subscribed
 
+    @staticmethod
+    def failover(mb_urls, mb_client):
+        """
+        Iterate through the list of message brokers provided and connect to 
the first available server. This will not
+        return until a message broker connection is established.
+
+        :param mb_urls: the list of message broker URLS of format [host:port, 
host:port]
+        :param mb_client: the initialized message broker client object
+        :return: a tuple of the connected message broker client, connected 
message broker IP address and connected
+        message broker port
+
+        """
+        # Connection retry interval incrementer
+        message_broker_retry_timer = IncrementalCeilingListIterator(
+                                                            [2, 2, 5, 5, 10, 
10, 20, 20, 30, 30, 40, 40, 50, 50, 60],
+                                                            False)
+
+        # Cycling through the provided mb urls until forever
+        while True:
+            retry_interval = 
message_broker_retry_timer.get_next_retry_interval()
+
+            for mb_url in mb_urls:
+                mb_ip, mb_port = mb_url.split(":")
+                EventSubscriber.log.debug(
+                    "Trying to connect to the message broker with address 
%r:%r" % (mb_ip, mb_port))
+                try:
+                    mb_client.connect(mb_ip, mb_port, 60)
+                    return mb_client, mb_ip, mb_port
+                except:
+                    # The message broker didn't respond well
+                    EventSubscriber.log.debug("Could not connect to the 
message broker at %s:%s." % (mb_ip, mb_port))
+
+            EventSubscriber.log.debug(
+                "Could not connect to any of the message brokers provided. 
Retrying in %s seconds." % retry_interval)
+
+            time.sleep(retry_interval)
+
 
 class EventExecutor(threading.Thread):
     """
@@ -97,7 +175,7 @@ class EventExecutor(threading.Thread):
         self.__event_queue = event_queue
         # TODO: several handlers for one event
         self.__event_handlers = {}
-        self.log = LogFactory().get_log(__name__)
+        EventSubscriber.log = LogFactory().get_log(__name__)
 
     def run(self):
         while True:
@@ -106,16 +184,44 @@ class EventExecutor(threading.Thread):
             if event in self.__event_handlers:
                 handler = self.__event_handlers[event]
                 try:
-                    self.log.debug("Executing handler for event %r" % event)
+                    EventSubscriber.log.debug("Executing handler for event %r" 
% event)
                     handler(event_msg)
                 except:
-                    self.log.exception("Error processing %r event" % event)
+                    EventSubscriber.log.exception("Error processing %r event" 
% event)
             else:
 
-                self.log.debug("Event handler not found for event : %r" % 
event)
+                EventSubscriber.log.debug("Event handler not found for event : 
%r" % event)
 
     def register_event_handler(self, event, handler):
         self.__event_handlers[event] = handler
 
     def terminate(self):
         self.terminate()
+
+
+class MessageBrokerHeartBeatChecker(AbstractAsyncScheduledTask):
+    """
+    A scheduled task to periodically check if the connected message broker is 
online.
+    If the message broker goes offline, it will disconnect the currently 
connected
+    client object and it will return from the loop_forever() method.
+    """
+
+    def __init__(self, connected_client, mb_ip, mb_port, username=None, 
password=None):
+        self.__mb_client = mqtt.Client()
+
+        if username is not None:
+            self.__mb_client.username_pw_set(username, password)
+
+        self.__mb_ip = mb_ip
+        self.__mb_port = mb_port
+        self.__connected_client = connected_client
+        self.__log = LogFactory().get_log(__name__)
+
+    def execute_task(self):
+        try:
+            self.__mb_client.connect(self.__mb_ip, self.__mb_port, 60)
+            self.__mb_client.disconnect()
+        except:
+            self.__log.info(
+                "Message broker %s:%s cannot be reached. Disconnecting 
client..." % (self.__mb_ip, self.__mb_port))
+            self.__connected_client.disconnect()

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
new file mode 100644
index 0000000..30ff703
--- /dev/null
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/MessageBrokerHATestCase.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.python.cartridge.agent.integration.tests;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.common.domain.LoadBalancingIPType;
+import org.apache.stratos.messaging.domain.topology.*;
+import 
org.apache.stratos.messaging.event.instance.notifier.ArtifactUpdatedEvent;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.event.topology.MemberInitializedEvent;
+import org.testng.annotations.Test;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Created by chamilad on 11/11/15.
+ */
+public class MessageBrokerHATestCase extends PythonAgentIntegrationTest {
+    public MessageBrokerHATestCase() throws IOException {
+    }
+
+    private static final Log log = 
LogFactory.getLog(MessageBrokerHATestCase.class);
+    private static final int ADC_TIMEOUT = 300000;
+    private static final String CLUSTER_ID = "tomcat.domain";
+    private static final String DEPLOYMENT_POLICY_NAME = "deployment-policy-6";
+    private static final String AUTOSCALING_POLICY_NAME = 
"autoscaling-policy-6";
+    private static final String APP_ID = "application-6";
+    private static final String MEMBER_ID = "tomcat.member-1";
+    private static final String INSTANCE_ID = "instance-1";
+    private static final String CLUSTER_INSTANCE_ID = "cluster-1-instance-1";
+    private static final String NETWORK_PARTITION_ID = "network-partition-1";
+    private static final String PARTITION_ID = "partition-1";
+    private static final String TENANT_ID = "6";
+    private static final String SERVICE_NAME = "tomcat";
+
+
+    @BeforeMethod(alwaysRun = true)
+    public void setup() throws Exception {
+        System.setProperty("jndi.properties.dir", getTestCaseResourcesPath());
+//        integrationTestPropertiesPath = new FileInputStream(new 
File(getTestCaseResourcesPath() + PATH_SEP + "integration-test.properties"));
+
+        super.setup(ADC_TIMEOUT);
+        startServerSocket(8080);
+    }
+    
+    @AfterMethod(alwaysRun = true)
+    public void tearDownBrokerHATest(){
+        tearDown();
+    }
+
+    @Test(groups = {"test"})
+    public void testBrokerFailoverHeartbeat(){
+        startCommunicatorThread();
+        sleep(10000);
+//        assertAgentActivation();
+
+        // take down the default broker
+        log.info("Stopping subscribed message broker: DEFAULT");
+        stopActiveMQInstance("testBroker-" + amqpBindPorts[0] + "-" + 
mqttBindPorts[0]);
+
+        List<String> outputLines = new ArrayList<>();
+        boolean exit = false;
+        while (!exit) {
+            List<String> newLines = getNewLines(outputLines, 
outputStream.toString());
+            if (newLines.size() > 0) {
+                for (String line : newLines) {
+                    if (line.contains("Message broker localhost:" + 
mqttBindPorts[0] + " cannot be reached. Disconnecting client...")) {
+                        log.info("Message Broker Heartbeat checker has 
detected message broker node termination and is trying the next option.");
+                        exit = true;
+                    }
+                }
+            }
+            sleep(1000);
+        }
+
+        sleep(10000);
+        log.info("Stopping subscribed message broker");
+        stopActiveMQInstance("testBroker-" + amqpBindPorts[1] + "-" + 
mqttBindPorts[1]);
+
+        exit = false;
+        while (!exit) {
+            List<String> newLines = getNewLines(outputLines, 
outputStream.toString());
+            if (newLines.size() > 0) {
+                for (String line : newLines) {
+                    if (line.contains("Message broker localhost:" + 
mqttBindPorts[1] + " cannot be reached. Disconnecting client...")) {
+                        log.info("Message Broker Heartbeat checker has 
detected message broker node termination and is trying the next option.");
+                        exit = true;
+                    }
+                }
+            }
+            sleep(1000);
+        }
+
+        sleep(20000);
+        log.info("Stopping subscribed message broker");
+        stopActiveMQInstance("testBroker-" + amqpBindPorts[2] + "-" + 
mqttBindPorts[2]);
+
+        exit = false;
+        while (!exit) {
+            List<String> newLines = getNewLines(outputLines, 
outputStream.toString());
+            if (newLines.size() > 0) {
+                for (String line : newLines) {
+                    if (line.contains("Message broker localhost:" + 
mqttBindPorts[2] + " cannot be reached. Disconnecting client...")) {
+                        log.info("Message Broker Heartbeat checker has 
detected message broker node termination and is trying the next option.");
+                    }
+                    if (line.contains("Could not connect to any of the message 
brokers provided. Retrying in 2 seconds")) {
+                        log.info("Failover went through all the options and 
will be retrying.");
+                        exit = true;
+                    }
+                }
+            }
+            sleep(1000);
+        }
+    }
+    
+    @Test(groups = {"smoke"})
+    public void testBrokerFailoverForPublisher(){
+        startCommunicatorThread();
+
+
+        List<String> outputLines = new ArrayList<>();
+        boolean exit = false;
+        while (!exit) {
+            List<String> newLines = getNewLines(outputLines, 
outputStream.toString());
+            if (newLines.size() > 0) {
+                for (String line : newLines) {
+                    if (line.contains("Subscribed to 'topology/#'")) {
+                        // take down the default broker
+                        stopActiveMQInstance("testBroker-" + amqpBindPorts[0] 
+ "-" + mqttBindPorts[0]);
+                    }
+
+                    if (line.contains("Waiting for complete topology event")) {
+
+                        sleep(4000);
+
+//                        stopActiveMQInstance("testBroker2");
+//                        stopActiveMQInstance("testBroker3");
+                        // Send complete topology event
+                        log.info("Publishing complete topology event...");
+                        Topology topology = createTestTopology();
+                        CompleteTopologyEvent completeTopologyEvent = new 
CompleteTopologyEvent(topology);
+                        publishEvent(completeTopologyEvent);
+                        log.info("Complete topology event published");
+                    }
+
+                    if (line.contains("Waiting for cartridge agent to be 
initialized")) {
+                        // Publish member initialized event
+                        log.info("Publishing member initialized event...");
+                        MemberInitializedEvent memberInitializedEvent = new 
MemberInitializedEvent(
+                                SERVICE_NAME, CLUSTER_ID, CLUSTER_INSTANCE_ID, 
MEMBER_ID, NETWORK_PARTITION_ID,
+                                PARTITION_ID, INSTANCE_ID
+                        );
+                        publishEvent(memberInitializedEvent);
+                        log.info("Member initialized event published");
+                    }
+
+
+                    // Send artifact updated event to activate the instance 
first
+                    if (line.contains("Artifact repository found")) {
+                        publishEvent(getArtifactUpdatedEventForPublicRepo());
+                        log.info("Artifact updated event published");
+                    }
+
+                    if (line.contains("Could not publish event to message 
broker localhost:1885.")) {
+                        log.info("Event publishing to default message broker 
failed and the next option is tried.");
+                        exit = true;
+                    }
+
+//                    if (line.contains("The event will be dropped.")) {
+//                        log.info("Event publishing failed after timeout 
exceeded and the event was dropped.");
+//                        exit = true;
+//                    }
+                }
+            }
+            sleep(1000);
+        }
+
+//        assertAgentActivation();
+    }
+
+    private void assertAgentActivation() {
+        Thread startupTestThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                while (!eventReceiverInitiated) {
+                    sleep(1000);
+                }
+                List<String> outputLines = new ArrayList<>();
+                boolean completeTopologyPublished = false;
+                boolean memberInitPublished = false;
+                while (!outputStream.isClosed()) {
+                    List<String> newLines = getNewLines(outputLines, 
outputStream.toString());
+                    if (newLines.size() > 0) {
+                        for (String line : newLines) {
+                            if (line.contains("Waiting for complete topology 
event") && !completeTopologyPublished) {
+                                sleep(2000);
+                                // Send complete topology event
+                                log.info("Publishing complete topology 
event...");
+                                Topology topology = createTestTopology();
+                                CompleteTopologyEvent completeTopologyEvent = 
new CompleteTopologyEvent(topology);
+                                publishEvent(completeTopologyEvent);
+                                log.info("Complete topology event published");
+                                completeTopologyPublished = true;
+                            }
+
+                            if (line.contains("Waiting for cartridge agent to 
be initialized") && !memberInitPublished) {
+                                // Publish member initialized event
+                                log.info("Publishing member initialized 
event...");
+                                MemberInitializedEvent memberInitializedEvent 
= new MemberInitializedEvent(
+                                        SERVICE_NAME, CLUSTER_ID, 
CLUSTER_INSTANCE_ID, MEMBER_ID, NETWORK_PARTITION_ID,
+                                        PARTITION_ID, INSTANCE_ID
+                                );
+                                publishEvent(memberInitializedEvent);
+                                log.info("Member initialized event published");
+                                memberInitPublished = true;
+                            }
+
+                            // Send artifact updated event to activate the 
instance first
+                            if (line.contains("Artifact repository found")) {
+                                
publishEvent(getArtifactUpdatedEventForPublicRepo());
+                                log.info("Artifact updated event published");
+                            }
+                        }
+                    }
+                    sleep(1000);
+                }
+            }
+        });
+        startupTestThread.start();
+
+        while (!instanceStarted || !instanceActivated) {
+            // wait until the instance activated event is received.
+            // this will assert whether instance got activated within timeout 
period; no need for explicit assertions
+            sleep(2000);
+        }
+    }
+
+    private ArtifactUpdatedEvent getArtifactUpdatedEventForPublicRepo() {
+        ArtifactUpdatedEvent publicRepoEvent = 
createTestArtifactUpdatedEvent();
+        
publicRepoEvent.setRepoURL("https://bitbucket.org/testapache2211/opentestrepo1.git";);
+        return publicRepoEvent;
+    }
+
+    private static ArtifactUpdatedEvent createTestArtifactUpdatedEvent() {
+        ArtifactUpdatedEvent artifactUpdatedEvent = new ArtifactUpdatedEvent();
+        artifactUpdatedEvent.setClusterId(CLUSTER_ID);
+        artifactUpdatedEvent.setTenantId(TENANT_ID);
+        return artifactUpdatedEvent;
+    }
+
+    /**
+     * Create test topology
+     *
+     * @return
+     */
+    private Topology createTestTopology() {
+        Topology topology = new Topology();
+        Service service = new Service(SERVICE_NAME, ServiceType.SingleTenant);
+        topology.addService(service);
+
+        Cluster cluster = new Cluster(service.getServiceName(), CLUSTER_ID, 
DEPLOYMENT_POLICY_NAME,
+                AUTOSCALING_POLICY_NAME, APP_ID);
+        service.addCluster(cluster);
+
+        Member member = new Member(service.getServiceName(), 
cluster.getClusterId(), MEMBER_ID,
+                CLUSTER_INSTANCE_ID, NETWORK_PARTITION_ID, PARTITION_ID, 
LoadBalancingIPType.Private,
+                System.currentTimeMillis());
+
+        member.setDefaultPrivateIP("10.0.0.1");
+        member.setDefaultPublicIP("20.0.0.1");
+        Properties properties = new Properties();
+        properties.setProperty("prop1", "value1");
+        member.setProperties(properties);
+        member.setStatus(MemberStatus.Created);
+        cluster.addMember(member);
+
+        return topology;
+    }
+}

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
index 3c7661b..fb12db6 100644
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/java/org/apache/stratos/python/cartridge/agent/integration/tests/PythonAgentIntegrationTest.java
@@ -49,27 +49,30 @@ import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
 
 public class PythonAgentIntegrationTest {
-    protected final Properties integrationProperties = new Properties();
-    public static final String PATH_SEP = File.separator;
-    private static final Log log = 
LogFactory.getLog(PythonAgentIntegrationTest.class);
-    protected BrokerService broker;
 
+    public static final String PATH_SEP = File.separator;
     public static final String NEW_LINE = System.getProperty("line.separator");
-    public static final String ACTIVEMQ_AMQP_BIND_ADDRESS = 
"activemq.amqp.bind.address";
-    public static final String ACTIVEMQ_MQTT_BIND_ADDRESS = 
"activemq.mqtt.bind.address";
+
+    public static final String ACTIVEMQ_AMQP_BIND_PORTS = 
"activemq.amqp.bind.ports";
+    public static final String ACTIVEMQ_MQTT_BIND_PORTS = 
"activemq.mqtt.bind.ports";
     public static final String CEP_PORT = "cep.server.one.port";
     public static final String CEP_SSL_PORT = "cep.server.one.ssl.port";
     public static final String DISTRIBUTION_NAME = "distribution.name";
+
+    private static final Log log = 
LogFactory.getLog(PythonAgentIntegrationTest.class);
+
     public static final String TEST_THREAD_POOL_SIZE = "test.thread.pool.size";
     protected final UUID PYTHON_AGENT_DIR_NAME = UUID.randomUUID();
+//    protected final String defaultBrokerName = "testBrokerDefault";
+    protected final Properties integrationProperties = new Properties();
 
     protected Map<Integer, ServerSocket> serverSocketMap = new HashMap<>();
     protected Map<String, Executor> executorList = new HashMap<>();
 
     protected int cepPort;
     protected int cepSSLPort;
-    protected String amqpBindAddress;
-    protected String mqttBindAddress;
+    protected String[] amqpBindPorts;
+    protected String[] mqttBindPorts;
     protected String distributionName;
     protected int testThreadPoolSize;
 
@@ -82,12 +85,38 @@ public class PythonAgentIntegrationTest {
     protected ByteArrayOutputStreamLocal outputStream;
     protected ThriftTestServer thriftTestServer;
 
+    private Map<String, BrokerService> messageBrokers;
+
+
     /**
      * Setup method for test method testPythonCartridgeAgent
      */
     protected void setup(int timeout) throws Exception {
+        messageBrokers = new HashMap<>();
+
+        distributionName = 
integrationProperties.getProperty(DISTRIBUTION_NAME);
+
+        cepPort = 
Integer.parseInt(integrationProperties.getProperty(CEP_PORT));
+        cepSSLPort = 
Integer.parseInt(integrationProperties.getProperty(CEP_SSL_PORT));
+
+        Properties jndiProperties = new Properties();
+        jndiProperties.load(new FileInputStream(new 
File(System.getProperty("jndi.properties.dir") + PATH_SEP + 
"jndi.properties")));
+        if (!jndiProperties.containsKey(ACTIVEMQ_AMQP_BIND_PORTS) || 
!jndiProperties.containsKey(ACTIVEMQ_MQTT_BIND_PORTS)) {
+            amqpBindPorts = 
integrationProperties.getProperty(ACTIVEMQ_AMQP_BIND_PORTS).split(",");
+            mqttBindPorts = 
integrationProperties.getProperty(ACTIVEMQ_MQTT_BIND_PORTS).split(",");
+        }else{
+            amqpBindPorts = 
jndiProperties.getProperty(ACTIVEMQ_AMQP_BIND_PORTS).split(",");
+            mqttBindPorts = 
jndiProperties.getProperty(ACTIVEMQ_MQTT_BIND_PORTS).split(",");
+        }
+
+        if (amqpBindPorts.length != mqttBindPorts.length) {
+            throw new RuntimeException("The number of AMQP ports and MQTT 
ports should be equal in integration-test.properties.");
+        }
+
         // start ActiveMQ test server
-        startBroker();
+        for (int i = 0; i < amqpBindPorts.length; i++){
+            startActiveMQInstance(Integer.parseInt(amqpBindPorts[i]), 
Integer.parseInt(mqttBindPorts[i]), true);
+        }
 
         if (!this.eventReceiverInitiated) {
             ExecutorService executorService = 
StratosThreadPool.getExecutorService("TEST_THREAD_POOL", testThreadPoolSize);
@@ -193,45 +222,86 @@ public class PythonAgentIntegrationTest {
 
         this.instanceActivated = false;
         this.instanceStarted = false;
-        try {
-            broker.stop();
-            broker = null;
-        } catch (Exception ignore) {
+
+        // stop the broker services
+        for (Map.Entry<String, BrokerService> entry : 
this.messageBrokers.entrySet()) {
+            try {
+                    log.debug("Stopping broker service [" + entry.getKey() + 
"]");
+                entry.getValue().stop();
+            } catch (Exception ignore) {
+            }
         }
+
+        this.messageBrokers = null;
+
         // TODO: use thread synchronization and assert all connections are 
properly closed
         // leave some room to clear up active connections
         sleep(1000);
     }
 
     public PythonAgentIntegrationTest() throws IOException {
-        integrationProperties
-                
.load(PythonAgentIntegrationTest.class.getResourceAsStream(PATH_SEP + 
"integration-test.properties"));
+        integrationProperties.load(
+                PythonAgentIntegrationTest.class.getResourceAsStream(PATH_SEP 
+ "integration-test.properties"));
         distributionName = 
integrationProperties.getProperty(DISTRIBUTION_NAME);
-        amqpBindAddress = 
integrationProperties.getProperty(ACTIVEMQ_AMQP_BIND_ADDRESS);
-        mqttBindAddress = 
integrationProperties.getProperty(ACTIVEMQ_MQTT_BIND_ADDRESS);
         cepPort = 
Integer.parseInt(integrationProperties.getProperty(CEP_PORT));
         cepSSLPort = 
Integer.parseInt(integrationProperties.getProperty(CEP_SSL_PORT));
         testThreadPoolSize = 
Integer.parseInt(integrationProperties.getProperty(TEST_THREAD_POOL_SIZE));
         log.info("PCA integration properties: " + 
integrationProperties.toString());
     }
 
-    protected void startBroker() throws Exception {
+    protected String startActiveMQInstance(int amqpPort, int mqttPort, boolean 
secured) throws Exception {
+
+        try {
+            ServerSocket serverSocket = new ServerSocket(amqpPort);
+            serverSocket.close();
+        } catch (IOException e) {
+            throw new RuntimeException("AMQP port " + amqpPort + " is already 
in use.", e);
+        }
+
+        try {
+            ServerSocket serverSocket = new ServerSocket(mqttPort);
+            serverSocket.close();
+        } catch (IOException e) {
+            throw new RuntimeException("MQTT port " + mqttPort + " is already 
in use.", e);
+        }
+
         System.setProperty("mb.username", "system");
         System.setProperty("mb.password", "manager");
 
-        broker = new BrokerService();
-        broker.addConnector(amqpBindAddress);
-        broker.addConnector(mqttBindAddress);
-        AuthenticationUser authenticationUser = new 
AuthenticationUser("system", "manager", "users,admins");
-        List<AuthenticationUser> authUserList = new ArrayList<>();
-        authUserList.add(authenticationUser);
-        broker.setPlugins(new BrokerPlugin[]{new 
SimpleAuthenticationPlugin(authUserList)});
-        broker.setBrokerName("testBroker");
+        String brokerName = "testBroker-" + amqpPort + "-" + mqttPort;
+
+        log.info("Starting an ActiveMQ instance");
+        BrokerService broker = new BrokerService();
+        broker.addConnector("tcp://localhost:" + (amqpPort));
+        broker.addConnector("mqtt://localhost:" + (mqttPort));
+
+        if (secured) {
+            AuthenticationUser authenticationUser = new 
AuthenticationUser("system", "manager", "users,admins");
+            List<AuthenticationUser> authUserList = new ArrayList<>();
+            authUserList.add(authenticationUser);
+            broker.setPlugins(new BrokerPlugin[]{new 
SimpleAuthenticationPlugin(authUserList)});
+        }
+
+        broker.setBrokerName(brokerName);
         broker.setDataDirectory(
                 
PythonAgentIntegrationTest.class.getResource(PATH_SEP).getPath() + PATH_SEP + 
".." + PATH_SEP +
-                        PYTHON_AGENT_DIR_NAME + PATH_SEP + "activemq-data");
+                        PYTHON_AGENT_DIR_NAME + PATH_SEP + "activemq-data-" + 
brokerName);
         broker.start();
-        log.info("Broker service started!");
+        this.messageBrokers.put(brokerName, broker);
+        log.info("ActiveMQ Broker service [" + brokerName + "] started! [AMQP] 
" + amqpPort + " [MQTT] " + mqttPort);
+
+        return brokerName;
+    }
+
+    protected void stopActiveMQInstance(String brokerName){
+        if (this.messageBrokers.containsKey(brokerName)){
+            log.debug("Stopping broker service [" + brokerName + "]");
+            BrokerService broker = this.messageBrokers.get(brokerName);
+            try {
+                broker.stop();
+            } catch (Exception ignore) {
+            }
+        }
     }
 
     protected void startCommunicatorThread() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCExtensionTestCase/agent.conf
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCExtensionTestCase/agent.conf
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCExtensionTestCase/agent.conf
index d01a246..894a254 100755
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCExtensionTestCase/agent.conf
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCExtensionTestCase/agent.conf
@@ -16,8 +16,7 @@
 # under the License.
 
 [agent]
-mb.ip                                 =localhost
-mb.port                               =1885
+mb.urls                               =localhost:1885
 mb.username                           =system
 mb.password                           =manager
 listen.address                        =localhost

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTenantUserTestCase/agent.conf
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTenantUserTestCase/agent.conf
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTenantUserTestCase/agent.conf
index b5efb1c..00eb956 100755
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTenantUserTestCase/agent.conf
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTenantUserTestCase/agent.conf
@@ -16,8 +16,7 @@
 # under the License.
 
 [agent]
-mb.ip                                 =localhost
-mb.port                               =1885
+mb.urls                               =localhost:1885
 mb.username                           =system
 mb.password                           =manager
 listen.address                        =localhost

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTestCase/agent.conf
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTestCase/agent.conf
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTestCase/agent.conf
index 7362697..644cd13 100755
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTestCase/agent.conf
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCMTAppTestCase/agent.conf
@@ -16,8 +16,7 @@
 # under the License.
 
 [agent]
-mb.ip                                 =localhost
-mb.port                               =1885
+mb.urls                               =localhost:1885
 mb.username                           =system
 mb.password                           =manager
 listen.address                        =localhost

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCTestCase/agent.conf
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCTestCase/agent.conf
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCTestCase/agent.conf
index d01a246..894a254 100755
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCTestCase/agent.conf
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/ADCTestCase/agent.conf
@@ -16,8 +16,7 @@
 # under the License.
 
 [agent]
-mb.ip                                 =localhost
-mb.port                               =1885
+mb.urls                               =localhost:1885
 mb.username                           =system
 mb.password                           =manager
 listen.address                        =localhost

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentStartupTestCase/agent.conf
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentStartupTestCase/agent.conf
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentStartupTestCase/agent.conf
index 35d462b..afcd2f2 100755
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentStartupTestCase/agent.conf
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/AgentStartupTestCase/agent.conf
@@ -16,10 +16,10 @@
 # under the License.
 
 [agent]
-mb.ip                                 =localhost
-mb.port                               =1885
+mb.urls                               =localhost:1885
 mb.username                           =system
 mb.password                           =manager
+mb.publisher.timeout                  =900
 listen.address                        =localhost
 thrift.receiver.urls                  =localhost:7712
 thrift.server.admin.username          =admin

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/CEPHAModeTestCase/agent.conf
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/CEPHAModeTestCase/agent.conf
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/CEPHAModeTestCase/agent.conf
index e982bc3..e0a8c23 100755
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/CEPHAModeTestCase/agent.conf
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/CEPHAModeTestCase/agent.conf
@@ -16,10 +16,10 @@
 # under the License.
 
 [agent]
-mb.ip                                 =localhost
-mb.port                               =1885
+mb.urls                               =localhost:1885
 mb.username                           =system
 mb.password                           =manager
+mb.publisher.timeout                  =900
 listen.address                        =localhost
 thrift.receiver.urls                  =localhost:7712,localhost:7713
 thrift.server.admin.username          =admin

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/agent.conf
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/agent.conf
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/agent.conf
new file mode 100644
index 0000000..8395bfd
--- /dev/null
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/agent.conf
@@ -0,0 +1,46 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[agent]
+mb.urls                               
=localhost:1885,localhost:1886,localhost:1887
+mb.username                           =system
+mb.password                           =manager
+mb.publisher.timeout                  =20
+listen.address                        =localhost
+thrift.receiver.urls                  =localhost:7712
+thrift.server.admin.username          =admin
+thrift.server.admin.password          =admin
+cep.stats.publisher.enabled           =true
+lb.private.ip                         =
+lb.public.ip                          =
+enable.artifact.update                =true
+auto.commit                           =true
+auto.checkout                         =true
+artifact.update.interval              =15
+artifact.clone.retries                =5
+artifact.clone.interval               =10
+port.check.timeout                    =600000
+enable.data.publisher                 =false
+monitoring.server.ip                  =localhost
+monitoring.server.port                =7612
+monitoring.server.secure.port         =7712
+monitoring.server.admin.username      =admin
+monitoring.server.admin.password      =admin
+log.file.paths                        =/tmp/agent.screen-adc-test.log
+metadata.service.url                  =https://localhost:9443
+super.tenant.repository.path          =/repository/deployment/server/
+tenant.repository.path                =/repository/tenants/

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/jndi.properties
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/jndi.properties
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/jndi.properties
new file mode 100644
index 0000000..485fd75
--- /dev/null
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/jndi.properties
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+connectionfactoryName=TopicConnectionFactory
+java.naming.provider.url=failover:(tcp://localhost:61617,tcp://localhost:61618,tcp://localhost:61619)?initialReconnectDelay=100
+java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory
+java.naming.security.principal=system
+java.naming.security.credentials=manager
+activemq.amqp.bind.ports=61617,61618,61619
+activemq.mqtt.bind.ports=1885,1886,1887
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/logging.ini
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/logging.ini
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/logging.ini
new file mode 100644
index 0000000..15cad9b
--- /dev/null
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/logging.ini
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+[formatters]
+keys=default
+
+[formatter_default]
+format=[%(asctime)s] %(levelname)s {%(filename)s:%(funcName)s} - %(message)s
+class=logging.Formatter
+
+[handlers]
+keys=console, error_file, log_file
+
+[handler_console]
+class=logging.StreamHandler
+formatter=default
+args=tuple()
+
+[handler_log_file]
+class=logging.FileHandler
+level=DEBUG
+formatter=default
+args=("agent.log", "w")
+
+[handler_error_file]
+class=logging.FileHandler
+level=ERROR
+formatter=default
+args=("error.log", "w")
+
+[loggers]
+keys=root
+
+[logger_root]
+level=DEBUG
+formatter=default
+handlers=console,error_file,log_file
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/payload/launch-params
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/payload/launch-params
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/payload/launch-params
new file mode 100644
index 0000000..3498976
--- /dev/null
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/MessageBrokerHATestCase/payload/launch-params
@@ -0,0 +1 @@
+APPLICATION_ID=application1,SERVICE_NAME=tomcat,HOST_NAME=tomcat.stratos.org,MULTITENANT=false,TENANT_ID=-1234,TENANT_RANGE=*,CARTRIDGE_ALIAS=tomcat,CLUSTER_ID=tomcat.domain,CLUSTER_INSTANCE_ID=cluster-1-instance-1,CARTRIDGE_KEY=PUjpXCLujDhYr5A6,DEPLOYMENT=default,PORTS=8080,PUPPET_IP=127.0.0.1,PUPPET_HOSTNAME=puppet.apache.stratos.org,PUPPET_ENV=false,MEMBER_ID=tomcat.member-1,LB_CLUSTER_ID=null,NETWORK_PARTITION_ID=network-p1,PARTITION_ID=p1,APPLICATION_PATH=/tmp/ADCTestCase,MIN_COUNT=1,INTERNAL=false,CLUSTERING_PRIMARY_KEY=A,LOG_FILE_PATHS=/tmp/temp.log,PERSISTENCE_MAPPING=null
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/9c57c7d8/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-conf/integration-test.properties
----------------------------------------------------------------------
diff --git 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-conf/integration-test.properties
 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-conf/integration-test.properties
index 5a98dbe..4e5c66f 100755
--- 
a/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-conf/integration-test.properties
+++ 
b/products/python-cartridge-agent/modules/integration/test-integration/src/test/resources/test-conf/integration-test.properties
@@ -18,8 +18,8 @@
 # Stratos distribution properties added via filters during the build
 distribution.version=${project.version}
 
distribution.name=${python.cartridge.agent.distribution.name}-${project.version}
-activemq.amqp.bind.address=tcp://localhost:61617
-activemq.mqtt.bind.address=mqtt://localhost:1885
+activemq.amqp.bind.ports=61617
+activemq.mqtt.bind.ports=1885
 cep.server.one.port=7612
 cep.server.two.port=7613
 cep.server.one.ssl.port=7712

Reply via email to