Changing python agent to publish thrift events to multiple CEP servers
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/82ac4d72 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/82ac4d72 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/82ac4d72 Branch: refs/heads/stratos-4.1.x Commit: 82ac4d72f009677e915c9fc52b35bd1213703850 Parents: 8ad1f6e Author: anuruddhal <[email protected]> Authored: Wed Sep 16 14:53:05 2015 +0530 Committer: anuruddhal <[email protected]> Committed: Fri Sep 18 09:53:18 2015 +0530 ---------------------------------------------------------------------- .../cartridge.agent/cartridge.agent/agent.conf | 3 +- .../cartridge.agent/constants.py | 1 + .../cartridge.agent/healthstats.py | 67 ++++++++++++-------- 3 files changed, 42 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/82ac4d72/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf index 22d1deb..926ea96 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/agent.conf @@ -19,8 +19,7 @@ mb.ip =MB-IP mb.port =MB-PORT listen.address =LISTEN_ADDR -thrift.receiver.ip =CEP-IP -thrift.receiver.port =CEP-PORT +thrift.receiver.urls =CEP-URLS thrift.server.admin.username =CEP-ADMIN-USERNAME thrift.server.admin.password =CEP-ADMIN-PASSWORD cep.stats.publisher.enabled =ENABLE_HEALTH_PUBLISHER http://git-wip-us.apache.org/repos/asf/stratos/blob/82ac4d72/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py index 5510659..e840057 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/constants.py @@ -123,6 +123,7 @@ PORT_CHECK_TIMEOUT = "port.check.timeout" CEP_PUBLISHER_ENABLED = "cep.stats.publisher.enabled" CEP_RECEIVER_IP = "thrift.receiver.ip" CEP_RECEIVER_PORT = "thrift.receiver.port" +CEP_RECEIVER_URLS = "thrift.receiver.urls" CEP_SERVER_ADMIN_USERNAME = "thrift.server.admin.username" CEP_SERVER_ADMIN_PASSWORD = "thrift.server.admin.password" http://git-wip-us.apache.org/repos/asf/stratos/blob/82ac4d72/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py ---------------------------------------------------------------------- diff --git a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py index f661294..71e6d2f 100644 --- a/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py +++ b/components/org.apache.stratos.python.cartridge.agent/src/main/python/cartridge.agent/cartridge.agent/healthstats.py @@ -69,7 +69,7 @@ class HealthStatisticsPublisherManager(Thread): self.log.exception("Couldn't publish health statistics to CEP. Thrift Receiver offline. Reconnecting...") self.publisher = HealthStatisticsPublisher() - self.publisher.publisher.disconnect() + self.publisher.disconnect_publisher() class HealthStatisticsPublisher: @@ -94,38 +94,44 @@ class HealthStatisticsPublisher: return conf_value def __init__(self): - self.ports = [] - cep_port = HealthStatisticsPublisher.read_config(constants.CEP_RECEIVER_PORT) - self.ports.append(cep_port) - cep_ip = HealthStatisticsPublisher.read_config(constants.CEP_RECEIVER_IP) + self.publishers = [] + cep_admin_username = HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_USERNAME) + cep_admin_password = HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_PASSWORD) + # 1.1.1.1:1883,2.2.2.2:1883 + cep_urls = HealthStatisticsPublisher.read_config(constants.CEP_RECEIVER_URLS); + cep_urls = cep_urls.split(',') + for cep_url in cep_urls: + self.ports = [] + cep_ip = cep_url.split(':')[0] + cep_port = cep_url.split(':')[1] + self.ports.append(cep_port) + cartridgeagentutils.wait_until_ports_active( + cep_ip, + self.ports, + int(Config.read_property("port.check.timeout", critical=False))) - cartridgeagentutils.wait_until_ports_active( - cep_ip, - self.ports, - int(Config.read_property("port.check.timeout", critical=False))) + cep_active = cartridgeagentutils.check_ports_active( + cep_ip, + self.ports) - cep_active = cartridgeagentutils.check_ports_active( - cep_ip, - self.ports) + if not cep_active: + raise CEPPublisherException("CEP server not active. Health statistics publishing aborted.") - if not cep_active: - raise CEPPublisherException("CEP server not active. Health statistics publishing aborted.") - cep_admin_username = HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_USERNAME) - cep_admin_password = HealthStatisticsPublisher.read_config(constants.CEP_SERVER_ADMIN_PASSWORD) + self.stream_definition = HealthStatisticsPublisher.create_stream_definition() + HealthStatisticsPublisher.log.debug("Stream definition created: %r" % str(self.stream_definition)) - self.stream_definition = HealthStatisticsPublisher.create_stream_definition() - HealthStatisticsPublisher.log.debug("Stream definition created: %r" % str(self.stream_definition)) + publisher = ThriftPublisher( + cep_ip, + cep_port, + cep_admin_username, + cep_admin_password, + self.stream_definition) - self.publisher = ThriftPublisher( - cep_ip, - cep_port, - cep_admin_username, - cep_admin_password, - self.stream_definition) + self.publishers.append(publisher) - HealthStatisticsPublisher.log.debug("HealthStatisticsPublisher initialized") + HealthStatisticsPublisher.log.debug("HealthStatisticsPublisher initialized") @staticmethod def create_stream_definition(): @@ -171,7 +177,7 @@ class HealthStatisticsPublisher: event.payloadData, self.stream_definition.version)) - self.publisher.publish(event) + self.publish_event(self.publishers, event) def publish_load_average(self, load_avg): """ @@ -195,8 +201,15 @@ class HealthStatisticsPublisher: event.payloadData, self.stream_definition.version)) - self.publisher.publish(event) + self.publish_event(event) + + def publish_event(self, event): + for publisher in self.publishers: + publisher.publish(event) + def disconnect_publisher(self, publishers): + for publisher in self.publishers: + publisher.disconnect() class DefaultHealthStatisticsReader: """
