pool deliveries at the connection level; improved delivery refcounting semantics
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d549ec38 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d549ec38 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d549ec38 Branch: refs/heads/master Commit: d549ec388dfff3abc262c2ca226a36f42b37ee16 Parents: 3b8de5b Author: Rafael Schloming <[email protected]> Authored: Sat Dec 6 06:20:15 2014 -0500 Committer: Rafael Schloming <[email protected]> Committed: Sat Dec 6 06:20:15 2014 -0500 ---------------------------------------------------------------------- proton-c/src/engine/engine-internal.h | 7 +- proton-c/src/engine/engine.c | 142 +++++++++++++++++++---------- proton-c/src/tests/refcount.c | 110 ++++++++++++++++++++++ proton-c/src/transport/transport.c | 10 +- 4 files changed, 214 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d549ec38/proton-c/src/engine/engine-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h index 16ce6a4..204ef01 100644 --- a/proton-c/src/engine/engine-internal.h +++ b/proton-c/src/engine/engine-internal.h @@ -201,6 +201,7 @@ struct pn_connection_t { pn_data_t *properties; pn_collector_t *collector; pn_record_t *context; + pn_list_t *delivery_pool; }; struct pn_session_t { @@ -242,8 +243,6 @@ struct pn_link_t { pn_delivery_t *unsettled_head; pn_delivery_t *unsettled_tail; pn_delivery_t *current; - pn_delivery_t *settled_head; - pn_delivery_t *settled_tail; pn_record_t *context; size_t unsettled_count; pn_sequence_t available; @@ -278,8 +277,6 @@ struct pn_delivery_t { pn_buffer_t *tag; pn_delivery_t *unsettled_next; pn_delivery_t *unsettled_prev; - pn_delivery_t *settled_next; - pn_delivery_t *settled_prev; pn_delivery_t *work_next; pn_delivery_t *work_prev; pn_delivery_t *tpwork_next; @@ -292,6 +289,7 @@ struct pn_delivery_t { bool work; bool tpwork; bool done; + bool referenced; }; #define PN_SET_LOCAL(OLD, NEW) \ @@ -312,6 +310,7 @@ void pn_real_settle(pn_delivery_t *delivery); // will free delivery if link is void pn_clear_tpwork(pn_delivery_t *delivery); void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery); void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint); +void pn_connection_bound(pn_connection_t *conn); void pn_connection_unbound(pn_connection_t *conn); int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...); void pn_session_unbound(pn_session_t* ssn); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d549ec38/proton-c/src/engine/engine.c ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c index 96c601f..1f2768a 100644 --- a/proton-c/src/engine/engine.c +++ b/proton-c/src/engine/engine.c @@ -134,6 +134,11 @@ void pn_connection_free(pn_connection_t *connection) pn_decref(connection); } +void pn_connection_bound(pn_connection_t *connection) +{ + pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_BOUND); +} + // invoked when transport has been removed: void pn_connection_unbound(pn_connection_t *connection) { @@ -146,7 +151,6 @@ void pn_connection_unbound(pn_connection_t *connection) pn_clear_modified(connection, connection->transport_head); } while (connection->tpwork_head) { - pn_real_settle(connection->tpwork_head); pn_clear_tpwork(connection->tpwork_head); } } @@ -306,11 +310,6 @@ void pn_link_free(pn_link_t *link) pn_delivery_settle(delivery); delivery = next; } - while (link->settled_head) { - delivery = link->settled_head; - LL_POP(link, settled, pn_delivery_t); - pn_decref(delivery); - } link->endpoint.freed = true; pn_decref(link); } @@ -402,6 +401,7 @@ static void pn_connection_finalize(void *object) pn_free(conn->desired_capabilities); pn_free(conn->properties); pn_endpoint_tini(endpoint); + pn_free(conn->delivery_pool); } #define pn_connection_initialize NULL @@ -433,6 +433,7 @@ pn_connection_t *pn_connection(void) conn->properties = pn_data(0); conn->collector = NULL; conn->context = pn_record(); + conn->delivery_pool = pn_list(PN_OBJECT, 0); return conn; } @@ -600,7 +601,6 @@ void pn_add_tpwork(pn_delivery_t *delivery) { LL_ADD(connection, tpwork, delivery); delivery->tpwork = true; - pn_incref(delivery); } pn_modified(connection, &connection->endpoint, true); } @@ -612,7 +612,6 @@ void pn_clear_tpwork(pn_delivery_t *delivery) { LL_REMOVE(connection, tpwork, delivery); delivery->tpwork = false; - pn_decref(delivery); // may free delivery! } } @@ -634,7 +633,6 @@ void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit if (!endpoint->modified) { LL_ADD(connection, transport, endpoint); endpoint->modified = true; - pn_incref(endpoint); } if (emit && connection->transport) { @@ -650,7 +648,6 @@ void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint) endpoint->transport_next = NULL; endpoint->transport_prev = NULL; endpoint->modified = false; - pn_decref(endpoint); // may free endpoint! } } @@ -739,12 +736,14 @@ static void pn_session_incref(void *object) static bool pni_preserve_child(pn_endpoint_t *endpoint, pn_endpoint_t *parent) { - if (!endpoint->freed && endpoint->referenced) { + pn_connection_t *conn = pn_ep_get_connection(endpoint); + if ((!endpoint->freed || (conn->transport && endpoint->modified)) && endpoint->referenced) { pn_object_incref(endpoint); endpoint->referenced = false; pn_decref(parent); return true; } else { + LL_REMOVE(conn, transport, endpoint); return false; } } @@ -912,10 +911,6 @@ static void pn_link_finalize(void *object) pn_link_t *link = (pn_link_t *) object; pn_endpoint_t *endpoint = &link->endpoint; - // assumptions: all deliveries freed - assert(link->settled_head == NULL); - assert(link->unsettled_head == NULL); - if (pni_post_final(endpoint, PN_LINK_FINAL)) { return; } @@ -925,6 +920,11 @@ static void pn_link_finalize(void *object) return; } + while (link->unsettled_head) { + assert(!link->unsettled_head->referenced); + pn_free(link->unsettled_head); + } + pn_free(link->context); pn_terminus_free(&link->source); pn_terminus_free(&link->target); @@ -963,7 +963,6 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) pn_terminus_init(&link->target, PN_TARGET); pn_terminus_init(&link->remote_source, PN_UNSPECIFIED); pn_terminus_init(&link->remote_target, PN_UNSPECIFIED); - link->settled_head = link->settled_tail = NULL; link->unsettled_head = link->unsettled_tail = link->current = NULL; link->unsettled_count = 0; link->available = 0; @@ -1197,17 +1196,76 @@ static void pn_disposition_finalize(pn_disposition_t *ds) pn_condition_tini(&ds->condition); } +static bool pni_link_live(pn_link_t *link) +{ + return pni_session_live(link->session) || pn_refcount(link) > 1; +} + +static void pn_delivery_incref(void *object) +{ + pn_delivery_t *delivery = (pn_delivery_t *) object; + if (delivery->link && !delivery->referenced) { + delivery->referenced = true; + pn_incref(delivery->link); + } else { + pn_object_incref(object); + } +} + +static bool pni_preserve_delivery(pn_delivery_t *delivery) +{ + pn_connection_t *conn = delivery->link->session->connection; + return !delivery->local.settled || (conn->transport && delivery->tpwork); +} + static void pn_delivery_finalize(void *object) { pn_delivery_t *delivery = (pn_delivery_t *) object; - assert(delivery->settled); - assert(!delivery->state.init); // no longer in session delivery map - pn_free(delivery->context); - pn_buffer_free(delivery->tag); - pn_buffer_free(delivery->bytes); - pn_disposition_finalize(&delivery->local); - pn_disposition_finalize(&delivery->remote); - pn_decref(delivery->link); + pn_link_t *link = delivery->link; + + bool pooled = false; + bool referenced = true; + if (link) { + if (pni_link_live(link) && pni_preserve_delivery(delivery) && delivery->referenced) { + delivery->referenced = false; + pn_object_incref(delivery); + pn_decref(link); + return; + } + referenced = delivery->referenced; + + pn_clear_tpwork(delivery); + LL_REMOVE(link, unsettled, delivery); + pn_delivery_map_del(pn_link_is_sender(link) + ? &link->session->state.outgoing + : &link->session->state.incoming, + delivery); + pn_buffer_clear(delivery->tag); + pn_buffer_clear(delivery->bytes); + pn_record_clear(delivery->context); + delivery->settled = true; + pn_connection_t *conn = link->session->connection; + assert(pn_refcount(delivery) == 0); + if (pni_connection_live(conn)) { + pn_list_t *pool = link->session->connection->delivery_pool; + delivery->link = NULL; + pn_list_add(pool, delivery); + pooled = true; + assert(pn_refcount(delivery) == 1); + } + } + + if (!pooled) { + pn_free(delivery->context); + pn_buffer_free(delivery->tag); + pn_buffer_free(delivery->bytes); + pn_disposition_finalize(&delivery->local); + pn_disposition_finalize(&delivery->remote); + } + + if (referenced) { + pn_decref(link); + } } static void pn_disposition_init(pn_disposition_t *ds) @@ -1230,6 +1288,11 @@ static void pn_disposition_clear(pn_disposition_t *ds) pn_condition_clear(&ds->condition); } +#define pn_delivery_new pn_object_new +#define pn_delivery_refcount pn_object_refcount +#define pn_delivery_decref pn_object_decref +#define pn_delivery_free pn_object_free +#define pn_delivery_reify pn_object_reify #define pn_delivery_initialize NULL #define pn_delivery_hashcode NULL #define pn_delivery_compare NULL @@ -1238,14 +1301,12 @@ static void pn_disposition_clear(pn_disposition_t *ds) pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag) { assert(link); - pn_delivery_t *delivery = link->settled_head; - LL_POP(link, settled, pn_delivery_t); + pn_list_t *pool = link->session->connection->delivery_pool; + pn_delivery_t *delivery = (pn_delivery_t *) pn_list_pop(pool); if (!delivery) { - static const pn_class_t clazz = PN_CLASS(pn_delivery); + static const pn_class_t clazz = PN_METACLASS(pn_delivery); delivery = (pn_delivery_t *) pn_class_new(&clazz, sizeof(pn_delivery_t)); if (!delivery) return NULL; - delivery->link = link; - pn_incref(delivery->link); // keep link until finalized delivery->tag = pn_buffer(16); delivery->bytes = pn_buffer(64); pn_disposition_init(&delivery->local); @@ -1254,6 +1315,8 @@ pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag) } else { assert(!delivery->tpwork); } + delivery->link = link; + pn_incref(delivery->link); // keep link until finalized pn_buffer_clear(delivery->tag); pn_buffer_append(delivery->tag, tag.bytes, tag.size); pn_disposition_clear(&delivery->local); @@ -1261,6 +1324,7 @@ pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag) delivery->updated = false; delivery->settled = false; LL_ADD(link, unsettled, delivery); + delivery->referenced = true; delivery->work_next = NULL; delivery->work_prev = NULL; delivery->work = false; @@ -1556,25 +1620,6 @@ void pn_link_set_rcv_settle_mode(pn_link_t *link, pn_rcv_settle_mode_t mode) link->rcv_settle_mode = (uint8_t)mode; } -void pn_real_settle(pn_delivery_t *delivery) -{ - pn_link_t *link = delivery->link; - LL_REMOVE(link, unsettled, delivery); - pn_delivery_map_del(pn_link_is_sender(link) - ? &link->session->state.outgoing - : &link->session->state.incoming, - delivery); - pn_buffer_clear(delivery->tag); - pn_buffer_clear(delivery->bytes); - pn_record_clear(delivery->context); - delivery->settled = true; - if (link->endpoint.freed) { - pn_decref(delivery); - } else { - LL_ADD(link, settled, delivery); - } -} - void pn_delivery_settle(pn_delivery_t *delivery) { assert(delivery); @@ -1588,6 +1633,7 @@ void pn_delivery_settle(pn_delivery_t *delivery) delivery->local.settled = true; pn_add_tpwork(delivery); pn_work_update(delivery->link->session->connection, delivery); + pn_decref(delivery); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d549ec38/proton-c/src/tests/refcount.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/refcount.c b/proton-c/src/tests/refcount.c index 2c1cd0c..87ce488 100644 --- a/proton-c/src/tests/refcount.c +++ b/proton-c/src/tests/refcount.c @@ -22,6 +22,7 @@ #include <proton/connection.h> #include <proton/session.h> #include <proton/link.h> +#include <proton/delivery.h> #include <stdio.h> #include <stdlib.h> @@ -161,6 +162,113 @@ static void test_incref_order_ls(void) { pn_decref(lnk); } +static void swap(int array[], int i, int j) { + int a = array[i]; + int b = array[j]; + array[j] = a; + array[i] = b; +} + +static void setup(void **objects) { + pn_connection_t *conn = pn_connection(); + pn_session_t *ssn = pn_session(conn); + pn_link_t *lnk = pn_sender(ssn, "sender"); + pn_delivery_t *dlv = pn_delivery(lnk, pn_dtag("dtag", 4)); + + assert(pn_refcount(conn) == 2); + assert(pn_refcount(ssn) == 2); + assert(pn_refcount(lnk) == 2); + assert(pn_refcount(dlv) == 1); + + objects[0] = conn; + objects[1] = ssn; + objects[2] = lnk; + objects[3] = dlv; +} + +static bool decreffed(int *indexes, void **objects, int step, void *object) { + for (int i = 0; i <= step; i++) { + if (object == objects[indexes[i]]) { + return true; + } + } + return false; +} + +static bool live_descendent(int *indexes, void **objects, int step, int objidx) { + for (int i = objidx + 1; i < 4; i++) { + if (!decreffed(indexes, objects, step, objects[i])) { + return true; + } + } + + return false; +} + +static void assert_refcount(void *object, int expected) { + int rc = pn_refcount(object); + //printf("pn_refcount(%s) = %d\n", pn_object_reify(object)->name, rc); + assert(rc == expected); +} + +static void test_decref_order(int *indexes, void **objects) { + setup(objects); + + //printf("-----------\n"); + for (int i = 0; i < 3; i++) { + int idx = indexes[i]; + void *obj = objects[idx]; + //printf("decreffing %s\n", pn_object_reify(obj)->name); + pn_decref(obj); + for (int j = 0; j <= i; j++) { + // everything we've decreffed already should have a refcount of + // 1 because it has been preserved by its parent + assert_refcount(objects[indexes[j]], 1); + } + for (int j = i+1; j < 4; j++) { + // everything we haven't decreffed yet should have a refcount of + // 2 unless it has a descendent that has not been decrefed (or + // it has no child) in which case it should have a refcount of 1 + int idx = indexes[j]; + void *obj = objects[idx]; + assert(!decreffed(indexes, objects, i, obj)); + if (live_descendent(indexes, objects, i, idx)) { + assert_refcount(obj, 2); + } else { + assert_refcount(obj, 1); + } + } + } + + void *last = objects[indexes[3]]; + //printf("decreffing %s\n", pn_object_reify(last)->name); + pn_decref(last); + // all should be gone now, need to run with valgrind to check +} + +static void permute(int n, int *indexes, void **objects) { + int j; + if (n == 1) { + test_decref_order(indexes, objects); + } else { + for (int i = 1; i <= n; i++) { + permute(n-1, indexes, objects); + if ((n % 2) == 1) { + j = 1; + } else { + j = i; + } + swap(indexes, j-1, n-1); + } + } +} + +static void test_decref_permutations(void) { + void *objects[4]; + int indexes[4] = {0, 1, 2, 3}; + permute(4, indexes, objects); +} + int main(int argc, char **argv) { test_decref_order_csl(); @@ -172,5 +280,7 @@ int main(int argc, char **argv) test_incref_order_sl(); test_incref_order_ls(); + + test_decref_permutations(); return 0; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d549ec38/proton-c/src/transport/transport.c ---------------------------------------------------------------------- diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c index 08072fe..ec0ec3e 100644 --- a/proton-c/src/transport/transport.c +++ b/proton-c/src/transport/transport.c @@ -377,6 +377,7 @@ void pni_transport_unbind_handles(pn_hash_t *handles, bool reset_state); static void pni_unmap_remote_channel(pn_session_t *ssn) { // XXX: should really update link state also + pn_delivery_map_clear(&ssn->state.incoming); pni_transport_unbind_handles(ssn->state.remote_handles, false); pn_transport_t *transport = ssn->connection->transport; uint16_t channel = ssn->state.remote_channel; @@ -463,9 +464,10 @@ int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection) transport->connection = connection; connection->transport = transport; - pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_BOUND); - pn_incref(connection); + + pn_connection_bound(connection); + if (transport->open_rcvd) { PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE); pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_REMOTE_OPEN); @@ -493,6 +495,8 @@ void pni_transport_unbind_channels(pn_hash_t *channels) for (pn_handle_t h = pn_hash_head(channels); h; h = pn_hash_next(channels, h)) { uintptr_t key = pn_hash_key(channels, h); pn_session_t *ssn = (pn_session_t *) pn_hash_value(channels, h); + pn_delivery_map_clear(&ssn->state.incoming); + pn_delivery_map_clear(&ssn->state.outgoing); pni_transport_unbind_handles(ssn->state.local_handles, true); pni_transport_unbind_handles(ssn->state.remote_handles, true); pn_session_unbound(ssn); @@ -952,7 +956,6 @@ static void pn_full_settle(pn_delivery_map_t *db, pn_delivery_t *delivery) { assert(!delivery->work); pn_clear_tpwork(delivery); - pn_real_settle(delivery); } int pn_do_transfer(pn_dispatcher_t *disp) @@ -1901,6 +1904,7 @@ bool pn_pointful_buffering(pn_transport_t *transport, pn_session_t *session) static void pni_unmap_local_channel(pn_session_t *ssn) { // XXX: should really update link state also + pn_delivery_map_clear(&ssn->state.outgoing); pni_transport_unbind_handles(ssn->state.local_handles, false); pn_transport_t *transport = ssn->connection->transport; pn_session_state_t *state = &ssn->state; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
