Repository: qpid-dispatch
Updated Branches:
  refs/heads/tross-DISPATCH-179-1 58d9f3fc4 -> 38e550b7a


DISPATCH-179 - Added code to support the READ management operation for addresses


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/38e550b7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/38e550b7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/38e550b7

Branch: refs/heads/tross-DISPATCH-179-1
Commit: 38e550b7a872eee98b7e6ccfecf3a66ec455750e
Parents: 58d9f3f
Author: ganeshmurthy <[email protected]>
Authored: Tue Nov 17 10:48:03 2015 -0500
Committer: ganeshmurthy <[email protected]>
Committed: Tue Nov 17 10:48:03 2015 -0500

----------------------------------------------------------------------
 src/router_core/agent.c               | 106 ++++++++---
 src/router_core/agent_address.c       |  81 ++++++++-
 src/router_core/agent_address.h       |   9 +
 src/router_core/agent_link.c          |   4 +-
 src/router_core/agent_link.h          |   4 +-
 src/router_core/management_agent.c    | 283 ++++++++++++++++++-----------
 src/router_core/router_core_private.h |  13 +-
 7 files changed, 347 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/38e550b7/src/router_core/agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent.c b/src/router_core/agent.c
index 80c6f9d..e8e5761 100644
--- a/src/router_core/agent.c
+++ b/src/router_core/agent.c
@@ -54,9 +54,11 @@ static const char *qdr_link_columns[] =
      "identity",
      0};
 
-#define QDR_ADDRESS_COLUMN_COUNT  13
+
 #define QDR_LINK_COLUMN_COUNT     10
 
+static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard);
+
 
 
//==================================================================================
 // Internal Functions
@@ -99,6 +101,27 @@ void qdr_agent_enqueue_response_CT(qdr_core_t *core, 
qdr_query_t *query)
         qd_timer_schedule(core->agent_timer, 0);
 }
 
+qdr_query_t *qdr_query(qdr_core_t              *core,
+                       void                    *context,
+                       qd_router_entity_type_t  type,
+                       qd_parsed_field_t       *attribute_names,
+                       qd_composed_field_t     *body)
+{
+    qdr_query_t *query = new_qdr_query_t();
+
+    DEQ_ITEM_INIT(query);
+    query->core        = core;
+    query->entity_type = type;
+    query->context     = context;
+    query->body        = body;
+    query->next_key    = 0;
+    query->next_offset = 0;
+    query->more        = false;
+    query->status      = 0;
+
+    return query;
+}
+
 static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard);
 static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard);
 static void qdr_agent_emit_columns(qdr_query_t *query, const char 
*qdr_columns[], int column_count);
@@ -119,9 +142,20 @@ void qdr_manage_delete(qdr_core_t *core, void *context, 
qd_router_entity_type_t
 }
 
 
-void qdr_manage_read(qdr_core_t *core, void *context, qd_router_entity_type_t 
type,
-                     qd_field_iterator_t *name, qd_field_iterator_t *identity, 
qd_composed_field_t *body)
+void qdr_manage_read(qdr_core_t *core, void  *context,
+                     qd_router_entity_type_t  entity_type,
+                     qd_field_iterator_t     *name,
+                     qd_field_iterator_t     *identity,
+                     qd_composed_field_t     *body)
 {
+    qdr_action_t *action = qdr_action(qdr_manage_read_CT);
+
+    // Create a query object here
+    action->args.agent.query = qdr_query(core, context, entity_type, 0, body);
+    action->args.agent.identity  = identity;
+    action->args.agent.name = name;
+
+    qdr_action_enqueue(core, action);
 }
 
 
@@ -132,20 +166,14 @@ void qdr_manage_update(qdr_core_t *core, void *context, 
qd_router_entity_type_t
 }
 
 
-qdr_query_t *qdr_manage_query(qdr_core_t *core, void *context, 
qd_router_entity_type_t type,
-                              qd_parsed_field_t *attribute_names, 
qd_composed_field_t *body)
+qdr_query_t *qdr_manage_query(qdr_core_t              *core,
+                              void                    *context,
+                              qd_router_entity_type_t  type,
+                              qd_parsed_field_t       *attribute_names,
+                              qd_composed_field_t     *body)
 {
-    qdr_query_t *query = new_qdr_query_t();
 
-    DEQ_ITEM_INIT(query);
-    query->core        = core;
-    query->entity_type = type;
-    query->context     = context;
-    query->body        = body;
-    query->next_key    = 0;
-    query->next_offset = 0;
-    query->more        = false;
-    query->status      = 0;
+    qdr_query_t* query = qdr_query(core, context, type, attribute_names, body);
 
     switch (query->entity_type) {
     case QD_ROUTER_CONNECTION: break;
@@ -172,7 +200,6 @@ void qdr_query_add_attribute_names(qdr_query_t *query)
     }
 }
 
-
 void qdr_query_get_first(qdr_query_t *query, int offset)
 {
     qdr_action_t *action = qdr_action(qdrh_query_get_first_CT);
@@ -269,20 +296,38 @@ void qdr_agent_setup_CT(qdr_core_t *core)
 }
 
 
-static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard)
+static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard)
 {
-    qdr_query_t *query  = action->args.agent.query;
-    int          offset = action->args.agent.offset;
+    qd_field_iterator_t     *identity   = action->args.agent.identity;
+    qd_field_iterator_t     *name       = action->args.agent.name;
+    qdr_query_t             *query      = action->args.agent.query;
 
-    if (!discard)
-        switch (query->entity_type) {
+    switch (query->entity_type) {
         case QD_ROUTER_CONNECTION: break;
-        case QD_ROUTER_LINK:       qdrl_link_get_first_CT(core, query, 
offset); break;
-        case QD_ROUTER_ADDRESS:    qdra_address_get_first_CT(core, query, 
offset); break;
+        case QD_ROUTER_LINK:       break;
+        case QD_ROUTER_ADDRESS:    qdra_address_get(core, name, identity, 
query, qdr_address_columns); break;
         case QD_ROUTER_WAYPOINT:   break;
         case QD_ROUTER_EXCHANGE:   break;
         case QD_ROUTER_BINDING:    break;
+   }
+}
+
+
+static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard)
+{
+    qdr_query_t *query  = action->args.agent.query;
+    int          offset = action->args.agent.offset;
+
+    if (!discard) {
+        switch (query->entity_type) {
+            case QD_ROUTER_CONNECTION: break;
+            case QD_ROUTER_LINK:       qdra_link_get_first_CT(core, query, 
offset); break;
+            case QD_ROUTER_ADDRESS:    qdra_address_get_first_CT(core, query, 
offset); break;
+            case QD_ROUTER_WAYPOINT:   break;
+            case QD_ROUTER_EXCHANGE:   break;
+            case QD_ROUTER_BINDING:    break;
         }
+    }
 }
 
 
@@ -290,15 +335,16 @@ static void qdrh_query_get_next_CT(qdr_core_t *core, 
qdr_action_t *action, bool
 {
     qdr_query_t *query  = action->args.agent.query;
 
-    if (!discard)
+    if (!discard) {
         switch (query->entity_type) {
-        case QD_ROUTER_CONNECTION: break;
-        case QD_ROUTER_LINK:       break;
-        case QD_ROUTER_ADDRESS:    qdra_address_get_next_CT(core, query); 
break;
-        case QD_ROUTER_WAYPOINT:   break;
-        case QD_ROUTER_EXCHANGE:   break;
-        case QD_ROUTER_BINDING:    break;
+            case QD_ROUTER_CONNECTION: break;
+            case QD_ROUTER_LINK:       qdra_link_get_next_CT(core, query); 
break;
+            case QD_ROUTER_ADDRESS:    qdra_address_get_next_CT(core, query); 
break;
+            case QD_ROUTER_WAYPOINT:   break;
+            case QD_ROUTER_EXCHANGE:   break;
+            case QD_ROUTER_BINDING:    break;
         }
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/38e550b7/src/router_core/agent_address.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.c b/src/router_core/agent_address.c
index 08af140..fbbd7a0 100644
--- a/src/router_core/agent_address.c
+++ b/src/router_core/agent_address.c
@@ -18,6 +18,7 @@
  */
 
 #include "agent_address.h"
+#include "router_core_private.h"
 
 #define QDR_ADDRESS_NAME                      0
 #define QDR_ADDRESS_IDENTITY                  1
@@ -33,14 +34,11 @@
 #define QDR_ADDRESS_DELIVERIES_TO_CONTAINER   11
 #define QDR_ADDRESS_DELIVERIES_FROM_CONTAINER 12
 
-static void qdr_manage_write_address_CT(qdr_query_t *query, qdr_address_t 
*addr)
-{
-    qd_composed_field_t *body = query->body;
 
-    qd_compose_start_list(body);
-    int i = 0;
-    while (query->columns[i] >= 0) {
-        switch(query->columns[i]) {
+static void qdr_insert_address_columns_CT(qdr_address_t        *addr,
+                                          qd_composed_field_t  *body,
+                                          int column_index) {
+    switch(column_index) {
         case QDR_ADDRESS_NAME:
         case QDR_ADDRESS_IDENTITY:
         case QDR_ADDRESS_KEY:
@@ -93,9 +91,40 @@ static void qdr_manage_write_address_CT(qdr_query_t *query, 
qdr_address_t *addr)
         default:
             qd_compose_insert_null(body);
             break;
-        }
+    }
+
+}
+
+static void qdr_manage_write_address_map_CT(qdr_address_t       *addr,
+                                            qd_composed_field_t *body,
+                                            const char          
*qdr_address_columns[])
+{
+    qd_compose_start_map(body);
+
+    for(int i = 0; i < QDR_ADDRESS_COLUMN_COUNT; i++) {
+        qd_compose_insert_string(body, qdr_address_columns[i]);
+        qdr_insert_address_columns_CT(addr, body, i);
+    }
+
+    qd_compose_end_map(body);
+}
+
+
+static void qdr_manage_write_address_list_CT(qdr_query_t *query, qdr_address_t 
*addr)
+{
+    qd_composed_field_t *body = query->body;
+
+    qd_compose_start_list(body);
+
+    if (!addr)
+        return;
+
+    int i = 0;
+    while (query->columns[i] >= 0) {
+        qdr_insert_address_columns_CT(addr, body, query->columns[i]);
         i++;
     }
+
     qd_compose_end_list(body);
 }
 
@@ -111,6 +140,38 @@ static void qdr_manage_advance_address_CT(qdr_query_t 
*query, qdr_address_t *add
         query->more = false;
 }
 
+void qdra_address_get(qdr_core_t          *core,
+                      qd_field_iterator_t *name,
+                      qd_field_iterator_t *identity,
+                      qdr_query_t         *query,
+                      const char          *qdr_address_columns[])
+{
+    qdr_address_t *addr;
+
+    if (identity) //If there is identity, ignore the name
+        qd_hash_retrieve(core->addr_hash, identity, (void*) &addr);
+    else if (name)
+        qd_hash_retrieve(core->addr_hash, name, (void*) &addr);
+
+    if (addr == 0) {
+        // Send back a 404
+        query->status = &QD_AMQP_NOT_FOUND;
+    }
+    else {
+        //
+        // Write the columns of the address entity into the response body.
+        //
+        qdr_manage_write_address_map_CT(addr, query->body, 
qdr_address_columns);
+        query->status = &QD_AMQP_OK;
+    }
+
+    //
+    // Enqueue the response.
+    //
+    qdr_agent_enqueue_response_CT(core, query);
+
+}
+
 
 void qdra_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int 
offset)
 {
@@ -139,7 +200,7 @@ void qdra_address_get_first_CT(qdr_core_t *core, 
qdr_query_t *query, int offset)
     //
     // Write the columns of the address entity into the response body.
     //
-    qdr_manage_write_address_CT(query, addr);
+    qdr_manage_write_address_list_CT(query, addr);
 
     //
     // Advance to the next address
@@ -182,7 +243,7 @@ void qdra_address_get_next_CT(qdr_core_t *core, qdr_query_t 
*query)
         //
         // Write the columns of the address entity into the response body.
         //
-        qdr_manage_write_address_CT(query, addr);
+        qdr_manage_write_address_list_CT(query, addr);
 
         //
         // Advance to the next address

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/38e550b7/src/router_core/agent_address.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_address.h b/src/router_core/agent_address.h
index 753c37d..5a1a4f7 100644
--- a/src/router_core/agent_address.h
+++ b/src/router_core/agent_address.h
@@ -24,4 +24,13 @@
 void qdra_address_get_first_CT(qdr_core_t *core, qdr_query_t *query, int 
offset);
 void qdra_address_get_next_CT(qdr_core_t *core, qdr_query_t *query);
 
+void qdra_address_get(qdr_core_t *core,
+                      qd_field_iterator_t *name,
+                      qd_field_iterator_t *identity,
+                      qdr_query_t          *query,
+                      const char *qdr_address_columns[]);
+
+
+#define QDR_ADDRESS_COLUMN_COUNT  13
+
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/38e550b7/src/router_core/agent_link.c
----------------------------------------------------------------------
diff --git a/src/router_core/agent_link.c b/src/router_core/agent_link.c
index 0cced7b..61aaa48 100644
--- a/src/router_core/agent_link.c
+++ b/src/router_core/agent_link.c
@@ -116,7 +116,7 @@ static void qdr_manage_advance_link_CT(qdr_query_t *query, 
qdr_link_t *link)
 }
 
 
-void qdrl_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
+void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset)
 {
     //
     // Queries that get this far will always succeed.
@@ -158,7 +158,7 @@ void qdrl_link_get_first_CT(qdr_core_t *core, qdr_query_t 
*query, int offset)
 }
 
 
-void qdrl_link_get_next_CT(qdr_core_t *core, qdr_query_t *query)
+void qdra_link_get_next_CT(qdr_core_t *core, qdr_query_t *query)
 {
     qdr_link_t *link = 0;
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/38e550b7/src/router_core/agent_link.h
----------------------------------------------------------------------
diff --git a/src/router_core/agent_link.h b/src/router_core/agent_link.h
index aaafa06..cad84bf 100644
--- a/src/router_core/agent_link.h
+++ b/src/router_core/agent_link.h
@@ -21,7 +21,7 @@
 
 #include "router_core_private.h"
 
-void qdrl_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset);
-void qdrl_link_get_next_CT(qdr_core_t *core, qdr_query_t *query);
+void qdra_link_get_first_CT(qdr_core_t *core, qdr_query_t *query, int offset);
+void qdra_link_get_next_CT(qdr_core_t *core, qdr_query_t *query);
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/38e550b7/src/router_core/management_agent.c
----------------------------------------------------------------------
diff --git a/src/router_core/management_agent.c 
b/src/router_core/management_agent.c
index 2d93cb5..b45d7cd 100644
--- a/src/router_core/management_agent.c
+++ b/src/router_core/management_agent.c
@@ -25,12 +25,17 @@
 #include <qpid/dispatch/compose.h>
 #include <qpid/dispatch/dispatch.h>
 #include "management_agent_private.h"
+#include "router_core_private.h"
 #include "dispatch_private.h"
 #include "alloc.h"
 
 const char *entity_type_key = "entityType";
+const char *type_key = "type";
 const char *count_key = "count";
 const char *offset_key = "offset";
+const char *name_key = "name";
+const char *identity_key = "identity";
+
 
 const char *operation_type_key = "operation";
 const char *attribute_names_key = "attributeNames";
@@ -64,12 +69,13 @@ typedef enum {
 
 typedef struct qd_management_context_t {
     qd_message_t        *msg;
+    qd_message_t        *source;
     qd_composed_field_t *field;
     qdr_query_t         *query;
     qd_dispatch_t       *qd;
-    qd_field_iterator_t *to;
     int                 count;
     int                 current_count;
+    qd_router_operation_type_t operation_type;
 } qd_management_context_t ;
 
 ALLOC_DECLARE(qd_management_context_t);
@@ -78,169 +84,214 @@ ALLOC_DEFINE(qd_management_context_t);
 /**
  * Convenience function to create and initialize context 
(qd_management_context_t)
  */
-static qd_management_context_t* qd_management_context(qd_message_t         
*msg,
-                                                      qd_composed_field_t  
*field,
-                                                      qdr_query_t          
*query,
-                                                      qd_field_iterator_t  *to,
-                                                      qd_dispatch_t        *qd,
-                                                      int                  
count)
+static qd_management_context_t* qd_management_context(qd_message_t             
  *msg,
+                                                      qd_message_t             
  *source,
+                                                      qd_composed_field_t      
  *field,
+                                                      qdr_query_t              
  *query,
+                                                      qd_dispatch_t            
  *qd,
+                                                      
qd_router_operation_type_t operation_type,
+                                                      int                      
  count)
 {
     qd_management_context_t *ctx = new_qd_management_context_t();
-    ctx->count = count;
-    ctx->field = field;
-    ctx->msg   = msg;
+    ctx->count  = count;
+    ctx->field  = field;
+    ctx->msg    = msg;
+    ctx->source = source;
     if (query)
         ctx->query = query;
     else
         ctx->query = 0;
     ctx->current_count = 0;
     ctx->qd = qd;
-    ctx->to = to;
+    ctx->operation_type = operation_type;
 
     return ctx;
 }
 
-static void qd_compose_send(qd_management_context_t *ctx)
+
+/**
+ * Sets the error status on a new composed field.
+ */
+static void qd_set_response_status(const qd_amqp_error_t *error, 
qd_composed_field_t **field)
 {
-    qd_compose_end_list(ctx->field);
-    qd_compose_end_map(ctx->field);
-    qd_message_compose_2(ctx->msg, ctx->field);
-    qd_router_send(ctx->qd, ctx->to, ctx->msg);
-
-    //We have come to the very end. Free the appropriate memory.
-    //ctx->field has already been freed in the call to 
qd_compose_end_list(ctx->field)
-    //ctx->query has also been already freed
-    qd_message_free(ctx->msg);
-    qd_field_iterator_free(ctx->to);
-    free_qd_management_context_t(ctx);
+    //
+    // Insert appropriate success or error
+    //
+    *field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, *field);
+    qd_compose_start_map(*field);
+
+    qd_compose_insert_string(*field, status_description);
+    qd_compose_insert_string(*field, error->description);
+
+    qd_compose_insert_string(*field, status_code);
+    qd_compose_insert_uint(*field, error->status);
+
+    qd_compose_end_map(*field);
 }
 
 
-static void manage_response_handler (void *context, const qd_amqp_error_t 
*status, bool more)
+static void qd_set_properties(qd_message_t        *msg,
+                              qd_field_iterator_t **reply_to,
+                              qd_composed_field_t **fld)
 {
-    qd_management_context_t *ctx = (qd_management_context_t*) context;
+    qd_field_iterator_t *correlation_id = qd_message_field_iterator_typed(msg, 
QD_FIELD_CORRELATION_ID);
+    // Grab the reply_to field from the incoming message. This is the address 
we will send the response to.
+    *reply_to = qd_message_field_iterator(msg, QD_FIELD_REPLY_TO);
+    *fld = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
+    qd_compose_start_list(*fld);
+    qd_compose_insert_null(*fld);                           // message-id
+    qd_compose_insert_null(*fld);                           // user-id
+    qd_compose_insert_string_iterator(*fld, *reply_to);     // to
+    qd_compose_insert_null(*fld);                           // subject
+    qd_compose_insert_null(*fld);
+    qd_compose_insert_typed_iterator(*fld, correlation_id);
+    qd_compose_end_list(*fld);
+
+}
 
-    //TODO - Check the status (qd_amqp_error_t) here first. If the status is 
anything other that 200, you need to send it back the message with the status.
 
-    if (!more || ctx->count == 0) {
-       // If Count is zero or there are no more rows to process or the status 
returned is something other than
-       // QD_AMQP_OK, we will close the list, send the message and
-       qd_compose_send(ctx);
-    }
-    else {
-        ctx->current_count++; // Increment how many you have at hand
+static void qd_manage_response_handler (void *context, const qd_amqp_error_t 
*status, bool more)
+{
+    qd_management_context_t *ctx = (qd_management_context_t*) context;
 
-        if (ctx->count == ctx->current_count) //The count has matched, we are 
done, close the list and send out the message
-            qd_compose_send(ctx);
-        else
-            qdr_query_get_next(ctx->query);
+    if (ctx->operation_type == QD_ROUTER_OPERATION_QUERY) {
+        if (status == &QD_AMQP_OK) { // There is no error, proceed to 
conditionally call get_next
+            if (more) {
+               //If there are no more rows to process or the status returned 
is something other than
+               // QD_AMQP_OK, we will close the list, send the message and
+               ctx->current_count++; // Increment how many you have at hand
+               if (ctx->count != ctx->current_count) {
+                   qdr_query_get_next(ctx->query);
+                   return;
+               }
+            }
+        }
+        qd_compose_end_list(ctx->query->body);
+        qd_compose_end_map(ctx->query->body);
     }
-}
 
-static void core_agent_query_handler(qd_dispatch_t           *qd,
-                                     qd_router_entity_type_t entity_type,
-                                     qd_message_t            *msg,
-                                     int                     *count,
-                                     int                     *offset)
-{
-    qdr_core_t *core = qd_router_core(qd);
 
-    // Create a new message
-    qd_message_t *message = qd_message();
-    qd_field_iterator_t *correlation_id = qd_message_field_iterator_typed(msg, 
QD_FIELD_CORRELATION_ID);
-    // Grab the reply_to field from the incoming message. This is the address 
we will send the response to.
-    qd_field_iterator_t *reply_to = qd_message_field_iterator(msg, 
QD_FIELD_REPLY_TO);
 
-    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_PROPERTIES, 0);
-    qd_compose_start_list(field);
-    qd_compose_insert_null(field);                           // message-id
-    qd_compose_insert_null(field);                           // user-id
-    qd_compose_insert_string_iterator(field, reply_to);       // to
-    qd_compose_insert_null(field);                           // subject
-    qd_compose_insert_null(field);
-    qd_compose_insert_typed_iterator(field, correlation_id);
-    qd_compose_end_list(field);
+    qd_field_iterator_t *reply_to = 0;
 
+    qd_composed_field_t *fld = 0;
 
-    // Get the attributeNames
-    qd_parsed_field_t *attribute_names_parsed_field = 0;
+    // Start composing the message.
+    // First set the properties on the message like reply_to, correlation-id 
etc.
+    qd_set_properties(ctx->source, &reply_to, &fld);
 
-    qd_parsed_field_t *body = qd_parse(qd_message_field_iterator(msg, 
QD_FIELD_BODY));
+    // Second, set the status on the message, QD_AMQP_OK or 
QD_AMQP_BAD_REQUEST and so on.
+    qd_set_response_status(status, &fld);
 
-    if (body != 0 && qd_parse_is_map(body))
-        attribute_names_parsed_field = qd_parse_value_by_key(body, 
attribute_names_key);
+    // Finally, compose and send the message.
+    qd_message_compose_3(ctx->msg, fld, ctx->field);
+    qd_router_send(ctx->qd, reply_to, ctx->msg);
 
-    //
-    // Insert application property map with statusDescription of OK and status 
code of 200
-    //
-    field = qd_compose(QD_PERFORMATIVE_APPLICATION_PROPERTIES, field);
-    qd_compose_start_map(field);
+    // We have come to the very end. Free the appropriate memory.
+    // ctx->field has already been freed in the call to 
qd_compose_end_list(ctx->field)
+    // ctx->query has also been already freed
+    // Just go over this with Ted to see if I freed everything.
 
-    // Insert {'statusDescription': 'OK'}
-    qd_compose_insert_string(field, status_description);
-    qd_compose_insert_string(field, QD_AMQP_OK.description);
+    if (ctx->msg)
+        qd_message_free(ctx->msg);
+    free_qd_management_context_t(ctx);
+}
 
-    // Insert {'statusCode': '200'}
-    qd_compose_insert_string(field, status_code);
-    qd_compose_insert_uint(field, QD_AMQP_OK.status);
 
-    qd_compose_end_map(field);
+static void qd_core_agent_query_handler(qd_dispatch_t              *qd,
+                                        qd_router_entity_type_t    entity_type,
+                                        qd_router_operation_type_t 
operation_type,
+                                        qd_message_t               *msg,
+                                        int                        *count,
+                                        int                        *offset)
+{
+    qdr_core_t *core = qd_router_core(qd);
 
     //
-    // Add Body
+    // Add the Body
     //
-    field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, field);
+    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 
0);
 
-    // Start a map in the body
+    // Start a map in the body. Look for the end map in the callback function, 
qd_manage_response_handler.
     qd_compose_start_map(field);
 
     qd_compose_insert_string(field, attribute_names_key); //add a 
"attributeNames" key
 
-    // Set the callback function.
-    qdr_manage_handler(core, manage_response_handler);
+    // Call local function that creates and returns a qd_management_context_t 
containing the values passed in.
+    qd_management_context_t *ctx = qd_management_context(qd_message(), msg, 
field, 0, qd, operation_type, (*count));
 
-    // Local local function that creates and returns a qd_management_context_t
-    qd_management_context_t *ctx = qd_management_context(message, field, 0, 
reply_to, qd, (*count));
+    // Grab the attribute names from the incoming message body. The attribute 
names will be used later on in the response.
+    qd_parsed_field_t *attribute_names_parsed_field = 0;
+    qd_parsed_field_t *body = qd_parse(qd_message_field_iterator(msg, 
QD_FIELD_BODY));
+    if (body != 0 && qd_parse_is_map(body))
+        attribute_names_parsed_field = qd_parse_value_by_key(body, 
attribute_names_key);
 
+    // Set the callback function.
+    qdr_manage_handler(core, qd_manage_response_handler);
     ctx->query = qdr_manage_query(core, ctx, entity_type, 
attribute_names_parsed_field, field);
 
     //Add the attribute names
-    qdr_query_add_attribute_names(ctx->query);
-
+    qdr_query_add_attribute_names(ctx->query); //this adds adds a list of 
attribute names like ["attribute1", "attribute2", "attribute3", "attribute4",]
     qd_compose_insert_string(field, results); //add a "results" key
     qd_compose_start_list(field); //start the list for results
 
     qdr_query_get_first(ctx->query, (*offset));
 }
 
-static void core_agent_create_handler()
+
+static void qd_core_agent_read_handler(qd_dispatch_t              *qd,
+                                       qd_message_t               *msg,
+                                       qd_router_entity_type_t     entity_type,
+                                       qd_router_operation_type_t  
operation_type,
+                                       qd_field_iterator_t        
*identity_iter,
+                                       qd_field_iterator_t        *name_iter)
 {
+    qdr_core_t *core = qd_router_core(qd);
+
+    //
+    // Add the Body
+    //
+    qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_AMQP_VALUE, 
0);
+
+    // Set the callback function.
+    qdr_manage_handler(core, qd_manage_response_handler);
 
+    // Call local function that creates and returns a qd_management_context_t 
containing the values passed in.
+    qd_management_context_t *ctx = qd_management_context(qd_message(), msg, 
field, 0, qd, operation_type, 0);
+
+    //Call the read API function
+    qdr_manage_read(core, ctx, entity_type, name_iter, identity_iter, field);
 }
 
-static void core_agent_read_handler()
+
+static void qd_core_agent_create_handler()
 {
 
 }
 
-static void core_agent_update_handler()
+
+static void qd_core_agent_update_handler()
 {
 
 }
 
-static void core_agent_delete_handler()
+
+static void qd_core_agent_delete_handler()
 {
 
 }
 
+
 /**
  * Checks the content of the message to see if this can be handled by this 
agent.
  */
-static bool can_handle_request(qd_field_iterator_t *props,
-                               qd_router_entity_type_t *entity_type,
-                               qd_router_operation_type_t *operation_type,
-                               int *count,
-                               int *offset)
+static bool qd_can_handle_request(qd_field_iterator_t        *props,
+                                  qd_router_entity_type_t    *entity_type,
+                                  qd_router_operation_type_t *operation_type,
+                                  qd_field_iterator_t        **identity_iter,
+                                  qd_field_iterator_t        **name_iter,
+                                  int                        *count,
+                                  int                        *offset)
 {
     qd_parsed_field_t *fld = qd_parse(props);
 
@@ -255,10 +306,25 @@ static bool can_handle_request(qd_field_iterator_t *props,
     // 'entityType': 'org.apache.qpid.dispatch.router.link'
     // TODO - Add more entity types here. The above is not a complete list.
 
-    qd_parsed_field_t *parsed_field = qd_parse_value_by_key(fld, 
entity_type_key);
+    qd_parsed_field_t *parsed_field = qd_parse_value_by_key(fld, identity_key);
+    if (parsed_field!=0) {
+        *identity_iter = qd_parse_raw(parsed_field);
+        //qd_address_iterator_reset_view(*identity_iter, 
ITER_VIEW_ADDRESS_HASH);
+    }
+    parsed_field = qd_parse_value_by_key(fld, name_key);
+    if (parsed_field!=0) {
+        *name_iter = qd_parse_raw(parsed_field);
+        //qd_address_iterator_reset_view(*name_iter, ITER_VIEW_ADDRESS_HASH);
+    }
 
-    if (parsed_field == 0)
-        return false;
+
+    parsed_field = qd_parse_value_by_key(fld, entity_type_key);
+
+    if (parsed_field == 0) { // Sometimes there is no 'entityType' but 'type' 
might be available.
+        parsed_field = qd_parse_value_by_key(fld, type_key);
+        if (parsed_field == 0)
+            return false;
+    }
 
     if (qd_field_iterator_equal(qd_parse_raw(parsed_field), 
address_entity_type))
         (*entity_type) = QD_ROUTER_ADDRESS;
@@ -305,6 +371,7 @@ static bool can_handle_request(qd_field_iterator_t *props,
     return true;
 }
 
+
 /**
  *
  * Handler for the management agent.
@@ -313,30 +380,33 @@ static bool can_handle_request(qd_field_iterator_t *props,
 void management_agent_handler(void *context, qd_message_t *msg, int link_id)
 {
     qd_dispatch_t *qd = (qd_dispatch_t*) context;
-    qd_field_iterator_t *iter = qd_message_field_iterator(msg, 
QD_FIELD_APPLICATION_PROPERTIES);
+    qd_field_iterator_t *app_properties_iter = qd_message_field_iterator(msg, 
QD_FIELD_APPLICATION_PROPERTIES);
 
     qd_router_entity_type_t    entity_type = 0;
     qd_router_operation_type_t operation_type = 0;
 
+    qd_field_iterator_t *identity_iter = 0;
+    qd_field_iterator_t *name_iter = 0;
+
     int32_t count = 0;
     int32_t offset = 0;
 
-    if (can_handle_request(iter, &entity_type, &operation_type, &count, 
&offset)) {
+    if (qd_can_handle_request(app_properties_iter, &entity_type, 
&operation_type, &identity_iter, &name_iter, &count, &offset)) {
         switch (operation_type) {
             case QD_ROUTER_OPERATION_QUERY:
-                core_agent_query_handler(qd, entity_type, msg, &count, 
&offset);
+                qd_core_agent_query_handler(qd, entity_type, operation_type, 
msg, &count, &offset);
                 break;
             case QD_ROUTER_OPERATION_CREATE:
-                core_agent_create_handler();
+                qd_core_agent_create_handler();
                 break;
             case QD_ROUTER_OPERATION_READ:
-                core_agent_read_handler();
+                qd_core_agent_read_handler(qd, msg, entity_type, 
operation_type, identity_iter, name_iter);
                 break;
             case QD_ROUTER_OPERATION_UPDATE:
-                core_agent_update_handler();
+                qd_core_agent_update_handler();
                 break;
             case QD_ROUTER_OPERATION_DELETE:
-                core_agent_delete_handler();
+                qd_core_agent_delete_handler();
                 break;
        }
     }
@@ -344,7 +414,8 @@ void management_agent_handler(void *context, qd_message_t 
*msg, int link_id)
         qd_router_send2(qd, MANAGEMENT_INTERNAL, msg); //the C management 
agent is not going to handle this request. Forward it off to Python.
     // TODO - This is wrong. Need to find out how I can forward off the 
message to $management_internal so it can be handled by Python.
 
-    qd_field_iterator_free(iter);
+    qd_field_iterator_free(app_properties_iter);
+    qd_field_iterator_free(name_iter);
+    qd_field_iterator_free(identity_iter);
 
 }
-

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/38e550b7/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index 5681fad..64253b1 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -37,7 +37,6 @@ typedef struct {
 qdr_field_t *qdr_field(const char *string);
 void qdr_field_free(qdr_field_t *field);
 
-
 /**
  * qdr_action_t - This type represents one work item to be performed by the 
router-core thread.
  */
@@ -92,9 +91,12 @@ struct qdr_action_t {
         // Arguments for management-agent actions
         //
         struct {
-            qdr_query_t *query;
-            int          offset;
+            qdr_query_t             *query;
+            int                      offset;
+            qd_field_iterator_t     *identity;
+            qd_field_iterator_t     *name;
         } agent;
+
     } args;
 };
 
@@ -308,4 +310,9 @@ qdr_action_t *qdr_action(qdr_action_handler_t 
action_handler);
 void qdr_action_enqueue(qdr_core_t *core, qdr_action_t *action);
 void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query);
 
+qdr_query_t *qdr_query(qdr_core_t              *core,
+                       void                    *context,
+                       qd_router_entity_type_t  type,
+                       qd_parsed_field_t       *attribute_names,
+                       qd_composed_field_t     *body);
 #endif


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to