This is an automated email from the ASF dual-hosted git repository.
gmurthy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push:
new 644935d DISPATCH-1276 - Added a cleanup function
qdrc_address_endpoint_cleanup which will clean out the link's edge context in
addition to cleaning out other state. This will prevent the crash outlined in
the issue
644935d is described below
commit 644935decfeb90af1fd0315437fd2256a0ba576c
Author: Ganesh Murthy <[email protected]>
AuthorDate: Thu Mar 7 09:30:32 2019 -0500
DISPATCH-1276 - Added a cleanup function qdrc_address_endpoint_cleanup
which will clean out the link's edge context in addition to cleaning out other
state. This will prevent the crash outlined in the issue
---
.../edge_addr_tracking/edge_addr_tracking.c | 89 +++++++++++++++-------
src/router_core/modules/edge_router/addr_proxy.c | 2 +-
2 files changed, 61 insertions(+), 30 deletions(-)
diff --git a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
index e161963..b568652 100644
--- a/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
+++ b/src/router_core/modules/edge_addr_tracking/edge_addr_tracking.c
@@ -31,7 +31,8 @@ struct qdr_addr_endpoint_state_t {
qdrc_endpoint_t *endpoint;
qdr_connection_t *conn; // The connection associated
with the endpoint.
qdr_addr_tracking_module_context_t *mc;
- qdr_link_t *link;
+ int ref_count;
+ bool closed; // Is the endpoint that this
state belong to closed?
};
DEQ_DECLARE(qdr_addr_endpoint_state_t, qdr_addr_endpoint_state_list_t);
@@ -77,7 +78,7 @@ static qd_message_t *qdcm_edge_create_address_dlv(qdr_core_t
*core, qdr_address_
return msg;
}
-static qdr_addr_endpoint_state_t
*qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t
endpoint_state_list, qdr_connection_t *conn, qdr_link_t *link)
+static qdr_addr_endpoint_state_t
*qdrc_get_endpoint_state_for_connection(qdr_addr_endpoint_state_list_t
endpoint_state_list, qdr_connection_t *conn)
{
qdr_addr_endpoint_state_t *endpoint_state = DEQ_HEAD(endpoint_state_list);
while(endpoint_state) {
@@ -101,12 +102,12 @@ static void qdrc_address_endpoint_first_attach(void
*bind_context,
qdr_addr_endpoint_state_t *endpoint_state =
new_qdr_addr_endpoint_state_t();
ZERO(endpoint_state);
- endpoint_state->endpoint = endpoint;
- endpoint_state->mc = bc;
- endpoint_state->conn = qdrc_endpoint_get_connection_CT(endpoint);
+ endpoint_state->endpoint = endpoint;
+ endpoint_state->mc = bc;
+ endpoint_state->conn = qdrc_endpoint_get_connection_CT(endpoint);
- DEQ_INSERT_TAIL(bc->endpoint_state_list, endpoint_state);
+ DEQ_INSERT_TAIL(bc->endpoint_state_list, endpoint_state);
//
// The link to hard coded address QD_TERMINUS_EDGE_ADDRESS_TRACKING should
be created only if this is a receiver link
@@ -133,18 +134,33 @@ static void qdrc_address_endpoint_on_first_detach(void
*link_context,
{
qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t
*)link_context;
qdrc_endpoint_detach_CT(endpoint_state->mc->core,
endpoint_state->endpoint, 0);
- qdr_addr_tracking_module_context_t *mc = endpoint_state->mc;
- DEQ_REMOVE(mc->endpoint_state_list, endpoint_state);
- endpoint_state->conn = 0;
- endpoint_state->endpoint = 0;
- if (endpoint_state->link) {
- endpoint_state->link->edge_context = 0;
- endpoint_state->link = 0;
- }
- free_qdr_addr_endpoint_state_t(endpoint_state);
qdr_error_free(error);
}
+static void qdrc_address_endpoint_cleanup(void *link_context)
+{
+ qdr_addr_endpoint_state_t *endpoint_state = (qdr_addr_endpoint_state_t
*)link_context;
+ if (endpoint_state) {
+ qdr_addr_tracking_module_context_t *mc = endpoint_state->mc;
+ assert (endpoint_state->conn);
+ endpoint_state->closed = true;
+ if (endpoint_state->ref_count == 0) {
+
+ //
+ // The endpoint has been closed and no other links are referencing
this endpoint. Time to free it.
+ // Clean out all the states held by the link_context
(endpoint_state)
+ //
+ if (mc) {
+ DEQ_REMOVE(mc->endpoint_state_list, endpoint_state);
+ }
+
+ endpoint_state->conn = 0;
+ endpoint_state->endpoint = 0;
+ free_qdr_addr_endpoint_state_t(endpoint_state);
+ }
+ }
+}
+
static bool qdrc_can_send_address(qdr_address_t *addr, qdr_connection_t *conn)
{
@@ -201,7 +217,7 @@ static void on_addr_event(void *context, qdrc_event_t
event, qdr_address_t *addr
while (inlink) {
if(inlink->link->edge_context != 0) {
qdr_addr_endpoint_state_t *endpoint_state =
(qdr_addr_endpoint_state_t *)inlink->link->edge_context;
- if (qdrc_can_send_address(addr, endpoint_state->conn)
) {
+ if (!endpoint_state->closed &&
qdrc_can_send_address(addr, endpoint_state->conn) ) {
qdrc_endpoint_t *endpoint =
endpoint_state->endpoint;
qdrc_send_message(addr_tracking->core, addr,
endpoint, true);
}
@@ -222,7 +238,7 @@ static void on_addr_event(void *context, qdrc_event_t
event, qdr_address_t *addr
while (inlink) {
if(inlink->link->edge_context != 0) {
qdr_addr_endpoint_state_t *endpoint_state =
(qdr_addr_endpoint_state_t *)inlink->link->edge_context;
- if (qdrc_can_send_address(addr, endpoint_state->conn) ) {
+ if (!endpoint_state->closed && qdrc_can_send_address(addr,
endpoint_state->conn) ) {
qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
if (endpoint)
qdrc_send_message(addr_tracking->core, addr,
endpoint, true);
@@ -244,9 +260,11 @@ static void on_addr_event(void *context, qdrc_event_t
event, qdr_address_t *addr
while (inlink) {
if(inlink->link->edge_context != 0) {
qdr_addr_endpoint_state_t *endpoint_state =
(qdr_addr_endpoint_state_t *)inlink->link->edge_context;
- qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
- if (endpoint)
- qdrc_send_message(addr_tracking->core, addr,
endpoint, false);
+ if(!endpoint_state->closed) {
+ qdrc_endpoint_t *endpoint =
endpoint_state->endpoint;
+ if (endpoint)
+ qdrc_send_message(addr_tracking->core, addr,
endpoint, false);
+ }
}
inlink = DEQ_NEXT(inlink);
}
@@ -268,10 +286,10 @@ static void on_addr_event(void *context, qdrc_event_t
event, qdr_address_t *addr
qdr_link_ref_t *inlink = DEQ_HEAD(addr->inlinks);
while (inlink) {
- if(inlink->link->edge_context != 0) {
+ if (inlink->link->edge_context != 0) {
qdr_addr_endpoint_state_t *endpoint_state =
(qdr_addr_endpoint_state_t *)inlink->link->edge_context;
qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
- if (endpoint_state->conn == link->conn) {
+ if (endpoint_state->conn == link->conn &&
!endpoint_state->closed) {
qdrc_send_message(addr_tracking->core, addr, endpoint,
false);
break;
}
@@ -292,7 +310,7 @@ static void on_addr_event(void *context, qdrc_event_t
event, qdr_address_t *addr
if(inlink->link->edge_context != 0) {
qdr_addr_endpoint_state_t *endpoint_state =
(qdr_addr_endpoint_state_t *)inlink->link->edge_context;
qdrc_endpoint_t *endpoint = endpoint_state->endpoint;
- if (link->conn == endpoint_state->conn) {
+ if (link->conn == endpoint_state->conn &&
!endpoint_state->closed) {
qdrc_send_message(addr_tracking->core, addr, endpoint,
true);
break;
}
@@ -314,11 +332,12 @@ static void on_link_event(void *context, qdrc_event_t
event, qdr_link_t *link)
{
qdr_addr_tracking_module_context_t *mc =
(qdr_addr_tracking_module_context_t *) context;
qdr_address_t *addr = link->owning_addr;
- if (addr && qdr_address_is_mobile_CT(addr)) {
- qdr_addr_endpoint_state_t *endpoint_state =
qdrc_get_endpoint_state_for_connection(mc->endpoint_state_list, link->conn,
link);
+ if (addr && qdr_address_is_mobile_CT(addr) &&
DEQ_SIZE(addr->subscriptions) == 0 && link->link_direction == QD_INCOMING) {
+ qdr_addr_endpoint_state_t *endpoint_state =
qdrc_get_endpoint_state_for_connection(mc->endpoint_state_list, link->conn);
+ assert(endpoint_state);
+ assert(link->edge_context == 0);
link->edge_context = endpoint_state;
- endpoint_state->link = link;
-
+ endpoint_state->ref_count++;
if (qdrc_can_send_address(addr, link->conn) && endpoint_state)
{
qdrc_send_message(mc->core, addr,
endpoint_state->endpoint, true);
}
@@ -329,10 +348,21 @@ static void on_link_event(void *context, qdrc_event_t
event, qdr_link_t *link)
{
if (link->edge_context) {
qdr_addr_endpoint_state_t *endpoint_state =
(qdr_addr_endpoint_state_t *)link->edge_context;
- endpoint_state->link = 0;
+ endpoint_state->ref_count--;
link->edge_context = 0;
+ //
+ // The endpoint has been closed and no other links are
referencing this endpoint. Time to free it.
+ //
+ if (endpoint_state->ref_count == 0 && endpoint_state->closed) {
+ qdr_addr_tracking_module_context_t *mc =
endpoint_state->mc;
+ if (mc) {
+ DEQ_REMOVE(mc->endpoint_state_list, endpoint_state);
+ }
+ endpoint_state->conn = 0;
+ endpoint_state->endpoint = 0;
+ free_qdr_addr_endpoint_state_t(endpoint_state);
+ }
}
-
break;
}
@@ -361,6 +391,7 @@ static void qdrc_edge_address_tracking_init_CT(qdr_core_t
*core, void **module_c
context->addr_tracking_endpoint.label =
"qdrc_edge_address_tracking_module_init_CT";
context->addr_tracking_endpoint.on_first_attach =
qdrc_address_endpoint_first_attach;
context->addr_tracking_endpoint.on_first_detach =
qdrc_address_endpoint_on_first_detach;
+ context->addr_tracking_endpoint.on_cleanup =
qdrc_address_endpoint_cleanup;
qdrc_endpoint_bind_mobile_address_CT(core,
QD_TERMINUS_EDGE_ADDRESS_TRACKING, '0', &context->addr_tracking_endpoint,
context);
//
diff --git a/src/router_core/modules/edge_router/addr_proxy.c
b/src/router_core/modules/edge_router/addr_proxy.c
index c73dfc0..ed593fd 100644
--- a/src/router_core/modules/edge_router/addr_proxy.c
+++ b/src/router_core/modules/edge_router/addr_proxy.c
@@ -160,7 +160,7 @@ static void del_inlink(qcm_edge_addr_proxy_t *ap,
qdr_address_t *addr)
static void add_outlink(qcm_edge_addr_proxy_t *ap, const char *key,
qdr_address_t *addr)
{
- if (addr->edge_outlink == 0) {
+ if (addr->edge_outlink == 0 && DEQ_SIZE(addr->subscriptions) == 0) {
//
// Note that this link must not be bound to the address at this time.
That will
// happen later when the interior tells us that there are upstream
destinations
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]