This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit b3af7de4ce33660aff4db1c9b7dc419d8b733738 Author: Ted Ross <tr...@apache.org> AuthorDate: Thu Jun 18 14:25:44 2020 -0400 Dataplane: Added no_route and initial_delivery on link-first-attach. --- include/qpid/dispatch/protocol_adaptor.h | 4 ++ src/adaptors/reference_adaptor.c | 8 ++++ src/router_core/connections.c | 64 ++++++++++++++++++++++++++++++-- src/router_core/router_core.c | 14 +++++++ src/router_core/router_core_private.h | 2 + src/router_core/transfer.c | 51 ++++++++++++++----------- src/router_node.c | 4 ++ 7 files changed, 122 insertions(+), 25 deletions(-) diff --git a/include/qpid/dispatch/protocol_adaptor.h b/include/qpid/dispatch/protocol_adaptor.h index 9920aba..bbf2f27 100644 --- a/include/qpid/dispatch/protocol_adaptor.h +++ b/include/qpid/dispatch/protocol_adaptor.h @@ -775,6 +775,8 @@ const char *qdr_link_name(const qdr_link_t *link); * @param target Target terminus of the attach * @param name - name of the link * @param terminus_addr - terminus address if any + * @param no_route If true, new deliveries are not to be routed to this link + * @param initial_delivery (optional) Move this delivery from its existing link to the head of this link's buffer * @param link_id - set to the management id of the new link * @return A pointer to a new qdr_link_t object to track the link */ @@ -784,6 +786,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qdr_terminus_t *target, const char *name, const char *terminus_addr, + bool no_route, + qdr_delivery_t *initial_delivery, uint64_t *link_id); /** diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c index 7a65c17..59975a9 100644 --- a/src/adaptors/reference_adaptor.c +++ b/src/adaptors/reference_adaptor.c @@ -113,6 +113,8 @@ static void qdr_ref_second_attach(void *context, qdr_link_t *link, target, //qdr_terminus_t *target, "ref.1", //const char *name, 0, //const char *terminus_addr, + false, //bool no_route + 0, //qdr_delivery_t *initial_delivery &link_id); target = qdr_terminus(0); @@ -123,6 +125,8 @@ static void qdr_ref_second_attach(void *context, qdr_link_t *link, target, //qdr_terminus_t *target, "ref.2", //const char *name, 0, //const char *terminus_addr, + false, //bool no_route + 0, //qdr_delivery_t *initial_delivery &link_id); source = qdr_terminus(0); @@ -133,6 +137,8 @@ static void qdr_ref_second_attach(void *context, qdr_link_t *link, qdr_terminus(0), //qdr_terminus_t *target, "ref.3", //const char *name, 0, //const char *terminus_addr, + false, //bool no_route + 0, //qdr_delivery_t *initial_delivery &link_id); } } @@ -366,6 +372,8 @@ static void on_startup(void *context) qdr_terminus(0), //qdr_terminus_t *target, "ref.0", //const char *name, 0, //const char *terminus_addr, + false, //bool no_route + 0, //qdr_delivery_t *initial_delivery &link_id); } diff --git a/src/router_core/connections.c b/src/router_core/connections.c index 88c4737..d76f01c 100644 --- a/src/router_core/connections.c +++ b/src/router_core/connections.c @@ -543,6 +543,8 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, qdr_terminus_t *target, const char *name, const char *terminus_addr, + bool no_route, + qdr_delivery_t *initial_delivery, uint64_t *link_id) { qdr_action_t *action = qdr_action(qdr_link_inbound_first_attach_CT, "link_first_attach"); @@ -573,6 +575,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, link->core_ticks = conn->core->uptime_ticks; link->zero_credit_time = conn->core->uptime_ticks; link->terminus_survives_disconnect = qdr_terminus_survives_disconnect(local_terminus); + link->no_route = no_route; link->strip_annotations_in = conn->strip_annotations_in; link->strip_annotations_out = conn->strip_annotations_out; @@ -595,6 +598,7 @@ qdr_link_t *qdr_link_first_attach(qdr_connection_t *conn, action->args.connection.dir = dir; action->args.connection.source = source; action->args.connection.target = target; + action->args.connection.initial_delivery = initial_delivery; qdr_action_enqueue(conn->core, action); return link; @@ -1563,6 +1567,56 @@ static void qdr_attach_link_downlink_CT(qdr_core_t *core, qdr_connection_t *conn } +static void qdr_link_process_initial_delivery_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t *dlv) +{ + qdr_link_t *old_link = safe_deref_qdr_link_t(dlv->link_sp); + int ref_delta = 0; + + // + // Remove the delivery from its current link if needed + // + if (!!old_link) { + switch (dlv->where) { + case QDR_DELIVERY_NOWHERE: + break; + + case QDR_DELIVERY_IN_UNDELIVERED: + DEQ_REMOVE(old_link->undelivered, dlv); + dlv->where = QDR_DELIVERY_NOWHERE; + ref_delta--; + break; + + case QDR_DELIVERY_IN_UNSETTLED: + DEQ_REMOVE(old_link->unsettled, dlv); + dlv->where = QDR_DELIVERY_NOWHERE; + ref_delta--; + break; + + case QDR_DELIVERY_IN_SETTLED: + DEQ_REMOVE(old_link->settled, dlv); + dlv->where = QDR_DELIVERY_NOWHERE; + ref_delta--; + break; + } + } + + // + // Enqueue the delivery onto the new link's undelivered list + // + set_safe_ptr_qdr_link_t(link, &dlv->link_sp); + qdr_forward_deliver_CT(core, link, dlv); + + // + // Adjust the delivery's reference count + // + assert(ref_delta <= 0); + while (ref_delta < 0) { + qdr_delivery_decref(core, dlv, "qdr_link_process_initial_delivery_CT"); + ref_delta++; + } +} + + static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *action, bool discard) { qdr_connection_t *conn = safe_deref_qdr_connection_t(action->args.connection.conn); @@ -1570,9 +1624,10 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act if (discard || !conn || !link) return; - qd_direction_t dir = action->args.connection.dir; - qdr_terminus_t *source = action->args.connection.source; - qdr_terminus_t *target = action->args.connection.target; + qd_direction_t dir = action->args.connection.dir; + qdr_terminus_t *source = action->args.connection.source; + qdr_terminus_t *target = action->args.connection.target; + qdr_delivery_t *initial_dlv = action->args.connection.initial_delivery; // // Start the attach count. @@ -1675,6 +1730,9 @@ static void qdr_link_inbound_first_attach_CT(qdr_core_t *core, qdr_action_t *act // // Handle outgoing link cases // + if (initial_dlv) + qdr_link_process_initial_delivery_CT(core, link, initial_dlv); + switch (link->link_type) { case QD_LINK_ENDPOINT: { if (core->addr_lookup_handler) diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c index 25c124a..f61ebfd 100644 --- a/src/router_core/router_core.c +++ b/src/router_core/router_core.c @@ -567,6 +567,13 @@ void qdr_core_bind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_li if (key && (*key == QD_ITER_HASH_PREFIX_MOBILE)) link->phase = (int) (key[1] - '0'); + // + // If this link is configured as no-route, don't create any functional linkage between the + // link and the address beyond the owning_addr. + // + if (link->no_route) + return; + if (link->link_direction == QD_OUTGOING) { qdr_add_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); if (DEQ_SIZE(addr->rlinks) == 1) { @@ -599,6 +606,13 @@ void qdr_core_unbind_address_link_CT(qdr_core_t *core, qdr_address_t *addr, qdr_ { link->owning_addr = 0; + // + // If the link is configured as no_route, there will be no further link/address + // linkage to disconnect. + // + if (link->no_route) + return; + if (link->link_direction == QD_OUTGOING) { qdr_del_link_ref(&addr->rlinks, link, QDR_LINK_LIST_CLASS_ADDRESS); if (DEQ_SIZE(addr->rlinks) == 0) { diff --git a/src/router_core/router_core_private.h b/src/router_core/router_core_private.h index ac123e5..1709442 100644 --- a/src/router_core/router_core_private.h +++ b/src/router_core/router_core_private.h @@ -136,6 +136,7 @@ struct qdr_action_t { qd_detach_type_t dt; int credit; bool drain; + qdr_delivery_t *initial_delivery; } connection; // @@ -464,6 +465,7 @@ struct qdr_link_t { bool streaming; ///< True if this link can be reused for streaming msgs bool in_streaming_pool; ///< True if this link is in the connections standby pool STREAMING_POOL bool terminus_survives_disconnect; + bool no_route; ///< True if this link is to not receive routed deliveries char *strip_prefix; char *insert_prefix; diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c index add176a..6ca2b2b 100644 --- a/src/router_core/transfer.c +++ b/src/router_core/transfer.c @@ -151,7 +151,8 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) dlv = DEQ_HEAD(link->undelivered); if (dlv) { qdr_delivery_incref(dlv, "qdr_link_process_deliveries - holding the undelivered delivery locally"); - uint64_t new_disp = 0; + uint64_t new_disp = 0; + bool to_new_link = false; ///< Delivery got moved to a new link by the handler // DISPATCH-1302 race hack fix: There is a race between the CORE thread // and the outbound (this) thread over settlement. It occurs when the CORE @@ -165,9 +166,13 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) sys_mutex_unlock(conn->work_lock); new_disp = conn->protocol_adaptor->deliver_handler(conn->protocol_adaptor->user_context, link, dlv, settled); sys_mutex_lock(conn->work_lock); - } while (settled != dlv->settled); // oops missed the settlement + if (safe_deref_qdr_link_t(dlv->link_sp) != link) { + to_new_link = true; + break; + } + } while (settled != dlv->settled && !to_new_link); // oops missed the settlement send_complete = qdr_delivery_send_complete(dlv); - if (send_complete) { + if (send_complete || to_new_link) { // // The entire message has been sent. It is now the appropriate time to have the delivery removed // from the head of the undelivered list and move it to the unsettled list if it is not settled. @@ -178,26 +183,28 @@ int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit) link->credit_to_core--; link->total_deliveries++; - // DISPATCH-1153: - // If the undelivered list is cleared the link may have detached. Stop processing. - offer = DEQ_SIZE(link->undelivered); - if (offer == 0) { - qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - release local reference - closed link"); - sys_mutex_unlock(conn->work_lock); - return num_deliveries_completed; - } + if (!to_new_link) { + // DISPATCH-1153: + // If the undelivered list is cleared the link may have detached. Stop processing. + offer = DEQ_SIZE(link->undelivered); + if (offer == 0) { + qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - release local reference - closed link"); + sys_mutex_unlock(conn->work_lock); + return num_deliveries_completed; + } - assert(dlv == DEQ_HEAD(link->undelivered)); - DEQ_REMOVE_HEAD(link->undelivered); - dlv->link_work = 0; - - if (settled || qdr_delivery_oversize(dlv) || qdr_delivery_is_aborted(dlv)) { - dlv->where = QDR_DELIVERY_NOWHERE; - qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - remove from undelivered list"); - } else { - DEQ_INSERT_TAIL(link->unsettled, dlv); - dlv->where = QDR_DELIVERY_IN_UNSETTLED; - qd_log(core->log, QD_LOG_DEBUG, "Delivery transfer: dlv:%lx qdr_link_process_deliveries: undelivered-list -> unsettled-list", (long) dlv); + assert(dlv == DEQ_HEAD(link->undelivered)); + DEQ_REMOVE_HEAD(link->undelivered); + dlv->link_work = 0; + + if (settled || qdr_delivery_oversize(dlv) || qdr_delivery_is_aborted(dlv)) { + dlv->where = QDR_DELIVERY_NOWHERE; + qdr_delivery_decref(core, dlv, "qdr_link_process_deliveries - remove from undelivered list"); + } else { + DEQ_INSERT_TAIL(link->unsettled, dlv); + dlv->where = QDR_DELIVERY_IN_UNSETTLED; + qd_log(core->log, QD_LOG_DEBUG, "Delivery transfer: dlv:%lx qdr_link_process_deliveries: undelivered-list -> unsettled-list", (long) dlv); + } } } else { diff --git a/src/router_node.c b/src/router_node.c index bd60542..993a235 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -827,6 +827,8 @@ static int AMQP_incoming_link_handler(void* context, qd_link_t *link) qdr_terminus(qd_link_remote_target(link)), pn_link_name(qd_link_pn(link)), terminus_addr, + false, + 0, &link_id); qd_link_set_link_id(link, link_id); qdr_link_set_context(qdr_link, link); @@ -856,6 +858,8 @@ static int AMQP_outgoing_link_handler(void* context, qd_link_t *link) qdr_terminus(qd_link_remote_target(link)), pn_link_name(qd_link_pn(link)), terminus_addr, + false, + 0, &link_id); qd_link_set_link_id(link, link_id); qdr_link_set_context(qdr_link, link); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org