This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
The following commit(s) were added to refs/heads/master by this push: new f126d2d DISPATCH-1214 - Fixed race between asynchronous address lookups and connection/link closure. Added reproducer. f126d2d is described below commit f126d2da2df972353a2c53594aeb6fc598697796 Author: Ted Ross <tr...@redhat.com> AuthorDate: Mon Dec 17 16:19:52 2018 -0500 DISPATCH-1214 - Fixed race between asynchronous address lookups and connection/link closure. Added reproducer. --- include/qpid/dispatch/alloc_malloc.h | 3 ++ include/qpid/dispatch/alloc_pool.h | 1 + src/alloc_pool.c | 13 +++++++ .../modules/address_lookup_client/lookup_client.c | 17 +++++++++ tests/system_tests_link_route_credit.py | 41 ++++++++++++++++++++++ 5 files changed, 75 insertions(+) diff --git a/include/qpid/dispatch/alloc_malloc.h b/include/qpid/dispatch/alloc_malloc.h index 72c390c..3ba6cf3 100644 --- a/include/qpid/dispatch/alloc_malloc.h +++ b/include/qpid/dispatch/alloc_malloc.h @@ -19,6 +19,8 @@ * under the License. */ +#include <stdint.h> + /** *@file * @@ -37,6 +39,7 @@ #define ALLOC_DEFINE(T) ALLOC_DEFINE_CONFIG(T, sizeof(T), 0, 0) +static inline uint32_t qd_alloc_sequence(void *p) { return 0; } static inline void qd_alloc_initialize(void) {} static inline void qd_alloc_debug_dump(const char *file) {} static inline void qd_alloc_finalize(void) {} diff --git a/include/qpid/dispatch/alloc_pool.h b/include/qpid/dispatch/alloc_pool.h index 1d5decc..6b66e88 100644 --- a/include/qpid/dispatch/alloc_pool.h +++ b/include/qpid/dispatch/alloc_pool.h @@ -73,6 +73,7 @@ typedef struct { void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool); /** De-allocate from a thread pool. Use via ALLOC_DECLARE */ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p); +uint32_t qd_alloc_sequence(void *p); /** * Declare functions new_T and alloc_T diff --git a/src/alloc_pool.c b/src/alloc_pool.c index d4613b6..170fd57 100644 --- a/src/alloc_pool.c +++ b/src/alloc_pool.c @@ -49,6 +49,7 @@ DEQ_DECLARE(qd_alloc_type_t, qd_alloc_type_list_t); struct qd_alloc_item_t { DEQ_LINKS(qd_alloc_item_t); + uint32_t sequence; #ifdef QD_MEMORY_DEBUG qd_alloc_type_desc_t *desc; uint32_t header; @@ -185,6 +186,7 @@ void *qd_alloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool) break; DEQ_ITEM_INIT(item); DEQ_INSERT_TAIL(pool->free_list, item); + item->sequence = 0; #if QD_MEMORY_STATS desc->stats->held_by_threads++; desc->stats->total_alloc_from_heap++; @@ -239,6 +241,7 @@ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p) qd_alloc_pool_t *pool = *tpool; + item->sequence++; DEQ_INSERT_TAIL(pool->free_list, item); if (DEQ_SIZE(pool->free_list) <= desc->config->local_free_list_max) @@ -278,6 +281,16 @@ void qd_dealloc(qd_alloc_type_desc_t *desc, qd_alloc_pool_t **tpool, char *p) } +uint32_t qd_alloc_sequence(void *p) +{ + if (!p) + return 0; + + qd_alloc_item_t *item = ((qd_alloc_item_t*) p) - 1; + return item->sequence; +} + + void qd_alloc_initialize(void) { init_lock = sys_mutex(); diff --git a/src/router_core/modules/address_lookup_client/lookup_client.c b/src/router_core/modules/address_lookup_client/lookup_client.c index 039387b..048a399 100644 --- a/src/router_core/modules/address_lookup_client/lookup_client.c +++ b/src/router_core/modules/address_lookup_client/lookup_client.c @@ -45,6 +45,8 @@ typedef struct qcm_addr_lookup_request_t { DEQ_LINKS(struct qcm_addr_lookup_request_t); qdr_connection_t *conn; qdr_link_t *link; + uint32_t conn_sequence; + uint32_t link_sequence; qd_direction_t dir; qdr_terminus_t *source; qdr_terminus_t *target; @@ -520,6 +522,9 @@ static void qcm_addr_lookup_CT(void *context, request->source = source; request->target = target; + request->conn_sequence = qd_alloc_sequence(conn); + request->link_sequence = qd_alloc_sequence(link); + DEQ_INSERT_TAIL(client->pending_requests, request); qcm_addr_lookup_process_pending_requests_CT(client); return; @@ -601,6 +606,18 @@ static uint64_t on_reply(qdr_core_t *core, bool is_link_route; bool has_destinations; + // + // If the pointer sequences mismatch for either the connection or link, + // exit without processing because either the connection or link has + // been freed while the request was in-flight. + // + if (request->conn_sequence != qd_alloc_sequence(request->conn) || + request->link_sequence != qd_alloc_sequence(request->link)) { + qdr_terminus_free(request->source); + qdr_terminus_free(request->target); + return 0; + } + status = qcm_link_route_lookup_decode(app_properties, body, &is_link_route, &has_destinations); if (status == QCM_ADDR_LOOKUP_OK) { // diff --git a/tests/system_tests_link_route_credit.py b/tests/system_tests_link_route_credit.py index 8674519..04c4131 100644 --- a/tests/system_tests_link_route_credit.py +++ b/tests/system_tests_link_route_credit.py @@ -319,6 +319,11 @@ class RouterTest(TestCase): test.run() self.assertEqual(None, test.error) + def test_29_fast_teardown_test(self): + test = LRFastTeardownTest(self.routers[2].addresses[0], "normal.29") + test.run() + self.assertEqual(None, test.error) + class Entity(object): def __init__(self, status_code, status_description, attrs): @@ -579,5 +584,41 @@ class LRDestReceiverFlowTest(MessagingHandler): container.run() +class LRFastTeardownTest(MessagingHandler): + def __init__(self, host, address): + super(LRFastTeardownTest, self).__init__(prefetch=0) + self.host = host + self.address = address + + self.conn = None + self.sender = None + self.error = None + self.last_action = "Test initialization" + + def fail(self, text): + self.error = text + self.conn.close() + self.timer.cancel() + + def timeout(self): + self.error = "Timeout Expired - last_action: %s" % (self.last_action) + self.conn.close() + + def on_start(self, event): + self.reactor = event.reactor + self.timer = event.reactor.schedule(7.0, Timeout(self)) + self.conn = event.container.connect(self.host) + self.last_action = "on_start" + + def on_connection_opened(self, event): + self.sender = event.container.create_sender(self.conn, self.address) + self.conn.close() + self.timer.cancel() + + def run(self): + container = Container(self) + container.run() + + if __name__== '__main__': unittest.main(main_module()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org