Repository: qpid-dispatch Updated Branches: refs/heads/tross-DISPATCH-179-1 ca24d67f0 -> d218eeeaa
DISPATCH-179 - Tied in link-attach and link-detach for router and endpoint links. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/d218eeea Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/d218eeea Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/d218eeea Branch: refs/heads/tross-DISPATCH-179-1 Commit: d218eeeaacaf7a01c28ace18967c02d35d2d0e63 Parents: ca24d67 Author: Ted Ross <[email protected]> Authored: Fri Dec 18 16:27:31 2015 -0500 Committer: Ted Ross <[email protected]> Committed: Fri Dec 18 16:27:31 2015 -0500 ---------------------------------------------------------------------- CMakeLists.txt | 2 +- include/qpid/dispatch/container.h | 10 +- include/qpid/dispatch/router_core.h | 34 ++- src/container.c | 64 +++--- src/router_core/connections.c | 323 +++++++++++++++++++++-------- src/router_core/router_core_private.h | 10 +- src/router_core/terminus.c | 10 + src/router_node.c | 70 ++++++- src/server.c | 4 +- 9 files changed, 387 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index 0d55205..3ab5f81 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,7 +46,7 @@ if (NOT PYTHONLIBS_FOUND) message(FATAL_ERROR "Python Development Libraries are needed.") endif (NOT PYTHONLIBS_FOUND) -set (SO_VERSION_MAJOR 1) +set (SO_VERSION_MAJOR 2) set (SO_VERSION_MINOR 0) set (SO_VERSION "${SO_VERSION_MAJOR}.${SO_VERSION_MINOR}") http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/include/qpid/dispatch/container.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/container.h b/include/qpid/dispatch/container.h index c88f796..39fe968 100644 --- a/include/qpid/dispatch/container.h +++ b/include/qpid/dispatch/container.h @@ -74,7 +74,7 @@ typedef void (*qd_container_delivery_handler_t) (void *node_context, qd_link_ typedef int (*qd_container_link_handler_t) (void *node_context, qd_link_t *link); typedef int (*qd_container_link_detach_handler_t) (void *node_context, qd_link_t *link, qd_detach_type_t dt); typedef void (*qd_container_node_handler_t) (void *type_context, qd_node_t *node); -typedef void (*qd_container_conn_handler_t) (void *type_context, qd_connection_t *conn, void *context); +typedef int (*qd_container_conn_handler_t) (void *type_context, qd_connection_t *conn, void *context); /** * A set of Node handlers for deliveries, links and container events. @@ -100,12 +100,8 @@ typedef struct { /** Invoked when an attach for a new outgoing link is received. */ qd_container_link_handler_t outgoing_handler; - /** - * Invoked when an outgoing link is available for sending either deliveries - * or disposition changes. The handler must check the link's credit to - * determine whether (and how many) message deliveries may be sent. - */ - qd_container_link_handler_t writable_handler; + /** Invoked when an activated connection is available for writing. */ + qd_container_conn_handler_t writable_handler; /** Invoked when a link is detached. */ qd_container_link_detach_handler_t link_detach_handler; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/include/qpid/dispatch/router_core.h ---------------------------------------------------------------------- diff --git a/include/qpid/dispatch/router_core.h b/include/qpid/dispatch/router_core.h index e42d818..9616b6b 100644 --- a/include/qpid/dispatch/router_core.h +++ b/include/qpid/dispatch/router_core.h @@ -150,12 +150,13 @@ void *qdr_connection_get_context(const qdr_connection_t *conn); * qdr_connection_process * * Allow the core to process work associated with this connection. - * This function MUST be called only on a thread that exclusively owns + * This function MUST be called on a thread that exclusively owns * this connection. * * @param conn The pointer returned by qdr_connection_opened + * @return The number of actions processed. */ -void qdr_connection_process(qdr_connection_t *conn); +int qdr_connection_process(qdr_connection_t *conn); /** * qdr_connection_activate_t callback @@ -254,6 +255,16 @@ bool qdr_terminus_is_anonymous(qdr_terminus_t *term); bool qdr_terminus_is_dynamic(qdr_terminus_t *term); /** + * qdr_terminus_set_address + * + * Set the terminus address + * + * @param term A qdr_terminus pointer returned by qdr_terminus() + * @param addr An AMQP address (null-terminated string) + */ +void qdr_terminus_set_address(qdr_terminus_t *term, const char *addr); + +/** * qdr_terminus_get_address * * Return the address of the terminus in the form of an iterator. @@ -328,6 +339,16 @@ qd_link_type_t qdr_link_type(const qdr_link_t *link); qd_direction_t qdr_link_direction(const qdr_link_t *link); /** + * qdr_link_name + * + * Retrieve the name of the link. + * + * @param link Link object + * @return The link's name + */ +const char *qdr_link_name(const qdr_link_t *link); + +/** * qdr_link_first_attach * * This function is invoked when a first-attach (not a response to an earlier attach) @@ -339,7 +360,11 @@ qd_direction_t qdr_link_direction(const qdr_link_t *link); * @param target Target terminus of the attach * @return A pointer to a new qdr_link_t object to track the link */ -qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir, qdr_terminus_t *source, qdr_terminus_t *target); +qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, + qd_direction_t dir, + qdr_terminus_t *source, + qdr_terminus_t *target, + const char *name); /** * qdr_link_second_attach @@ -359,9 +384,10 @@ void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_termin * This function is invoked when a link detach arrives. * * @param link The link pointer returned by qdr_link_first_attach or in a FIRST_ATTACH event. + * @param dt The type of detach that occurred. * @param error The link error from the detach frame or 0 if none. */ -void qdr_link_detach(qdr_link_t *link, qdr_error_t *error); +void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error); qdr_delivery_t *qdr_link_deliver(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg); qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, pn_delivery_t *delivery, qd_message_t *msg, qd_field_iterator_t *addr); http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/src/container.c ---------------------------------------------------------------------- diff --git a/src/container.c b/src/container.c index ace94e0..482f33b 100644 --- a/src/container.c +++ b/src/container.c @@ -202,20 +202,6 @@ static void handle_link_open(qd_container_t *container, pn_link_t *pn_link) } -static int do_writable(pn_link_t *pn_link) -{ - qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link); - if (!link) - return 0; - - qd_node_t *node = link->node; - if (!node) - return 0; - - return node->ntype->writable_handler(node->context, link); -} - - static void do_receive(pn_delivery_t *pnd) { pn_link_t *pn_link = pn_delivery_link(pnd); @@ -342,25 +328,30 @@ static int close_handler(qd_container_t *container, void* conn_context, pn_conne } -static int writable_handler(void* unused, pn_connection_t *conn, qd_connection_t* qd_conn) +static int writable_handler(qd_container_t *container, pn_connection_t *conn, qd_connection_t* qd_conn) { - pn_link_t *pn_link; - int event_count = 0; + const qd_node_type_t *nt; + int event_count = 0; // - // Call the attached node's writable handler for all active links - // on the connection. Note that in Dispatch, links are considered - // bidirectional. Incoming and outgoing only pertains to deliveries and - // deliveries are a subset of the traffic that flows both directions on links. + // Note the locking structure in this function. Generally this would be unsafe, but since + // this particular list is only ever appended to and never has items inserted or deleted, + // this usage is safe in this case. // - if (pn_connection_state(conn) == (PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE)) { - pn_link = pn_link_head(conn, 0); - while (pn_link) { - assert(pn_session_connection(pn_link_session(pn_link)) == conn); - event_count += do_writable(pn_link); - pn_link = pn_link_next(pn_link, 0); - } + sys_mutex_lock(container->lock); + qdc_node_type_t *nt_item = DEQ_HEAD(container->node_type_list); + sys_mutex_unlock(container->lock); + + while (nt_item) { + nt = nt_item->ntype; + if (nt->writable_handler) + event_count += nt->writable_handler(nt->type_context, qd_conn, 0); + + sys_mutex_lock(container->lock); + nt_item = DEQ_NEXT(nt_item); + sys_mutex_unlock(container->lock); } + return event_count; } @@ -520,10 +511,19 @@ static int handler(void *handler_context, void *conn_context, qd_conn_event_t ev pn_connection_t *conn = qd_connection_pn(qd_conn); switch (event) { - case QD_CONN_EVENT_LISTENER_OPEN: open_handler(container, qd_conn, QD_INCOMING, conn_context); break; - case QD_CONN_EVENT_CONNECTOR_OPEN: open_handler(container, qd_conn, QD_OUTGOING, conn_context); break; - case QD_CONN_EVENT_CLOSE: return close_handler(container, conn_context, conn, qd_conn); - case QD_CONN_EVENT_WRITABLE: return writable_handler(conn_context, conn, qd_conn); + case QD_CONN_EVENT_LISTENER_OPEN: + open_handler(container, qd_conn, QD_INCOMING, conn_context); + return 1; + + case QD_CONN_EVENT_CONNECTOR_OPEN: + open_handler(container, qd_conn, QD_OUTGOING, conn_context); + return 1; + + case QD_CONN_EVENT_CLOSE: + return close_handler(container, conn_context, conn, qd_conn); + + case QD_CONN_EVENT_WRITABLE: + return writable_handler(container, conn, qd_conn); } return 0; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/src/router_core/connections.c ---------------------------------------------------------------------- diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 26064de..db469f6 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -23,9 +23,9 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, bool discard); static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_link_second_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); -static void qdr_link_detach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); +static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, bool discard); ALLOC_DEFINE(qdr_connection_t); ALLOC_DEFINE(qdr_connection_work_t); @@ -107,7 +107,7 @@ void *qdr_connection_get_context(const qdr_connection_t *conn) } -void qdr_connection_process(qdr_connection_t *conn) +int qdr_connection_process(qdr_connection_t *conn) { qdr_connection_work_list_t work_list; qdr_core_t *core = conn->core; @@ -116,6 +116,7 @@ void qdr_connection_process(qdr_connection_t *conn) DEQ_MOVE(conn->work_list, work_list); sys_mutex_unlock(conn->work_lock); + int event_count = DEQ_SIZE(work_list); qdr_connection_work_t *work = DEQ_HEAD(work_list); while (work) { DEQ_REMOVE_HEAD(work_list); @@ -134,9 +135,14 @@ void qdr_connection_process(qdr_connection_t *conn) break; } + qdr_terminus_free(work->source); + qdr_terminus_free(work->target); free_qdr_connection_work_t(work); + work = DEQ_HEAD(work_list); } + + return event_count; } @@ -165,21 +171,32 @@ qd_direction_t qdr_link_direction(const qdr_link_t *link) } -qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir, qdr_terminus_t *source, qdr_terminus_t *target) +const char *qdr_link_name(const qdr_link_t *link) { - qdr_action_t *action = qdr_action(qdr_link_first_attach_CT, "link_first_attach"); - qdr_link_t *link = new_qdr_link_t(); + return link->name; +} + + +qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, + qd_direction_t dir, + qdr_terminus_t *source, + qdr_terminus_t *target, + const char *name) +{ + qdr_action_t *action = qdr_action(qdr_link_inbound_first_attach_CT, "link_first_attach"); + qdr_link_t *link = new_qdr_link_t(); + qdr_terminus_t *local_terminus = dir == QD_OUTGOING ? source : target; ZERO(link); link->core = conn->core; link->conn = conn; + link->name = (char*) malloc(strlen(name)); + strcpy(link->name, name); - if (dir == QD_OUTGOING) { - if (qdr_terminus_has_capability(target, QD_CAPABILITY_ROUTER_CONTROL)) - link->link_type = QD_LINK_CONTROL; - else if (qdr_terminus_has_capability(target, QD_CAPABILITY_ROUTER_DATA)) - link->link_type = QD_LINK_ROUTER; - } + if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_CONTROL)) + link->link_type = QD_LINK_CONTROL; + else if (qdr_terminus_has_capability(local_terminus, QD_CAPABILITY_ROUTER_DATA)) + link->link_type = QD_LINK_ROUTER; action->args.connection.conn = conn; action->args.connection.link = link; @@ -194,7 +211,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qd_direction_t dir, qd void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target) { - qdr_action_t *action = qdr_action(qdr_link_second_attach_CT, "link_second_attach"); + qdr_action_t *action = qdr_action(qdr_link_inbound_second_attach_CT, "link_second_attach"); action->args.connection.link = link; action->args.connection.source = source; @@ -203,12 +220,13 @@ void qdr_link_second_attach(qdr_link_t *link, qdr_terminus_t *source, qdr_termin } -void qdr_link_detach(qdr_link_t *link, qdr_error_t *error) +void qdr_link_detach(qdr_link_t *link, qd_detach_type_t dt, qdr_error_t *error) { - qdr_action_t *action = qdr_action(qdr_link_detach_CT, "link_detach"); + qdr_action_t *action = qdr_action(qdr_link_inbound_detach_CT, "link_detach"); - action->args.connection.link = link; - action->args.connection.error = error; + action->args.connection.link = link; + action->args.connection.error = error; + action->args.connection.dt = dt; qdr_action_enqueue(link->core, action); } @@ -246,6 +264,48 @@ static void qdr_connection_enqueue_work_CT(qdr_core_t *core, } +#define QDR_DISCRIMINATOR_SIZE 16 +static void qdr_generate_discriminator(char *string) +{ + static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_"; + long int rnd1 = random(); + long int rnd2 = random(); + long int rnd3 = random(); + int idx; + int cursor = 0; + + for (idx = 0; idx < 5; idx++) { + string[cursor++] = table[(rnd1 >> (idx * 6)) & 63]; + string[cursor++] = table[(rnd2 >> (idx * 6)) & 63]; + string[cursor++] = table[(rnd3 >> (idx * 6)) & 63]; + } + string[cursor] = '\0'; +} + + +/** + * Generate a temporary routable address for a destination connected to this + * router node. + */ +static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t length) +{ + char discriminator[QDR_DISCRIMINATOR_SIZE]; + qdr_generate_discriminator(discriminator); + snprintf(buffer, length, "amqp:/_topo/%s/%s/temp.%s", core->router_area, core->router_id, discriminator); +} + + +/** + * Generate a link name + */ +static void qdr_generate_link_name(const char *label, char *buffer, size_t length) +{ + char discriminator[QDR_DISCRIMINATOR_SIZE]; + qdr_generate_discriminator(discriminator); + snprintf(buffer, length, "%s.%s", label, discriminator); +} + + static qdr_link_t *qdr_create_link_CT(qdr_core_t *core, qdr_connection_t *conn, qd_link_type_t link_type, @@ -264,6 +324,8 @@ static qdr_link_t *qdr_create_link_CT(qdr_core_t *core, link->conn = conn; link->link_type = link_type; link->link_direction = dir; + link->name = (char*) malloc(QDR_DISCRIMINATOR_SIZE + 8); + qdr_generate_link_name("qdlink", link->name, QDR_DISCRIMINATOR_SIZE + 8); qdr_connection_work_t *work = new_qdr_connection_work_t(); ZERO(work); @@ -277,43 +339,27 @@ static qdr_link_t *qdr_create_link_CT(qdr_core_t *core, } -static void qdr_link_reject_CT(qdr_core_t *core, qdr_link_t *link, qdr_condition_t condition) +static void qdr_link_outbound_detach_CT(qdr_core_t *core, qdr_link_t *link, qdr_condition_t condition) { } -static void qdr_link_accept_CT(qdr_core_t *core, qdr_link_t *link) +static void qdr_link_outbound_second_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target) { -} - + qdr_connection_work_t *work = new_qdr_connection_work_t(); + ZERO(work); + work->work_type = QDR_CONNECTION_WORK_SECOND_ATTACH; + work->link = link; + work->source = source; + work->target = target; -static void qdr_forward_first_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_address_t *addr) -{ + qdr_connection_enqueue_work_CT(core, link->conn, work); } -/** - * Generate a temporary routable address for a destination connected to this - * router node. - */ -static void qdr_generate_temp_addr(qdr_core_t *core, char *buffer, size_t length) +static void qdr_forward_first_attach_CT(qdr_core_t *core, qdr_link_t *link, qdr_address_t *addr, + qdr_terminus_t *source, qdr_terminus_t *target) { - static const char *table = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+_"; - char discriminator[16]; - long int rnd1 = random(); - long int rnd2 = random(); - long int rnd3 = random(); - int idx; - int cursor = 0; - - for (idx = 0; idx < 5; idx++) { - discriminator[cursor++] = table[(rnd1 >> (idx * 6)) & 63]; - discriminator[cursor++] = table[(rnd2 >> (idx * 6)) & 63]; - discriminator[cursor++] = table[(rnd3 >> (idx * 6)) & 63]; - } - discriminator[cursor] = '\0'; - - snprintf(buffer, length, "amqp:/_topo/%s/%s/temp.%s", core->router_area, core->router_id, discriminator); } @@ -341,7 +387,7 @@ static qd_address_semantics_t qdr_semantics_for_address(qdr_core_t *core, qd_fie * Depending on its policy, the address may be eligible for being closed out * (i.e. Logging its terminal statistics and freeing its resources). */ -/*static*/ void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local) +static void qdr_check_addr_CT(qdr_core_t *core, qdr_address_t *addr, bool was_local) { if (addr == 0) return; @@ -361,8 +407,8 @@ static qd_address_semantics_t qdr_semantics_for_address(qdr_core_t *core, qd_fie // If the address has no in-process consumer or destinations, it should be // deleted. // - if (addr->on_message == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->rnodes) == 0 && - !addr->waypoint && !addr->block_deletion) { + if (addr->on_message == 0 && DEQ_SIZE(addr->rlinks) == 0 && DEQ_SIZE(addr->inlinks) == 0 && + DEQ_SIZE(addr->rnodes) == 0 && !addr->waypoint && !addr->block_deletion) { qd_hash_remove_by_handle(core->addr_hash, addr->hash_handle); DEQ_REMOVE(core->addrs, addr); qd_hash_handle_free(addr->hash_handle); @@ -437,6 +483,7 @@ static qdr_address_t *qdr_lookup_terminus_address_CT(qdr_core_t *core, addr = qdr_address(qdr_dynamic_semantics); qd_hash_insert(core->addr_hash, temp_iter, addr, &addr->hash_handle); DEQ_INSERT_TAIL(core->addrs, addr); + qdr_terminus_set_address(terminus, temp_addr); generating = false; } qd_field_iterator_free(temp_iter); @@ -512,10 +559,10 @@ static void qdr_connection_opened_CT(qdr_core_t *core, qdr_action_t *action, boo // The connector-side of inter-router connections is responsible for setting up the // inter-router links: Two (in and out) for control, two for routed-message transfer. // - (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), 0); - (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, 0, qdr_terminus_router_control()); - (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING, qdr_terminus_router_data(), 0); - (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, 0, qdr_terminus_router_data()); + (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_INCOMING, qdr_terminus_router_control(), qdr_terminus_router_control()); + (void) qdr_create_link_CT(core, conn, QD_LINK_CONTROL, QD_OUTGOING, qdr_terminus_router_control(), qdr_terminus_router_control()); + (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_INCOMING, qdr_terminus_router_data(), qdr_terminus_router_data()); + (void) qdr_create_link_CT(core, conn, QD_LINK_ROUTER, QD_OUTGOING, qdr_terminus_router_data(), qdr_terminus_router_data()); } } @@ -554,7 +601,7 @@ static void qdr_connection_closed_CT(qdr_core_t *core, qdr_action_t *action, boo } -static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { if (discard) return; @@ -567,15 +614,14 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, boo // // Reject any attaches of inter-router links that arrive on connections that are not inter-router. - // - if ((link->link_type == QD_LINK_CONTROL || link->link_type == QD_LINK_ROUTER) && conn->role != QDR_ROLE_INTER_ROUTER) - qdr_link_reject_CT(core, link, QDR_CONDITION_FORBIDDEN); - - // // Reject any waypoint links. Waypoint links are always initiated by a router, not the remote container. // - if (link->link_type == QD_LINK_WAYPOINT) - qdr_link_reject_CT(core, link, QDR_CONDITION_FORBIDDEN); + if ((link->link_type == QD_LINK_WAYPOINT) || + ((link->link_type == QD_LINK_CONTROL || link->link_type == QD_LINK_ROUTER) && conn->role != QDR_ROLE_INTER_ROUTER)) { + qdr_link_outbound_detach_CT(core, link, QDR_CONDITION_FORBIDDEN); + qdr_terminus_free(source); + qdr_terminus_free(target); + } if (dir == QD_INCOMING) { // @@ -585,24 +631,28 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, boo case QD_LINK_ENDPOINT: { if (qdr_terminus_is_anonymous(target)) { link->owning_addr = 0; - qdr_link_accept_CT(core, link); + qdr_link_outbound_second_attach_CT(core, link, source, target); + } else { // // This link has a target address // bool link_route; qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, target, false, false, &link_route); - if (!addr) + if (!addr) { // // No route to this destination, reject the link // - qdr_link_reject_CT(core, link, QDR_CONDITION_NO_ROUTE_TO_DESTINATION); + qdr_link_outbound_detach_CT(core, link, QDR_CONDITION_NO_ROUTE_TO_DESTINATION); + qdr_terminus_free(source); + qdr_terminus_free(target); + } else if (link_route) // // This is a link-routed destination, forward the attach to the next hop // - qdr_forward_first_attach_CT(core, link, addr); + qdr_forward_first_attach_CT(core, link, addr, source, target); else { // @@ -611,7 +661,7 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, boo // link->owning_addr = addr; qdr_add_link_ref(&addr->inlinks, link); - qdr_link_accept_CT(core, link); + qdr_link_outbound_second_attach_CT(core, link, source, target); } } break; @@ -622,9 +672,11 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, boo break; case QD_LINK_CONTROL: + qdr_link_outbound_second_attach_CT(core, link, source, target); break; case QD_LINK_ROUTER: + qdr_link_outbound_second_attach_CT(core, link, source, target); break; } } else { @@ -635,17 +687,20 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, boo case QD_LINK_ENDPOINT: { bool link_route; qdr_address_t *addr = qdr_lookup_terminus_address_CT(core, dir, source, true, true, &link_route); - if (!addr) + if (!addr) { // // No route to this destination, reject the link // - qdr_link_reject_CT(core, link, QDR_CONDITION_NO_ROUTE_TO_DESTINATION); + qdr_link_outbound_detach_CT(core, link, QDR_CONDITION_NO_ROUTE_TO_DESTINATION); + qdr_terminus_free(source); + qdr_terminus_free(target); + } else if (link_route) // // This is a link-routed destination, forward the attach to the next hop // - qdr_forward_first_attach_CT(core, link, addr); + qdr_forward_first_attach_CT(core, link, addr, source, target); else { // @@ -659,7 +714,7 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, boo if (key && *key == 'M') qdr_post_mobile_added_CT(core, key); } - qdr_link_accept_CT(core, link); + qdr_link_outbound_second_attach_CT(core, link, source, target); } break; } @@ -672,10 +727,12 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, boo link->owning_addr = core->hello_addr; qdr_add_link_ref(&core->hello_addr->rlinks, link); core->control_links_by_mask_bit[conn->mask_bit] = link; + qdr_link_outbound_second_attach_CT(core, link, source, target); break; case QD_LINK_ROUTER: core->data_links_by_mask_bit[conn->mask_bit] = link; + qdr_link_outbound_second_attach_CT(core, link, source, target); break; } } @@ -699,14 +756,59 @@ static void qdr_link_first_attach_CT(qdr_core_t *core, qdr_action_t *action, boo } -static void qdr_link_second_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +static void qdr_link_inbound_second_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { if (discard) return; - //qdr_link_t *link = action->args.connection.link; - //qdr_terminus_t *source = action->args.connection.source; - //qdr_terminus_t *target = action->args.connection.target; + qdr_connection_t *conn = action->args.connection.conn; + qdr_link_t *link = action->args.connection.link; + qd_direction_t dir = action->args.connection.dir; + qdr_terminus_t *source = action->args.connection.source; + qdr_terminus_t *target = action->args.connection.target; + + if (dir == QD_INCOMING) { + // + // Handle incoming link cases + // + switch (link->link_type) { + case QD_LINK_ENDPOINT: + break; + + case QD_LINK_WAYPOINT: + break; + + case QD_LINK_CONTROL: + break; + + case QD_LINK_ROUTER: + break; + } + } else { + // + // Handle outgoing link cases + // + switch (link->link_type) { + case QD_LINK_ENDPOINT: + break; + + case QD_LINK_WAYPOINT: + break; + + case QD_LINK_CONTROL: + link->owning_addr = core->hello_addr; + qdr_add_link_ref(&core->hello_addr->rlinks, link); + core->control_links_by_mask_bit[conn->mask_bit] = link; + break; + + case QD_LINK_ROUTER: + core->data_links_by_mask_bit[conn->mask_bit] = link; + break; + } + } + + qdr_terminus_free(source); + qdr_terminus_free(target); // // Cases to be handled: @@ -723,28 +825,79 @@ static void qdr_link_second_attach_CT(qdr_core_t *core, qdr_action_t *action, bo } -static void qdr_link_detach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) +static void qdr_link_inbound_detach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { if (discard) return; - qdr_link_t *link = action->args.connection.link; - //qdr_error_t *error = action->args.connection.error; + qdr_connection_t *conn = action->args.connection.conn; + qdr_link_t *link = action->args.connection.link; + //qdr_error_t *error = action->args.connection.error; + qd_detach_type_t dt = action->args.connection.dt; + qdr_address_t *addr = link->owning_addr; + bool was_local = false; - switch (link->link_type) { - case QD_LINK_ENDPOINT: - break; + link->owning_addr = 0; - case QD_LINK_WAYPOINT: - break; + if (link->link_direction == QD_INCOMING) { + // + // Handle incoming link cases + // + switch (link->link_type) { + case QD_LINK_ENDPOINT: + if (addr) + qdr_del_link_ref(&addr->inlinks, link); + break; - case QD_LINK_CONTROL: - break; + case QD_LINK_WAYPOINT: + break; + + case QD_LINK_CONTROL: + break; - case QD_LINK_ROUTER: - break; + case QD_LINK_ROUTER: + break; + } + } else { + // + // Handle outgoing link cases + // + switch (link->link_type) { + case QD_LINK_ENDPOINT: + if (addr) { + qdr_del_link_ref(&addr->rlinks, link); + was_local = true; + } + break; + + case QD_LINK_WAYPOINT: + break; + + case QD_LINK_CONTROL: + qdr_del_link_ref(&core->hello_addr->rlinks, link); + core->control_links_by_mask_bit[conn->mask_bit] = 0; + qdr_post_link_lost_CT(core, conn->mask_bit); + break; + + case QD_LINK_ROUTER: + core->data_links_by_mask_bit[conn->mask_bit] = 0; + break; + } } + // + // If the detach occurred via protocol, send a detach back. + // TODO - Note that this is not appropriate for routed links. + // + if (dt != QD_LOST) + qdr_link_outbound_detach_CT(core, link, 0); // TODO - Fix error arg + + // + // If there was an address associated with this link, check to see if any address-related + // cleanup has to be done. + // + if (addr) + qdr_check_addr_CT(core, addr, was_local); // // Cases to be handled: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/src/router_core/router_core_private.h ---------------------------------------------------------------------- diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index db84c55..0ee1604 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -72,6 +72,7 @@ struct qdr_action_t { qdr_terminus_t *source; qdr_terminus_t *target; qdr_error_t *error; + qd_detach_type_t dt; } connection; // @@ -161,6 +162,7 @@ struct qdr_link_t { qdr_connection_t *conn; ///< [ref] Connection that owns this link qd_link_type_t link_type; qd_direction_t link_direction; + char *name; qdr_address_t *owning_addr; ///< [ref] Address record that owns this link //qd_waypoint_t *waypoint; ///< [ref] Waypoint that owns this link qdr_link_t *connected_link; ///< [ref] If this is a link-route, reference the connected link @@ -245,7 +247,9 @@ void qdr_del_node_ref(qdr_router_ref_list_t *ref_list, qdr_node_t *rnode); // General Work // // The following types are used to post work to the IO threads for -// non-connection-specific action. +// non-connection-specific action. These actions are serialized through +// a zero-delay timer and are processed by one thread at a time. General +// actions occur in-order and are not run concurrently. // typedef struct qdr_general_work_t qdr_general_work_t; typedef void (*qdr_general_work_handler_t) (qdr_core_t *core, qdr_general_work_t *work); @@ -266,7 +270,9 @@ qdr_general_work_t *qdr_general_work(qdr_general_work_handler_t handler); // Connection Work // // The following types are used to post work to the IO threads for -// connection-specific action. +// connection-specific action. The actions for a particular connection +// are run in-order and are not concurrent. Actions for different connections +// will run concurrently. // typedef enum { QDR_CONNECTION_WORK_FIRST_ATTACH, http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/src/router_core/terminus.c ---------------------------------------------------------------------- diff --git a/src/router_core/terminus.c b/src/router_core/terminus.c index 175acf4..39ad368 100644 --- a/src/router_core/terminus.c +++ b/src/router_core/terminus.c @@ -83,6 +83,9 @@ void qdr_terminus_free(qdr_terminus_t *term) void qdr_terminus_copy(qdr_terminus_t *from, pn_terminus_t *to) { + if (!from) + return; + if (from->address) { unsigned char *addr = qd_field_iterator_copy(from->address->iterator); pn_terminus_set_address(to, (char*) addr); @@ -135,6 +138,13 @@ bool qdr_terminus_is_dynamic(qdr_terminus_t *term) } +void qdr_terminus_set_address(qdr_terminus_t *term, const char *addr) +{ + qdr_field_free(term->address); + term->address = qdr_field(addr); +} + + qd_field_iterator_t *qdr_terminus_get_address(qdr_terminus_t *term) { if (qdr_terminus_is_anonymous(term)) http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index e9e8012..97b3fc1 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -199,10 +199,21 @@ void qd_router_link_free_LH(qd_router_link_t *rlink) } +static int router_writable_conn_handler(void *type_context, qd_connection_t *conn, void *context) +{ + qdr_connection_t *qconn = (qdr_connection_t*) qd_connection_get_context(conn); + + if (qconn) + return qdr_connection_process(qconn); + return 0; +} + + /** * Outgoing Link Writable Handler + * DEPRECATE */ -static int router_writable_link_handler(void* context, qd_link_t *link) +/*static*/ int router_writable_link_handler(void* context, qd_link_t *link) { qd_router_t *router = (qd_router_t*) context; qd_router_delivery_t *delivery; @@ -821,7 +832,8 @@ static int router_incoming_link_handler(void* context, qd_link_t *link) qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn); qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_INCOMING, qdr_terminus(qd_link_remote_source(link)), - qdr_terminus(qd_link_remote_target(link))); + qdr_terminus(qd_link_remote_target(link)), + pn_link_name(qd_link_pn(link))); qdr_link_set_context(qdr_link, link); qd_link_set_context(link, qdr_link); @@ -838,7 +850,8 @@ static int router_outgoing_link_handler(void* context, qd_link_t *link) qdr_connection_t *qdr_conn = (qdr_connection_t*) qd_connection_get_context(conn); qdr_link_t *qdr_link = qdr_link_first_attach(qdr_conn, QD_OUTGOING, qdr_terminus(qd_link_remote_source(link)), - qdr_terminus(qd_link_remote_target(link))); + qdr_terminus(qd_link_remote_target(link)), + pn_link_name(qd_link_pn(link))); qdr_link_set_context(qdr_link, link); qd_link_set_context(link, qdr_link); @@ -915,14 +928,14 @@ static int router_link_detach_handler(void* context, qd_link_t *link, qd_detach_ qdr_error_t *error = qdr_error_from_pn(cond); if (!error && dt == QD_LOST) error = qdr_error("qd:routed-link-lost", "Connectivity to the peer container was lost"); - qdr_link_detach(rlink, error); + qdr_link_detach(rlink, dt, error); } return 0; } -static void router_inbound_opened_handler(void *type_context, qd_connection_t *conn, void *context) +static int router_inbound_opened_handler(void *type_context, qd_connection_t *conn, void *context) { qd_router_t *router = (qd_router_t*) type_context; qdr_connection_role_t role = qd_router_connection_role(conn); @@ -930,10 +943,12 @@ static void router_inbound_opened_handler(void *type_context, qd_connection_t *c qd_connection_set_context(conn, qdrc); qdr_connection_set_context(qdrc, conn); + + return 0; } -static void router_outbound_opened_handler(void *type_context, qd_connection_t *conn, void *context) +static int router_outbound_opened_handler(void *type_context, qd_connection_t *conn, void *context) { qd_router_t *router = (qd_router_t*) type_context; qdr_connection_role_t role = qd_router_connection_role(conn); @@ -941,14 +956,18 @@ static void router_outbound_opened_handler(void *type_context, qd_connection_t * qd_connection_set_context(conn, qdrc); qdr_connection_set_context(qdrc, conn); + + return 0; } -static void router_closed_handler(void *type_context, qd_connection_t *conn, void *context) +static int router_closed_handler(void *type_context, qd_connection_t *conn, void *context) { qdr_connection_t *qdrc = (qdr_connection_t*) qd_connection_get_context(conn); qdr_connection_closed(qdrc); qd_connection_set_context(conn, 0); + + return 0; } @@ -969,7 +988,7 @@ static qd_node_type_t router_node = {"router", 0, 0, router_disposition_handler, router_incoming_link_handler, router_outgoing_link_handler, - router_writable_link_handler, + router_writable_conn_handler, router_link_detach_handler, router_link_attach_handler, router_link_flow_handler, @@ -1080,11 +1099,46 @@ static void qd_router_link_first_attach(void *context, qdr_terminus_t *source, qdr_terminus_t *target) { + qd_router_t *router = (qd_router_t*) context; + qd_connection_t *qconn = (qd_connection_t*) qdr_connection_get_context(conn); + + // + // Create a new link to be attached + // + qd_link_t *qlink = qd_link(router->node, qconn, qdr_link_direction(link), qdr_link_name(link)); + + // + // Copy the source and target termini to the link + // + qdr_terminus_copy(source, qd_link_source(qlink)); + qdr_terminus_copy(target, qd_link_target(qlink)); + + // + // Associate the qd_link and the qdr_link to each other + // + qdr_link_set_context(link, qlink); + qd_link_set_context(qlink, link); + + // + // Open (attach) the link + // + pn_link_open(qd_link_pn(qlink)); } static void qd_router_link_second_attach(void *context, qdr_link_t *link, qdr_terminus_t *source, qdr_terminus_t *target) { + qd_link_t *qlink = (qd_link_t*) qdr_link_get_context(link); + if (!qlink) + return; + + qdr_terminus_copy(source, qd_link_source(qlink)); + qdr_terminus_copy(target, qd_link_target(qlink)); + + // + // Open (attach) the link + // + pn_link_open(qd_link_pn(qlink)); } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/d218eeea/src/server.c ---------------------------------------------------------------------- diff --git a/src/server.c b/src/server.c index b7303b2..441f575 100644 --- a/src/server.c +++ b/src/server.c @@ -1181,8 +1181,10 @@ void qd_server_activate(qd_connection_t *ctx) if (!ctor) return; - if (!qdpn_connector_closed(ctor)) + if (!qdpn_connector_closed(ctor)) { qdpn_connector_activate(ctor, QDPN_CONNECTOR_WRITABLE); + qdpn_driver_wakeup(ctx->server->driver); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
