Repository: qpid-dispatch Updated Branches: refs/heads/tross-DISPATCH-179-1 58d9f3fc4 -> 38e550b7a
DISPATCH-179 - Added code to support the READ management operation for addresses Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/38e550b7 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/38e550b7 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/38e550b7 Branch: refs/heads/tross-DISPATCH-179-1 Commit: 38e550b7a872eee98b7e6ccfecf3a66ec455750e Parents: 58d9f3f Author: ganeshmurthy <[email protected]> Authored: Tue Nov 17 10:48:03 2015 -0500 Committer: ganeshmurthy <[email protected]> Committed: Tue Nov 17 10:48:03 2015 -0500 ---------------------------------------------------------------------- src/router_core/agent.c | 106 ++++++++--- src/router_core/agent_address.c | 81 ++++++++- src/router_core/agent_address.h | 9 + src/router_core/agent_link.c | 4 +- src/router_core/agent_link.h | 4 +- src/router_core/management_agent.c | 283 ++++++++++++++++++----------- src/router_core/router_core_private.h | 13 +- 7 files changed, 347 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/38e550b7/src/router_core/agent.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent.c b/src/router_core/agent.c index 80c6f9d..e8e5761 100644 --- a/src/router_core/agent.c +++ b/src/router_core/agent.c @@ -54,9 +54,11 @@ static const char *qdr_link_columns[] = "identity", 0}; -#define QDR_ADDRESS_COLUMN_COUNT 13 + #define QDR_LINK_COLUMN_COUNT 10 +static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard); + //================================================================================== // Internal Functions @@ -99,6 +101,27 @@ void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query) qd_timer_schedule(core->agent_timer, 0); } +qdr_query_t *qdr_query(qdr_core_t *core, + void *context, + qd_router_entity_type_t type, + qd_parsed_field_t *attribute_names, + qd_composed_field_t *body) +{ + qdr_query_t *query = new_qdr_query_t(); + + DEQ_ITEM_INIT(query); + query->core = core; + query->entity_type = type; + query->context = context; + query->body = body; + query->next_key = 0; + query->next_offset = 0; + query->more = false; + query->status = 0; + + return query; +} + static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_agent_emit_columns(qdr_query_t *query, const char *qdr_columns[], int column_count); @@ -119,9 +142,20 @@ void qdr_manage_delete(qdr_core_t *core, void *context, qd_router_entity_type_t } -void qdr_manage_read(qdr_core_t *core, void *context, qd_router_entity_type_t type, - qd_field_iterator_t *name, qd_field_iterator_t *identity, qd_composed_field_t *body) +void qdr_manage_read(qdr_core_t *core, void *context, + qd_router_entity_type_t entity_type, + qd_field_iterator_t *name, + qd_field_iterator_t *identity, + qd_composed_field_t *body) { + qdr_action_t *action = qdr_action(qdr_manage_read_CT); + + // Create a query object here + action->args.agent.query = qdr_query(core, context, entity_type, 0, body); + action->args.agent.identity = identity; + action->args.agent.name = name; + + qdr_action_enqueue(core, action); } @@ -132,20 +166,14 @@ void qdr_manage_update(qdr_core_t *core, void *context, qd_router_entity_type_t } -qdr_query_t *qdr_manage_query(qdr_core_t *core, void *context, qd_router_entity_type_t type, - qd_parsed_field_t *attribute_names, qd_composed_field_t *body) +qdr_query_t *qdr_manage_query(qdr_core_t *core, + void *context, + qd_router_entity_type_t type, + qd_parsed_field_t *attribute_names, + qd_composed_field_t *body) { - qdr_query_t *query = new_qdr_query_t(); - DEQ_ITEM_INIT(query); - query->core = core; - query->entity_type = type; - query->context = context; - query->body = body; - query->next_key = 0; - query->next_offset = 0; - query->more = false; - query->status = 0; + qdr_query_t* query = qdr_query(core, context, type, attribute_names, body); switch (query->entity_type) { case QD_ROUTER_CONNECTION: break; @@ -172,7 +200,6 @@ void qdr_query_add_attribute_names(qdr_query_t *query) } } - void qdr_query_get_first(qdr_query_t *query, int offset) { qdr_action_t *action = qdr_action(qdrh_query_get_first_CT); @@ -269,20 +296,38 @@ void qdr_agent_setup_CT(qdr_core_t *core) } -static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { - qdr_query_t *query = action->args.agent.query; - int offset = action->args.agent.offset; + qd_field_iterator_t *identity = action->args.agent.identity; + qd_field_iterator_t *name = action->args.agent.name; + qdr_query_t *query = action->args.agent.query; - if (!discard) - switch (query->entity_type) { + switch (query->entity_type) { case QD_ROUTER_CONNECTION: break; - case QD_ROUTER_LINK: qdrl_link_get_first_CT(core, query, offset); break; - case QD_ROUTER_ADDRESS: qdra_address_get_first_CT(core, query, offset); break; + case QD_ROUTER_LINK: break; + case QD_ROUTER_ADDRESS: qdra_address_get(core, name, identity, query, qdr_address_columns); break; case QD_ROUTER_WAYPOINT: break; case QD_ROUTER_EXCHANGE: break; case QD_ROUTER_BINDING: break; + } +} + + +static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + qdr_query_t *query = action->args.agent.query; + int offset = action->args.agent.offset; + + if (!discard) { + switch (query->entity_type) { + case QD_ROUTER_CONNECTION: break; + case QD_ROUTER_LINK: qdra_link_get_first_CT(core, query, offset); break; + case QD_ROUTER_ADDRESS: qdra_address_get_first_CT(core, query, offset); break; + case QD_ROUTER_WAYPOINT: break; + case QD_ROUTER_EXCHANGE: break; + case QD_ROUTER_BINDING: break; } + } } @@ -290,15 +335,16 @@ static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool { qdr_query_t *query = action->args.agent.query; - if (!discard) + if (!discard) { switch (query->entity_type) { - case QD_ROUTER_CONNECTION: break; - case QD_ROUTER_LINK: break; - case QD_ROUTER_ADDRESS: qdra_address_get_next_CT(core, query); break; - case QD_ROUTER_WAYPOINT: break; - case QD_ROUTER_EXCHANGE: break; - case QD_ROUTER_BINDING: break; + case QD_ROUTER_CONNECTION: break; + case QD_ROUTER_LINK: qdra_link_get_next_CT(core, query); break; + case QD_ROUTER_ADDRESS: qdra_address_get_next_CT(core, query); break; + case QD_ROUTER_WAYPOINT: break; + case QD_ROUTER_EXCHANGE: break; + case QD_ROUTER_BINDING: break; } + } } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/38e550b7/src/router_core/agent_address.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c index 08af140..fbbd7a0 100644 --- a/src/router_core/agent_address.c +++ b/src/router_core/agent_address.c @@ -18,6 +18,7 @@ */ #include "agent_address.h" +#include "router_core_private.h" #define QDR_ADDRESS_NAME 0 #define QDR_ADDRESS_IDENTITY 1 @@ -33,14 +34,11 @@ #define QDR_ADDRESS_DELIVERIES_TO_CONTAINER 11 #define QDR_ADDRESS_DELIVERIES_FROM_CONTAINER 12 -static void qdr_manage_write_address_CT(qdr_query_t *query, qdr_address_t *addr) -{ - qd_composed_field_t *body = query->body; - qd_compose_start_list(body); - int i = 0; - while (query->columns[i] >= 0) { - switch(query->columns[i]) { +static void qdr_insert_address_columns_CT(qdr_address_t *addr, + qd_composed_field_t *body, + int column_index) { + switch(column_index) { case QDR_ADDRESS_NAME: case QDR_ADDRESS_IDENTITY: case QDR_ADDRESS_KEY: @@ -93,9 +91,40 @@ static void qdr_manage_write_address_CT(qdr_query_t *query, qdr_address_t *addr) default: qd_compose_insert_null(body); break; - } + } + +} + +static void qdr_manage_write_address_map_CT(qdr_address_t *addr, + qd_composed_field_t *body, + const char *qdr_address_columns[]) +{ + qd_compose_start_map(body); + + for(int i = 0; i < QDR_ADDRESS_COLUMN_COUNT; i++) { + qd_compose_insert_string(body, qdr_address_columns[i]); + qdr_insert_address_columns_CT(addr, body, i); + } + + qd_compose_end_map(body); +} + + +static void qdr_manage_write_address_list_CT(qdr_query_t *query, qdr_address_t *addr) +{ + qd_composed_field_t *body = query->body; + + qd_compose_start_list(body); + + if (!addr) + return; + + int i = 0; + while (query->columns[i] >= 0) { + qdr_insert_address_columns_CT(addr, body, query->columns[i]); i++; } + qd_compose_end_list(body); } @@ -111,6 +140,38 @@ static void qdr_manage_advance_address_CT(qdr_query_t *query, qdr_address_t *add query->more = false; } +void qdra_address_get(qdr_core_t *core, + qd_field_iterator_t *name, + qd_field_iterator_t *identity, + qdr_query_t *query, + const char *qdr_address_columns[]) +{ + qdr_address_t *addr; + + if (identity) //If there is identity, ignore the name + qd_hash_retrieve(core->addr_hash, identity, (void*) &addr); + else if (name) + qd_hash_retrieve(core->addr_hash, name, (void*) &addr); + + if (addr == 0) { + // Send back a 404 + query->status = &QD_AMQP_NOT_FOUND; + } + else { + // + // Write the columns of the address entity into the response body. + // + qdr_manage_write_address_map_CT(addr, query->body, qdr_address_columns); + query->status = &QD_AMQP_OK; + } + + // + // Enqueue the response. + // + qdr_agent_enqueue_response_CT(core, query); + +} + void qdra_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) { @@ -139,7 +200,7 @@ void qdra_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) // // Write the columns of the address entity into the response body. // - qdr_manage_write_address_CT(query, addr); + qdr_manage_write_address_list_CT(query, addr); // // Advance to the next address @@ -182,7 +243,7 @@ void qdra_address_get_next_CT(qdr_core_t *core, qdr_query_t *query) // // Write the columns of the address entity into the response body. // - qdr_manage_write_address_CT(query, addr); + qdr_manage_write_address_list_CT(query, addr); // // Advance to the next address http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/38e550b7/src/router_core/agent_address.h ---------------------------------------------------------------------- diff --git a/src/router_core/agent_address.h b/src/router_core/agent_address.h index 753c37d..5a1a4f7 100644 --- a/src/router_core/agent_address.h +++ b/src/router_core/agent_address.h @@ -24,4 +24,13 @@ void qdra_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset); void qdra_address_get_next_CT(qdr_core_t *core, qdr_query_t *query); +void qdra_address_get(qdr_core_t *core, + qd_field_iterator_t *name, + qd_field_iterator_t *identity, + qdr_query_t *query, + const char *qdr_address_columns[]); + + +#define QDR_ADDRESS_COLUMN_COUNT 13 + #endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/38e550b7/src/router_core/agent_link.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c index 0cced7b..61aaa48 100644 --- a/src/router_core/agent_link.c +++ b/src/router_core/agent_link.c @@ -116,7 +116,7 @@ static void qdr_manage_advance_link_CT(qdr_query_t *query, qdr_link_t *link) } -void qdrl_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) +void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) { // // Queries that get this far will always succeed. @@ -158,7 +158,7 @@ void qdrl_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset) } -void qdrl_link_get_next_CT(qdr_core_t *core, qdr_query_t *query) +void qdra_link_get_next_CT(qdr_core_t *core, qdr_query_t *query) { qdr_link_t *link = 0; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/38e550b7/src/router_core/agent_link.h ---------------------------------------------------------------------- diff --git a/src/router_core/agent_link.h b/src/router_core/agent_link.h index aaafa06..cad84bf 100644 --- a/src/router_core/agent_link.h +++ b/src/router_core/agent_link.h @@ -21,7 +21,7 @@ #include "router_core_private.h" -void qdrl_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset); -void qdrl_link_get_next_CT(qdr_core_t *core, qdr_query_t *query); +void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset); +void qdra_link_get_next_CT(qdr_core_t *core, qdr_query_t *query); #endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/38e550b7/src/router_core/management_agent.c ---------------------------------------------------------------------- diff --git a/src/router_core/management_agent.c b/src/router_core/management_agent.c index 2d93cb5..b45d7cd 100644 --- a/src/router_core/management_agent.c +++ b/src/router_core/management_agent.c @@ -25,12 +25,17 @@ #include <qpid/dispatch/compose.h> #include <qpid/dispatch/dispatch.h> #include "management_agent_private.h" +#include "router_core_private.h" #include "dispatch_private.h" #include "alloc.h" const char *entity_type_key = "entityType"; +const char *type_key = "type"; const char *count_key = "count"; const char *offset_key = "offset"; +const char *name_key = "name"; +const char *identity_key = "identity"; + const char *operation_type_key = "operation"; const char *attribute_names_key = "attributeNames"; @@ -64,12 +69,13 @@ typedef enum { typedef struct qd_management_context_t { qd_message_t *msg; + qd_message_t *source; qd_composed_field_t *field; qdr_query_t *query; qd_dispatch_t *qd; - qd_field_iterator_t *to; int count; int current_count; + qd_router_operation_type_t operation_type; } qd_management_context_t ; ALLOC_DECLARE(qd_management_context_t); @@ -78,169 +84,214 @@ ALLOC_DEFINE(qd_management_context_t); /** * Convenience function to create and initialize context (qd_management_context_t) */ -static qd_management_context_t* qd_management_context(qd_message_t *msg, - qd_composed_field_t *field, - qdr_query_t *query, - qd_field_iterator_t *to, - qd_dispatch_t *qd, - int count) +static qd_management_context_t* qd_management_context(qd_message_t *msg, + qd_message_t *source, + qd_composed_field_t *field, + qdr_query_t *query, + qd_dispatch_t *qd, + qd_router_operation_type_t operation_type, + int count) { qd_management_context_t *ctx = new_qd_management_context_t(); - ctx->count = count; - ctx->field = field; - ctx->msg = msg; + ctx->count = count; + ctx->field = field; + ctx->msg = msg; + ctx->source = source; if (query) ctx->query = query; else ctx->query = 0; ctx->current_count = 0; ctx->qd = qd; - ctx->to = to; + ctx->operation_type = operation_type; return ctx; } -static void qd_compose_send(qd_management_context_t *ctx) + +/** + * Sets the error status on a new composed field. + */ +static void qd_set_response_status(const qd_amqp_error_t *error, qd_composed_field_t **field) { - qd_compose_end_list(ctx->field); - qd_compose_end_map(ctx->field); - qd_message_compose_2(ctx->msg, ctx->field); - qd_router_send(ctx->qd, ctx->to, ctx->msg); - - //We have come to the very end. Free the appropriate memory. - //ctx->field has already been freed in the call to qd_compose_end_list(ctx->field) - //ctx->query has also been already freed - qd_message_free(ctx->msg); - qd_field_iterator_free(ctx->to); - free_qd_management_context_t(ctx); + // + // Insert appropriate success or error + // + *field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, *field); + qd_compose_start_map(*field); + + qd_compose_insert_string(*field, status_description); + qd_compose_insert_string(*field, error->description); + + qd_compose_insert_string(*field, status_code); + qd_compose_insert_uint(*field, error->status); + + qd_compose_end_map(*field); } -static void manage_response_handler (void *context, const qd_amqp_error_t *status, bool more) +static void qd_set_properties(qd_message_t *msg, + qd_field_iterator_t **reply_to, + qd_composed_field_t **fld) { - qd_management_context_t *ctx = (qd_management_context_t*) context; + qd_field_iterator_t *correlation_id = qd_message_field_iterator_typed(msg, QD_FIELD_CORRELATION_ID); + // Grab the reply_to field from the incoming message. This is the address we will send the response to. + *reply_to = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO); + *fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0); + qd_compose_start_list(*fld); + qd_compose_insert_null(*fld); // message-id + qd_compose_insert_null(*fld); // user-id + qd_compose_insert_string_iterator(*fld, *reply_to); // to + qd_compose_insert_null(*fld); // subject + qd_compose_insert_null(*fld); + qd_compose_insert_typed_iterator(*fld, correlation_id); + qd_compose_end_list(*fld); + +} - //TODO - Check the status (qd_amqp_error_t) here first. If the status is anything other that 200, you need to send it back the message with the status. - if (!more || ctx->count == 0) { - // If Count is zero or there are no more rows to process or the status returned is something other than - // QD_AMQP_OK, we will close the list, send the message and - qd_compose_send(ctx); - } - else { - ctx->current_count++; // Increment how many you have at hand +static void qd_manage_response_handler (void *context, const qd_amqp_error_t *status, bool more) +{ + qd_management_context_t *ctx = (qd_management_context_t*) context; - if (ctx->count == ctx->current_count) //The count has matched, we are done, close the list and send out the message - qd_compose_send(ctx); - else - qdr_query_get_next(ctx->query); + if (ctx->operation_type == QD_ROUTER_OPERATION_QUERY) { + if (status == &QD_AMQP_OK) { // There is no error, proceed to conditionally call get_next + if (more) { + //If there are no more rows to process or the status returned is something other than + // QD_AMQP_OK, we will close the list, send the message and + ctx->current_count++; // Increment how many you have at hand + if (ctx->count != ctx->current_count) { + qdr_query_get_next(ctx->query); + return; + } + } + } + qd_compose_end_list(ctx->query->body); + qd_compose_end_map(ctx->query->body); } -} -static void core_agent_query_handler(qd_dispatch_t *qd, - qd_router_entity_type_t entity_type, - qd_message_t *msg, - int *count, - int *offset) -{ - qdr_core_t *core = qd_router_core(qd); - // Create a new message - qd_message_t *message = qd_message(); - qd_field_iterator_t *correlation_id = qd_message_field_iterator_typed(msg, QD_FIELD_CORRELATION_ID); - // Grab the reply_to field from the incoming message. This is the address we will send the response to. - qd_field_iterator_t *reply_to = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO); - qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0); - qd_compose_start_list(field); - qd_compose_insert_null(field); // message-id - qd_compose_insert_null(field); // user-id - qd_compose_insert_string_iterator(field, reply_to); // to - qd_compose_insert_null(field); // subject - qd_compose_insert_null(field); - qd_compose_insert_typed_iterator(field, correlation_id); - qd_compose_end_list(field); + qd_field_iterator_t *reply_to = 0; + qd_composed_field_t *fld = 0; - // Get the attributeNames - qd_parsed_field_t *attribute_names_parsed_field = 0; + // Start composing the message. + // First set the properties on the message like reply_to, correlation-id etc. + qd_set_properties(ctx->source, &reply_to, &fld); - qd_parsed_field_t *body = qd_parse(qd_message_field_iterator(msg, QD_FIELD_BODY)); + // Second, set the status on the message, QD_AMQP_OK or QD_AMQP_BAD_REQUEST and so on. + qd_set_response_status(status, &fld); - if (body != 0 && qd_parse_is_map(body)) - attribute_names_parsed_field = qd_parse_value_by_key(body, attribute_names_key); + // Finally, compose and send the message. + qd_message_compose_3(ctx->msg, fld, ctx->field); + qd_router_send(ctx->qd, reply_to, ctx->msg); - // - // Insert application property map with statusDescription of OK and status code of 200 - // - field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field); - qd_compose_start_map(field); + // We have come to the very end. Free the appropriate memory. + // ctx->field has already been freed in the call to qd_compose_end_list(ctx->field) + // ctx->query has also been already freed + // Just go over this with Ted to see if I freed everything. - // Insert {'statusDescription': 'OK'} - qd_compose_insert_string(field, status_description); - qd_compose_insert_string(field, QD_AMQP_OK.description); + if (ctx->msg) + qd_message_free(ctx->msg); + free_qd_management_context_t(ctx); +} - // Insert {'statusCode': '200'} - qd_compose_insert_string(field, status_code); - qd_compose_insert_uint(field, QD_AMQP_OK.status); - qd_compose_end_map(field); +static void qd_core_agent_query_handler(qd_dispatch_t *qd, + qd_router_entity_type_t entity_type, + qd_router_operation_type_t operation_type, + qd_message_t *msg, + int *count, + int *offset) +{ + qdr_core_t *core = qd_router_core(qd); // - // Add Body + // Add the Body // - field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, field); + qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); - // Start a map in the body + // Start a map in the body. Look for the end map in the callback function, qd_manage_response_handler. qd_compose_start_map(field); qd_compose_insert_string(field, attribute_names_key); //add a "attributeNames" key - // Set the callback function. - qdr_manage_handler(core, manage_response_handler); + // Call local function that creates and returns a qd_management_context_t containing the values passed in. + qd_management_context_t *ctx = qd_management_context(qd_message(), msg, field, 0, qd, operation_type, (*count)); - // Local local function that creates and returns a qd_management_context_t - qd_management_context_t *ctx = qd_management_context(message, field, 0, reply_to, qd, (*count)); + // Grab the attribute names from the incoming message body. The attribute names will be used later on in the response. + qd_parsed_field_t *attribute_names_parsed_field = 0; + qd_parsed_field_t *body = qd_parse(qd_message_field_iterator(msg, QD_FIELD_BODY)); + if (body != 0 && qd_parse_is_map(body)) + attribute_names_parsed_field = qd_parse_value_by_key(body, attribute_names_key); + // Set the callback function. + qdr_manage_handler(core, qd_manage_response_handler); ctx->query = qdr_manage_query(core, ctx, entity_type, attribute_names_parsed_field, field); //Add the attribute names - qdr_query_add_attribute_names(ctx->query); - + qdr_query_add_attribute_names(ctx->query); //this adds adds a list of attribute names like ["attribute1", "attribute2", "attribute3", "attribute4",] qd_compose_insert_string(field, results); //add a "results" key qd_compose_start_list(field); //start the list for results qdr_query_get_first(ctx->query, (*offset)); } -static void core_agent_create_handler() + +static void qd_core_agent_read_handler(qd_dispatch_t *qd, + qd_message_t *msg, + qd_router_entity_type_t entity_type, + qd_router_operation_type_t operation_type, + qd_field_iterator_t *identity_iter, + qd_field_iterator_t *name_iter) { + qdr_core_t *core = qd_router_core(qd); + + // + // Add the Body + // + qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 0); + + // Set the callback function. + qdr_manage_handler(core, qd_manage_response_handler); + // Call local function that creates and returns a qd_management_context_t containing the values passed in. + qd_management_context_t *ctx = qd_management_context(qd_message(), msg, field, 0, qd, operation_type, 0); + + //Call the read API function + qdr_manage_read(core, ctx, entity_type, name_iter, identity_iter, field); } -static void core_agent_read_handler() + +static void qd_core_agent_create_handler() { } -static void core_agent_update_handler() + +static void qd_core_agent_update_handler() { } -static void core_agent_delete_handler() + +static void qd_core_agent_delete_handler() { } + /** * Checks the content of the message to see if this can be handled by this agent. */ -static bool can_handle_request(qd_field_iterator_t *props, - qd_router_entity_type_t *entity_type, - qd_router_operation_type_t *operation_type, - int *count, - int *offset) +static bool qd_can_handle_request(qd_field_iterator_t *props, + qd_router_entity_type_t *entity_type, + qd_router_operation_type_t *operation_type, + qd_field_iterator_t **identity_iter, + qd_field_iterator_t **name_iter, + int *count, + int *offset) { qd_parsed_field_t *fld = qd_parse(props); @@ -255,10 +306,25 @@ static bool can_handle_request(qd_field_iterator_t *props, // 'entityType': 'org.apache.qpid.dispatch.router.link' // TODO - Add more entity types here. The above is not a complete list. - qd_parsed_field_t *parsed_field = qd_parse_value_by_key(fld, entity_type_key); + qd_parsed_field_t *parsed_field = qd_parse_value_by_key(fld, identity_key); + if (parsed_field!=0) { + *identity_iter = qd_parse_raw(parsed_field); + //qd_address_iterator_reset_view(*identity_iter, ITER_VIEW_ADDRESS_HASH); + } + parsed_field = qd_parse_value_by_key(fld, name_key); + if (parsed_field!=0) { + *name_iter = qd_parse_raw(parsed_field); + //qd_address_iterator_reset_view(*name_iter, ITER_VIEW_ADDRESS_HASH); + } - if (parsed_field == 0) - return false; + + parsed_field = qd_parse_value_by_key(fld, entity_type_key); + + if (parsed_field == 0) { // Sometimes there is no 'entityType' but 'type' might be available. + parsed_field = qd_parse_value_by_key(fld, type_key); + if (parsed_field == 0) + return false; + } if (qd_field_iterator_equal(qd_parse_raw(parsed_field), address_entity_type)) (*entity_type) = QD_ROUTER_ADDRESS; @@ -305,6 +371,7 @@ static bool can_handle_request(qd_field_iterator_t *props, return true; } + /** * * Handler for the management agent. @@ -313,30 +380,33 @@ static bool can_handle_request(qd_field_iterator_t *props, void management_agent_handler(void *context, qd_message_t *msg, int link_id) { qd_dispatch_t *qd = (qd_dispatch_t*) context; - qd_field_iterator_t *iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES); + qd_field_iterator_t *app_properties_iter = qd_message_field_iterator(msg, QD_FIELD_APPLICATION_PROPERTIES); qd_router_entity_type_t entity_type = 0; qd_router_operation_type_t operation_type = 0; + qd_field_iterator_t *identity_iter = 0; + qd_field_iterator_t *name_iter = 0; + int32_t count = 0; int32_t offset = 0; - if (can_handle_request(iter, &entity_type, &operation_type, &count, &offset)) { + if (qd_can_handle_request(app_properties_iter, &entity_type, &operation_type, &identity_iter, &name_iter, &count, &offset)) { switch (operation_type) { case QD_ROUTER_OPERATION_QUERY: - core_agent_query_handler(qd, entity_type, msg, &count, &offset); + qd_core_agent_query_handler(qd, entity_type, operation_type, msg, &count, &offset); break; case QD_ROUTER_OPERATION_CREATE: - core_agent_create_handler(); + qd_core_agent_create_handler(); break; case QD_ROUTER_OPERATION_READ: - core_agent_read_handler(); + qd_core_agent_read_handler(qd, msg, entity_type, operation_type, identity_iter, name_iter); break; case QD_ROUTER_OPERATION_UPDATE: - core_agent_update_handler(); + qd_core_agent_update_handler(); break; case QD_ROUTER_OPERATION_DELETE: - core_agent_delete_handler(); + qd_core_agent_delete_handler(); break; } } @@ -344,7 +414,8 @@ void management_agent_handler(void *context, qd_message_t *msg, int link_id) qd_router_send2(qd, MANAGEMENT_INTERNAL, msg); //the C management agent is not going to handle this request. Forward it off to Python. // TODO - This is wrong. Need to find out how I can forward off the message to $management_internal so it can be handled by Python. - qd_field_iterator_free(iter); + qd_field_iterator_free(app_properties_iter); + qd_field_iterator_free(name_iter); + qd_field_iterator_free(identity_iter); } - http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/38e550b7/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index 5681fad..64253b1 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -37,7 +37,6 @@ typedef struct { qdr_field_t *qdr_field(const char *string); void qdr_field_free(qdr_field_t *field); - /** * qdr_action_t - This type represents one work item to be performed by the router-core thread. */ @@ -92,9 +91,12 @@ struct qdr_action_t { // Arguments for management-agent actions // struct { - qdr_query_t *query; - int offset; + qdr_query_t *query; + int offset; + qd_field_iterator_t *identity; + qd_field_iterator_t *name; } agent; + } args; }; @@ -308,4 +310,9 @@ qdr_action_t *qdr_action(qdr_action_handler_t action_handler); void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action); void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query); +qdr_query_t *qdr_query(qdr_core_t *core, + void *context, + qd_router_entity_type_t type, + qd_parsed_field_t *attribute_names, + qd_composed_field_t *body); #endif --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
