Repository: stratos Updated Branches: refs/heads/master bfe230a36 -> d82fcc539
Fixed some issues in publishing to CEP Project: http://git-wip-us.apache.org/repos/asf/stratos/repo Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/5e607550 Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/5e607550 Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/5e607550 Branch: refs/heads/master Commit: 5e6075506c5dfd62bf8e5a1d9b34a9f265eaf16e Parents: bfdec31 Author: Chamila de Alwis <[email protected]> Authored: Wed Oct 15 13:54:33 2014 +0530 Committer: Chamila de Alwis <[email protected]> Committed: Wed Oct 15 13:54:33 2014 +0530 ---------------------------------------------------------------------- .../cartridge-agent/agent.py | 28 +++++++++++-- .../cartridge-agent/modules/databridge/agent.py | 41 ++++++++++++------- .../modules/event/topology/events.py | 42 ++++++++++++++++++++ .../modules/healthstatspublisher/healthstats.py | 2 - .../publisher/cartridgeagentpublisher.py | 2 - 5 files changed, 93 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/stratos/blob/5e607550/tools/python-cartridge-agent/cartridge-agent/agent.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/agent.py b/tools/python-cartridge-agent/cartridge-agent/agent.py index e8acc5f..9f1a972 100644 --- a/tools/python-cartridge-agent/cartridge-agent/agent.py +++ b/tools/python-cartridge-agent/cartridge-agent/agent.py @@ -54,7 +54,6 @@ class CartridgeAgent(threading.Thread): mb_port) self.__tenant_context_initialized = False - self.__topology_context_initialized = False self.log_publish_manager = None @@ -198,12 +197,27 @@ class CartridgeAgent(threading.Thread): self.__topology_event_subscriber.register_handler("MemberSuspendedEvent", self.on_member_suspended) self.__topology_event_subscriber.register_handler("CompleteTopologyEvent", self.on_complete_topology) self.__topology_event_subscriber.register_handler("MemberStartedEvent", self.on_member_started) + self.__topology_event_subscriber.register_handler("InstanceSpawnedEvent", self.on_instance_spawned) self.__topology_event_subscriber.start() self.log.info("Cartridge Agent topology receiver thread started") + def on_instance_spawned(self, msg): + self.log.debug("Instance spawned event received: %r" % msg.payload) + if self.cartridge_agent_config.initialized: + return + + event_obj = InstanceSpawnedEvent.create_from_json(msg.payload) + try: + CartridgeAgent.extension_handler.on_instance_spawned_event(event_obj) + except: + self.log.exception("Error processing instance spawned event") + def on_member_activated(self, msg): self.log.debug("Member activated event received: %r" % msg.payload) + if not self.cartridge_agent_config.initialized: + return + event_obj = MemberActivatedEvent.create_from_json(msg.payload) try: CartridgeAgent.extension_handler.on_member_activated_event(event_obj) @@ -212,6 +226,9 @@ class CartridgeAgent(threading.Thread): def on_member_terminated(self, msg): self.log.debug("Member terminated event received: %r" % msg.payload) + if not self.cartridge_agent_config.initialized: + return + event_obj = MemberTerminatedEvent.create_from_json(msg.payload) try: CartridgeAgent.extension_handler.on_member_terminated_event(event_obj) @@ -220,6 +237,9 @@ class CartridgeAgent(threading.Thread): def on_member_suspended(self, msg): self.log.debug("Member suspended event received: %r" % msg.payload) + if not self.cartridge_agent_config.initialized: + return + event_obj = MemberSuspendedEvent.create_from_json(msg.payload) try: CartridgeAgent.extension_handler.on_member_suspended_event(event_obj) @@ -227,11 +247,10 @@ class CartridgeAgent(threading.Thread): self.log.exception("Error processing member suspended event") def on_complete_topology(self, msg): - if not self.__topology_context_initialized: + if not self.cartridge_agent_config.initialized: self.log.debug("Complete topology event received") event_obj = CompleteTopologyEvent.create_from_json(msg.payload) TopologyContext.update(event_obj.topology) - self.__topology_context_initialized = True try: CartridgeAgent.extension_handler.on_complete_topology_event(event_obj) except: @@ -241,6 +260,9 @@ class CartridgeAgent(threading.Thread): def on_member_started(self, msg): self.log.debug("Member started event received: %r" % msg.payload) + if not self.cartridge_agent_config.initialized: + return + event_obj = MemberStartedEvent.create_from_json(msg.payload) try: CartridgeAgent.extension_handler.on_member_started_event(event_obj) http://git-wip-us.apache.org/repos/asf/stratos/blob/5e607550/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py index 8e59dd3..e5700a0 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py @@ -46,14 +46,14 @@ class StreamDefinition: self.payload_data = [] """:type : list[str]""" - def add_metadata_attribute(self, name, attr_type): - self.meta_data.append({"name": name, "type": attr_type}) + def add_metadata_attribute(self, attr_name, attr_type): + self.meta_data.append({"name": attr_name, "type": attr_type}) - def add_payloaddata_attribute(self, name, attr_type): - self.payload_data.append({"name": name, "type": attr_type}) + def add_payloaddata_attribute(self, attr_name, attr_type): + self.payload_data.append({"name": attr_name, "type": attr_type}) - def add_correlationdata_attribute(self, name, attr_type): - self.correlation_data.append({"name": name, "type": attr_type}) + def add_correlationdata_attribute(self, attr_name, attr_type): + self.correlation_data.append({"name": attr_name, "type": attr_type}) def __str__(self): """ @@ -67,26 +67,26 @@ class StreamDefinition: json_str += "\"description\":\"" + self.description + "\"," # add metadata attributes if exists - if len(self.meta_data > 0): + if len(self.meta_data) > 0: json_str += "\"metaData\":[" for metadatum in self.meta_data: - json_str += "{\"name\":\"" + metadatum["name"] + ", \"type\": \"" + metadatum["type"] + "\"}," + json_str += "{\"name\":\"" + metadatum["name"] + "\", \"type\": \"" + metadatum["type"] + "\"}," json_str = json_str[:-1] + "]," # add correlationdata attributes if exists - if len(self.correlation_data > 0): + if len(self.correlation_data) > 0: json_str += "\"correlationData\":[" for coredatum in self.correlation_data: - json_str += "{\"name\":\"" + coredatum["name"] + ", \"type\": \"" + coredatum["type"] + "\"}," + json_str += "{\"name\":\"" + coredatum["name"] + "\", \"type\": \"" + coredatum["type"] + "\"}," json_str = json_str[:-1] + "]," # add payloaddata attributes if exists - if len(self.payload_data > 0): + if len(self.payload_data) > 0: json_str += "\"payloadData\":[" for payloaddatum in self.payload_data: - json_str += "{\"name\":\"" + payloaddatum["name"] + ", \"type\": \"" + payloaddatum["type"] + "\"}," + json_str += "{\"name\":\"" + payloaddatum["name"] + "\", \"type\": \"" + payloaddatum["type"] + "\"}," json_str = json_str[:-1] + "]," @@ -129,10 +129,17 @@ class ThriftPublisher: :return: ThriftPublisher object :rtype: ThriftPublisher """ - self.__publisher = Publisher(ip, port) + try: + port_number = int(port) + except ValueError: + raise RuntimeError("Port number for Thrift Publisher is invalid: %r" % port) + + self.__publisher = Publisher(ip, port_number) + ThriftPublisher.log.debug("CREATED PUBLISHER.. CONNECTING WITH USERNAME AND PASSWORD %r:%r" % (username, password)) self.__publisher.connect(username, password) + ThriftPublisher.log.debug("Connected THRIFT ") self.__publisher.defineStream(str(stream_definition)) - ThriftPublisher.log.debug("Connected to %r:%r with stream definition %r" % (ip, port, str(stream_definition))) + ThriftPublisher.log.debug("DEFINED STREAM to %r:%r with stream definition %r" % (ip, port, str(stream_definition))) def publish(self, event): """ @@ -141,12 +148,15 @@ class ThriftPublisher: :param ThriftEvent event: The log event to be published :return: void """ + ThriftPublisher.log.debug("ABOUT TO PUBLISH: POPULATING DATA") event_bundler = EventBundle() ThriftPublisher.assign_attributes(event.metaData, event_bundler) ThriftPublisher.assign_attributes(event.correlationData, event_bundler) ThriftPublisher.assign_attributes(event.payloadData, event_bundler) + ThriftPublisher.log.debug("ABOUT TO PUBLISH") - self.__publisher.publish(event) + self.__publisher.publish(event_bundler) + ThriftPublisher.log.debug("PUBLISHED EVENT BUNDLED TO THRIFT MODULE") def disconnect(self): """ @@ -154,6 +164,7 @@ class ThriftPublisher: :return: void """ self.__publisher.disconnect() + ThriftPublisher.log.debug("DISCONNECTED FROM THRIFT PUBLISHER") @staticmethod def assign_attributes(attributes, event_bundler): http://git-wip-us.apache.org/repos/asf/stratos/blob/5e607550/tools/python-cartridge-agent/cartridge-agent/modules/event/topology/events.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/event/topology/events.py b/tools/python-cartridge-agent/cartridge-agent/modules/event/topology/events.py index 8a89c98..52c7c19 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/event/topology/events.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/event/topology/events.py @@ -97,6 +97,7 @@ class MemberTerminatedEvent: instance.network_partition_id = json_obj["networkPartitionId"] if "networkPartitionId" in json_obj else None instance.partition_id = json_obj["partitionId"] if "partitionId" in json_obj else None instance.member_id = json_obj["memberId"] if "memberId" in json_obj else None + instance.properties = json_obj["properties"] if "properties" in json_obj else None return instance @@ -234,5 +235,46 @@ class MemberStartedEvent: instance.network_partition_id = json_obj["networkPartitionId"] if "networkPartitionId" in json_obj else None instance.partition_id = json_obj["partitionId"] if "partitionId" in json_obj else None instance.member_id = json_obj["memberId"] if "memberId" in json_obj else None + instance.properties = json_obj["properties"] if "properties" in json_obj else None + + return instance + + +class InstanceSpawnedEvent: + + def __init__(self): + self.service_name = None + """ :type : str """ + self.cluster_id = None + """ :type : str """ + self.network_partition_id = None + """ :type : str """ + self.partition_id = None + """ :type : str """ + self.member_id = None + """ :type : str """ + self.lb_cluster_id = None + """ :type : str """ + self.member_public_ip = None + """ :type : str """ + self.member_ip = None + """ :type : str """ + self.properties = {} + """ :type : dict[str, str] """ + + @staticmethod + def create_from_json(json_str): + json_obj = json.loads(json_str) + instance = InstanceSpawnedEvent() + + instance.service_name = json_obj["serviceName"] if "serviceName" in json_obj else None + instance.cluster_id = json_obj["clusterId"] if "clusterId" in json_obj else None + instance.network_partition_id = json_obj["networkPartitionId"] if "networkPartitionId" in json_obj else None + instance.partition_id = json_obj["partitionId"] if "partitionId" in json_obj else None + instance.member_id = json_obj["memberId"] if "memberId" in json_obj else None + instance.lb_cluster_id = json_obj["lbClusterId"] if "lbClusterId" in json_obj else None + instance.member_public_ip = json_obj["memberPublicIp"] if "memberPublicIp" in json_obj else None + instance.member_ip = json_obj["memberIp"] if "memberIp" in json_obj else None + instance.properties = json_obj["properties"] return instance \ No newline at end of file http://git-wip-us.apache.org/repos/asf/stratos/blob/5e607550/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py index 578bceb..5dcad24 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py @@ -93,9 +93,7 @@ class HealthStatisticsPublisher: self.ports, int(self.cartridge_agent_config.read_property("port.check.timeout", critical=False))) cep_active = cartridgeagentutils.check_ports_active(CEPPublisherConfiguration.get_instance().server_ip, self.ports) - HealthStatisticsPublisher.log.debug("CHECKED PORTS ACTIVITY") if not cep_active: - HealthStatisticsPublisher.log.debug("CEP PORTS NOT ACTIVE") raise CEPPublisherException("CEP server not active. Health statistics publishing aborted.") self.stream_definition = HealthStatisticsPublisher.create_stream_definition() http://git-wip-us.apache.org/repos/asf/stratos/blob/5e607550/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py ---------------------------------------------------------------------- diff --git a/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py b/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py index c3225c6..1ce8ffb 100644 --- a/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py +++ b/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py @@ -86,8 +86,6 @@ def publish_instance_activated_event(): else: interval = interval_default - log.debug("interval 3: %r" % interval) - health_stats_publisher = HealthStatisticsPublisherManager(interval) log.info("Starting Health statistics publisher with interval %r" % interval_default) health_stats_publisher.start()
