Repository: qpid-dispatch Updated Branches: refs/heads/master 7b1c3d3c8 -> 6406f040b
DISPATCH-174 - Fix lifecycle management of pn_link_t and pn_session_t. Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/6406f040 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/6406f040 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/6406f040 Branch: refs/heads/master Commit: 6406f040b5f121714eb93a3d6fbf90f2ff3d8a07 Parents: 7b1c3d3 Author: Ted Ross <[email protected]> Authored: Tue Sep 29 11:12:03 2015 -0400 Committer: Ted Ross <[email protected]> Committed: Tue Sep 29 11:12:03 2015 -0400 ---------------------------------------------------------------------- src/container.c | 54 +++++++++++++++++++++++++------------------------- src/router_node.c | 9 +++++---- 2 files changed, 32 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6406f040/src/container.c ---------------------------------------------------------------------- diff --git a/src/container.c b/src/container.c index 640c186..2be163d 100644 --- a/src/container.c +++ b/src/container.c @@ -106,7 +106,6 @@ static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link) pn_condition_set_name(cond, "amqp:not-found"); pn_condition_set_description(cond, "Source node does not exist"); pn_link_close(pn_link); - pn_link_free(pn_link); return; } } @@ -117,7 +116,6 @@ static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link) pn_condition_set_name(cond, "amqp:internal-error"); pn_condition_set_description(cond, "Insufficient memory"); pn_link_close(pn_link); - pn_link_free(pn_link); return; } @@ -129,6 +127,12 @@ static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link) link->drain_mode = pn_link_get_drain(pn_link); link->close_sess_with_link = false; + // + // Keep the borrowed references + // + pn_incref(pn_link); + pn_incref(link->pn_sess); + pn_link_set_context(pn_link, link); node->ntype->outgoing_handler(node->context, link); } @@ -156,7 +160,6 @@ static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link) pn_condition_set_name(cond, "amqp:not-found"); pn_condition_set_description(cond, "Target node does not exist"); pn_link_close(pn_link); - pn_link_free(pn_link); return; } } @@ -167,7 +170,6 @@ static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link) pn_condition_set_name(cond, "amqp:internal-error"); pn_condition_set_description(cond, "Insufficient memory"); pn_link_close(pn_link); - pn_link_free(pn_link); return; } @@ -179,6 +181,12 @@ static void setup_incoming_link(qd_container_t *container, pn_link_t *pn_link) link->drain_mode = pn_link_get_drain(pn_link); link->close_sess_with_link = false; + // + // Keep the borrowed references + // + pn_incref(pn_link); + pn_incref(link->pn_sess); + pn_link_set_context(pn_link, link); node->ntype->incoming_handler(node->context, link); } @@ -253,7 +261,6 @@ static int close_handler(void* unused, pn_connection_t *conn, qd_connection_t* q // being properly 'detached'. They are being orphaned. // pn_link_t *pn_link = pn_link_head(conn, PN_LOCAL_ACTIVE); - pn_link_t *link_to_free; while (pn_link) { qd_link_t *link = (qd_link_t*) pn_link_get_context(pn_link); if (link) { @@ -263,19 +270,14 @@ static int close_handler(void* unused, pn_connection_t *conn, qd_connection_t* q } } pn_link_close(pn_link); - link_to_free = pn_link; pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE); - pn_link_free(link_to_free); } // teardown all sessions pn_session_t *ssn = pn_session_head(conn, 0); - pn_session_t *ssn_to_free; while (ssn) { pn_session_close(ssn); - ssn_to_free = ssn; ssn = pn_session_next(ssn, 0); - pn_session_free(ssn_to_free); } // teardown the connection @@ -359,7 +361,6 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even pn_link = pn_link_next(pn_link, PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE); } pn_session_close(ssn); - pn_session_free(ssn); } break; @@ -388,18 +389,11 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even // If the qd_link does not reference the pn_link, we have already freed the pn_link. // If we attempt to free it again, proton will crash. // - if (qd_link->pn_link == pn_link) { + if (qd_link->pn_link == pn_link) pn_link_close(pn_link); - pn_link_free(pn_link); - } } break; - case PN_LINK_FINAL : - pn_link = pn_event_link(event); - qd_link = (qd_link_t*) pn_link_get_context(pn_link); - break; - case PN_LINK_FLOW : pn_link = pn_event_link(event); qd_link = (qd_link_t*) pn_link_get_context(pn_link); @@ -437,6 +431,7 @@ int pn_event_handler(void *handler_context, void *conn_context, pn_event_t *even case PN_LINK_LOCAL_OPEN : case PN_LINK_LOCAL_CLOSE : case PN_LINK_LOCAL_DETACH : + case PN_LINK_FINAL : case PN_TRANSPORT : case PN_TRANSPORT_ERROR : case PN_TRANSPORT_HEAD_CLOSED : @@ -694,6 +689,12 @@ qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, c link->drain_mode = pn_link_get_drain(link->pn_link); link->close_sess_with_link = true; + // + // Keep the borrowed references + // + pn_incref(link->pn_link); + pn_incref(link->pn_sess); + pn_link_set_context(link->pn_link, link); pn_session_open(link->pn_sess); @@ -705,6 +706,10 @@ qd_link_t *qd_link(qd_node_t *node, qd_connection_t *conn, qd_direction_t dir, c void qd_link_free_LH(qd_link_t *link) { if (!link) return; + if (link->pn_link) pn_decref(link->pn_link); + if (link->pn_sess) pn_decref(link->pn_sess); + link->pn_link = 0; + link->pn_sess = 0; free_qd_link_t(link); } @@ -786,7 +791,7 @@ qd_connection_t *qd_link_connection(qd_link_t *link) return 0; qd_connection_t *ctx = pn_connection_get_context(conn); - if (!ctx) + if (!ctx || !ctx->opened || ctx->closed) return 0; return ctx; @@ -840,15 +845,10 @@ void qd_link_activate(qd_link_t *link) void qd_link_close(qd_link_t *link) { - if (link->pn_link) { + if (link->pn_link) pn_link_close(link->pn_link); - pn_link_free(link->pn_link); - link->pn_link = 0; - } - if (link->close_sess_with_link && link->pn_sess) { + if (link->close_sess_with_link && link->pn_sess) pn_session_close(link->pn_sess); - pn_session_free(link->pn_sess); - } } http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/6406f040/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index f51d779..361cd44 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -441,12 +441,13 @@ static int router_writable_link_handler(void* context, qd_link_t *link) qd_routed_event_t *re; size_t offer; int event_count = 0; - bool drain_mode; - bool drain_changed = qd_link_drain_changed(link, &drain_mode); if (!rlink) return 0; + bool drain_mode; + bool drain_changed = qd_link_drain_changed(link, &drain_mode); + DEQ_INIT(to_send); DEQ_INIT(events); @@ -1567,8 +1568,8 @@ static int router_link_detach_handler(void* context, qd_link_t *link, qd_detach_ memset(ld, 0, sizeof(link_detach_t)); ld->router = router; ld->rlink = rlink->connected_link; - pn_condition_t *cond = pn_link_remote_condition(qd_link_pn(link)); - if (pn_condition_is_set(cond)) { + pn_condition_t *cond = qd_link_pn(link) ? pn_link_remote_condition(qd_link_pn(link)) : 0; + if (cond && pn_condition_is_set(cond)) { if (pn_condition_get_name(cond)) { strncpy(ld->condition_name, pn_condition_get_name(cond), COND_NAME_LEN); ld->condition_name[COND_NAME_LEN] = '\0'; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
