DISPATCH-437 - Added API around the agent
Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/2bcb2f76 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/2bcb2f76 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/2bcb2f76 Branch: refs/heads/DISPATCH-437-1 Commit: 2bcb2f766a5667767b6aa8ee7b1073f0b5f542fd Parents: e2ad8da Author: Ganesh Murthy <[email protected]> Authored: Tue Aug 2 09:37:23 2016 -0400 Committer: Ganesh Murthy <[email protected]> Committed: Tue Aug 2 09:37:23 2016 -0400 ---------------------------------------------------------------------- src/CMakeLists.txt | 6 +- src/agent.c | 375 +++++++++++++++++++++++++++++++ src/agent.h | 62 ++++++ src/agent_private.h | 53 +++++ src/router_core/agent.c | 457 -------------------------------------- src/router_core/agent_core.c | 457 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 950 insertions(+), 460 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2bcb2f76/src/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3a78801..7367e96 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -65,7 +65,7 @@ set(qpid_dispatch_SOURCES python_embedded.c router_agent.c router_config.c - router_core/agent.c + router_core/agent_core.c router_core/agent_address.c router_core/agent_config_address.c router_core/agent_config_auto_link.c @@ -83,7 +83,7 @@ set(qpid_dispatch_SOURCES router_core/transfer.c router_node.c router_pynode.c - agent_adapter.c + agent.c schema_enum.c server.c timer.c @@ -95,7 +95,7 @@ if(USE_MEMORY_POOL) endif() set_property( - SOURCE python_embedded.c router_pynode.c agent_adapter.c + SOURCE python_embedded.c router_pynode.c agent.c PROPERTY COMPILE_FLAGS -Wno-strict-aliasing ) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2bcb2f76/src/agent.c ---------------------------------------------------------------------- diff --git a/src/agent.c b/src/agent.c new file mode 100644 index 0000000..d4788f6 --- /dev/null +++ b/src/agent.c @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/python_embedded.h> +#include <stdio.h> +#include <string.h> +#include <stdbool.h> +#include <stdlib.h> + +#include "agent.h" +#include "agent_private.h" +#include "schema_enum.h" + +#define MANAGEMENT_MODULE "qpid_dispatch_internal.management" + +typedef struct { + PyObject_HEAD + qd_agent_t *agent; + +} AgentRequestAdapter; + +/** + * Declare all the methods in the AgentRequestAdapter. + * post_management_request is the name of the method that the python side would call and qd_post_management_request is the C implementation + * of the function. + */ +static PyMethodDef AgentRequestAdapter_functions[] = { + {"post_management_request", qd_post_management_request, METH_VARARGS, "Posts a management request to a work queue"}, + {0, 0, 0, 0} // <-- Not sure why we need this +}; + +static PyTypeObject AgentRequestAdapterType = { + PyObject_HEAD_INIT(0) + 0, /* ob_size*/ + MANAGEMENT_MODULE ".AgentRequestAdapter", /* tp_name*/ + sizeof(AgentRequestAdapter), /* tp_basicsize*/ + 0, /* tp_itemsize*/ + 0, /* tp_dealloc*/ + 0, /* tp_print*/ + 0, /* tp_getattr*/ + 0, /* tp_setattr*/ + 0, /* tp_compare*/ + 0, /* tp_repr*/ + 0, /* tp_as_number*/ + 0, /* tp_as_sequence*/ + 0, /* tp_as_mapping*/ + 0, /* tp_hash */ + 0, /* tp_call*/ + 0, /* tp_str*/ + 0, /* tp_getattro*/ + 0, /* tp_setattro*/ + 0, /* tp_as_buffer*/ + Py_TPFLAGS_DEFAULT, /* tp_flags*/ + "Agent request Adapter", /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + AgentRequestAdapter_functions, /* tp_methods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + 0, /* tp_new */ + 0, /* tp_free */ + 0, /* tp_is_gc */ + 0, /* tp_bases */ + 0, /* tp_mro */ + 0, /* tp_cache */ + 0, /* tp_subclasses */ + 0, /* tp_weaklist */ + 0, /* tp_del */ + 0 /* tp_version_tag */ +}; + +PyObject* qd_agent_init(char *agentClass, char *address, PyObject *pythonManagementModule, const char *config_path) +{ + // Create a new instance of AgentRequestAdapterType + AgentRequestAdapterType.tp_new = PyType_GenericNew; + PyType_Ready(&AgentRequestAdapterType); + + // Load the qpid_dispatch_internal.management Python module + + if (!pythonManagementModule) { + qd_error_py(); + qd_log(log_source, QD_LOG_CRITICAL, "Cannot load dispatch extension module '%s'", MANAGEMENT_MODULE); + abort(); + } + + PyTypeObject *agentRequestAdapterType = &AgentRequestAdapterType; + Py_INCREF(agentRequestAdapterType); + + //Use the "AgentRequestAdapter" name to add the AgentRequestAdapterType to the management + PyModule_AddObject(pythonManagementModule, "AgentRequestAdapter", (PyObject*) &AgentRequestAdapterType); + // Now we have added AgentRequestAdapter to the qpid_dispatch_internal.management python module + + PyObject *adapterType = PyObject_GetAttrString(pythonManagementModule, "AgentRequestAdapter"); + PyObject * adapterInstance = PyObject_CallObject(adapterType, 0); + adapter = ((AgentRequestAdapter*) adapterInstance); + ((AgentRequestAdapter*) adapterInstance)->log_source = qd_log_source("AGENT"); + qd_management_work_list_t work_queue = 0; + DEQ_INIT(work_queue); + ((AgentRequestAdapter*) adapterInstance)->work_queue = work_queue; + ((AgentRequestAdapter*) adapterInstance)->lock = sys_mutex(); + initialize_handlers(((AgentRequestAdapter*) adapterInstance)->handlers); + + //Instantiate the ManagementAgent class found in qpid_dispatch_internal/management/agent.py + PyObject* pClass = PyObject_GetAttrString(pythonManagementModule, agentClass); + + // + // Constructor Arguments for ManagementAgent + // + PyObject* pArgs = PyTuple_New(3); + + // arg 0: management address $management + PyObject *address = PyString_FromString(address); + PyTuple_SetItem(pArgs, 0, address); + + // arg 1: adapter instance + PyTuple_SetItem(pArgs, 1, adapterInstance); + + // arg 2: config file location + PyObject *config_file = PyString_FromString(config_path); + PyTuple_SetItem(pArgs, 2, config_file); + + // + // Instantiate the ManagementAgent class + // + PyObject* pyManagementInstance = PyInstance_New(pClass, pArgs, 0); + if (pyManagementInstance) {} + Py_DECREF(pArgs); + Py_DECREF(adapterType); + Py_DECREF(pythonManagementModule); + + //TODO - should I return an adapter or an instance of the entire management agent object? + return adapterInstance; +} + +/*** + * Adds a management + */ +static PyObject *qd_post_management_request(PyObject *self, //TODO - Do we need so many arguments or can I just pass a list with everything in it? + PyObject *arg1, // Operation(CRUDQ) to be performed. + PyObject *arg2, // Entity type + PyObject *arg3, // count + PyObject *arg4, // offset + PyObject *arg5, // Correlation-id + PyObject *arg6, // Reply to + PyObject *arg7, // Name + PyObject *arg8, // identity + PyObject *arg9) // Request body +{ + int operation; //Is this a CREATE, READ, UPDATE, DELETE or QUERY + int entity_type; // Is this a listener or connector or address.... etc. + int count = 0; // used for queries only + int offset = 0; //used for queries only + PyObject *cid = 0; + PyObject *reply_to = 0; + PyObject *name = 0; + PyObject *identity = 0; + PyObject *body = 0; + + if (!PyArg_ParseTuple(arg1, "i", &operation)) + return 0; + if (!PyArg_ParseTuple(arg2, "i", &entity_type)) + return 0; + if (!PyArg_ParseTuple(arg3, "i", &count)) + return 0; + if (!PyArg_ParseTuple(arg4, "i", &offset)) + return 0; + if (!PyArg_ParseTuple(arg3, "o", &cid)) + return 0; + if (!PyArg_ParseTuple(arg4, "o", &reply_to)) + return 0; + if (!PyArg_ParseTuple(arg5, "o", &name)) + return 0; + if (!PyArg_ParseTuple(arg6, "o", &identity)) + return 0; + if (!PyArg_ParseTuple(arg7, "o", &body)) + return 0; + + // + // correlation id + // + qd_composed_field_t *cid_field = qd_compose_subfield(0); + qd_py_to_composed(body, cid_field); + qd_buffer_list_t cid_buffers = qd_compose_buffers(cid_field); + // TODO - this is not correct. what if the buffer length is more than 512? + qd_buffer_t buffer = DEQ_HEAD(cid_buffers); + qd_field_iterator_t cid_iter = qd_address_iterator_buffer(buffer, 0, qd_buffer_list_length(cid_buffers), ITER_VIEW_ALL); + + + qd_composed_field_t *reply_to_field = qd_compose_subfield(0); + qd_py_to_composed(body, reply_to_field); + qd_buffer_list_t reply_to_buffers = qd_compose_buffers(reply_to_field); + // TODO - this is not correct. what if the buffer length is more than 512? + qd_field_iterator_t reply_to_iter = qd_address_iterator_buffer(DEQ_HEAD(reply_to_buffers), 0, qd_buffer_list_length(reply_to_buffers), ITER_VIEW_ALL); + + qd_composed_field_t *identity_field = qd_compose_subfield(0); + qd_py_to_composed(body, identity_field); + qd_buffer_list_t identity_buffers = qd_compose_buffers(identity_field); + // TODO - this is not correct. what if the buffer length is more than 512? + qd_field_iterator_t identity_iter = qd_address_iterator_buffer(DEQ_HEAD(identity_buffers), 0, qd_buffer_list_length(identity_buffers), ITER_VIEW_ALL); + + qd_composed_field_t *name_field = qd_compose_subfield(0); + qd_py_to_composed(body, name_field); + qd_buffer_list_t name_buffers = qd_compose_buffers(name_field); + // TODO - this is not correct. what if the buffer length is more than 512? + qd_field_iterator_t name_iter = qd_address_iterator_buffer(DEQ_HEAD(name_buffers), 0, qd_buffer_list_length(name_buffers), ITER_VIEW_ALL); + + + qd_composed_field_t *body_field = qd_compose_subfield(0); + qd_py_to_composed(body, body_field); + qd_buffer_list_t body_buffers = qd_compose_buffers(body_field); + // TODO - this is not correct. what if the buffer length is more than 512? + qd_field_iterator_t body_iter = qd_address_iterator_buffer(DEQ_HEAD(body_buffers), 0, qd_buffer_list_length(body_buffers), ITER_VIEW_ALL); + + + qd_entity_type_handler_t handler = qd_agent_handler_for_type(entity_type); + + // + // Create a work item (qd_management_work_item_t) + // + qd_management_work_item_t work_item = NEW(qd_management_work_item_t); + work_item->count = count; + work_item->offset = offset; + work_item->operation = operation; + work_item->entity_type = entity_type; + work_item->ctx = handler->ctx; + work_item->reply_to = reply_to_iter; + work_item->correlation_id = cid_iter; + work_item->identity_iter = identity_iter; + work_item->name_iter = name_iter; + work_item->in_body = body_iter; + + // + // Add work item to the work item list after locking the work item list + // + sys_mutex_lock(adapter->lock); + DEQ_INSERT_TAIL(adapter->work_queue, work_item); + sys_mutex_unlock(adapter->lock); + + // + // TODO - Kick off processing of the work queue + // + return Py_None; +} + + +void qd_register_handlers(void *ctx, + PyObject *pyAdapter, + qd_schema_entity_type_t entity_type, + qd_agent_handler_t create_handler, + qd_agent_handler_t read_handler, + qd_agent_handler_t update_handler, + qd_agent_handler_t delete_handler, + qd_agent_handler_t query_handler) +{ + AgentRequestAdapter* adapter = ((AgentRequestAdapter*) pyAdapter); + qd_entity_type_handler_t entity_handler = NEW(qd_entity_type_handler_t); + entity_handler->delete_handler = delete_handler; + entity_handler->update_handler = update_handler; + entity_handler->query_handler = query_handler; + entity_handler->create_handler = create_handler; + entity_handler->read_handler = read_handler; + + //Store the entity_handler in the appropriate cell of the handler array index by the enum qd_schema_entity_type_t + adapter->handlers[entity_type] = entity_handler; + +} + +static qd_entity_type_handler_t *qd_agent_handler_for_type(qd_schema_entity_type_t entity_type, AgentRequestAdapter* adapter) +{ + return adapter->handlers[entity_type]; +} + +static void initialize_handlers(AgentRequestAdapter* adapter) +{ + for (int i=0; i < QD_SCHEMA_ENTITY_TYPE_ENUM_COUNT; i++) + { + adapter->handlers[i] = 0; + } +} + + +static process_work_queue(qd_management_work_list_t work_queue, AgentRequestAdapter* adapter) +{ + qd_management_work_item_t work_item = DEQ_HEAD(work_queue); + + qd_entity_type_handler_t handler = qd_agent_handler_for_type(work_item->entity_type, adapter); + + //TODO - The following works well with core but no corresponding functions for non-core + while(work_item) { + switch (work_item->operation) { + case QD_SCHEMA_ENTITY_OPERATION_READ: + handler->read_handler(work_item->ctx, + work_item->reply_to, + work_item->correlation_id, + work_item->entity_type, + work_item->operation, + work_item->identity_iter, + work_item->name_iter); + break; + case QD_SCHEMA_ENTITY_OPERATION_DELETE: + handler->delete_handler(work_item->ctx, + work_item->reply_to, + work_item->correlation_id, + work_item->entity_type, + work_item->operation, + work_item->identity_iter, + work_item->name_iter); + break; + case QD_SCHEMA_ENTITY_OPERATION_CREATE: + handler->create_handler(work_item->ctx, + work_item->reply_to, + work_item->correlation_id, + work_item->entity_type, + work_item->operation, + work_item->name_iter, + work_item->in_body); + break; + case QD_SCHEMA_ENTITY_OPERATION_UPDATE: + handler->update_handler(work_item->ctx, + work_item->reply_to, + work_item->correlation_id, + work_item->entity_type, + work_item->operation, + work_item->identity_iter, + work_item->name_iter, + work_item->in_body); + break; + case QD_SCHEMA_ENTITY_OPERATION_QUERY: + handler->query_handler(work_item->ctx, + work_item->reply_to, + work_item->correlation_id, + work_item->entity_type, + work_item->operation, + work_item->count, + work_item->offset, + work_item->in_body); + break; + } + + work_item = DEQ_NEXT(work_item); + } +} + + +PyObject* qd_agent_adapter_finalize(PyObject *adapter) +{ + +} http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2bcb2f76/src/agent.h ---------------------------------------------------------------------- diff --git a/src/agent.h b/src/agent.h new file mode 100644 index 0000000..83a82d3 --- /dev/null +++ b/src/agent.h @@ -0,0 +1,62 @@ +#ifndef __agent_adapter_h__ +#define __agent_adapter_h__ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "schema_enum.h" +#include "agent_private.h" + +typedef struct qd_agent_t qd_agent_t; +typedef struct qd_agent_request_t qd_agent_request_t; + +/** + * Creates a new agent with the passed in address and whose configuration is located at config path. + * @see qd_agent_start to start the agent + */ +qd_agent_t* qd_agent(char *address, const char *config_path); + +/** + * Free the agent and its components + */ +void qd_agent_free(qd_agent_t *agent); + + +typedef void (*qd_agent_handler_t) (void *context, + qd_agent_request_t *request); + +/** + * Register CRUDQ handlers for a particular entity type + */ +void qd_agent_register_handlers(qd_agent_t *agent, + void *ctx, + qd_schema_entity_type_t entity_type, + qd_agent_handler_t create_handler, + qd_agent_handler_t read_handler, + qd_agent_handler_t update_handler, + qd_agent_handler_t delete_handler, + qd_agent_handler_t query_handler); + +/** + * Start the agent. + * Loads the contents of the config file located in config_path + * Agent starts listening on the provided address + */ +void qd_agent_start(qd_agent_t *agent); + +#endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2bcb2f76/src/agent_private.h ---------------------------------------------------------------------- diff --git a/src/agent_private.h b/src/agent_private.h new file mode 100644 index 0000000..96f0e26 --- /dev/null +++ b/src/agent_private.h @@ -0,0 +1,53 @@ +#ifndef __agent_private_h__ +#define __agent_private_h__ + +#include "schema_enum.h" +#include "ctools.h" + +typedef struct qd_management_work_item_t { + DEQ_LINKS(struct qd_management_work_item_t); + int operation; //Is this a CREATE, READ, UPDATE, DELETE or QUERY + int entity_type; // Is this a listener or connector or address.... etc. + int count; + int offset; + void *ctx; + qd_field_iterator_t *reply_to; + qd_field_iterator_t *correlation_id; + qd_field_iterator_t *identity_iter; + qd_field_iterator_t *name_iter; + qd_parsed_field_t *in_body; +} qd_management_work_item_t; + +DEQ_DECLARE(qd_management_work_item_t, qd_management_work_list_t); + +typedef struct qd_entity_type_handler_t { + qd_schema_entity_type_t entity_type; + void *ctx; + qd_agent_handler_t create_handler; + qd_agent_handler_t read_handler; + qd_agent_handler_t update_handler; + qd_agent_handler_t delete_handler; + qd_agent_handler_t query_handler; +} qd_entity_type_handler_t; + +struct qd_agent_t { + qd_management_work_list_t work_queue; + sys_mutex_t *lock; + qd_log_source_t *log_source; + qd_entity_type_handler_t *handlers[QD_SCHEMA_ENTITY_TYPE_ENUM_COUNT]; +}; + +struct qd_agent_request_t { + qd_buffer_list_t *list; // A buffer chain holding all the relevant information for the CRUDQ operations. + void *ctx; + int count; + int offset; + qd_field_iterator_t *identity_iter; + qd_field_iterator_t *name_iter; + qd_field_iterator_t *reply_to; + qd_field_iterator_t *correlation_id; + qd_router_entity_type_t entity_type; + qd_parsed_field_t *in_body; +}; + +#endif http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2bcb2f76/src/router_core/agent.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent.c b/src/router_core/agent.c deleted file mode 100644 index 272b75c..0000000 --- a/src/router_core/agent.c +++ /dev/null @@ -1,457 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <qpid/dispatch/amqp.h> -#include "agent_config_address.h" -#include "agent_config_link_route.h" -#include "agent_config_auto_link.h" -#include "agent_address.h" -#include "agent_link.h" -#include "router_core_private.h" -#include <stdio.h> - -static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_manage_delete_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_manage_update_CT(qdr_core_t *core, qdr_action_t *action, bool discard); - -ALLOC_DECLARE(qdr_query_t); -ALLOC_DEFINE(qdr_query_t); - -//================================================================================== -// Internal Functions -//================================================================================== - -static void qdr_agent_response_handler(void *context) -{ - qdr_core_t *core = (qdr_core_t*) context; - qdr_query_t *query; - bool done = false; - - while (!done) { - sys_mutex_lock(core->query_lock); - query = DEQ_HEAD(core->outgoing_query_list); - if (query) - DEQ_REMOVE_HEAD(core->outgoing_query_list); - done = DEQ_SIZE(core->outgoing_query_list) == 0; - sys_mutex_unlock(core->query_lock); - - if (query) { - bool more = query->more; - core->agent_response_handler(query->context, &query->status, more); - if (!more) - qdr_query_free(query); - } - } -} - - -void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query) -{ - sys_mutex_lock(core->query_lock); - DEQ_INSERT_TAIL(core->outgoing_query_list, query); - bool notify = DEQ_SIZE(core->outgoing_query_list) == 1; - sys_mutex_unlock(core->query_lock); - - if (notify) - qd_timer_schedule(core->agent_timer, 0); -} - - -qdr_query_t *qdr_query(qdr_core_t *core, - void *context, - qd_router_entity_type_t type, - qd_composed_field_t *body) -{ - qdr_query_t *query = new_qdr_query_t(); - - DEQ_ITEM_INIT(query); - ZERO(query); - query->core = core; - query->entity_type = type; - query->context = context; - query->body = body; - query->more = false; - - return query; -} - -static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_agent_emit_columns(qdr_query_t *query, const char *qdr_columns[], int column_count); -static void qdr_agent_set_columns(qdr_query_t *query, qd_parsed_field_t *attribute_names, const char *qdr_columns[], int column_count); - -//================================================================================== -// Interface Functions -//================================================================================== - -void qdr_manage_create(qdr_core_t *core, - void *context, - qd_router_entity_type_t type, - qd_field_iterator_t *name, - qd_parsed_field_t *in_body, - qd_composed_field_t *out_body) -{ - qdr_action_t *action = qdr_action(qdr_manage_create_CT, "manage_create"); - - // Create a query object here - action->args.agent.query = qdr_query(core, context, type, out_body); - action->args.agent.name = name; - action->args.agent.in_body = in_body; - - qdr_action_enqueue(core, action); -} - - -void qdr_manage_delete(qdr_core_t *core, - void *context, - qd_router_entity_type_t type, - qd_field_iterator_t *name, - qd_field_iterator_t *identity) -{ - qdr_action_t *action = qdr_action(qdr_manage_delete_CT, "manage_delete"); - - // Create a query object here - action->args.agent.query = qdr_query(core, context, type, 0); - action->args.agent.name = name; - action->args.agent.identity = identity; - - qdr_action_enqueue(core, action); -} - - -void qdr_manage_read(qdr_core_t *core, - void *context, - qd_router_entity_type_t entity_type, - qd_field_iterator_t *name, - qd_field_iterator_t *identity, - qd_composed_field_t *body) -{ - qdr_action_t *action = qdr_action(qdr_manage_read_CT, "manage_read"); - - // Create a query object here - action->args.agent.query = qdr_query(core, context, entity_type, body); - action->args.agent.identity = identity; - action->args.agent.name = name; - - qdr_action_enqueue(core, action); -} - - -void qdr_manage_update(qdr_core_t *core, - void *context, - qd_router_entity_type_t type, - qd_field_iterator_t *name, - qd_field_iterator_t *identity, - qd_parsed_field_t *in_body, - qd_composed_field_t *out_body) -{ - qdr_action_t *action = qdr_action(qdr_manage_update_CT, "manage_update"); - action->args.agent.query = qdr_query(core, context, type, out_body); - action->args.agent.name = name; - action->args.agent.identity = identity; - action->args.agent.in_body = in_body; - - qdr_action_enqueue(core, action); -} - - -qdr_query_t *qdr_manage_query(qdr_core_t *core, - void *context, - qd_router_entity_type_t type, - qd_parsed_field_t *attribute_names, - qd_composed_field_t *body) -{ - - qdr_query_t* query = qdr_query(core, context, type, body); - - switch (query->entity_type) { - case QD_ROUTER_CONFIG_ADDRESS: qdr_agent_set_columns(query, attribute_names, qdr_config_address_columns, QDR_CONFIG_ADDRESS_COLUMN_COUNT); break; - case QD_ROUTER_CONFIG_LINK_ROUTE: qdr_agent_set_columns(query, attribute_names, qdr_config_link_route_columns, QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT); break; - case QD_ROUTER_CONFIG_AUTO_LINK: qdr_agent_set_columns(query, attribute_names, qdr_config_auto_link_columns, QDR_CONFIG_AUTO_LINK_COLUMN_COUNT); break; - case QD_ROUTER_CONNECTION: break; - case QD_ROUTER_LINK: qdr_agent_set_columns(query, attribute_names, qdr_link_columns, QDR_LINK_COLUMN_COUNT); break; - case QD_ROUTER_ADDRESS: qdr_agent_set_columns(query, attribute_names, qdr_address_columns, QDR_ADDRESS_COLUMN_COUNT); break; - case QD_ROUTER_FORBIDDEN: break; - case QD_ROUTER_EXCHANGE: break; - case QD_ROUTER_BINDING: break; - } - - return query; -} - - -void qdr_query_add_attribute_names(qdr_query_t *query) -{ - switch (query->entity_type) { - case QD_ROUTER_CONFIG_ADDRESS: qdr_agent_emit_columns(query, qdr_config_address_columns, QDR_CONFIG_ADDRESS_COLUMN_COUNT); break; - case QD_ROUTER_CONFIG_LINK_ROUTE: qdr_agent_emit_columns(query, qdr_config_link_route_columns, QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT); break; - case QD_ROUTER_CONFIG_AUTO_LINK: qdr_agent_emit_columns(query, qdr_config_auto_link_columns, QDR_CONFIG_AUTO_LINK_COLUMN_COUNT); break; - case QD_ROUTER_CONNECTION: break; - case QD_ROUTER_LINK: qdr_agent_emit_columns(query, qdr_link_columns, QDR_LINK_COLUMN_COUNT); break; - case QD_ROUTER_ADDRESS: qdr_agent_emit_columns(query, qdr_address_columns, QDR_ADDRESS_COLUMN_COUNT); break; - case QD_ROUTER_FORBIDDEN: qd_compose_empty_list(query->body); break; - case QD_ROUTER_EXCHANGE: break; - case QD_ROUTER_BINDING: break; - } -} - -void qdr_query_get_first(qdr_query_t *query, int offset) -{ - qdr_action_t *action = qdr_action(qdrh_query_get_first_CT, "query_get_first"); - action->args.agent.query = query; - action->args.agent.offset = offset; - qdr_action_enqueue(query->core, action); -} - - -void qdr_query_get_next(qdr_query_t *query) -{ - qdr_action_t *action = qdr_action(qdrh_query_get_next_CT, "query_get_next"); - action->args.agent.query = query; - qdr_action_enqueue(query->core, action); -} - - -void qdr_query_free(qdr_query_t *query) -{ - if (!query) - return; - - if (query->next_key) - qdr_field_free(query->next_key); - - free_qdr_query_t(query); -} - -static void qdr_agent_emit_columns(qdr_query_t *query, const char *qdr_columns[], int column_count) -{ - qd_compose_start_list(query->body); - int i = 0; - while (query->columns[i] >= 0) { - assert(query->columns[i] < column_count); - qd_compose_insert_string(query->body, qdr_columns[query->columns[i]]); - i++; - } - qd_compose_end_list(query->body); -} - -static void qdr_agent_set_columns(qdr_query_t *query, - qd_parsed_field_t *attribute_names, - const char *qdr_columns[], - int column_count) -{ - if (!attribute_names || - (qd_parse_tag(attribute_names) != QD_AMQP_LIST8 && - qd_parse_tag(attribute_names) != QD_AMQP_LIST32) || - qd_parse_sub_count(attribute_names) == 0 || - qd_parse_sub_count(attribute_names) >= QDR_AGENT_MAX_COLUMNS) { - // - // Either the attribute_names field is absent, it's not a list, or it's an empty list. - // In this case, we will include all available attributes. - // - int i; - for (i = 0; i < column_count; i++) - query->columns[i] = i; - query->columns[i] = -1; - assert(i < QDR_AGENT_MAX_COLUMNS); - return; - } - - // - // We have a valid, non-empty attribute list. Set the columns appropriately. - // - uint32_t count = qd_parse_sub_count(attribute_names); - uint32_t idx; - - for (idx = 0; idx < count; idx++) { - qd_parsed_field_t *name = qd_parse_sub_value(attribute_names, idx); - if (!name || (qd_parse_tag(name) != QD_AMQP_STR8_UTF8 && qd_parse_tag(name) != QD_AMQP_STR32_UTF8)) - query->columns[idx] = QDR_AGENT_COLUMN_NULL; - else { - int j = 0; - while (qdr_columns[j]) { - qd_field_iterator_t *iter = qd_parse_raw(name); - if (qd_field_iterator_equal(iter, (const unsigned char*) qdr_columns[j])) { - query->columns[idx] = j; - break; - } - j+=1; - } - } - } - query->columns[idx+1] = -1; -} - - - -void qdr_manage_handler(qdr_core_t *core, qdr_manage_response_t response_handler) -{ - core->agent_response_handler = response_handler; -} - - -//================================================================================== -// In-Thread Functions -//================================================================================== - -void qdr_agent_setup_CT(qdr_core_t *core) -{ - DEQ_INIT(core->outgoing_query_list); - core->query_lock = sys_mutex(); - core->agent_timer = qd_timer(core->qd, qdr_agent_response_handler, core); -} - - -static void qdr_agent_forbidden(qdr_core_t *core, qdr_query_t *query, bool op_query) -{ - query->status = QD_AMQP_FORBIDDEN; - if (query->body && !op_query) - qd_compose_insert_null(query->body); - qdr_agent_enqueue_response_CT(core, query); -} - - -static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard) -{ - qd_field_iterator_t *identity = action->args.agent.identity; - qd_field_iterator_t *name = action->args.agent.name; - qdr_query_t *query = action->args.agent.query; - - switch (query->entity_type) { - case QD_ROUTER_CONFIG_ADDRESS: qdra_config_address_get_CT(core, name, identity, query, qdr_config_address_columns); break; - case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_get_CT(core, name, identity, query, qdr_config_link_route_columns); break; - case QD_ROUTER_CONFIG_AUTO_LINK: qdra_config_auto_link_get_CT(core, name, identity, query, qdr_config_auto_link_columns); break; - case QD_ROUTER_CONNECTION: break; - case QD_ROUTER_LINK: break; - case QD_ROUTER_ADDRESS: qdra_address_get_CT(core, name, identity, query, qdr_address_columns); break; - case QD_ROUTER_FORBIDDEN: qdr_agent_forbidden(core, query, false); break; - case QD_ROUTER_EXCHANGE: break; - case QD_ROUTER_BINDING: break; - } -} - - -static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool discard) -{ - qd_field_iterator_t *name = action->args.agent.name; - qdr_query_t *query = action->args.agent.query; - qd_parsed_field_t *in_body = action->args.agent.in_body; - - switch (query->entity_type) { - case QD_ROUTER_CONFIG_ADDRESS: qdra_config_address_create_CT(core, name, query, in_body); break; - case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_create_CT(core, name, query, in_body); break; - case QD_ROUTER_CONFIG_AUTO_LINK: qdra_config_auto_link_create_CT(core, name, query, in_body); break; - case QD_ROUTER_CONNECTION: break; - case QD_ROUTER_LINK: break; - case QD_ROUTER_ADDRESS: break; - case QD_ROUTER_FORBIDDEN: qdr_agent_forbidden(core, query, false); break; - case QD_ROUTER_EXCHANGE: break; - case QD_ROUTER_BINDING: break; - - } - - qd_parse_free(in_body); -} - - -static void qdr_manage_delete_CT(qdr_core_t *core, qdr_action_t *action, bool discard) -{ - qd_field_iterator_t *name = action->args.agent.name; - qd_field_iterator_t *identity = action->args.agent.identity; - qdr_query_t *query = action->args.agent.query; - - switch (query->entity_type) { - case QD_ROUTER_CONFIG_ADDRESS: qdra_config_address_delete_CT(core, query, name, identity); break; - case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_delete_CT(core, query, name, identity); break; - case QD_ROUTER_CONFIG_AUTO_LINK: qdra_config_auto_link_delete_CT(core, query, name, identity); break; - case QD_ROUTER_CONNECTION: break; - case QD_ROUTER_LINK: break; - case QD_ROUTER_ADDRESS: break; - case QD_ROUTER_FORBIDDEN: qdr_agent_forbidden(core, query, false); break; - case QD_ROUTER_EXCHANGE: break; - case QD_ROUTER_BINDING: break; - } -} - -static void qdr_manage_update_CT(qdr_core_t *core, qdr_action_t *action, bool discard) -{ - qd_field_iterator_t *identity = action->args.agent.identity; - qd_field_iterator_t *name = action->args.agent.name; - qdr_query_t *query = action->args.agent.query; - qd_parsed_field_t *in_body = action->args.agent.in_body; - - switch (query->entity_type) { - case QD_ROUTER_CONFIG_ADDRESS: break; - case QD_ROUTER_CONFIG_LINK_ROUTE: break; - case QD_ROUTER_CONFIG_AUTO_LINK: break; - case QD_ROUTER_CONNECTION: break; - case QD_ROUTER_LINK: qdra_link_update_CT(core, name, identity, query, in_body); break; - case QD_ROUTER_ADDRESS: break; - case QD_ROUTER_FORBIDDEN: qdr_agent_forbidden(core, query, false); break; - case QD_ROUTER_EXCHANGE: break; - case QD_ROUTER_BINDING: break; - } - - qd_parse_free(in_body); -} - - - - -static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard) -{ - qdr_query_t *query = action->args.agent.query; - int offset = action->args.agent.offset; - - if (!discard) { - switch (query->entity_type) { - case QD_ROUTER_CONFIG_ADDRESS: qdra_config_address_get_first_CT(core, query, offset); break; - case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_get_first_CT(core, query, offset); break; - case QD_ROUTER_CONFIG_AUTO_LINK: qdra_config_auto_link_get_first_CT(core, query, offset); break; - case QD_ROUTER_CONNECTION: break; - case QD_ROUTER_LINK: qdra_link_get_first_CT(core, query, offset); break; - case QD_ROUTER_ADDRESS: qdra_address_get_first_CT(core, query, offset); break; - case QD_ROUTER_FORBIDDEN: qdr_agent_forbidden(core, query, true); break; - case QD_ROUTER_EXCHANGE: break; - case QD_ROUTER_BINDING: break; - } - } -} - - -static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard) -{ - qdr_query_t *query = action->args.agent.query; - - if (!discard) { - switch (query->entity_type) { - case QD_ROUTER_CONFIG_ADDRESS: qdra_config_address_get_next_CT(core, query); break; - case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_get_next_CT(core, query); break; - case QD_ROUTER_CONFIG_AUTO_LINK: qdra_config_auto_link_get_next_CT(core, query); break; - case QD_ROUTER_CONNECTION: break; - case QD_ROUTER_LINK: qdra_link_get_next_CT(core, query); break; - case QD_ROUTER_ADDRESS: qdra_address_get_next_CT(core, query); break; - case QD_ROUTER_FORBIDDEN: break; - case QD_ROUTER_EXCHANGE: break; - case QD_ROUTER_BINDING: break; - } - } -} - http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2bcb2f76/src/router_core/agent_core.c ---------------------------------------------------------------------- diff --git a/src/router_core/agent_core.c b/src/router_core/agent_core.c new file mode 100644 index 0000000..272b75c --- /dev/null +++ b/src/router_core/agent_core.c @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <qpid/dispatch/amqp.h> +#include "agent_config_address.h" +#include "agent_config_link_route.h" +#include "agent_config_auto_link.h" +#include "agent_address.h" +#include "agent_link.h" +#include "router_core_private.h" +#include <stdio.h> + +static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_manage_delete_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_manage_update_CT(qdr_core_t *core, qdr_action_t *action, bool discard); + +ALLOC_DECLARE(qdr_query_t); +ALLOC_DEFINE(qdr_query_t); + +//================================================================================== +// Internal Functions +//================================================================================== + +static void qdr_agent_response_handler(void *context) +{ + qdr_core_t *core = (qdr_core_t*) context; + qdr_query_t *query; + bool done = false; + + while (!done) { + sys_mutex_lock(core->query_lock); + query = DEQ_HEAD(core->outgoing_query_list); + if (query) + DEQ_REMOVE_HEAD(core->outgoing_query_list); + done = DEQ_SIZE(core->outgoing_query_list) == 0; + sys_mutex_unlock(core->query_lock); + + if (query) { + bool more = query->more; + core->agent_response_handler(query->context, &query->status, more); + if (!more) + qdr_query_free(query); + } + } +} + + +void qdr_agent_enqueue_response_CT(qdr_core_t *core, qdr_query_t *query) +{ + sys_mutex_lock(core->query_lock); + DEQ_INSERT_TAIL(core->outgoing_query_list, query); + bool notify = DEQ_SIZE(core->outgoing_query_list) == 1; + sys_mutex_unlock(core->query_lock); + + if (notify) + qd_timer_schedule(core->agent_timer, 0); +} + + +qdr_query_t *qdr_query(qdr_core_t *core, + void *context, + qd_router_entity_type_t type, + qd_composed_field_t *body) +{ + qdr_query_t *query = new_qdr_query_t(); + + DEQ_ITEM_INIT(query); + ZERO(query); + query->core = core; + query->entity_type = type; + query->context = context; + query->body = body; + query->more = false; + + return query; +} + +static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_agent_emit_columns(qdr_query_t *query, const char *qdr_columns[], int column_count); +static void qdr_agent_set_columns(qdr_query_t *query, qd_parsed_field_t *attribute_names, const char *qdr_columns[], int column_count); + +//================================================================================== +// Interface Functions +//================================================================================== + +void qdr_manage_create(qdr_core_t *core, + void *context, + qd_router_entity_type_t type, + qd_field_iterator_t *name, + qd_parsed_field_t *in_body, + qd_composed_field_t *out_body) +{ + qdr_action_t *action = qdr_action(qdr_manage_create_CT, "manage_create"); + + // Create a query object here + action->args.agent.query = qdr_query(core, context, type, out_body); + action->args.agent.name = name; + action->args.agent.in_body = in_body; + + qdr_action_enqueue(core, action); +} + + +void qdr_manage_delete(qdr_core_t *core, + void *context, + qd_router_entity_type_t type, + qd_field_iterator_t *name, + qd_field_iterator_t *identity) +{ + qdr_action_t *action = qdr_action(qdr_manage_delete_CT, "manage_delete"); + + // Create a query object here + action->args.agent.query = qdr_query(core, context, type, 0); + action->args.agent.name = name; + action->args.agent.identity = identity; + + qdr_action_enqueue(core, action); +} + + +void qdr_manage_read(qdr_core_t *core, + void *context, + qd_router_entity_type_t entity_type, + qd_field_iterator_t *name, + qd_field_iterator_t *identity, + qd_composed_field_t *body) +{ + qdr_action_t *action = qdr_action(qdr_manage_read_CT, "manage_read"); + + // Create a query object here + action->args.agent.query = qdr_query(core, context, entity_type, body); + action->args.agent.identity = identity; + action->args.agent.name = name; + + qdr_action_enqueue(core, action); +} + + +void qdr_manage_update(qdr_core_t *core, + void *context, + qd_router_entity_type_t type, + qd_field_iterator_t *name, + qd_field_iterator_t *identity, + qd_parsed_field_t *in_body, + qd_composed_field_t *out_body) +{ + qdr_action_t *action = qdr_action(qdr_manage_update_CT, "manage_update"); + action->args.agent.query = qdr_query(core, context, type, out_body); + action->args.agent.name = name; + action->args.agent.identity = identity; + action->args.agent.in_body = in_body; + + qdr_action_enqueue(core, action); +} + + +qdr_query_t *qdr_manage_query(qdr_core_t *core, + void *context, + qd_router_entity_type_t type, + qd_parsed_field_t *attribute_names, + qd_composed_field_t *body) +{ + + qdr_query_t* query = qdr_query(core, context, type, body); + + switch (query->entity_type) { + case QD_ROUTER_CONFIG_ADDRESS: qdr_agent_set_columns(query, attribute_names, qdr_config_address_columns, QDR_CONFIG_ADDRESS_COLUMN_COUNT); break; + case QD_ROUTER_CONFIG_LINK_ROUTE: qdr_agent_set_columns(query, attribute_names, qdr_config_link_route_columns, QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT); break; + case QD_ROUTER_CONFIG_AUTO_LINK: qdr_agent_set_columns(query, attribute_names, qdr_config_auto_link_columns, QDR_CONFIG_AUTO_LINK_COLUMN_COUNT); break; + case QD_ROUTER_CONNECTION: break; + case QD_ROUTER_LINK: qdr_agent_set_columns(query, attribute_names, qdr_link_columns, QDR_LINK_COLUMN_COUNT); break; + case QD_ROUTER_ADDRESS: qdr_agent_set_columns(query, attribute_names, qdr_address_columns, QDR_ADDRESS_COLUMN_COUNT); break; + case QD_ROUTER_FORBIDDEN: break; + case QD_ROUTER_EXCHANGE: break; + case QD_ROUTER_BINDING: break; + } + + return query; +} + + +void qdr_query_add_attribute_names(qdr_query_t *query) +{ + switch (query->entity_type) { + case QD_ROUTER_CONFIG_ADDRESS: qdr_agent_emit_columns(query, qdr_config_address_columns, QDR_CONFIG_ADDRESS_COLUMN_COUNT); break; + case QD_ROUTER_CONFIG_LINK_ROUTE: qdr_agent_emit_columns(query, qdr_config_link_route_columns, QDR_CONFIG_LINK_ROUTE_COLUMN_COUNT); break; + case QD_ROUTER_CONFIG_AUTO_LINK: qdr_agent_emit_columns(query, qdr_config_auto_link_columns, QDR_CONFIG_AUTO_LINK_COLUMN_COUNT); break; + case QD_ROUTER_CONNECTION: break; + case QD_ROUTER_LINK: qdr_agent_emit_columns(query, qdr_link_columns, QDR_LINK_COLUMN_COUNT); break; + case QD_ROUTER_ADDRESS: qdr_agent_emit_columns(query, qdr_address_columns, QDR_ADDRESS_COLUMN_COUNT); break; + case QD_ROUTER_FORBIDDEN: qd_compose_empty_list(query->body); break; + case QD_ROUTER_EXCHANGE: break; + case QD_ROUTER_BINDING: break; + } +} + +void qdr_query_get_first(qdr_query_t *query, int offset) +{ + qdr_action_t *action = qdr_action(qdrh_query_get_first_CT, "query_get_first"); + action->args.agent.query = query; + action->args.agent.offset = offset; + qdr_action_enqueue(query->core, action); +} + + +void qdr_query_get_next(qdr_query_t *query) +{ + qdr_action_t *action = qdr_action(qdrh_query_get_next_CT, "query_get_next"); + action->args.agent.query = query; + qdr_action_enqueue(query->core, action); +} + + +void qdr_query_free(qdr_query_t *query) +{ + if (!query) + return; + + if (query->next_key) + qdr_field_free(query->next_key); + + free_qdr_query_t(query); +} + +static void qdr_agent_emit_columns(qdr_query_t *query, const char *qdr_columns[], int column_count) +{ + qd_compose_start_list(query->body); + int i = 0; + while (query->columns[i] >= 0) { + assert(query->columns[i] < column_count); + qd_compose_insert_string(query->body, qdr_columns[query->columns[i]]); + i++; + } + qd_compose_end_list(query->body); +} + +static void qdr_agent_set_columns(qdr_query_t *query, + qd_parsed_field_t *attribute_names, + const char *qdr_columns[], + int column_count) +{ + if (!attribute_names || + (qd_parse_tag(attribute_names) != QD_AMQP_LIST8 && + qd_parse_tag(attribute_names) != QD_AMQP_LIST32) || + qd_parse_sub_count(attribute_names) == 0 || + qd_parse_sub_count(attribute_names) >= QDR_AGENT_MAX_COLUMNS) { + // + // Either the attribute_names field is absent, it's not a list, or it's an empty list. + // In this case, we will include all available attributes. + // + int i; + for (i = 0; i < column_count; i++) + query->columns[i] = i; + query->columns[i] = -1; + assert(i < QDR_AGENT_MAX_COLUMNS); + return; + } + + // + // We have a valid, non-empty attribute list. Set the columns appropriately. + // + uint32_t count = qd_parse_sub_count(attribute_names); + uint32_t idx; + + for (idx = 0; idx < count; idx++) { + qd_parsed_field_t *name = qd_parse_sub_value(attribute_names, idx); + if (!name || (qd_parse_tag(name) != QD_AMQP_STR8_UTF8 && qd_parse_tag(name) != QD_AMQP_STR32_UTF8)) + query->columns[idx] = QDR_AGENT_COLUMN_NULL; + else { + int j = 0; + while (qdr_columns[j]) { + qd_field_iterator_t *iter = qd_parse_raw(name); + if (qd_field_iterator_equal(iter, (const unsigned char*) qdr_columns[j])) { + query->columns[idx] = j; + break; + } + j+=1; + } + } + } + query->columns[idx+1] = -1; +} + + + +void qdr_manage_handler(qdr_core_t *core, qdr_manage_response_t response_handler) +{ + core->agent_response_handler = response_handler; +} + + +//================================================================================== +// In-Thread Functions +//================================================================================== + +void qdr_agent_setup_CT(qdr_core_t *core) +{ + DEQ_INIT(core->outgoing_query_list); + core->query_lock = sys_mutex(); + core->agent_timer = qd_timer(core->qd, qdr_agent_response_handler, core); +} + + +static void qdr_agent_forbidden(qdr_core_t *core, qdr_query_t *query, bool op_query) +{ + query->status = QD_AMQP_FORBIDDEN; + if (query->body && !op_query) + qd_compose_insert_null(query->body); + qdr_agent_enqueue_response_CT(core, query); +} + + +static void qdr_manage_read_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + qd_field_iterator_t *identity = action->args.agent.identity; + qd_field_iterator_t *name = action->args.agent.name; + qdr_query_t *query = action->args.agent.query; + + switch (query->entity_type) { + case QD_ROUTER_CONFIG_ADDRESS: qdra_config_address_get_CT(core, name, identity, query, qdr_config_address_columns); break; + case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_get_CT(core, name, identity, query, qdr_config_link_route_columns); break; + case QD_ROUTER_CONFIG_AUTO_LINK: qdra_config_auto_link_get_CT(core, name, identity, query, qdr_config_auto_link_columns); break; + case QD_ROUTER_CONNECTION: break; + case QD_ROUTER_LINK: break; + case QD_ROUTER_ADDRESS: qdra_address_get_CT(core, name, identity, query, qdr_address_columns); break; + case QD_ROUTER_FORBIDDEN: qdr_agent_forbidden(core, query, false); break; + case QD_ROUTER_EXCHANGE: break; + case QD_ROUTER_BINDING: break; + } +} + + +static void qdr_manage_create_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + qd_field_iterator_t *name = action->args.agent.name; + qdr_query_t *query = action->args.agent.query; + qd_parsed_field_t *in_body = action->args.agent.in_body; + + switch (query->entity_type) { + case QD_ROUTER_CONFIG_ADDRESS: qdra_config_address_create_CT(core, name, query, in_body); break; + case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_create_CT(core, name, query, in_body); break; + case QD_ROUTER_CONFIG_AUTO_LINK: qdra_config_auto_link_create_CT(core, name, query, in_body); break; + case QD_ROUTER_CONNECTION: break; + case QD_ROUTER_LINK: break; + case QD_ROUTER_ADDRESS: break; + case QD_ROUTER_FORBIDDEN: qdr_agent_forbidden(core, query, false); break; + case QD_ROUTER_EXCHANGE: break; + case QD_ROUTER_BINDING: break; + + } + + qd_parse_free(in_body); +} + + +static void qdr_manage_delete_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + qd_field_iterator_t *name = action->args.agent.name; + qd_field_iterator_t *identity = action->args.agent.identity; + qdr_query_t *query = action->args.agent.query; + + switch (query->entity_type) { + case QD_ROUTER_CONFIG_ADDRESS: qdra_config_address_delete_CT(core, query, name, identity); break; + case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_delete_CT(core, query, name, identity); break; + case QD_ROUTER_CONFIG_AUTO_LINK: qdra_config_auto_link_delete_CT(core, query, name, identity); break; + case QD_ROUTER_CONNECTION: break; + case QD_ROUTER_LINK: break; + case QD_ROUTER_ADDRESS: break; + case QD_ROUTER_FORBIDDEN: qdr_agent_forbidden(core, query, false); break; + case QD_ROUTER_EXCHANGE: break; + case QD_ROUTER_BINDING: break; + } +} + +static void qdr_manage_update_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + qd_field_iterator_t *identity = action->args.agent.identity; + qd_field_iterator_t *name = action->args.agent.name; + qdr_query_t *query = action->args.agent.query; + qd_parsed_field_t *in_body = action->args.agent.in_body; + + switch (query->entity_type) { + case QD_ROUTER_CONFIG_ADDRESS: break; + case QD_ROUTER_CONFIG_LINK_ROUTE: break; + case QD_ROUTER_CONFIG_AUTO_LINK: break; + case QD_ROUTER_CONNECTION: break; + case QD_ROUTER_LINK: qdra_link_update_CT(core, name, identity, query, in_body); break; + case QD_ROUTER_ADDRESS: break; + case QD_ROUTER_FORBIDDEN: qdr_agent_forbidden(core, query, false); break; + case QD_ROUTER_EXCHANGE: break; + case QD_ROUTER_BINDING: break; + } + + qd_parse_free(in_body); +} + + + + +static void qdrh_query_get_first_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + qdr_query_t *query = action->args.agent.query; + int offset = action->args.agent.offset; + + if (!discard) { + switch (query->entity_type) { + case QD_ROUTER_CONFIG_ADDRESS: qdra_config_address_get_first_CT(core, query, offset); break; + case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_get_first_CT(core, query, offset); break; + case QD_ROUTER_CONFIG_AUTO_LINK: qdra_config_auto_link_get_first_CT(core, query, offset); break; + case QD_ROUTER_CONNECTION: break; + case QD_ROUTER_LINK: qdra_link_get_first_CT(core, query, offset); break; + case QD_ROUTER_ADDRESS: qdra_address_get_first_CT(core, query, offset); break; + case QD_ROUTER_FORBIDDEN: qdr_agent_forbidden(core, query, true); break; + case QD_ROUTER_EXCHANGE: break; + case QD_ROUTER_BINDING: break; + } + } +} + + +static void qdrh_query_get_next_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +{ + qdr_query_t *query = action->args.agent.query; + + if (!discard) { + switch (query->entity_type) { + case QD_ROUTER_CONFIG_ADDRESS: qdra_config_address_get_next_CT(core, query); break; + case QD_ROUTER_CONFIG_LINK_ROUTE: qdra_config_link_route_get_next_CT(core, query); break; + case QD_ROUTER_CONFIG_AUTO_LINK: qdra_config_auto_link_get_next_CT(core, query); break; + case QD_ROUTER_CONNECTION: break; + case QD_ROUTER_LINK: qdra_link_get_next_CT(core, query); break; + case QD_ROUTER_ADDRESS: qdra_address_get_next_CT(core, query); break; + case QD_ROUTER_FORBIDDEN: break; + case QD_ROUTER_EXCHANGE: break; + case QD_ROUTER_BINDING: break; + } + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
