Repository: stratos
Updated Branches:
  refs/heads/master bfe230a36 -> d82fcc539


Fixed some issues in publishing to CEP


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/5e607550
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/5e607550
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/5e607550

Branch: refs/heads/master
Commit: 5e6075506c5dfd62bf8e5a1d9b34a9f265eaf16e
Parents: bfdec31
Author: Chamila de Alwis <[email protected]>
Authored: Wed Oct 15 13:54:33 2014 +0530
Committer: Chamila de Alwis <[email protected]>
Committed: Wed Oct 15 13:54:33 2014 +0530

----------------------------------------------------------------------
 .../cartridge-agent/agent.py                    | 28 +++++++++++--
 .../cartridge-agent/modules/databridge/agent.py | 41 ++++++++++++-------
 .../modules/event/topology/events.py            | 42 ++++++++++++++++++++
 .../modules/healthstatspublisher/healthstats.py |  2 -
 .../publisher/cartridgeagentpublisher.py        |  2 -
 5 files changed, 93 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/5e607550/tools/python-cartridge-agent/cartridge-agent/agent.py
----------------------------------------------------------------------
diff --git a/tools/python-cartridge-agent/cartridge-agent/agent.py 
b/tools/python-cartridge-agent/cartridge-agent/agent.py
index e8acc5f..9f1a972 100644
--- a/tools/python-cartridge-agent/cartridge-agent/agent.py
+++ b/tools/python-cartridge-agent/cartridge-agent/agent.py
@@ -54,7 +54,6 @@ class CartridgeAgent(threading.Thread):
             mb_port)
 
         self.__tenant_context_initialized = False
-        self.__topology_context_initialized = False
 
         self.log_publish_manager = None
 
@@ -198,12 +197,27 @@ class CartridgeAgent(threading.Thread):
         
self.__topology_event_subscriber.register_handler("MemberSuspendedEvent", 
self.on_member_suspended)
         
self.__topology_event_subscriber.register_handler("CompleteTopologyEvent", 
self.on_complete_topology)
         
self.__topology_event_subscriber.register_handler("MemberStartedEvent", 
self.on_member_started)
+        
self.__topology_event_subscriber.register_handler("InstanceSpawnedEvent", 
self.on_instance_spawned)
 
         self.__topology_event_subscriber.start()
         self.log.info("Cartridge Agent topology receiver thread started")
 
+    def on_instance_spawned(self, msg):
+        self.log.debug("Instance spawned event received: %r" % msg.payload)
+        if self.cartridge_agent_config.initialized:
+            return
+
+        event_obj = InstanceSpawnedEvent.create_from_json(msg.payload)
+        try:
+            
CartridgeAgent.extension_handler.on_instance_spawned_event(event_obj)
+        except:
+            self.log.exception("Error processing instance spawned event")
+
     def on_member_activated(self, msg):
         self.log.debug("Member activated event received: %r" % msg.payload)
+        if not self.cartridge_agent_config.initialized:
+            return
+
         event_obj = MemberActivatedEvent.create_from_json(msg.payload)
         try:
             
CartridgeAgent.extension_handler.on_member_activated_event(event_obj)
@@ -212,6 +226,9 @@ class CartridgeAgent(threading.Thread):
 
     def on_member_terminated(self, msg):
         self.log.debug("Member terminated event received: %r" % msg.payload)
+        if not self.cartridge_agent_config.initialized:
+            return
+
         event_obj = MemberTerminatedEvent.create_from_json(msg.payload)
         try:
             
CartridgeAgent.extension_handler.on_member_terminated_event(event_obj)
@@ -220,6 +237,9 @@ class CartridgeAgent(threading.Thread):
 
     def on_member_suspended(self, msg):
         self.log.debug("Member suspended event received: %r" % msg.payload)
+        if not self.cartridge_agent_config.initialized:
+            return
+
         event_obj = MemberSuspendedEvent.create_from_json(msg.payload)
         try:
             
CartridgeAgent.extension_handler.on_member_suspended_event(event_obj)
@@ -227,11 +247,10 @@ class CartridgeAgent(threading.Thread):
             self.log.exception("Error processing member suspended event")
 
     def on_complete_topology(self, msg):
-        if not self.__topology_context_initialized:
+        if not self.cartridge_agent_config.initialized:
             self.log.debug("Complete topology event received")
             event_obj = CompleteTopologyEvent.create_from_json(msg.payload)
             TopologyContext.update(event_obj.topology)
-            self.__topology_context_initialized = True
             try:
                 
CartridgeAgent.extension_handler.on_complete_topology_event(event_obj)
             except:
@@ -241,6 +260,9 @@ class CartridgeAgent(threading.Thread):
 
     def on_member_started(self, msg):
         self.log.debug("Member started event received: %r" % msg.payload)
+        if not self.cartridge_agent_config.initialized:
+            return
+
         event_obj = MemberStartedEvent.create_from_json(msg.payload)
         try:
             CartridgeAgent.extension_handler.on_member_started_event(event_obj)

http://git-wip-us.apache.org/repos/asf/stratos/blob/5e607550/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py
----------------------------------------------------------------------
diff --git 
a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py 
b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py
index 8e59dd3..e5700a0 100644
--- a/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py
+++ b/tools/python-cartridge-agent/cartridge-agent/modules/databridge/agent.py
@@ -46,14 +46,14 @@ class StreamDefinition:
         self.payload_data = []
         """:type : list[str]"""
 
-    def add_metadata_attribute(self, name, attr_type):
-        self.meta_data.append({"name": name, "type": attr_type})
+    def add_metadata_attribute(self, attr_name, attr_type):
+        self.meta_data.append({"name": attr_name, "type": attr_type})
 
-    def add_payloaddata_attribute(self, name, attr_type):
-        self.payload_data.append({"name": name, "type": attr_type})
+    def add_payloaddata_attribute(self, attr_name, attr_type):
+        self.payload_data.append({"name": attr_name, "type": attr_type})
 
-    def add_correlationdata_attribute(self, name, attr_type):
-        self.correlation_data.append({"name": name, "type": attr_type})
+    def add_correlationdata_attribute(self, attr_name, attr_type):
+        self.correlation_data.append({"name": attr_name, "type": attr_type})
 
     def __str__(self):
         """
@@ -67,26 +67,26 @@ class StreamDefinition:
         json_str += "\"description\":\"" + self.description + "\","
 
         # add metadata attributes if exists
-        if len(self.meta_data > 0):
+        if len(self.meta_data) > 0:
             json_str += "\"metaData\":["
             for metadatum in self.meta_data:
-                json_str += "{\"name\":\"" + metadatum["name"] + ", \"type\": 
\"" + metadatum["type"] + "\"},"
+                json_str += "{\"name\":\"" + metadatum["name"] + "\", 
\"type\": \"" + metadatum["type"] + "\"},"
 
             json_str = json_str[:-1] + "],"
 
         # add correlationdata attributes if exists
-        if len(self.correlation_data > 0):
+        if len(self.correlation_data) > 0:
             json_str += "\"correlationData\":["
             for coredatum in self.correlation_data:
-                json_str += "{\"name\":\"" + coredatum["name"] + ", \"type\": 
\"" + coredatum["type"] + "\"},"
+                json_str += "{\"name\":\"" + coredatum["name"] + "\", 
\"type\": \"" + coredatum["type"] + "\"},"
 
             json_str = json_str[:-1] + "],"
 
         # add payloaddata attributes if exists
-        if len(self.payload_data > 0):
+        if len(self.payload_data) > 0:
             json_str += "\"payloadData\":["
             for payloaddatum in self.payload_data:
-                json_str += "{\"name\":\"" + payloaddatum["name"] + ", 
\"type\": \"" + payloaddatum["type"] + "\"},"
+                json_str += "{\"name\":\"" + payloaddatum["name"] + "\", 
\"type\": \"" + payloaddatum["type"] + "\"},"
 
             json_str = json_str[:-1] + "],"
 
@@ -129,10 +129,17 @@ class ThriftPublisher:
         :return: ThriftPublisher object
         :rtype: ThriftPublisher
         """
-        self.__publisher = Publisher(ip, port)
+        try:
+            port_number = int(port)
+        except ValueError:
+            raise RuntimeError("Port number for Thrift Publisher is invalid: 
%r" % port)
+
+        self.__publisher = Publisher(ip, port_number)
+        ThriftPublisher.log.debug("CREATED PUBLISHER.. CONNECTING WITH 
USERNAME AND PASSWORD %r:%r" % (username, password))
         self.__publisher.connect(username, password)
+        ThriftPublisher.log.debug("Connected THRIFT ")
         self.__publisher.defineStream(str(stream_definition))
-        ThriftPublisher.log.debug("Connected to %r:%r with stream definition 
%r" % (ip, port, str(stream_definition)))
+        ThriftPublisher.log.debug("DEFINED STREAM to %r:%r with stream 
definition %r" % (ip, port, str(stream_definition)))
 
     def publish(self, event):
         """
@@ -141,12 +148,15 @@ class ThriftPublisher:
         :param ThriftEvent event: The log event to be published
         :return: void
         """
+        ThriftPublisher.log.debug("ABOUT TO PUBLISH: POPULATING DATA")
         event_bundler = EventBundle()
         ThriftPublisher.assign_attributes(event.metaData, event_bundler)
         ThriftPublisher.assign_attributes(event.correlationData, event_bundler)
         ThriftPublisher.assign_attributes(event.payloadData, event_bundler)
+        ThriftPublisher.log.debug("ABOUT TO PUBLISH")
 
-        self.__publisher.publish(event)
+        self.__publisher.publish(event_bundler)
+        ThriftPublisher.log.debug("PUBLISHED EVENT BUNDLED TO THRIFT MODULE")
 
     def disconnect(self):
         """
@@ -154,6 +164,7 @@ class ThriftPublisher:
         :return: void
         """
         self.__publisher.disconnect()
+        ThriftPublisher.log.debug("DISCONNECTED FROM THRIFT PUBLISHER")
 
     @staticmethod
     def assign_attributes(attributes, event_bundler):

http://git-wip-us.apache.org/repos/asf/stratos/blob/5e607550/tools/python-cartridge-agent/cartridge-agent/modules/event/topology/events.py
----------------------------------------------------------------------
diff --git 
a/tools/python-cartridge-agent/cartridge-agent/modules/event/topology/events.py 
b/tools/python-cartridge-agent/cartridge-agent/modules/event/topology/events.py
index 8a89c98..52c7c19 100644
--- 
a/tools/python-cartridge-agent/cartridge-agent/modules/event/topology/events.py
+++ 
b/tools/python-cartridge-agent/cartridge-agent/modules/event/topology/events.py
@@ -97,6 +97,7 @@ class MemberTerminatedEvent:
         instance.network_partition_id = json_obj["networkPartitionId"] if 
"networkPartitionId" in json_obj else None
         instance.partition_id = json_obj["partitionId"] if "partitionId" in 
json_obj else None
         instance.member_id = json_obj["memberId"] if "memberId" in json_obj 
else None
+        instance.properties = json_obj["properties"] if "properties" in 
json_obj else None
 
         return instance
 
@@ -234,5 +235,46 @@ class MemberStartedEvent:
         instance.network_partition_id = json_obj["networkPartitionId"] if 
"networkPartitionId" in json_obj else None
         instance.partition_id = json_obj["partitionId"] if "partitionId" in 
json_obj else None
         instance.member_id = json_obj["memberId"] if "memberId" in json_obj 
else None
+        instance.properties = json_obj["properties"] if "properties" in 
json_obj else None
+
+        return instance
+
+
+class InstanceSpawnedEvent:
+
+    def __init__(self):
+        self.service_name = None
+        """ :type : str  """
+        self.cluster_id = None
+        """ :type : str  """
+        self.network_partition_id = None
+        """ :type : str  """
+        self.partition_id = None
+        """ :type : str  """
+        self.member_id = None
+        """ :type : str  """
+        self.lb_cluster_id = None
+        """ :type : str  """
+        self.member_public_ip = None
+        """ :type : str  """
+        self.member_ip = None
+        """ :type : str  """
+        self.properties = {}
+        """ :type : dict[str, str]  """
+
+    @staticmethod
+    def create_from_json(json_str):
+        json_obj = json.loads(json_str)
+        instance = InstanceSpawnedEvent()
+
+        instance.service_name = json_obj["serviceName"] if "serviceName" in 
json_obj else None
+        instance.cluster_id = json_obj["clusterId"] if "clusterId" in json_obj 
else None
+        instance.network_partition_id = json_obj["networkPartitionId"] if 
"networkPartitionId" in json_obj else None
+        instance.partition_id = json_obj["partitionId"] if "partitionId" in 
json_obj else None
+        instance.member_id = json_obj["memberId"] if "memberId" in json_obj 
else None
+        instance.lb_cluster_id = json_obj["lbClusterId"] if "lbClusterId" in 
json_obj else None
+        instance.member_public_ip = json_obj["memberPublicIp"] if 
"memberPublicIp" in json_obj else None
+        instance.member_ip = json_obj["memberIp"] if "memberIp" in json_obj 
else None
+        instance.properties = json_obj["properties"]
 
         return instance
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/5e607550/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py
----------------------------------------------------------------------
diff --git 
a/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py
 
b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py
index 578bceb..5dcad24 100644
--- 
a/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py
+++ 
b/tools/python-cartridge-agent/cartridge-agent/modules/healthstatspublisher/healthstats.py
@@ -93,9 +93,7 @@ class HealthStatisticsPublisher:
             self.ports,
             
int(self.cartridge_agent_config.read_property("port.check.timeout", 
critical=False)))
         cep_active = 
cartridgeagentutils.check_ports_active(CEPPublisherConfiguration.get_instance().server_ip,
 self.ports)
-        HealthStatisticsPublisher.log.debug("CHECKED PORTS ACTIVITY")
         if not cep_active:
-            HealthStatisticsPublisher.log.debug("CEP PORTS NOT ACTIVE")
             raise CEPPublisherException("CEP server not active. Health 
statistics publishing aborted.")
 
         self.stream_definition = 
HealthStatisticsPublisher.create_stream_definition()

http://git-wip-us.apache.org/repos/asf/stratos/blob/5e607550/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py
----------------------------------------------------------------------
diff --git 
a/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py
 
b/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py
index c3225c6..1ce8ffb 100644
--- 
a/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py
+++ 
b/tools/python-cartridge-agent/cartridge-agent/modules/publisher/cartridgeagentpublisher.py
@@ -86,8 +86,6 @@ def publish_instance_activated_event():
             else:
                 interval = interval_default
 
-            log.debug("interval 3: %r" % interval)
-
             health_stats_publisher = HealthStatisticsPublisherManager(interval)
             log.info("Starting Health statistics publisher with interval %r" % 
interval_default)
             health_stats_publisher.start()

Reply via email to