Changing python agent to publish thrift events to multiple CEP servers

Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/82ac4d72
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/82ac4d72
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/82ac4d72

Branch: refs/heads/stratos-4.1.x
Commit: 82ac4d72f009677e915c9fc52b35bd1213703850
Parents: 8ad1f6e
Author: anuruddhal <[email protected]>
Authored: Wed Sep 16 14:53:05 2015 +0530
Committer: anuruddhal <[email protected]>
Committed: Fri Sep 18 09:53:18 2015 +0530

----------------------------------------------------------------------
 .../cartridge.agent/cartridge.agent/agent.conf  |  3 +-
 .../cartridge.agent/constants.py                |  1 +
 .../cartridge.agent/healthstats.py              | 67 ++++++++++++--------
 3 files changed, 42 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/82ac4d72/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf
index 22d1deb..926ea96 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf
@@ -19,8 +19,7 @@
 mb.ip                                 =MB-IP
 mb.port                               =MB-PORT
 listen.address                        =LISTEN_ADDR
-thrift.receiver.ip                    =CEP-IP
-thrift.receiver.port                  =CEP-PORT
+thrift.receiver.urls                  =CEP-URLS
 thrift.server.admin.username          =CEP-ADMIN-USERNAME
 thrift.server.admin.password          =CEP-ADMIN-PASSWORD
 cep.stats.publisher.enabled           =ENABLE_HEALTH_PUBLISHER

http://git-wip-us.apache.org/repos/asf/stratos/blob/82ac4d72/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
index 5510659..e840057 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py
@@ -123,6 +123,7 @@ PORT_CHECK_TIMEOUT = "port.check.timeout"
 CEP_PUBLISHER_ENABLED = "cep.stats.publisher.enabled"
 CEP_RECEIVER_IP = "thrift.receiver.ip"
 CEP_RECEIVER_PORT = "thrift.receiver.port"
+CEP_RECEIVER_URLS = "thrift.receiver.urls"
 CEP_SERVER_ADMIN_USERNAME = "thrift.server.admin.username"
 CEP_SERVER_ADMIN_PASSWORD = "thrift.server.admin.password"
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/82ac4d72/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
----------------------------------------------------------------------
diff --git 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
index f661294..71e6d2f 100644
--- 
a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
+++ 
b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py
@@ -69,7 +69,7 @@ class HealthStatisticsPublisherManager(Thread):
                 self.log.exception("Couldn't publish health statistics to CEP. 
Thrift Receiver offline. Reconnecting...")
                 self.publisher = HealthStatisticsPublisher()
 
-        self.publisher.publisher.disconnect()
+        self.publisher.disconnect_publisher()
 
 
 class HealthStatisticsPublisher:
@@ -94,38 +94,44 @@ class HealthStatisticsPublisher:
         return conf_value
 
     def __init__(self):
-        self.ports = []
-        cep_port = 
HealthStatisticsPublisher.read_config(constants.CEP_RECEIVER_PORT)
-        self.ports.append(cep_port)
 
-        cep_ip = 
HealthStatisticsPublisher.read_config(constants.CEP_RECEIVER_IP)
+        self.publishers = []
+        cep_admin_username = 
HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_USERNAME)
+        cep_admin_password = 
HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_PASSWORD)
+        # 1.1.1.1:1883,2.2.2.2:1883
+        cep_urls = 
HealthStatisticsPublisher.read_config(constants.CEP_RECEIVER_URLS);
+        cep_urls = cep_urls.split(',')
+        for cep_url in cep_urls:
+            self.ports = []
+            cep_ip = cep_url.split(':')[0]
+            cep_port = cep_url.split(':')[1]
+            self.ports.append(cep_port)
+            cartridgeagentutils.wait_until_ports_active(
+                cep_ip,
+                self.ports,
+                int(Config.read_property("port.check.timeout", 
critical=False)))
 
-        cartridgeagentutils.wait_until_ports_active(
-            cep_ip,
-            self.ports,
-            int(Config.read_property("port.check.timeout", critical=False)))
+            cep_active = cartridgeagentutils.check_ports_active(
+                cep_ip,
+                self.ports)
 
-        cep_active = cartridgeagentutils.check_ports_active(
-            cep_ip,
-            self.ports)
+            if not cep_active:
+                raise CEPPublisherException("CEP server not active. Health 
statistics publishing aborted.")
 
-        if not cep_active:
-            raise CEPPublisherException("CEP server not active. Health 
statistics publishing aborted.")
 
-        cep_admin_username = 
HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_USERNAME)
-        cep_admin_password = 
HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_PASSWORD)
+            self.stream_definition = 
HealthStatisticsPublisher.create_stream_definition()
+            HealthStatisticsPublisher.log.debug("Stream definition created: 
%r" % str(self.stream_definition))
 
-        self.stream_definition = 
HealthStatisticsPublisher.create_stream_definition()
-        HealthStatisticsPublisher.log.debug("Stream definition created: %r" % 
str(self.stream_definition))
+            publisher = ThriftPublisher(
+                cep_ip,
+                cep_port,
+                cep_admin_username,
+                cep_admin_password,
+                self.stream_definition)
 
-        self.publisher = ThriftPublisher(
-            cep_ip,
-            cep_port,
-            cep_admin_username,
-            cep_admin_password,
-            self.stream_definition)
+            self.publishers.append(publisher)
 
-        HealthStatisticsPublisher.log.debug("HealthStatisticsPublisher 
initialized")
+            HealthStatisticsPublisher.log.debug("HealthStatisticsPublisher 
initialized")
 
     @staticmethod
     def create_stream_definition():
@@ -171,7 +177,7 @@ class HealthStatisticsPublisher:
                                                 event.payloadData,
                                                 
self.stream_definition.version))
 
-        self.publisher.publish(event)
+        self.publish_event(self.publishers, event)
 
     def publish_load_average(self, load_avg):
         """
@@ -195,8 +201,15 @@ class HealthStatisticsPublisher:
                                                 event.payloadData,
                                                 
self.stream_definition.version))
 
-        self.publisher.publish(event)
+        self.publish_event(event)
+
+    def publish_event(self, event):
+        for publisher in self.publishers:
+            publisher.publish(event)
 
+    def disconnect_publisher(self, publishers):
+        for publisher in self.publishers:
+            publisher.disconnect()
 
 class DefaultHealthStatisticsReader:
     """

Reply via email to