DISPATCH-437 - Work in progress - added AgentRequestAdapter and ordinality
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/e2ad8da8 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/e2ad8da8 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/e2ad8da8 Branch: refs/heads/DISPATCH-437-1 Commit: e2ad8da8acefd5d2f15f58d73330fe2c2037fd9e Parents: d7fcfef Author: Ganesh Murthy <[email protected]> Authored: Mon Aug 1 12:34:36 2016 -0400 Committer: Ganesh Murthy <[email protected]> Committed: Mon Aug 1 12:41:06 2016 -0400 ---------------------------------------------------------------------- .../management/__init__.py | 4 + .../qpid_dispatch_internal/management/agent.py | 207 ++++++++----- .../qpid_dispatch_internal/management/config.py | 4 +- .../qpid_dispatch_internal/management/schema.py | 39 ++- src/CMakeLists.txt | 3 +- src/dispatch.c | 8 + src/iterator.c | 3 + src/router_core/agent_config_address.c | 11 +- src/router_core/management_agent.c | 288 +++++++------------ src/router_core/router_core.c | 10 - src/router_core/router_core_private.h | 1 - 11 files changed, 302 insertions(+), 276 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/python/qpid_dispatch_internal/management/__init__.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/management/__init__.py b/python/qpid_dispatch_internal/management/__init__.py index 6ee5966..9d49f47 100644 --- a/python/qpid_dispatch_internal/management/__init__.py +++ b/python/qpid_dispatch_internal/management/__init__.py @@ -18,3 +18,7 @@ # """Qpid Dispatch internal management package.""" +# from .agent import ManagementAgent + +__all__ = ["ManagementAgent"] + http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/python/qpid_dispatch_internal/management/agent.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/management/agent.py b/python/qpid_dispatch_internal/management/agent.py index 6e4b7fc..7ada418 100644 --- a/python/qpid_dispatch_internal/management/agent.py +++ b/python/qpid_dispatch_internal/management/agent.py @@ -83,7 +83,7 @@ from .qdrouter import QdSchema from ..router.message import Message from ..router.address import Address from ..policy.policy_manager import PolicyManager - +from . import AgentRequestHandler def dictstr(d): """Stringify a dict in the form 'k=v, k=v ...' instead of '{k:v, ...}'""" @@ -334,6 +334,9 @@ class VhostStatsEntity(EntityAdapter): def __str__(self): return super(VhostStatsEntity, self).__str__().replace("Entity(", "VhostStatsEntity(") + def __str__(self): + return super(PolicyStatsEntity, self).__str__().replace("Entity(", "PolicyStatsEntity(") + def _host_port_name_identifier(entity): for attr in ['host', 'port', 'name']: # Set default values if need be @@ -741,30 +744,29 @@ class ManagementEntity(EntityAdapter): out.close() raise BadRequestStatus("Bad profile request %s" % (request)) -class Agent(object): - """AMQP managment agent. Manages entities, directs requests to the correct entity.""" +class ManagementAgent(object): + """ + AMQP managment agent. Manages entities, directs requests to the correct entity. + """ - def __init__(self, dispatch, qd): + def __init__(self, address, agent_adapter, dispatch, qd, schema=QdSchema()): self.qd = qd + self.address = address + self.agent_adapter = agent_adapter self.dispatch = dispatch - self.schema = QdSchema() + self.schema = schema self.entities = EntityCache(self) self.request_lock = Lock() self.log_adapter = LogAdapter("AGENT") self.policy = PolicyManager(self) self.management = self.create_entity({"type": "management"}) self.add_entity(self.management) + self.io = IoAdapter(self.receive, address, 'L', '0', TREATMENT_ANYCAST_CLOSEST) def log(self, level, text): info = traceback.extract_stack(limit=2)[0] # Caller frame info self.log_adapter.log(level, text, info[0], info[1]) - def activate(self, address): - """Register the management address to receive management requests""" - self.entities.refresh_from_c() - self.log(LOG_INFO, "Activating management agent on %s" % address) - self.io = IoAdapter(self.receive, address, 'L', '0', TREATMENT_ANYCAST_CLOSEST) - def entity_class(self, entity_type): """Return the class that implements entity_type""" class_name = camelcase(entity_type.short_name, capital=True) + 'Entity' @@ -784,69 +786,11 @@ class Agent(object): entity_type = self.schema.entity_type(attributes['type']) return self.entity_class(entity_type)(self, entity_type, attributes) - def respond(self, request, status=OK, description=None, body=None): - """Send a response to the client""" - if body is None: body = {} - description = description or STATUS_TEXT[status] - response = Message( - address=request.reply_to, - correlation_id=request.correlation_id, - properties={'statusCode': status, 'statusDescription': description}, - body=body) - self.log(LOG_DEBUG, "Agent response:\n %s\n Responding to: \n %s"%(response, request)) - try: - self.io.send(response) - except: - self.log(LOG_ERROR, "Can't respond to %s: %s"%(request, format_exc())) - - def receive(self, request, unused_link_id, unused_cost): - """Called when a management request is received.""" - def error(e, trace): - """Raise an error""" - self.log(LOG_ERROR, "Error dispatching %s: %s\n%s"%(request, e, trace)) - self.respond(request, e.status, e.description) - - # If there's no reply_to, don't bother to process the request. - if not request.reply_to: - return - - # Coarse locking, handle one request at a time. - with self.request_lock: - try: - self.entities.refresh_from_c() - self.log(LOG_DEBUG, "Agent request %s"% request) - status, body = self.handle(request) - self.respond(request, status=status, body=body) - except ManagementError, e: - error(e, format_exc()) - except ValidationError, e: - error(BadRequestStatus(str(e)), format_exc()) - except Exception, e: - error(InternalServerErrorStatus("%s: %s"%(type(e).__name__, e)), format_exc()) - def entity_type(self, type): - try: return self.schema.entity_type(type) - except ValidationError, e: raise NotFoundStatus(str(e)) - - def handle(self, request): - """ - Handle a request. - Dispatch management node requests to self, entity requests to the entity. - @return: (response-code, body) - """ - operation = required_property('operation', request) - if operation.lower() == 'create': - # Create requests are entity requests but must be handled by the agent since - # the entity does not yet exist. - return self.create(request) - else: - target = self.find_entity(request) - target.entity_type.allowed(operation, request.body) - try: - method = getattr(target, operation.lower().replace("-", "_")) - except AttributeError: - not_implemented(operation, target.type) - return method(request) + try: + return self.schema.entity_type(type) + except ValidationError, e: + raise NotFoundStatus(str(e)) def _create(self, attributes): """Create an entity, called externally or from configuration file.""" @@ -901,12 +845,19 @@ class Agent(object): """Remove and internal python implementation object.""" self.entities.remove_implementation(id(implementation)) + def requested_entity_type(self, request): + entity_type = request.properties.get('entityType') + if entity_type: + return self.schema.entity_type(entity_type) + return None + def find_entity(self, request): """Find the entity addressed by request""" requested_type = request.properties.get('type') if requested_type: requested_type = self.schema.entity_type(requested_type) + # ids is a map of identifying attribute values ids = dict((k, request.properties.get(k)) for k in ['name', 'identity'] if k in request.properties) @@ -945,3 +896,113 @@ class Agent(object): def find_entity_by_type(self, type): return self.entities.map_type(None, type) + + def handle(self, request): + """ + Handle a request. + Dispatch management node requests to self, entity requests to the entity. + @return: (response-code, body) + """ + # Get the operation from the request. This could be CREATE or READ or UPDATE or DELETE or QUERY + operation = required_property('operation', request) + + # Get the entity type from the request. For e.g. entity_type could be a connector entity or a listener entity + entity_type = self.requested_entity_type(request) + + # Get the type from the request + requested_type = request.properties.get('type') # Should be org.amqp.management + if requested_type: + requested_type = self.schema.entity_type(requested_type) + if operation not in requested_type.operations: + # Is the operation allowed on the entity_type, if not throw an exception + entity_type.allowed(operation) + + # target = self.find_entity(request) + + # Parameters required to post the request to the work queue + reply_to = request.reply_to + correlation_id = request.correlation_id + entity_type_ordinality = entity_type.ordinality + #operation_ordinality = operation.ordinality + + print 'reply_to, correlation_id, entity_type_ordinality ', reply_to, correlation_id, entity_type_ordinality + + if operation: + operation = operation.lower() + if operation == 'create': + + # request.body is already a map with the create parameters + # request.properties has the operation, name, type + request_handler.qd_post_management_request(operation, entity_type_ordinality, correlation_id, reply_to, + request.body) + + request_type = request.properties.get('type') + if self.schema.is_long_name(request_type): + enum_type = self.schema.type_map.get(request_type) + else: + # Get the corresponding long name from short name + long_type = self.schema.short_long_type_map.get(request_type) + enum_type = self.schema.type_map.get(long_type) + + # post an entry into the work queue + #qd_dispatch_post_management_request(self.dispatch, oper, enum_type) + + else: + raise not_implemented(operation, target.type) + """ + if operation.lower() == 'create': + # Create requests are entity requests but must be handled by the agent since + # the entity does not yet exist. + return self.create(request) + else: + target = self.find_entity(request) + target.entity_type.allowed(operation, request.body) + try: + method = getattr(target, operation.lower().replace("-", "_")) + except AttributeError: + not_implemented(operation, target.type) + return method(request) + """ + + def respond(self, request, status=OK, description=None, body=None): + """Send a response to the client""" + if body is None: body = {} + description = description or STATUS_TEXT[status] + response = Message( + address=request.reply_to, + correlation_id=request.correlation_id, + properties={'statusCode': status, 'statusDescription': description}, + body=body) + self.log(LOG_DEBUG, "Agent response:\n %s\n Responding to: \n %s"%(response, request)) + try: + self.io.send(response) + except: + self.log(LOG_ERROR, "Can't respond to %s: %s"%(request, format_exc())) + + def receive(self, request, unused_link_id, unused_cost): + """ + Called when a management request is received. + The init function registers this function as the one to be called by creating an IOAdapter. + """ + # If there's no reply_to, don't bother to process the request. + if not request.reply_to: + return + + def error(e, trace): + """Raise an error""" + self.log(LOG_ERROR, "Error dispatching %s: %s\n%s"%(request, e, trace)) + self.respond(request, e.status, e.description) + + # Coarse locking, handle one request at a time. + with self.request_lock: + try: + self.entities.refresh_from_c() + self.log(LOG_DEBUG, "Agent request %s"% request) + status, body = self.handle(request) + self.respond(request, status=status, body=body) + except ManagementError, e: + error(e, format_exc()) + except ValidationError, e: + error(BadRequestStatus(str(e)), format_exc()) + except Exception, e: + error(InternalServerErrorStatus("%s: %s"%(type(e).__name__, e)), format_exc()) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/python/qpid_dispatch_internal/management/config.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/management/config.py b/python/qpid_dispatch_internal/management/config.py index a7b82c7..88823db 100644 --- a/python/qpid_dispatch_internal/management/config.py +++ b/python/qpid_dispatch_internal/management/config.py @@ -149,9 +149,11 @@ def configure_dispatch(dispatch, lib_handle, filename): # Configure and prepare container and router before we can activate the agent. configure(config.by_type('container')[0]) configure(config.by_type('router')[0]) + qd.qd_dispatch_prepare(dispatch) qd.qd_router_setup_late(dispatch) # Actions requiring active management agent. - agent.activate("$_management_internal") + + #agent.activate("$_management_internal") from qpid_dispatch_internal.display_name.display_name import DisplayNameService displayname_service = DisplayNameService("$displayname") http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/python/qpid_dispatch_internal/management/schema.py ---------------------------------------------------------------------- diff --git a/python/qpid_dispatch_internal/management/schema.py b/python/qpid_dispatch_internal/management/schema.py index 7559c73..78c77ad 100644 --- a/python/qpid_dispatch_internal/management/schema.py +++ b/python/qpid_dispatch_internal/management/schema.py @@ -208,7 +208,7 @@ class AttributeType(object): def __init__(self, name, type=None, defined_in=None, default=None, required=False, unique=False, hidden=False, deprecated=False, - value=None, description="", create=False, update=False, graph=False): + value=None, description="", create=False, update=False, graph=False, ordinality=0): """ See L{AttributeType} instance variables. """ @@ -224,6 +224,7 @@ class AttributeType(object): self.value = value self.unique = unique self.description = description + self.ordinality = ordinality if self.value is not None and self.default is not None: raise ValidationError("Attribute '%s' has default value and fixed value" % self.name) @@ -321,7 +322,7 @@ class EntityType(object): @ivar referential: True if an entity can be referred to by name from another entity. """ def __init__(self, name, schema, attributes=None, operations=None, operationDefs=None, description="", - fullName=True, singleton=False, deprecated=False, extends=None, referential=False, **kwargs): + fullName=True, singleton=False, deprecated=False, extends=None, referential=False, ordinality=0, **kwargs): """ @param name: name of the entity type. @param schema: schema for this type. @@ -341,8 +342,9 @@ class EntityType(object): self.short_name = self.short_name.replace("router.config.", "") else: self.name = self.short_name = name + self.attributes = OrderedDict((k, AttributeType(k, defined_in=self, **v)) - for k, v in (attributes or {}).iteritems()) + for k, v in (attributes or {}).iteritems()) self.operations = operations or [] # Bases are resolved in self.init() self.base = extends @@ -352,6 +354,7 @@ class EntityType(object): self.singleton = singleton self.deprecated = deprecated self.referential = referential + self.ordinality = ordinality self._init = False # Have not yet initialized from base and attributes. # Operation definitions self.operation_defs = dict((name, OperationDef(name, **op)) @@ -381,13 +384,21 @@ class EntityType(object): self.operations += other.operations check(self.attributes.iterkeys(), other.attributes.itervalues(), "attributes") self.attributes.update(other.attributes) + if other.name == 'entity': # Fill in entity "type" attribute automatically. self.attributes["type"]["value"] = self.name - def extends(self, base): return base in self.all_bases + ordinality = 0 + for attrib in self.attributes.values(): + attrib.ordinality = ordinality + ordinality += 1 - def is_a(self, type): return type == self or self.extends(type) + def extends(self, base): + return base in self.all_bases + + def is_a(self, type): + return type == self or self.extends(type) def attribute(self, name): """Get the AttributeType for name""" @@ -433,7 +444,7 @@ class EntityType(object): return attributes - def allowed(self, op, body): + def allowed(self, op): """Raise exception if op is not a valid operation on entity.""" op = op.upper() if not op in self.operations: @@ -505,24 +516,36 @@ class Schema(object): self.all_attributes = set() + ordinality = 0 for e in self.entity_types.itervalues(): e.init() + e.ordinality = ordinality self.all_attributes.update(e.attributes.keys()) + ordinality += 1 def short_name(self, name): """Remove prefix from name if present""" - if not name: return name + if not name: + return name if name.startswith(self.prefixdot): name = name[len(self.prefixdot):] return name def long_name(self, name): """Add prefix to unqualified name""" - if not name: return name + if not name: + return name if not name.startswith(self.prefixdot): name = self.prefixdot + name return name + def is_long_name(self, name): + if not name: + return False + if self.prefixdot in name: + return True + return False + def dump(self): """Return json-friendly representation""" return OrderedDict([ http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index c5aa589..3a78801 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -83,6 +83,7 @@ set(qpid_dispatch_SOURCES router_core/transfer.c router_node.c router_pynode.c + agent_adapter.c schema_enum.c server.c timer.c @@ -94,7 +95,7 @@ if(USE_MEMORY_POOL) endif() set_property( - SOURCE python_embedded.c router_pynode.c + SOURCE python_embedded.c router_pynode.c agent_adapter.c PROPERTY COMPILE_FLAGS -Wno-strict-aliasing ) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/src/dispatch.c ---------------------------------------------------------------------- diff --git a/src/dispatch.c b/src/dispatch.c index 1b39228..cb5c34e 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -174,6 +174,14 @@ qd_error_t qd_dispatch_configure_auto_link(qd_dispatch_t *qd, qd_entity_t *entit return qd_error_code(); } +/*qd_error_t qd_dispatch_post_management_request(qd_dispatch_t *qd, + qd_schema_entity_operation_t operation, + qd_schema_entity_type_t entity_type) { + if (!qd->router) return qd_error(QD_ERROR_NOT_FOUND, "No router available"); + qd_router_configure_auto_link(qd->router, entity); + return qd_error_code(); +}*/ + qd_error_t qd_dispatch_configure_policy(qd_dispatch_t *qd, qd_entity_t *entity) { qd_error_t err; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/src/iterator.c ---------------------------------------------------------------------- diff --git a/src/iterator.c b/src/iterator.c index 75d0196..f4483cc 100644 --- a/src/iterator.c +++ b/src/iterator.c @@ -390,6 +390,9 @@ qd_field_iterator_t* qd_address_iterator_binary(const char *text, int length, qd qd_field_iterator_t *qd_address_iterator_buffer(qd_buffer_t *buffer, int offset, int length, qd_iterator_view_t view) { + if (!buffer) + return 0; + qd_field_iterator_t *iter = new_qd_field_iterator_t(); if (!iter) return 0; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/src/router_core/agent_config_address.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent_config_address.c b/src/router_core/agent_config_address.c index edb9ba5..f30f3fa 100644 --- a/src/router_core/agent_config_address.c +++ b/src/router_core/agent_config_address.c @@ -19,6 +19,7 @@ #include <qpid/dispatch/ctools.h> #include "agent_config_address.h" +#include "schema_enum.h" #include <inttypes.h> #include <stdio.h> @@ -322,11 +323,11 @@ void qdra_config_address_create_CT(qdr_core_t *core, // // Extract the fields from the request // - qd_parsed_field_t *prefix_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_PREFIX]); - qd_parsed_field_t *distrib_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_DISTRIBUTION]); - qd_parsed_field_t *waypoint_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_WAYPOINT]); - qd_parsed_field_t *in_phase_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_IN_PHASE]); - qd_parsed_field_t *out_phase_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QDR_CONFIG_ADDRESS_OUT_PHASE]); + qd_parsed_field_t *prefix_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QD_SCHEMA_FIXEDADDRESS_ATTRIBUTES_PREFIX]); + qd_parsed_field_t *distrib_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QD_SCHEMA_ADDRESS_ATTRIBUTES_DISTRIBUTION]); + qd_parsed_field_t *waypoint_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QD_SCHEMA_ADDRESS_ATTRIBUTES_WAYPOINT]); + qd_parsed_field_t *in_phase_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QD_SCHEMA_ADDRESS_ATTRIBUTES_INGRESSPHASE]); + qd_parsed_field_t *out_phase_field = qd_parse_value_by_key(in_body, qdr_config_address_columns[QD_SCHEMA_ADDRESS_ATTRIBUTES_EGRESSPHASE]); // // Prefix field is mandatory. Fail if it is not here. http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/src/router_core/management_agent.c ---------------------------------------------------------------------- diff --git a/src/router_core/management_agent.c b/src/router_core/management_agent.c index 030499a..e1cf1cb 100644 --- a/src/router_core/management_agent.c +++ b/src/router_core/management_agent.c @@ -27,6 +27,7 @@ #include "router_core_private.h" #include "dispatch_private.h" #include "agent_link.h" +#include "schema_enum.h" #include "alloc.h" const char *ENTITY = "entityType"; @@ -72,14 +73,15 @@ typedef enum { typedef struct qd_management_context_t { - qd_message_t *msg; - qd_message_t *source; - qd_composed_field_t *field; - qdr_query_t *query; - qdr_core_t *core; - int count; - int current_count; - qd_router_operation_type_t operation_type; + qd_message_t *response; + qd_composed_field_t *out_body; + qd_field_iterator_t *reply_to; + qd_field_iterator_t *correlation_id; + qdr_query_t *query; + qdr_core_t *core; + int count; + int current_count; + qd_schema_entity_operation_t operation_type; } qd_management_context_t ; ALLOC_DECLARE(qd_management_context_t); @@ -88,22 +90,22 @@ ALLOC_DEFINE(qd_management_context_t); /** * Convenience function to create and initialize context (qd_management_context_t) */ -static qd_management_context_t* qd_management_context(qd_message_t *msg, - qd_message_t *source, - qd_composed_field_t *field, - qdr_query_t *query, - qdr_core_t *core, - qd_router_operation_type_t operation_type, - int count) +static qd_management_context_t* qd_management_context(qd_field_iterator_t *reply_to, + qd_field_iterator_t *correlation_id, + qdr_query_t *query, + qdr_core_t *core, + qd_schema_entity_operation_t operation_type, + int count) { qd_management_context_t *ctx = new_qd_management_context_t(); - ctx->count = count; - ctx->field = field; - ctx->msg = msg; - ctx->source = qd_message_copy(source); - ctx->query = query; - ctx->current_count = 0; - ctx->core = core; + ctx->response = qd_message(); + ctx->out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + ctx->reply_to = reply_to; + ctx->correlation_id = correlation_id; + ctx->query = query; + ctx->core = core; + ctx->count = count; + ctx->current_count = 0; ctx->operation_type = operation_type; return ctx; @@ -131,18 +133,16 @@ static void qd_set_response_status(const qd_amqp_error_t *error, qd_composed_fie } -static void qd_set_properties(qd_message_t *msg, - qd_field_iterator_t **reply_to, +static void qd_set_properties(qd_field_iterator_t *correlation_id, + qd_field_iterator_t *reply_to, qd_composed_field_t **fld) { - qd_field_iterator_t *correlation_id = qd_message_field_iterator_typed(msg, QD_FIELD_CORRELATION_ID); - // Grab the reply_to field from the incoming message. This is the address we will send the response to. - *reply_to = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO); + // Set the correlation_id and reply_to on fld *fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0); qd_compose_start_list(*fld); qd_compose_insert_null(*fld); // message-id qd_compose_insert_null(*fld); // user-id - qd_compose_insert_string_iterator(*fld, *reply_to); // to + qd_compose_insert_string_iterator(*fld, reply_to); // to qd_compose_insert_null(*fld); // subject qd_compose_insert_null(*fld); qd_compose_insert_typed_iterator(*fld, correlation_id); @@ -153,34 +153,34 @@ static void qd_set_properties(qd_message_t *msg, static void qd_manage_response_handler(void *context, const qd_amqp_error_t *status, bool more) { - qd_management_context_t *ctx = (qd_management_context_t*) context; + qd_management_context_t *mgmt_ctx = (qd_management_context_t*) context; - if (ctx->operation_type == QD_ROUTER_OPERATION_QUERY) { + if (mgmt_ctx->operation_type == QD_SCHEMA_ENTITY_OPERATION_QUERY) { if (status->status / 100 == 2) { // There is no error, proceed to conditionally call get_next if (more) { - ctx->current_count++; // Increment how many you have at hand - if (ctx->count != ctx->current_count) { - qdr_query_get_next(ctx->query); + mgmt_ctx->current_count++; // Increment how many you have at hand + if (mgmt_ctx->count != mgmt_ctx->current_count) { + qdr_query_get_next(mgmt_ctx->query); return; } else // // This is the one case where the core agent won't free the query itself. // - qdr_query_free(ctx->query); + qdr_query_free(mgmt_ctx->query); } } - qd_compose_end_list(ctx->field); - qd_compose_end_map(ctx->field); + qd_compose_end_list(mgmt_ctx->out_body); + qd_compose_end_map(mgmt_ctx->out_body); } - else if (ctx->operation_type == QD_ROUTER_OPERATION_DELETE) { + else if (mgmt_ctx->operation_type == QD_SCHEMA_ENTITY_OPERATION_DELETE) { // The body of the delete response message MUST consist of an amqp-value section containing a Map with zero entries. - qd_compose_start_map(ctx->field); - qd_compose_end_map(ctx->field); + qd_compose_start_map(mgmt_ctx->out_body); + qd_compose_end_map(mgmt_ctx->out_body); } - else if (ctx->operation_type == QD_ROUTER_OPERATION_READ) { + else if (mgmt_ctx->operation_type == QD_SCHEMA_ENTITY_OPERATION_READ) { if (status->status / 100 != 2) { - qd_compose_start_map(ctx->field); - qd_compose_end_map(ctx->field); + qd_compose_start_map(mgmt_ctx->out_body); + qd_compose_end_map(mgmt_ctx->out_body); } } @@ -189,14 +189,14 @@ static void qd_manage_response_handler(void *context, const qd_amqp_error_t *sta // Start composing the message. // First set the properties on the message like reply_to, correlation-id etc. - qd_set_properties(ctx->source, &reply_to, &fld); + qd_set_properties(mgmt_ctx->correlation_id, mgmt_ctx->reply_to, &fld); // Second, set the status on the message, QD_AMQP_OK or QD_AMQP_BAD_REQUEST and so on. qd_set_response_status(status, &fld); // Finally, compose and send the message. - qd_message_compose_3(ctx->msg, fld, ctx->field); - qdr_send_to1(ctx->core, ctx->msg, reply_to, true, false); + qd_message_compose_3(mgmt_ctx->response, fld, mgmt_ctx->out_body); + qdr_send_to1(mgmt_ctx->core, mgmt_ctx->response, reply_to, true, false); // We have come to the very end. Free the appropriate memory. // Just go over this with Ted to see if I freed everything. @@ -204,25 +204,31 @@ static void qd_manage_response_handler(void *context, const qd_amqp_error_t *sta qd_field_iterator_free(reply_to); qd_compose_free(fld); - qd_message_free(ctx->msg); - qd_message_free(ctx->source); - qd_compose_free(ctx->field); + qd_message_free(mgmt_ctx->response); + qd_compose_free(mgmt_ctx->out_body); - free_qd_management_context_t(ctx); + free_qd_management_context_t(mgmt_ctx); } -static void qd_core_agent_query_handler(qdr_core_t *core, - qd_router_entity_type_t entity_type, - qd_router_operation_type_t operation_type, - qd_message_t *msg, - int *count, - int *offset) +void qd_core_agent_query_handler(void *ctx, + qd_field_iterator_t *reply_to, + qd_field_iterator_t *correlation_id, + qd_router_entity_type_t entity_type, + qd_router_operation_type_t operation_type, + int *count, + int *offset, + qd_parsed_field_t *in_body) { + qdr_core_t *core = (qdr_core_t*)ctx; + + // Call local function that creates and returns a local qd_management_context_t object containing the values passed in. + qd_management_context_t *mgmt_ctx = qd_management_context(reply_to, correlation_id, 0, core, operation_type, (*count)); + // // Add the Body. // - qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + qd_composed_field_t *field = mgmt_ctx->out_body; // Start a map in the body. Look for the end map in the callback function, qd_manage_response_handler. qd_compose_start_map(field); @@ -230,127 +236,107 @@ static void qd_core_agent_query_handler(qdr_core_t *core, //add a "attributeNames" key qd_compose_insert_string(field, ATTRIBUTE_NAMES); - // Call local function that creates and returns a local qd_management_context_t object containing the values passed in. - qd_management_context_t *ctx = qd_management_context(qd_message(), msg, field, 0, core, operation_type, (*count)); - // Grab the attribute names from the incoming message body. The attribute names will be used later on in the response. qd_parsed_field_t *attribute_names_parsed_field = 0; - qd_field_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY); - - qd_parsed_field_t *body = qd_parse(body_iter); - if (body != 0 && qd_parse_is_map(body)) { - attribute_names_parsed_field = qd_parse_value_by_key(body, ATTRIBUTE_NAMES); + if (in_body != 0 && qd_parse_is_map(in_body)) { + attribute_names_parsed_field = qd_parse_value_by_key(in_body, ATTRIBUTE_NAMES); } // Set the callback function. qdr_manage_handler(core, qd_manage_response_handler); - ctx->query = qdr_manage_query(core, ctx, entity_type, attribute_names_parsed_field, field); + mgmt_ctx->query = qdr_manage_query(core, mgmt_ctx, entity_type, attribute_names_parsed_field, field); //Add the attribute names - qdr_query_add_attribute_names(ctx->query); //this adds a list of attribute names like ["attribute1", "attribute2", "attribute3", "attribute4",] + qdr_query_add_attribute_names(mgmt_ctx->query); //this adds a list of attribute names like ["attribute1", "attribute2", "attribute3", "attribute4",] qd_compose_insert_string(field, results); //add a "results" key qd_compose_start_list(field); //start the list for results - qdr_query_get_first(ctx->query, (*offset)); + qdr_query_get_first(mgmt_ctx->query, (*offset)); - qd_field_iterator_free(body_iter); - qd_parse_free(body); + qd_parse_free(in_body); } -static void qd_core_agent_read_handler(qdr_core_t *core, - qd_message_t *msg, - qd_router_entity_type_t entity_type, - qd_router_operation_type_t operation_type, - qd_field_iterator_t *identity_iter, - qd_field_iterator_t *name_iter) +void qd_core_agent_read_handler(void *ctx, + qd_field_iterator_t *reply_to, + qd_field_iterator_t *correlation_id, + qd_router_entity_type_t entity_type, + qd_router_operation_type_t operation_type, + qd_field_iterator_t *identity_iter, + qd_field_iterator_t *name_iter) { - // - // Add the Body - // - qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + qdr_core_t *core = (qdr_core_t*)ctx; // Set the callback function. qdr_manage_handler(core, qd_manage_response_handler); // Call local function that creates and returns a qd_management_context_t containing the values passed in. - qd_management_context_t *ctx = qd_management_context(qd_message(), msg, body, 0, core, operation_type, 0); + qd_management_context_t *mgmt_ctx = qd_management_context(reply_to, correlation_id, 0, core, operation_type, 0); //Call the read API function - qdr_manage_read(core, ctx, entity_type, name_iter, identity_iter, body); + qdr_manage_read(core, mgmt_ctx, entity_type, name_iter, identity_iter, mgmt_ctx->out_body); } -static void qd_core_agent_create_handler(qdr_core_t *core, - qd_message_t *msg, - qd_router_entity_type_t entity_type, - qd_router_operation_type_t operation_type, - qd_field_iterator_t *name_iter) +void qd_core_agent_create_handler(void *ctx, + qd_field_iterator_t *reply_to, + qd_field_iterator_t *correlation_id, + qd_router_entity_type_t entity_type, + qd_router_operation_type_t operation_type, + qd_field_iterator_t *name_iter, + qd_parsed_field_t *in_body) { - // - // Add the Body - // - qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); - + qdr_core_t *core = (qdr_core_t*)ctx; // Set the callback function. qdr_manage_handler(core, qd_manage_response_handler); // Call local function that creates and returns a qd_management_context_t containing the values passed in. - qd_management_context_t *ctx = qd_management_context(qd_message(), msg, out_body, 0, core, operation_type, 0); - - qd_field_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY); + qd_management_context_t *mgmt_ctx = qd_management_context(reply_to, correlation_id, 0, core, operation_type, 0); - qd_parsed_field_t *in_body = qd_parse(body_iter); + qdr_manage_create(core, mgmt_ctx, entity_type, name_iter, in_body, mgmt_ctx->out_body); - qdr_manage_create(core, ctx, entity_type, name_iter, in_body, out_body); - - qd_field_iterator_free(body_iter); } -static void qd_core_agent_update_handler(qdr_core_t *core, - qd_message_t *msg, - qd_router_entity_type_t entity_type, - qd_router_operation_type_t operation_type, - qd_field_iterator_t *identity_iter, - qd_field_iterator_t *name_iter) +void qd_core_agent_update_handler(void *ctx, + qd_field_iterator_t *reply_to, + qd_field_iterator_t *correlation_id, + qd_router_entity_type_t entity_type, + qd_router_operation_type_t operation_type, + qd_field_iterator_t *identity_iter, + qd_field_iterator_t *name_iter, + qd_parsed_field_t *in_body) { - qd_composed_field_t *out_body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + qdr_core_t *core = (qdr_core_t*)ctx; // Set the callback function. qdr_manage_handler(core, qd_manage_response_handler); - qd_management_context_t *ctx = qd_management_context(qd_message(), msg, out_body, 0, core, operation_type, 0); - - qd_field_iterator_t *iter = qd_message_field_iterator(msg, QD_FIELD_BODY); - qd_parsed_field_t *in_body= qd_parse(iter); - qd_field_iterator_free(iter); - - qdr_manage_update(core, ctx, entity_type, name_iter, identity_iter, in_body, out_body); + qd_management_context_t *mgmt_ctx = qd_management_context(reply_to, correlation_id, 0, core, operation_type, 0); + qdr_manage_update(core, mgmt_ctx, entity_type, name_iter, identity_iter, in_body, mgmt_ctx->out_body); } -static void qd_core_agent_delete_handler(qdr_core_t *core, - qd_message_t *msg, - qd_router_entity_type_t entity_type, - qd_router_operation_type_t operation_type, - qd_field_iterator_t *identity_iter, - qd_field_iterator_t *name_iter) +void qd_core_agent_delete_handler(void *ctx, + qd_field_iterator_t *reply_to, + qd_field_iterator_t *correlation_id, + qd_message_t *msg, + qd_router_entity_type_t entity_type, + qd_router_operation_type_t operation_type, + qd_field_iterator_t *identity_iter, + qd_field_iterator_t *name_iter) { - // - // Add the Body - // - qd_composed_field_t *body = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + qdr_core_t *core = (qdr_core_t*)ctx; // Set the callback function. qdr_manage_handler(core, qd_manage_response_handler); // Call local function that creates and returns a qd_management_context_t containing the values passed in. - qd_management_context_t *ctx = qd_management_context(qd_message(), msg, body, 0, core, operation_type, 0); + qd_management_context_t *mgmt_ctx = qd_management_context(reply_to, correlation_id, 0, core, operation_type, 0); - qdr_manage_delete(core, ctx, entity_type, name_iter, identity_iter); + qdr_manage_delete(core, mgmt_ctx, entity_type, name_iter, identity_iter); } @@ -358,7 +344,7 @@ static void qd_core_agent_delete_handler(qdr_core_t *core, * Checks the content of the message to see if this can be handled by the C-management agent. If this agent cannot handle it, it will be * forwarded to the Python agent. */ -static bool qd_can_handle_request(qd_parsed_field_t *properties_fld, +bool qd_can_handle_request(qd_parsed_field_t *properties_fld, qd_router_entity_type_t *entity_type, qd_router_operation_type_t *operation_type, qd_field_iterator_t **identity_iter, @@ -445,55 +431,3 @@ static bool qd_can_handle_request(qd_parsed_field_t *properties_fld, return true; } - -/** - * - * Handler for the management agent. - * - */ -void qdr_management_agent_on_message(void *context, qd_message_t *msg, int unused_link_id, int unused_cost) -{ - qdr_core_t *core = (qdr_core_t*) context; - qd_field_iterator_t *app_properties_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES); - - qd_router_entity_type_t entity_type = 0; - qd_router_operation_type_t operation_type = 0; - - qd_field_iterator_t *identity_iter = 0; - qd_field_iterator_t *name_iter = 0; - - int32_t count = 0; - int32_t offset = 0; - - qd_parsed_field_t *properties_fld = qd_parse(app_properties_iter); - - if (qd_can_handle_request(properties_fld, &entity_type, &operation_type, &identity_iter, &name_iter, &count, &offset)) { - switch (operation_type) { - case QD_ROUTER_OPERATION_QUERY: - qd_core_agent_query_handler(core, entity_type, operation_type, msg, &count, &offset); - break; - case QD_ROUTER_OPERATION_CREATE: - qd_core_agent_create_handler(core, msg, entity_type, operation_type, name_iter); - break; - case QD_ROUTER_OPERATION_READ: - qd_core_agent_read_handler(core, msg, entity_type, operation_type, identity_iter, name_iter); - break; - case QD_ROUTER_OPERATION_UPDATE: - qd_core_agent_update_handler(core, msg, entity_type, operation_type, identity_iter, name_iter); - break; - case QD_ROUTER_OPERATION_DELETE: - qd_core_agent_delete_handler(core, msg, entity_type, operation_type, identity_iter, name_iter); - break; - } - } else { - // - // The C management agent is not going to handle this request. Forward it off to Python. - // - qdr_send_to2(core, msg, MANAGEMENT_INTERNAL, false, false); - } - - qd_field_iterator_free(app_properties_iter); - qd_parse_free(properties_fld); - -} - http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/src/router_core/router_core.c ---------------------------------------------------------------------- diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index e8cc06b..b3c3986 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -71,16 +71,6 @@ qdr_core_t *qdr_core(qd_dispatch_t *qd, qd_router_mode_t mode, const char *area, // core->thread = sys_thread(router_core_thread, core); - // - // Perform outside-of-thread setup for the management agent - // - core->agent_subscription_mobile = qdr_core_subscribe(core, "$management", 'M', '0', - QD_TREATMENT_ANYCAST_CLOSEST, - qdr_management_agent_on_message, core); - core->agent_subscription_local = qdr_core_subscribe(core, "$management", 'L', '0', - QD_TREATMENT_ANYCAST_CLOSEST, - qdr_management_agent_on_message, core); - return core; } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/e2ad8da8/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index a578810..a59abb5 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -583,7 +583,6 @@ struct qdr_core_t { void *router_core_thread(void *arg); uint64_t qdr_identifier(qdr_core_t* core); -void qdr_management_agent_on_message(void *context, qd_message_t *msg, int link_id, int cost); void qdr_route_table_setup_CT(qdr_core_t *core); void qdr_agent_setup_CT(qdr_core_t *core); void qdr_forwarder_setup_CT(qdr_core_t *core); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
