Repository: qpid-dispatch Updated Branches: refs/heads/master 0f092ac43 -> 936c46b19
DISPATCH-1069 - (mostly from gmurthy) - Re-established the deferred deletion of proton links and sessions to prevent double-frees and memory growth. This closes #344 Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/936c46b1 Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/936c46b1 Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/936c46b1 Branch: refs/heads/master Commit: 936c46b193c07a7fcfc89e71c490ad092e7cffa5 Parents: 0f092ac Author: Ted Ross <[email protected]> Authored: Thu Jul 19 15:33:30 2018 -0400 Committer: Ted Ross <[email protected]> Committed: Thu Jul 19 15:33:30 2018 -0400 ---------------------------------------------------------------------- src/container.c | 98 +++++++++++++++++++++++++++++++++++++++-------- src/router_node.c | 4 ++ src/server.c | 41 +++++++++++++++----- src/server_private.h | 58 ++++++++++++++-------------- 4 files changed, 149 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/936c46b1/src/container.c ---------------------------------------------------------------------- diff --git a/src/container.c b/src/container.c index 52231b4..d4fbe51 100644 --- a/src/container.c +++ b/src/container.c @@ -88,6 +88,8 @@ struct qd_container_t { qd_link_list_t links; }; +ALLOC_DEFINE(qd_pn_free_link_session_t); + static void setup_outgoing_link(qd_container_t *container, pn_link_t *pn_link) { qd_node_t *node = container->default_node; @@ -332,18 +334,86 @@ static void writable_handler(qd_container_t *container, pn_connection_t *conn, q } -void qd_container_handle_event(qd_container_t *container, pn_event_t *event) +/** + * Returns true if the free_link already exists in free_link_list, false otherwise + */ +static bool link_exists(qd_pn_free_link_session_list_t *free_list, pn_link_t *free_link) { - pn_connection_t *conn = pn_event_connection(event); + qd_pn_free_link_session_t *free_item = DEQ_HEAD(*free_list); + while(free_item) { + if (free_item->pn_link == free_link) + return true; + free_item = DEQ_NEXT(free_item); + } + return false; +} - if (!conn) - return; +/** + * Returns true if the free_session already exists in free_session_list, false otherwise +*/ +static bool session_exists(qd_pn_free_link_session_list_t *free_list, pn_session_t *free_session) +{ + qd_pn_free_link_session_t *free_item = DEQ_HEAD(*free_list); + while(free_item) { + if (free_item->pn_session == free_session) + return true; + free_item = DEQ_NEXT(free_item); + } + return false; +} + +static void add_session_to_free_list(qd_pn_free_link_session_list_t *free_link_session_list, pn_session_t *ssn) +{ + if (!session_exists(free_link_session_list, ssn)) { + qd_pn_free_link_session_t *to_free = new_qd_pn_free_link_session_t(); + DEQ_ITEM_INIT(to_free); + to_free->pn_session = ssn; + to_free->pn_link = 0; + DEQ_INSERT_TAIL(*free_link_session_list, to_free); + } +} - qd_connection_t *qd_conn = pn_connection_get_context(conn); - pn_session_t *ssn = NULL; - pn_link_t *pn_link = NULL; - qd_link_t *qd_link = NULL; - pn_delivery_t *delivery = NULL; +static void add_link_to_free_list(qd_pn_free_link_session_list_t *free_link_session_list, pn_link_t *pn_link) +{ + if (!link_exists(free_link_session_list, pn_link)) { + qd_pn_free_link_session_t *to_free = new_qd_pn_free_link_session_t(); + DEQ_ITEM_INIT(to_free); + to_free->pn_link = pn_link; + to_free->pn_session = 0; + DEQ_INSERT_TAIL(*free_link_session_list, to_free); + } + +} + + +/* + * The need for these lists may indicate a router bug, where the router is + * using links/sessions after they are freed. Investigate and simplify if + * possible. +*/ +void qd_conn_event_batch_complete(qd_container_t *container, qd_connection_t *qd_conn) +{ + qd_pn_free_link_session_t *to_free = DEQ_HEAD(qd_conn->free_link_session_list); + + while(to_free) { + if (to_free->pn_link) + pn_link_free(to_free->pn_link); + if (to_free->pn_session) + pn_session_free(to_free->pn_session); + DEQ_REMOVE_HEAD(qd_conn->free_link_session_list); + free_qd_pn_free_link_session_t(to_free); + to_free = DEQ_HEAD(qd_conn->free_link_session_list); + } +} + + +void qd_container_handle_event(qd_container_t *container, pn_event_t *event, + pn_connection_t *conn, qd_connection_t *qd_conn) +{ + pn_session_t *ssn = NULL; + pn_link_t *pn_link = NULL; + qd_link_t *qd_link = NULL; + pn_delivery_t *delivery = NULL; switch (pn_event_type(event)) { @@ -406,7 +476,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event) } if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { - pn_session_free(ssn); + add_session_to_free_list(&qd_conn->free_link_session_list, ssn); } break; @@ -458,7 +528,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event) pn_session_close(ssn); } else if (pn_session_state(ssn) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { - pn_session_free(ssn); + add_session_to_free_list(&qd_conn->free_link_session_list, ssn); } } break; @@ -517,8 +587,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event) } if (pn_link_state(pn_link) & PN_LOCAL_CLOSED) { - pn_link_set_context(pn_link, NULL); - pn_link_free(pn_link); + add_link_to_free_list(&qd_conn->free_link_session_list, pn_link); } if (node) { node->ntype->link_detach_handler(node->context, qd_link, dt); @@ -531,8 +600,7 @@ void qd_container_handle_event(qd_container_t *container, pn_event_t *event) case PN_LINK_LOCAL_CLOSE: pn_link = pn_event_link(event); if (pn_link_state(pn_link) == (PN_LOCAL_CLOSED | PN_REMOTE_CLOSED)) { - pn_link_set_context(pn_link, NULL); - pn_link_free(pn_link); + add_link_to_free_list(&qd_conn->free_link_session_list, pn_link); } break; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/936c46b1/src/router_node.c ---------------------------------------------------------------------- diff --git a/src/router_node.c b/src/router_node.c index 71a545d..585dd6f 100644 --- a/src/router_node.c +++ b/src/router_node.c @@ -295,6 +295,10 @@ static void AMQP_rx_handler(void* context, qd_link_t *link) qd_router_t *router = (qd_router_t*) context; pn_link_t *pn_link = qd_link_pn(link); assert(pn_link); + + if (!pn_link) + return; + pn_delivery_t *pnd = pn_link_current(pn_link); if (!pnd) return; http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/936c46b1/src/server.c ---------------------------------------------------------------------- diff --git a/src/server.c b/src/server.c index b698fab..8fc9511 100644 --- a/src/server.c +++ b/src/server.c @@ -522,6 +522,7 @@ qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t *c pn_connection_set_context(ctx->pn_conn, ctx); DEQ_ITEM_INIT(ctx); DEQ_INIT(ctx->deferred_calls); + DEQ_INIT(ctx->free_link_session_list); sys_mutex_lock(server->lock); ctx->connection_id = server->next_connection_id++; DEQ_INSERT_TAIL(server->conn_list, ctx); @@ -692,7 +693,8 @@ static void invoke_deferred_calls(qd_connection_t *conn, bool discard) sys_mutex_unlock(conn->deferred_call_lock); } -void qd_container_handle_event(qd_container_t *container, pn_event_t *event); +void qd_container_handle_event(qd_container_t *container, pn_event_t *event, pn_connection_t *pn_conn, qd_connection_t *qd_conn); +void qd_conn_event_batch_complete(qd_container_t *container, qd_connection_t *qd_conn); static void handle_listener(pn_event_t *e, qd_server_t *qd_server) { qd_log_source_t *log = qd_server->log_source; @@ -856,13 +858,12 @@ static void qd_increment_conn_index(qd_connection_t *ctx) /* Events involving a connection or listener are serialized by the proactor so * only one event per connection / listener will be processed at a time. */ -static bool handle(qd_server_t *qd_server, pn_event_t *e) { - pn_connection_t *pn_conn = pn_event_connection(e); +static bool handle(qd_server_t *qd_server, pn_event_t *e, pn_connection_t *pn_conn, qd_connection_t *ctx) +{ if (pn_conn && qdr_is_authentication_service_connection(pn_conn)) { qdr_handle_authentication_service_connection_event(e); return true; } - qd_connection_t *ctx = pn_conn ? (qd_connection_t*) pn_connection_get_context(pn_conn) : NULL; switch (pn_event_type(e)) { @@ -943,8 +944,8 @@ static bool handle(qd_server_t *qd_server, pn_event_t *e) { break; } // Switch event type - /* TODO aconway 2017-04-18: fold the container handler into the server */ - qd_container_handle_event(qd_server->container, e); + if (ctx) + qd_container_handle_event(qd_server->container, e, pn_conn, ctx); /* Free the connection after all other processing is complete */ if (ctx && pn_event_type(e) == PN_TRANSPORT_CLOSED) { @@ -961,9 +962,29 @@ static void *thread_run(void *arg) while (running) { pn_event_batch_t *events = pn_proactor_wait(qd_server->proactor); pn_event_t * e; + qd_connection_t *qd_conn = 0; + pn_connection_t *pn_conn = 0; + while (running && (e = pn_event_batch_next(events))) { - running = handle(qd_server, e); + pn_connection_t *conn = pn_event_connection(e); + + if (!pn_conn) + pn_conn = conn; + assert(pn_conn == conn); + + if (!qd_conn) + qd_conn = !!pn_conn ? (qd_connection_t*) pn_connection_get_context(pn_conn) : 0; + + running = handle(qd_server, e, conn, qd_conn); } + + // + // Notify the container that the batch is complete so it can do after-batch + // processing. + // + if (qd_conn) + qd_conn_event_batch_complete(qd_server->container, qd_conn); + pn_proactor_done(qd_server->proactor, events); } return NULL; @@ -1200,7 +1221,7 @@ void qd_server_free(qd_server_t *qd_server) void qd_server_set_container(qd_dispatch_t *qd, qd_container_t *container) { - qd->server->container = container; + qd->server->container = container; } @@ -1473,7 +1494,9 @@ const char* qd_connection_remote_ip(const qd_connection_t *c) { /* Expose event handling for HTTP connections */ void qd_connection_handle(qd_connection_t *c, pn_event_t *e) { - handle(c->server, e); + pn_connection_t *pn_conn = pn_event_connection(e); + qd_connection_t *qd_conn = !!pn_conn ? (qd_connection_t*) pn_connection_get_context(pn_conn) : 0; + handle(c->server, e, pn_conn, qd_conn); } bool qd_connection_strip_annotations_in(const qd_connection_t *c) { http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/936c46b1/src/server_private.h ---------------------------------------------------------------------- diff --git a/src/server_private.h b/src/server_private.h index d80a1fa..25cc901 100644 --- a/src/server_private.h +++ b/src/server_private.h @@ -133,34 +133,35 @@ DEQ_DECLARE(qd_connector_t, qd_connector_list_t); */ struct qd_connection_t { DEQ_LINKS(qd_connection_t); - char *name; - qd_server_t *server; - bool opened; // An open callback was invoked for this connection - bool closed; - int enqueued; - qd_timer_t *timer; // Timer for initial-setup - pn_connection_t *pn_conn; - pn_session_t *pn_sess; - pn_ssl_t *ssl; - qd_listener_t *listener; - qd_connector_t *connector; - void *context; // context from listener or connector - void *user_context; - void *link_context; // Context shared by this connection's links - uint64_t connection_id; // A unique identifier for the qd_connection_t. The underlying pn_connection already has one but it is long and clunky. - const char *user_id; // A unique identifier for the user on the connection. This is currently populated from the client ssl cert. See ssl_uid_format in server.h for more info - bool free_user_id; - qd_policy_settings_t *policy_settings; - int n_sessions; - int n_senders; - int n_receivers; - void *open_container; - qd_deferred_call_list_t deferred_calls; - sys_mutex_t *deferred_call_lock; - bool policy_counted; - char *role; //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc. - bool strip_annotations_in; - bool strip_annotations_out; + char *name; + qd_server_t *server; + bool opened; // An open callback was invoked for this connection + bool closed; + int enqueued; + qd_timer_t *timer; // Timer for initial-setup + pn_connection_t *pn_conn; + pn_session_t *pn_sess; + pn_ssl_t *ssl; + qd_listener_t *listener; + qd_connector_t *connector; + void *context; // context from listener or connector + void *user_context; + void *link_context; // Context shared by this connection's links + uint64_t connection_id; // A unique identifier for the qd_connection_t. The underlying pn_connection already has one but it is long and clunky. + const char *user_id; // A unique identifier for the user on the connection. This is currently populated from the client ssl cert. See ssl_uid_format in server.h for more info + bool free_user_id; + qd_policy_settings_t *policy_settings; + int n_sessions; + int n_senders; + int n_receivers; + void *open_container; + qd_deferred_call_list_t deferred_calls; + sys_mutex_t *deferred_call_lock; + bool policy_counted; + char *role; //The specified role of the connection, e.g. "normal", "inter-router", "route-container" etc. + qd_pn_free_link_session_list_t free_link_session_list; + bool strip_annotations_in; + bool strip_annotations_out; void (*wake)(qd_connection_t*); /* Wake method, different for HTTP vs. proactor */ char rhost[NI_MAXHOST]; /* Remote host numeric IP for incoming connections */ char rhost_port[NI_MAXHOST+NI_MAXSERV]; /* Remote host:port for incoming connections */ @@ -172,5 +173,6 @@ ALLOC_DECLARE(qd_listener_t); ALLOC_DECLARE(qd_deferred_call_t); ALLOC_DECLARE(qd_connector_t); ALLOC_DECLARE(qd_connection_t); +ALLOC_DECLARE(qd_pn_free_link_session_t); #endif --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
