DISPATCH-437 - Work in progress - added AgentRequestAdapter and ordinality

Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/e2ad8da8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/e2ad8da8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/e2ad8da8

Branch: refs/heads/DISPATCH-437-1
Commit: e2ad8da8acefd5d2f15f58d73330fe2c2037fd9e
Parents: d7fcfef
Author: Ganesh Murthy <[email protected]>
Authored: Mon Aug 1 12:34:36 2016 -0400
Committer: Ganesh Murthy <[email protected]>
Committed: Mon Aug 1 12:41:06 2016 -0400

----------------------------------------------------------------------
 .../management/__init__.py                      |   4 +
 .../qpid_dispatch_internal/management/agent.py  | 207 ++++++++-----
 .../qpid_dispatch_internal/management/config.py |   4 +-
 .../qpid_dispatch_internal/management/schema.py |  39 ++-
 src/CMakeLists.txt                              |   3 +-
 src/dispatch.c                                  |   8 +
 src/iterator.c                                  |   3 +
 src/router_core/agent_config_address.c          |  11 +-
 src/router_core/management_agent.c              | 288 +++++++------------
 src/router_core/router_core.c                   |  10 -
 src/router_core/router_core_private.h           |   1 -
 11 files changed, 302 insertions(+), 276 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/python/qpid_dispatch_internal/management/__init__.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/__init__.py 
b/python/qpid_dispatch_internal/management/__init__.py
index 6ee5966..9d49f47 100644
--- a/python/qpid_dispatch_internal/management/__init__.py
+++ b/python/qpid_dispatch_internal/management/__init__.py
@@ -18,3 +18,7 @@
 #
 """Qpid Dispatch internal management package."""
 
+#   from .agent import ManagementAgent
+
+__all__ = ["ManagementAgent"]
+

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/python/qpid_dispatch_internal/management/agent.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/agent.py 
b/python/qpid_dispatch_internal/management/agent.py
index 6e4b7fc..7ada418 100644
--- a/python/qpid_dispatch_internal/management/agent.py
+++ b/python/qpid_dispatch_internal/management/agent.py
@@ -83,7 +83,7 @@ from .qdrouter import QdSchema
 from ..router.message import Message
 from ..router.address import Address
 from ..policy.policy_manager import PolicyManager
-
+from . import AgentRequestHandler
 
 def dictstr(d):
     """Stringify a dict in the form 'k=v, k=v ...' instead of '{k:v, ...}'"""
@@ -334,6 +334,9 @@ class VhostStatsEntity(EntityAdapter):
     def __str__(self):
         return super(VhostStatsEntity, self).__str__().replace("Entity(", 
"VhostStatsEntity(")
 
+    def __str__(self):
+        return super(PolicyStatsEntity, self).__str__().replace("Entity(", 
"PolicyStatsEntity(")
+
 
 def _host_port_name_identifier(entity):
     for attr in ['host', 'port', 'name']: # Set default values if need be
@@ -741,30 +744,29 @@ class ManagementEntity(EntityAdapter):
                 out.close()
         raise BadRequestStatus("Bad profile request %s" % (request))
 
-class Agent(object):
-    """AMQP managment agent. Manages entities, directs requests to the correct 
entity."""
+class ManagementAgent(object):
+    """
+    AMQP managment agent. Manages entities, directs requests to the correct 
entity.
+    """
 
-    def __init__(self, dispatch, qd):
+    def __init__(self, address, agent_adapter, dispatch, qd, 
schema=QdSchema()):
         self.qd = qd
+        self.address = address
+        self.agent_adapter = agent_adapter
         self.dispatch = dispatch
-        self.schema = QdSchema()
+        self.schema = schema
         self.entities = EntityCache(self)
         self.request_lock = Lock()
         self.log_adapter = LogAdapter("AGENT")
         self.policy = PolicyManager(self)
         self.management = self.create_entity({"type": "management"})
         self.add_entity(self.management)
+        self.io = IoAdapter(self.receive, address, 'L', '0', 
TREATMENT_ANYCAST_CLOSEST)
 
     def log(self, level, text):
         info = traceback.extract_stack(limit=2)[0] # Caller frame info
         self.log_adapter.log(level, text, info[0], info[1])
 
-    def activate(self, address):
-        """Register the management address to receive management requests"""
-        self.entities.refresh_from_c()
-        self.log(LOG_INFO, "Activating management agent on %s" % address)
-        self.io = IoAdapter(self.receive, address, 'L', '0', 
TREATMENT_ANYCAST_CLOSEST)
-
     def entity_class(self, entity_type):
         """Return the class that implements entity_type"""
         class_name = camelcase(entity_type.short_name, capital=True) + 'Entity'
@@ -784,69 +786,11 @@ class Agent(object):
         entity_type = self.schema.entity_type(attributes['type'])
         return self.entity_class(entity_type)(self, entity_type, attributes)
 
-    def respond(self, request, status=OK, description=None, body=None):
-        """Send a response to the client"""
-        if body is None: body = {}
-        description = description or STATUS_TEXT[status]
-        response = Message(
-            address=request.reply_to,
-            correlation_id=request.correlation_id,
-            properties={'statusCode': status, 'statusDescription': 
description},
-            body=body)
-        self.log(LOG_DEBUG, "Agent response:\n  %s\n  Responding to: \n  
%s"%(response, request))
-        try:
-            self.io.send(response)
-        except:
-            self.log(LOG_ERROR, "Can't respond to %s: %s"%(request, 
format_exc()))
-
-    def receive(self, request, unused_link_id, unused_cost):
-        """Called when a management request is received."""
-        def error(e, trace):
-            """Raise an error"""
-            self.log(LOG_ERROR, "Error dispatching %s: %s\n%s"%(request, e, 
trace))
-            self.respond(request, e.status, e.description)
-
-        # If there's no reply_to, don't bother to process the request.
-        if not request.reply_to:
-            return
-
-        # Coarse locking, handle one request at a time.
-        with self.request_lock:
-            try:
-                self.entities.refresh_from_c()
-                self.log(LOG_DEBUG, "Agent request %s"% request)
-                status, body = self.handle(request)
-                self.respond(request, status=status, body=body)
-            except ManagementError, e:
-                error(e, format_exc())
-            except ValidationError, e:
-                error(BadRequestStatus(str(e)), format_exc())
-            except Exception, e:
-                error(InternalServerErrorStatus("%s: %s"%(type(e).__name__, 
e)), format_exc())
-
     def entity_type(self, type):
-        try: return self.schema.entity_type(type)
-        except ValidationError, e: raise NotFoundStatus(str(e))
-
-    def handle(self, request):
-        """
-        Handle a request.
-        Dispatch management node requests to self, entity requests to the 
entity.
-        @return: (response-code, body)
-        """
-        operation = required_property('operation', request)
-        if operation.lower() == 'create':
-            # Create requests are entity requests but must be handled by the 
agent since
-            # the entity does not yet exist.
-            return self.create(request)
-        else:
-            target = self.find_entity(request)
-            target.entity_type.allowed(operation, request.body)
-            try:
-                method = getattr(target, operation.lower().replace("-", "_"))
-            except AttributeError:
-                not_implemented(operation, target.type)
-            return method(request)
+        try:
+            return self.schema.entity_type(type)
+        except ValidationError, e:
+            raise NotFoundStatus(str(e))
 
     def _create(self, attributes):
         """Create an entity, called externally or from configuration file."""
@@ -901,12 +845,19 @@ class Agent(object):
         """Remove and internal python implementation object."""
         self.entities.remove_implementation(id(implementation))
 
+    def requested_entity_type(self, request):
+        entity_type = request.properties.get('entityType')
+        if entity_type:
+            return self.schema.entity_type(entity_type)
+        return None
+
     def find_entity(self, request):
         """Find the entity addressed by request"""
 
         requested_type = request.properties.get('type')
         if requested_type:
             requested_type = self.schema.entity_type(requested_type)
+
         # ids is a map of identifying attribute values
         ids = dict((k, request.properties.get(k))
                    for k in ['name', 'identity'] if k in request.properties)
@@ -945,3 +896,113 @@ class Agent(object):
 
     def find_entity_by_type(self, type):
         return self.entities.map_type(None, type)
+
+    def handle(self, request):
+        """
+        Handle a request.
+        Dispatch management node requests to self, entity requests to the 
entity.
+        @return: (response-code, body)
+        """
+        # Get the operation from the request. This could be CREATE or READ or 
UPDATE or DELETE or QUERY
+        operation = required_property('operation', request)
+
+        # Get the entity type from the request. For e.g. entity_type could be 
a connector entity or a listener entity
+        entity_type = self.requested_entity_type(request)
+
+        # Get the type from the request
+        requested_type = request.properties.get('type') # Should be 
org.amqp.management
+        if requested_type:
+            requested_type = self.schema.entity_type(requested_type)
+            if operation not in requested_type.operations:
+                # Is the operation allowed on the entity_type, if not throw an 
exception
+                entity_type.allowed(operation)
+
+        # target = self.find_entity(request)
+
+        # Parameters required to post the request to the work queue
+        reply_to = request.reply_to
+        correlation_id = request.correlation_id
+        entity_type_ordinality = entity_type.ordinality
+        #operation_ordinality = operation.ordinality
+
+        print 'reply_to, correlation_id, entity_type_ordinality ', reply_to, 
correlation_id, entity_type_ordinality
+
+        if operation:
+            operation = operation.lower()
+            if operation == 'create':
+
+                # request.body is already a map with the create parameters
+                # request.properties has the operation, name, type
+                request_handler.qd_post_management_request(operation, 
entity_type_ordinality, correlation_id, reply_to,
+                                                           request.body)
+
+            request_type = request.properties.get('type')
+            if self.schema.is_long_name(request_type):
+                enum_type = self.schema.type_map.get(request_type)
+            else:
+                # Get the corresponding long name from short name
+                long_type = self.schema.short_long_type_map.get(request_type)
+                enum_type = self.schema.type_map.get(long_type)
+
+            # post an entry into the work queue
+            #qd_dispatch_post_management_request(self.dispatch, oper, 
enum_type)
+
+        else:
+            raise not_implemented(operation, target.type)
+        """
+        if operation.lower() == 'create':
+            # Create requests are entity requests but must be handled by the 
agent since
+            # the entity does not yet exist.
+            return self.create(request)
+        else:
+            target = self.find_entity(request)
+            target.entity_type.allowed(operation, request.body)
+            try:
+                method = getattr(target, operation.lower().replace("-", "_"))
+            except AttributeError:
+                not_implemented(operation, target.type)
+            return method(request)
+        """
+
+    def respond(self, request, status=OK, description=None, body=None):
+        """Send a response to the client"""
+        if body is None: body = {}
+        description = description or STATUS_TEXT[status]
+        response = Message(
+            address=request.reply_to,
+            correlation_id=request.correlation_id,
+            properties={'statusCode': status, 'statusDescription': 
description},
+            body=body)
+        self.log(LOG_DEBUG, "Agent response:\n  %s\n  Responding to: \n  
%s"%(response, request))
+        try:
+            self.io.send(response)
+        except:
+            self.log(LOG_ERROR, "Can't respond to %s: %s"%(request, 
format_exc()))
+
+    def receive(self, request, unused_link_id, unused_cost):
+        """
+        Called when a management request is received.
+        The init function registers this function as the one to be called by 
creating an IOAdapter.
+        """
+        # If there's no reply_to, don't bother to process the request.
+        if not request.reply_to:
+            return
+
+        def error(e, trace):
+            """Raise an error"""
+            self.log(LOG_ERROR, "Error dispatching %s: %s\n%s"%(request, e, 
trace))
+            self.respond(request, e.status, e.description)
+
+        # Coarse locking, handle one request at a time.
+        with self.request_lock:
+            try:
+                self.entities.refresh_from_c()
+                self.log(LOG_DEBUG, "Agent request %s"% request)
+                status, body = self.handle(request)
+                self.respond(request, status=status, body=body)
+            except ManagementError, e:
+                error(e, format_exc())
+            except ValidationError, e:
+                error(BadRequestStatus(str(e)), format_exc())
+            except Exception, e:
+                error(InternalServerErrorStatus("%s: %s"%(type(e).__name__, 
e)), format_exc())

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/python/qpid_dispatch_internal/management/config.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/config.py 
b/python/qpid_dispatch_internal/management/config.py
index a7b82c7..88823db 100644
--- a/python/qpid_dispatch_internal/management/config.py
+++ b/python/qpid_dispatch_internal/management/config.py
@@ -149,9 +149,11 @@ def configure_dispatch(dispatch, lib_handle, filename):
     # Configure and prepare container and router before we can activate the 
agent.
     configure(config.by_type('container')[0])
     configure(config.by_type('router')[0])
+
     qd.qd_dispatch_prepare(dispatch)
     qd.qd_router_setup_late(dispatch) # Actions requiring active management 
agent.
-    agent.activate("$_management_internal")
+
+    #agent.activate("$_management_internal")
 
     from qpid_dispatch_internal.display_name.display_name import 
DisplayNameService
     displayname_service = DisplayNameService("$displayname")

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/python/qpid_dispatch_internal/management/schema.py
----------------------------------------------------------------------
diff --git a/python/qpid_dispatch_internal/management/schema.py 
b/python/qpid_dispatch_internal/management/schema.py
index 7559c73..78c77ad 100644
--- a/python/qpid_dispatch_internal/management/schema.py
+++ b/python/qpid_dispatch_internal/management/schema.py
@@ -208,7 +208,7 @@ class AttributeType(object):
 
     def __init__(self, name, type=None, defined_in=None, default=None,
                  required=False, unique=False, hidden=False, deprecated=False,
-                 value=None, description="", create=False, update=False, 
graph=False):
+                 value=None, description="", create=False, update=False, 
graph=False, ordinality=0):
         """
         See L{AttributeType} instance variables.
         """
@@ -224,6 +224,7 @@ class AttributeType(object):
             self.value = value
             self.unique = unique
             self.description = description
+            self.ordinality = ordinality
             if self.value is not None and self.default is not None:
                 raise ValidationError("Attribute '%s' has default value and 
fixed value" %
                                       self.name)
@@ -321,7 +322,7 @@ class EntityType(object):
     @ivar referential: True if an entity can be referred to by name from 
another entity.
     """
     def __init__(self, name, schema, attributes=None, operations=None, 
operationDefs=None, description="",
-                 fullName=True, singleton=False, deprecated=False, 
extends=None, referential=False, **kwargs):
+                 fullName=True, singleton=False, deprecated=False, 
extends=None, referential=False, ordinality=0, **kwargs):
         """
         @param name: name of the entity type.
         @param schema: schema for this type.
@@ -341,8 +342,9 @@ class EntityType(object):
                     self.short_name = 
self.short_name.replace("router.config.", "")
             else:
                 self.name = self.short_name = name
+
             self.attributes = OrderedDict((k, AttributeType(k, 
defined_in=self, **v))
-                                              for k, v in (attributes or 
{}).iteritems())
+                                          for k, v in (attributes or 
{}).iteritems())
             self.operations = operations or []
             # Bases are resolved in self.init()
             self.base = extends
@@ -352,6 +354,7 @@ class EntityType(object):
             self.singleton = singleton
             self.deprecated = deprecated
             self.referential = referential
+            self.ordinality = ordinality
             self._init = False      # Have not yet initialized from base and 
attributes.
             # Operation definitions
             self.operation_defs = dict((name, OperationDef(name, **op))
@@ -381,13 +384,21 @@ class EntityType(object):
         self.operations += other.operations
         check(self.attributes.iterkeys(), other.attributes.itervalues(), 
"attributes")
         self.attributes.update(other.attributes)
+
         if other.name == 'entity':
             # Fill in entity "type" attribute automatically.
             self.attributes["type"]["value"] = self.name
 
-    def extends(self, base): return base in self.all_bases
+        ordinality = 0
+        for attrib in self.attributes.values():
+            attrib.ordinality = ordinality
+            ordinality += 1
 
-    def is_a(self, type): return type == self or self.extends(type)
+    def extends(self, base):
+        return base in self.all_bases
+
+    def is_a(self, type):
+        return type == self or self.extends(type)
 
     def attribute(self, name):
         """Get the AttributeType for name"""
@@ -433,7 +444,7 @@ class EntityType(object):
 
         return attributes
 
-    def allowed(self, op, body):
+    def allowed(self, op):
         """Raise exception if op is not a valid operation on entity."""
         op = op.upper()
         if not op in self.operations:
@@ -505,24 +516,36 @@ class Schema(object):
 
         self.all_attributes = set()
 
+        ordinality = 0
         for e in self.entity_types.itervalues():
             e.init()
+            e.ordinality = ordinality
             self.all_attributes.update(e.attributes.keys())
+            ordinality += 1
 
     def short_name(self, name):
         """Remove prefix from name if present"""
-        if not name: return name
+        if not name:
+            return name
         if name.startswith(self.prefixdot):
             name = name[len(self.prefixdot):]
         return name
 
     def long_name(self, name):
         """Add prefix to unqualified name"""
-        if not name: return name
+        if not name:
+            return name
         if not name.startswith(self.prefixdot):
             name = self.prefixdot + name
         return name
 
+    def is_long_name(self, name):
+        if not name:
+            return False
+        if self.prefixdot in name:
+            return True
+        return False
+
     def dump(self):
         """Return json-friendly representation"""
         return OrderedDict([

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index c5aa589..3a78801 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -83,6 +83,7 @@ set(qpid_dispatch_SOURCES
   router_core/transfer.c
   router_node.c
   router_pynode.c
+  agent_adapter.c
   schema_enum.c
   server.c
   timer.c
@@ -94,7 +95,7 @@ if(USE_MEMORY_POOL)
 endif()
 
 set_property(
-  SOURCE python_embedded.c router_pynode.c
+  SOURCE python_embedded.c router_pynode.c agent_adapter.c
   PROPERTY COMPILE_FLAGS -Wno-strict-aliasing
   )
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/src/dispatch.c
----------------------------------------------------------------------
diff --git a/src/dispatch.c b/src/dispatch.c
index 1b39228..cb5c34e 100644
--- a/src/dispatch.c
+++ b/src/dispatch.c
@@ -174,6 +174,14 @@ qd_error_t qd_dispatch_configure_auto_link(qd_dispatch_t 
*qd, qd_entity_t *entit
     return qd_error_code();
 }
 
+/*qd_error_t qd_dispatch_post_management_request(qd_dispatch_t *qd,
+                                               qd_schema_entity_operation_t 
operation,
+                                               qd_schema_entity_type_t 
entity_type) {
+    if (!qd->router) return qd_error(QD_ERROR_NOT_FOUND, "No router 
available");
+    qd_router_configure_auto_link(qd->router, entity);
+    return qd_error_code();
+}*/
+
 qd_error_t qd_dispatch_configure_policy(qd_dispatch_t *qd, qd_entity_t *entity)
 {
     qd_error_t err;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/src/iterator.c
----------------------------------------------------------------------
diff --git a/src/iterator.c b/src/iterator.c
index 75d0196..f4483cc 100644
--- a/src/iterator.c
+++ b/src/iterator.c
@@ -390,6 +390,9 @@ qd_field_iterator_t* qd_address_iterator_binary(const char 
*text, int length, qd
 
 qd_field_iterator_t *qd_address_iterator_buffer(qd_buffer_t *buffer, int 
offset, int length, qd_iterator_view_t view)
 {
+    if (!buffer)
+        return 0;
+
     qd_field_iterator_t *iter = new_qd_field_iterator_t();
     if (!iter)
         return 0;

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/src/router_core/agent_config_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_config_address.c 
b/src/router_core/agent_config_address.c
index edb9ba5..f30f3fa 100644
--- a/src/router_core/agent_config_address.c
+++ b/src/router_core/agent_config_address.c
@@ -19,6 +19,7 @@
 
 #include <qpid/dispatch/ctools.h>
 #include "agent_config_address.h"
+#include "schema_enum.h"
 #include <inttypes.h>
 #include <stdio.h>
 
@@ -322,11 +323,11 @@ void qdra_config_address_create_CT(qdr_core_t          
*core,
         //
         // Extract the fields from the request
         //
-        qd_parsed_field_t *prefix_field    = qd_parse_value_by_key(in_body, 
qdr_config_address_columns[QDR_CONFIG_ADDRESS_PREFIX]);
-        qd_parsed_field_t *distrib_field   = qd_parse_value_by_key(in_body, 
qdr_config_address_columns[QDR_CONFIG_ADDRESS_DISTRIBUTION]);
-        qd_parsed_field_t *waypoint_field  = qd_parse_value_by_key(in_body, 
qdr_config_address_columns[QDR_CONFIG_ADDRESS_WAYPOINT]);
-        qd_parsed_field_t *in_phase_field  = qd_parse_value_by_key(in_body, 
qdr_config_address_columns[QDR_CONFIG_ADDRESS_IN_PHASE]);
-        qd_parsed_field_t *out_phase_field = qd_parse_value_by_key(in_body, 
qdr_config_address_columns[QDR_CONFIG_ADDRESS_OUT_PHASE]);
+        qd_parsed_field_t *prefix_field    = qd_parse_value_by_key(in_body, 
qdr_config_address_columns[QD_SCHEMA_FIXEDADDRESS_ATTRIBUTES_PREFIX]);
+        qd_parsed_field_t *distrib_field   = qd_parse_value_by_key(in_body, 
qdr_config_address_columns[QD_SCHEMA_ADDRESS_ATTRIBUTES_DISTRIBUTION]);
+        qd_parsed_field_t *waypoint_field  = qd_parse_value_by_key(in_body, 
qdr_config_address_columns[QD_SCHEMA_ADDRESS_ATTRIBUTES_WAYPOINT]);
+        qd_parsed_field_t *in_phase_field  = qd_parse_value_by_key(in_body, 
qdr_config_address_columns[QD_SCHEMA_ADDRESS_ATTRIBUTES_INGRESSPHASE]);
+        qd_parsed_field_t *out_phase_field = qd_parse_value_by_key(in_body, 
qdr_config_address_columns[QD_SCHEMA_ADDRESS_ATTRIBUTES_EGRESSPHASE]);
 
         //
         // Prefix field is mandatory.  Fail if it is not here.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/src/router_core/management_agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/management_agent.c 
b/src/router_core/management_agent.c
index 030499a..e1cf1cb 100644
--- a/src/router_core/management_agent.c
+++ b/src/router_core/management_agent.c
@@ -27,6 +27,7 @@
 #include "router_core_private.h"
 #include "dispatch_private.h"
 #include "agent_link.h"
+#include "schema_enum.h"
 #include "alloc.h"
 
 const char *ENTITY = "entityType";
@@ -72,14 +73,15 @@ typedef enum {
 
 
 typedef struct qd_management_context_t {
-    qd_message_t               *msg;
-    qd_message_t               *source;
-    qd_composed_field_t        *field;
-    qdr_query_t                *query;
-    qdr_core_t                 *core;
-    int                         count;
-    int                         current_count;
-    qd_router_operation_type_t  operation_type;
+    qd_message_t                *response;
+    qd_composed_field_t         *out_body;
+    qd_field_iterator_t         *reply_to;
+    qd_field_iterator_t         *correlation_id;
+    qdr_query_t                 *query;
+    qdr_core_t                  *core;
+    int                          count;
+    int                          current_count;
+    qd_schema_entity_operation_t operation_type;
 } qd_management_context_t ;
 
 ALLOC_DECLARE(qd_management_context_t);
@@ -88,22 +90,22 @@ ALLOC_DEFINE(qd_management_context_t);
 /**
  * Convenience function to create and initialize context 
(qd_management_context_t)
  */
-static qd_management_context_t* qd_management_context(qd_message_t             
  *msg,
-                                                      qd_message_t             
  *source,
-                                                      qd_composed_field_t      
  *field,
-                                                      qdr_query_t              
  *query,
-                                                      qdr_core_t               
  *core,
-                                                      
qd_router_operation_type_t operation_type,
-                                                      int                      
  count)
+static qd_management_context_t* qd_management_context(qd_field_iterator_t      
    *reply_to,
+                                                      qd_field_iterator_t      
    *correlation_id,
+                                                      qdr_query_t              
    *query,
+                                                      qdr_core_t               
    *core,
+                                                      
qd_schema_entity_operation_t operation_type,
+                                                      int                      
    count)
 {
     qd_management_context_t *ctx = new_qd_management_context_t();
-    ctx->count  = count;
-    ctx->field  = field;
-    ctx->msg    = msg;
-    ctx->source = qd_message_copy(source);
-    ctx->query  = query;
-    ctx->current_count = 0;
-    ctx->core   = core;
+    ctx->response       = qd_message();
+    ctx->out_body       = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+    ctx->reply_to       = reply_to;
+    ctx->correlation_id = correlation_id;
+    ctx->query          = query;
+    ctx->core           = core;
+    ctx->count          = count;
+    ctx->current_count  = 0;
     ctx->operation_type = operation_type;
 
     return ctx;
@@ -131,18 +133,16 @@ static void qd_set_response_status(const qd_amqp_error_t 
*error, qd_composed_fie
 }
 
 
-static void qd_set_properties(qd_message_t        *msg,
-                              qd_field_iterator_t **reply_to,
+static void qd_set_properties(qd_field_iterator_t *correlation_id,
+                              qd_field_iterator_t *reply_to,
                               qd_composed_field_t **fld)
 {
-    qd_field_iterator_t *correlation_id = qd_message_field_iterator_typed(msg, 
QD_FIELD_CORRELATION_ID);
-    // Grab the reply_to field from the incoming message. This is the address 
we will send the response to.
-    *reply_to = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO);
+    // Set the correlation_id and reply_to on fld
     *fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
     qd_compose_start_list(*fld);
     qd_compose_insert_null(*fld);                           // message-id
     qd_compose_insert_null(*fld);                           // user-id
-    qd_compose_insert_string_iterator(*fld, *reply_to);     // to
+    qd_compose_insert_string_iterator(*fld, reply_to);     // to
     qd_compose_insert_null(*fld);                           // subject
     qd_compose_insert_null(*fld);
     qd_compose_insert_typed_iterator(*fld, correlation_id);
@@ -153,34 +153,34 @@ static void qd_set_properties(qd_message_t        *msg,
 
 static void qd_manage_response_handler(void *context, const qd_amqp_error_t 
*status, bool more)
 {
-    qd_management_context_t *ctx = (qd_management_context_t*) context;
+    qd_management_context_t *mgmt_ctx = (qd_management_context_t*) context;
 
-    if (ctx->operation_type == QD_ROUTER_OPERATION_QUERY) {
+    if (mgmt_ctx->operation_type == QD_SCHEMA_ENTITY_OPERATION_QUERY) {
         if (status->status / 100 == 2) { // There is no error, proceed to 
conditionally call get_next
             if (more) {
-               ctx->current_count++; // Increment how many you have at hand
-               if (ctx->count != ctx->current_count) {
-                   qdr_query_get_next(ctx->query);
+               mgmt_ctx->current_count++; // Increment how many you have at 
hand
+               if (mgmt_ctx->count != mgmt_ctx->current_count) {
+                   qdr_query_get_next(mgmt_ctx->query);
                    return;
                } else
                    //
                    // This is the one case where the core agent won't free the 
query itself.
                    //
-                   qdr_query_free(ctx->query);
+                   qdr_query_free(mgmt_ctx->query);
             }
         }
-        qd_compose_end_list(ctx->field);
-        qd_compose_end_map(ctx->field);
+        qd_compose_end_list(mgmt_ctx->out_body);
+        qd_compose_end_map(mgmt_ctx->out_body);
     }
-    else if (ctx->operation_type == QD_ROUTER_OPERATION_DELETE) {
+    else if (mgmt_ctx->operation_type == QD_SCHEMA_ENTITY_OPERATION_DELETE) {
         // The body of the delete response message MUST consist of an 
amqp-value section containing a Map with zero entries.
-        qd_compose_start_map(ctx->field);
-        qd_compose_end_map(ctx->field);
+        qd_compose_start_map(mgmt_ctx->out_body);
+        qd_compose_end_map(mgmt_ctx->out_body);
     }
-    else if (ctx->operation_type == QD_ROUTER_OPERATION_READ) {
+    else if (mgmt_ctx->operation_type == QD_SCHEMA_ENTITY_OPERATION_READ) {
         if (status->status / 100 != 2) {
-            qd_compose_start_map(ctx->field);
-            qd_compose_end_map(ctx->field);
+            qd_compose_start_map(mgmt_ctx->out_body);
+            qd_compose_end_map(mgmt_ctx->out_body);
         }
     }
 
@@ -189,14 +189,14 @@ static void qd_manage_response_handler(void *context, 
const qd_amqp_error_t *sta
 
     // Start composing the message.
     // First set the properties on the message like reply_to, correlation-id 
etc.
-    qd_set_properties(ctx->source, &reply_to, &fld);
+    qd_set_properties(mgmt_ctx->correlation_id, mgmt_ctx->reply_to, &fld);
 
     // Second, set the status on the message, QD_AMQP_OK or 
QD_AMQP_BAD_REQUEST and so on.
     qd_set_response_status(status, &fld);
 
     // Finally, compose and send the message.
-    qd_message_compose_3(ctx->msg, fld, ctx->field);
-    qdr_send_to1(ctx->core, ctx->msg, reply_to, true, false);
+    qd_message_compose_3(mgmt_ctx->response, fld, mgmt_ctx->out_body);
+    qdr_send_to1(mgmt_ctx->core, mgmt_ctx->response, reply_to, true, false);
 
     // We have come to the very end. Free the appropriate memory.
     // Just go over this with Ted to see if I freed everything.
@@ -204,25 +204,31 @@ static void qd_manage_response_handler(void *context, 
const qd_amqp_error_t *sta
     qd_field_iterator_free(reply_to);
     qd_compose_free(fld);
 
-    qd_message_free(ctx->msg);
-    qd_message_free(ctx->source);
-    qd_compose_free(ctx->field);
+    qd_message_free(mgmt_ctx->response);
+    qd_compose_free(mgmt_ctx->out_body);
 
-    free_qd_management_context_t(ctx);
+    free_qd_management_context_t(mgmt_ctx);
 }
 
 
-static void qd_core_agent_query_handler(qdr_core_t                 *core,
-                                        qd_router_entity_type_t     
entity_type,
-                                        qd_router_operation_type_t  
operation_type,
-                                        qd_message_t               *msg,
-                                        int                        *count,
-                                        int                        *offset)
+void qd_core_agent_query_handler(void                       *ctx,
+                                 qd_field_iterator_t        *reply_to,
+                                 qd_field_iterator_t        *correlation_id,
+                                 qd_router_entity_type_t     entity_type,
+                                 qd_router_operation_type_t  operation_type,
+                                 int                        *count,
+                                 int                        *offset,
+                                 qd_parsed_field_t          *in_body)
 {
+    qdr_core_t *core = (qdr_core_t*)ctx;
+
+    // Call local function that creates and returns a local 
qd_management_context_t object containing the values passed in.
+    qd_management_context_t *mgmt_ctx = qd_management_context(reply_to, 
correlation_id, 0, core, operation_type, (*count));
+
     //
     // Add the Body.
     //
-    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 
0);
+    qd_composed_field_t *field = mgmt_ctx->out_body;
 
     // Start a map in the body. Look for the end map in the callback function, 
qd_manage_response_handler.
     qd_compose_start_map(field);
@@ -230,127 +236,107 @@ static void qd_core_agent_query_handler(qdr_core_t      
           *core,
     //add a "attributeNames" key
     qd_compose_insert_string(field, ATTRIBUTE_NAMES);
 
-    // Call local function that creates and returns a local 
qd_management_context_t object containing the values passed in.
-    qd_management_context_t *ctx = qd_management_context(qd_message(), msg, 
field, 0, core, operation_type, (*count));
-
     // Grab the attribute names from the incoming message body. The attribute 
names will be used later on in the response.
     qd_parsed_field_t *attribute_names_parsed_field = 0;
 
-    qd_field_iterator_t *body_iter = qd_message_field_iterator(msg, 
QD_FIELD_BODY);
-
-    qd_parsed_field_t *body = qd_parse(body_iter);
-    if (body != 0 && qd_parse_is_map(body)) {
-        attribute_names_parsed_field = qd_parse_value_by_key(body, 
ATTRIBUTE_NAMES);
+    if (in_body != 0 && qd_parse_is_map(in_body)) {
+        attribute_names_parsed_field = qd_parse_value_by_key(in_body, 
ATTRIBUTE_NAMES);
     }
 
     // Set the callback function.
     qdr_manage_handler(core, qd_manage_response_handler);
-    ctx->query = qdr_manage_query(core, ctx, entity_type, 
attribute_names_parsed_field, field);
+    mgmt_ctx->query = qdr_manage_query(core, mgmt_ctx, entity_type, 
attribute_names_parsed_field, field);
 
     //Add the attribute names
-    qdr_query_add_attribute_names(ctx->query); //this adds a list of attribute 
names like ["attribute1", "attribute2", "attribute3", "attribute4",]
+    qdr_query_add_attribute_names(mgmt_ctx->query); //this adds a list of 
attribute names like ["attribute1", "attribute2", "attribute3", "attribute4",]
     qd_compose_insert_string(field, results); //add a "results" key
     qd_compose_start_list(field); //start the list for results
 
-    qdr_query_get_first(ctx->query, (*offset));
+    qdr_query_get_first(mgmt_ctx->query, (*offset));
 
-    qd_field_iterator_free(body_iter);
-    qd_parse_free(body);
+    qd_parse_free(in_body);
 }
 
 
-static void qd_core_agent_read_handler(qdr_core_t                 *core,
-                                       qd_message_t               *msg,
-                                       qd_router_entity_type_t     entity_type,
-                                       qd_router_operation_type_t  
operation_type,
-                                       qd_field_iterator_t        
*identity_iter,
-                                       qd_field_iterator_t        *name_iter)
+void qd_core_agent_read_handler(void                       *ctx,
+                                qd_field_iterator_t        *reply_to,
+                                qd_field_iterator_t        *correlation_id,
+                                qd_router_entity_type_t     entity_type,
+                                qd_router_operation_type_t  operation_type,
+                                qd_field_iterator_t        *identity_iter,
+                                qd_field_iterator_t        *name_iter)
 {
-    //
-    // Add the Body
-    //
-    qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+    qdr_core_t *core = (qdr_core_t*)ctx;
 
     // Set the callback function.
     qdr_manage_handler(core, qd_manage_response_handler);
 
     // Call local function that creates and returns a qd_management_context_t 
containing the values passed in.
-    qd_management_context_t *ctx = qd_management_context(qd_message(), msg, 
body, 0, core, operation_type, 0);
+    qd_management_context_t *mgmt_ctx = qd_management_context(reply_to, 
correlation_id, 0, core, operation_type, 0);
 
     //Call the read API function
-    qdr_manage_read(core, ctx, entity_type, name_iter, identity_iter, body);
+    qdr_manage_read(core, mgmt_ctx, entity_type, name_iter, identity_iter, 
mgmt_ctx->out_body);
 }
 
 
-static void qd_core_agent_create_handler(qdr_core_t                 *core,
-                                         qd_message_t               *msg,
-                                         qd_router_entity_type_t     
entity_type,
-                                         qd_router_operation_type_t  
operation_type,
-                                         qd_field_iterator_t        *name_iter)
+void qd_core_agent_create_handler(void                       *ctx,
+                                  qd_field_iterator_t        *reply_to,
+                                  qd_field_iterator_t        *correlation_id,
+                                  qd_router_entity_type_t     entity_type,
+                                  qd_router_operation_type_t  operation_type,
+                                  qd_field_iterator_t        *name_iter,
+                                  qd_parsed_field_t          *in_body)
 {
-    //
-    // Add the Body
-    //
-    qd_composed_field_t *out_body = 
qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
-
+    qdr_core_t *core = (qdr_core_t*)ctx;
     // Set the callback function.
     qdr_manage_handler(core, qd_manage_response_handler);
 
     // Call local function that creates and returns a qd_management_context_t 
containing the values passed in.
-    qd_management_context_t *ctx = qd_management_context(qd_message(), msg, 
out_body, 0, core, operation_type, 0);
-
-    qd_field_iterator_t *body_iter = qd_message_field_iterator(msg, 
QD_FIELD_BODY);
+    qd_management_context_t *mgmt_ctx = qd_management_context(reply_to, 
correlation_id, 0, core, operation_type, 0);
 
-    qd_parsed_field_t *in_body = qd_parse(body_iter);
+    qdr_manage_create(core, mgmt_ctx, entity_type, name_iter, in_body, 
mgmt_ctx->out_body);
 
-    qdr_manage_create(core, ctx, entity_type, name_iter, in_body, out_body);
-
-    qd_field_iterator_free(body_iter);
 }
 
 
-static void qd_core_agent_update_handler(qdr_core_t                 *core,
-                                         qd_message_t               *msg,
-                                         qd_router_entity_type_t     
entity_type,
-                                         qd_router_operation_type_t  
operation_type,
-                                         qd_field_iterator_t        
*identity_iter,
-                                         qd_field_iterator_t        *name_iter)
+void qd_core_agent_update_handler(void                       *ctx,
+                                  qd_field_iterator_t        *reply_to,
+                                  qd_field_iterator_t        *correlation_id,
+                                  qd_router_entity_type_t     entity_type,
+                                  qd_router_operation_type_t  operation_type,
+                                  qd_field_iterator_t        *identity_iter,
+                                  qd_field_iterator_t        *name_iter,
+                                  qd_parsed_field_t          *in_body)
 {
-    qd_composed_field_t *out_body = 
qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+    qdr_core_t *core = (qdr_core_t*)ctx;
 
     // Set the callback function.
     qdr_manage_handler(core, qd_manage_response_handler);
 
-    qd_management_context_t *ctx = qd_management_context(qd_message(), msg, 
out_body, 0, core, operation_type, 0);
-
-    qd_field_iterator_t *iter = qd_message_field_iterator(msg, QD_FIELD_BODY);
-    qd_parsed_field_t *in_body= qd_parse(iter);
-    qd_field_iterator_free(iter);
-
-    qdr_manage_update(core, ctx, entity_type, name_iter, identity_iter, 
in_body, out_body);
+    qd_management_context_t *mgmt_ctx = qd_management_context(reply_to, 
correlation_id, 0, core, operation_type, 0);
 
+    qdr_manage_update(core, mgmt_ctx, entity_type, name_iter, identity_iter, 
in_body, mgmt_ctx->out_body);
 }
 
 
-static void qd_core_agent_delete_handler(qdr_core_t                 *core,
-                                         qd_message_t               *msg,
-                                         qd_router_entity_type_t     
entity_type,
-                                         qd_router_operation_type_t  
operation_type,
-                                         qd_field_iterator_t        
*identity_iter,
-                                         qd_field_iterator_t        *name_iter)
+void qd_core_agent_delete_handler(void                       *ctx,
+                                  qd_field_iterator_t        *reply_to,
+                                  qd_field_iterator_t        *correlation_id,
+                                  qd_message_t               *msg,
+                                  qd_router_entity_type_t     entity_type,
+                                  qd_router_operation_type_t  operation_type,
+                                  qd_field_iterator_t        *identity_iter,
+                                  qd_field_iterator_t        *name_iter)
 {
-    //
-    // Add the Body
-    //
-    qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0);
+    qdr_core_t *core = (qdr_core_t*)ctx;
 
     // Set the callback function.
     qdr_manage_handler(core, qd_manage_response_handler);
 
     // Call local function that creates and returns a qd_management_context_t 
containing the values passed in.
-    qd_management_context_t *ctx = qd_management_context(qd_message(), msg, 
body, 0, core, operation_type, 0);
+    qd_management_context_t *mgmt_ctx = qd_management_context(reply_to, 
correlation_id, 0, core, operation_type, 0);
 
-    qdr_manage_delete(core, ctx, entity_type, name_iter, identity_iter);
+    qdr_manage_delete(core, mgmt_ctx, entity_type, name_iter, identity_iter);
 }
 
 
@@ -358,7 +344,7 @@ static void qd_core_agent_delete_handler(qdr_core_t         
        *core,
  * Checks the content of the message to see if this can be handled by the 
C-management agent. If this agent cannot handle it, it will be
  * forwarded to the Python agent.
  */
-static bool qd_can_handle_request(qd_parsed_field_t           *properties_fld,
+bool qd_can_handle_request(qd_parsed_field_t           *properties_fld,
                                   qd_router_entity_type_t     *entity_type,
                                   qd_router_operation_type_t  *operation_type,
                                   qd_field_iterator_t        **identity_iter,
@@ -445,55 +431,3 @@ static bool qd_can_handle_request(qd_parsed_field_t        
   *properties_fld,
     return true;
 }
 
-
-/**
- *
- * Handler for the management agent.
- *
- */
-void qdr_management_agent_on_message(void *context, qd_message_t *msg, int 
unused_link_id, int unused_cost)
-{
-    qdr_core_t *core = (qdr_core_t*) context;
-    qd_field_iterator_t *app_properties_iter = qd_message_field_iterator(msg, 
QD_FIELD_APPLICATION_PROPERTIES);
-
-    qd_router_entity_type_t    entity_type = 0;
-    qd_router_operation_type_t operation_type = 0;
-
-    qd_field_iterator_t *identity_iter = 0;
-    qd_field_iterator_t *name_iter = 0;
-
-    int32_t count = 0;
-    int32_t offset = 0;
-
-    qd_parsed_field_t *properties_fld = qd_parse(app_properties_iter);
-
-    if (qd_can_handle_request(properties_fld, &entity_type, &operation_type, 
&identity_iter, &name_iter, &count, &offset)) {
-        switch (operation_type) {
-        case QD_ROUTER_OPERATION_QUERY:
-            qd_core_agent_query_handler(core, entity_type, operation_type, 
msg, &count, &offset);
-            break;
-        case QD_ROUTER_OPERATION_CREATE:
-            qd_core_agent_create_handler(core, msg, entity_type, 
operation_type, name_iter);
-            break;
-        case QD_ROUTER_OPERATION_READ:
-            qd_core_agent_read_handler(core, msg, entity_type, operation_type, 
identity_iter, name_iter);
-            break;
-        case QD_ROUTER_OPERATION_UPDATE:
-            qd_core_agent_update_handler(core, msg, entity_type, 
operation_type, identity_iter, name_iter);
-            break;
-        case QD_ROUTER_OPERATION_DELETE:
-            qd_core_agent_delete_handler(core, msg, entity_type, 
operation_type, identity_iter, name_iter);
-            break;
-        }
-    } else {
-        //
-        // The C management agent is not going to handle this request. Forward 
it off to Python.
-        //
-        qdr_send_to2(core, msg, MANAGEMENT_INTERNAL, false, false);
-    }
-
-    qd_field_iterator_free(app_properties_iter);
-    qd_parse_free(properties_fld);
-
-}
-

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index e8cc06b..b3c3986 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -71,16 +71,6 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t 
mode, const char *area,
     //
     core->thread = sys_thread(router_core_thread, core);
 
-    //
-    // Perform outside-of-thread setup for the management agent
-    //
-    core->agent_subscription_mobile = qdr_core_subscribe(core, "$management", 
'M', '0',
-                                                         
QD_TREATMENT_ANYCAST_CLOSEST,
-                                                         
qdr_management_agent_on_message, core);
-    core->agent_subscription_local = qdr_core_subscribe(core, "$management", 
'L', '0',
-                                                        
QD_TREATMENT_ANYCAST_CLOSEST,
-                                                        
qdr_management_agent_on_message, core);
-
     return core;
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index a578810..a59abb5 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -583,7 +583,6 @@ struct qdr_core_t {
 
 void *router_core_thread(void *arg);
 uint64_t qdr_identifier(qdr_core_t* core);
-void qdr_management_agent_on_message(void *context, qd_message_t *msg, int 
link_id, int cost);
 void  qdr_route_table_setup_CT(qdr_core_t *core);
 void  qdr_agent_setup_CT(qdr_core_t *core);
 void  qdr_forwarder_setup_CT(qdr_core_t *core);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to