Repository: qpid-proton Updated Branches: refs/heads/master a4d44fd77 -> d1123b0e9
made the connection -> transport pointer from weak to strong Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d1123b0e Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d1123b0e Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d1123b0e Branch: refs/heads/master Commit: d1123b0e95a4aca2f54d3122d49d598adc08e654 Parents: a4d44fd Author: Rafael Schloming <[email protected]> Authored: Thu Feb 12 07:41:19 2015 -0500 Committer: Rafael Schloming <[email protected]> Committed: Thu Feb 12 07:44:18 2015 -0500 ---------------------------------------------------------------------- proton-c/src/engine/engine-internal.h | 2 + proton-c/src/engine/engine.c | 10 +++ proton-c/src/handlers/iohandler.c | 4 ++ proton-c/src/messenger/messenger.c | 3 + proton-c/src/reactor/acceptor.c | 2 +- proton-c/src/reactor/connection.c | 2 +- proton-c/src/tests/refcount.c | 101 +++++++++++++++++++++++++++++ proton-c/src/transport/transport.c | 36 +++++++++- 8 files changed, 156 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/engine/engine-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h index 5655e79..c8aece4 100644 --- a/proton-c/src/engine/engine-internal.h +++ b/proton-c/src/engine/engine-internal.h @@ -194,6 +194,8 @@ struct pn_transport_t { bool posted_idle_timeout; bool server; bool halt; + + bool referenced; }; struct pn_connection_t { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/engine/engine.c ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c index 4d56100..09668c5 100644 --- a/proton-c/src/engine/engine.c +++ b/proton-c/src/engine/engine.c @@ -460,6 +460,16 @@ static void pn_connection_finalize(void *object) pn_connection_t *conn = (pn_connection_t *) object; pn_endpoint_t *endpoint = &conn->endpoint; + if (conn->transport) { + assert(!conn->transport->referenced); + pn_free(conn->transport); + } + + // freeing the transport could post events + if (pn_refcount(conn) > 0) { + return; + } + pni_free_children(conn->sessions, conn->freed); pn_free(conn->context); pn_decref(conn->collector); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/handlers/iohandler.c ---------------------------------------------------------------------- diff --git a/proton-c/src/handlers/iohandler.c b/proton-c/src/handlers/iohandler.c index 7f5dcb2..9c8db58 100644 --- a/proton-c/src/handlers/iohandler.c +++ b/proton-c/src/handlers/iohandler.c @@ -21,6 +21,7 @@ #include <proton/handlers.h> #include <proton/selector.h> +#include <proton/transport.h> #include <assert.h> static void *pni_selector_handle = NULL; @@ -91,6 +92,9 @@ static void pn_iodispatch(pn_iohandler_t *handler, pn_event_t *event) { case PN_TRANSPORT: pni_handle_transport(reactor, event); break; + case PN_TRANSPORT_CLOSED: + pn_transport_unbind(pn_event_transport(event)); + break; case PN_REACTOR_QUIESCED: pni_handle_quiesced(reactor, selector); break; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/messenger/messenger.c ---------------------------------------------------------------------- diff --git a/proton-c/src/messenger/messenger.c b/proton-c/src/messenger/messenger.c index 0d85ac7..c6fdfb5 100644 --- a/proton-c/src/messenger/messenger.c +++ b/proton-c/src/messenger/messenger.c @@ -337,6 +337,7 @@ static void pni_listener_readable(pn_selectable_t *sel) pn_connection_t *conn = pn_messenger_connection(ctx->messenger, sock, scheme, NULL, NULL, NULL, NULL, ctx); pn_transport_bind(t, conn); + pn_decref(t); pni_conn_modified((pn_connection_ctx_t *) pn_connection_get_context(conn)); } @@ -1096,6 +1097,7 @@ void pn_messenger_process_connection(pn_messenger_t *messenger, pn_event_t *even pn_connection_reset(conn); pn_transport_t *t = pn_transport(); pn_transport_bind(t, conn); + pn_decref(t); pn_transport_config(messenger, conn); } } @@ -1623,6 +1625,7 @@ pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, const char *add pn_messenger_connection(messenger, sock, scheme, user, pass, host, port, NULL); pn_transport_t *transport = pn_transport(); pn_transport_bind(transport, connection); + pn_decref(transport); pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(connection); pn_selectable_t *sel = ctx->selectable; err = pn_transport_config(messenger, connection); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/reactor/acceptor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/acceptor.c b/proton-c/src/reactor/acceptor.c index ccd81f0..952e16f 100644 --- a/proton-c/src/reactor/acceptor.c +++ b/proton-c/src/reactor/acceptor.c @@ -44,8 +44,8 @@ void pni_acceptor_readable(pn_selectable_t *sel) { pn_sasl_mechanisms(sasl, "ANONYMOUS"); pn_sasl_done(sasl, PN_SASL_OK); pn_transport_bind(trans, conn); - pn_reactor_selectable_transport(reactor, sock, trans); pn_decref(trans); + pn_reactor_selectable_transport(reactor, sock, trans); } void pni_acceptor_finalize(pn_selectable_t *sel) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/reactor/connection.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/connection.c b/proton-c/src/reactor/connection.c index 4d08da2..eea0594 100644 --- a/proton-c/src/reactor/connection.c +++ b/proton-c/src/reactor/connection.c @@ -103,6 +103,7 @@ void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event) { pn_sasl_t *sasl = pn_sasl(transport); pn_sasl_mechanisms(sasl, "ANONYMOUS"); pn_transport_bind(transport, conn); + pn_decref(transport); const char *hostname = pn_connection_get_hostname(conn); pn_string_t *str = pn_string(hostname); char *host = pn_string_buffer(str); @@ -115,7 +116,6 @@ void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event) { pn_socket_t sock = pn_connect(pn_reactor_io(reactor), host, port); pn_free(str); pn_reactor_selectable_transport(reactor, sock, transport); - pn_decref(transport); } void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/tests/refcount.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/refcount.c b/proton-c/src/tests/refcount.c index 3c563ad..a36d01c 100644 --- a/proton-c/src/tests/refcount.c +++ b/proton-c/src/tests/refcount.c @@ -20,9 +20,11 @@ */ #include <proton/connection.h> +#include <proton/event.h> #include <proton/session.h> #include <proton/link.h> #include <proton/delivery.h> +#include <proton/transport.h> #include <stdio.h> #include <stdlib.h> @@ -274,6 +276,99 @@ static void test_decref_permutations(void) { permute(4, indexes, objects); } +static void test_transport(void) { + pn_transport_t *transport = pn_transport(); + assert(pn_refcount(transport) == 1); + pn_incref(transport); + assert(pn_refcount(transport) == 2); + pn_decref(transport); + assert(pn_refcount(transport) == 1); + pn_free(transport); +} + +static void test_connection_transport(void) { + pn_connection_t *connection = pn_connection(); + assert(pn_refcount(connection) == 1); + pn_transport_t *transport = pn_transport(); + assert(pn_refcount(transport) == 1); + pn_transport_bind(transport, connection); + assert(pn_refcount(connection) == 2); + pn_decref(transport); + assert(pn_refcount(transport) == 1); // preserved by the bind + assert(pn_refcount(connection) == 1); + pn_free(connection); +} + +static void test_transport_connection(void) { + pn_transport_t *transport = pn_transport(); + assert(pn_refcount(transport) == 1); + pn_connection_t *connection = pn_connection(); + assert(pn_refcount(connection) == 1); + pn_transport_bind(transport, connection); + assert(pn_refcount(connection) == 2); + pn_decref(connection); + assert(pn_refcount(connection) == 1); + assert(pn_refcount(transport) == 1); + pn_free(transport); +} + +static void drain(pn_collector_t *collector) { + while (pn_collector_peek(collector)) { pn_collector_pop(collector); } +} + +static void test_collector_connection_transport(void) { + pn_collector_t *collector = pn_collector(); + assert(pn_refcount(collector) == 1); + pn_connection_t *connection = pn_connection(); + assert(pn_refcount(connection) == 1); + pn_connection_collect(connection, collector); + assert(pn_refcount(collector) == 2); + assert(pn_refcount(connection) == 2); + drain(collector); + assert(pn_refcount(connection) == 1); + pn_transport_t *transport = pn_transport(); + assert(pn_refcount(transport) == 1); + pn_transport_bind(transport, connection); + assert(pn_refcount(transport) == 1); + assert(pn_refcount(connection) == 3); + drain(collector); + assert(pn_refcount(connection) == 2); + pn_decref(transport); + assert(pn_refcount(transport) == 1); // preserved by the bind + assert(pn_refcount(connection) == 1); + pn_free(connection); + assert(pn_refcount(transport) == 1); // events + assert(pn_refcount(connection) == 1); // events + pn_collector_free(collector); +} + +static void test_collector_transport_connection(void) { + pn_collector_t *collector = pn_collector(); + assert(pn_refcount(collector) == 1); + pn_transport_t *transport = pn_transport(); + assert(pn_refcount(transport) == 1); + pn_connection_t *connection = pn_connection(); + assert(pn_refcount(connection) == 1); + pn_connection_collect(connection, collector); + assert(pn_refcount(collector) == 2); + assert(pn_refcount(connection) == 2); + drain(collector); + assert(pn_refcount(connection) == 1); + pn_transport_bind(transport, connection); + assert(pn_refcount(connection) == 3); + assert(pn_refcount(transport) == 1); + drain(collector); + assert(pn_refcount(connection) == 2); + assert(pn_refcount(transport) == 1); + pn_decref(connection); + assert(pn_refcount(connection) == 1); + assert(pn_refcount(transport) == 1); + pn_free(transport); + assert(pn_refcount(connection) == 1); + assert(pn_refcount(transport) == 1); + pn_collector_free(collector); +} + int main(int argc, char **argv) { test_decref_order_csl(); @@ -287,5 +382,11 @@ int main(int argc, char **argv) test_incref_order_ls(); test_decref_permutations(); + + test_transport(); + test_connection_transport(); + test_transport_connection(); + test_collector_connection_transport(); + test_collector_transport_connection(); return 0; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/transport/transport.c ---------------------------------------------------------------------- diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c index 86b35f8..1a7ec69 100644 --- a/proton-c/src/transport/transport.c +++ b/proton-c/src/transport/transport.c @@ -364,6 +364,8 @@ static void pn_transport_initialize(void *object) transport->server = false; transport->halt = false; + transport->referenced = true; + transport->trace = (pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) | (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) | (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF); @@ -400,15 +402,35 @@ static void pni_unmap_remote_channel(pn_session_t *ssn) pn_hash_del(transport->remote_channels, channel); } +static void pn_transport_incref(void *object) +{ + pn_transport_t *transport = (pn_transport_t *) object; + if (!transport->referenced) { + transport->referenced = true; + if (transport->connection) { + pn_incref(transport->connection); + } else { + pn_object_incref(object); + } + } else { + pn_object_incref(object); + } +} static void pn_transport_finalize(void *object); +#define pn_transport_new pn_object_new +#define pn_transport_refcount pn_object_refcount +#define pn_transport_decref pn_object_decref +#define pn_transport_reify pn_object_reify #define pn_transport_hashcode NULL #define pn_transport_compare NULL #define pn_transport_inspect NULL pn_transport_t *pn_transport(void) { - static const pn_class_t clazz = PN_CLASS(pn_transport); +#define pn_transport_free pn_object_free + static const pn_class_t clazz = PN_METACLASS(pn_transport); +#undef pn_transport_free pn_transport_t *transport = (pn_transport_t *) pn_class_new(&clazz, sizeof(pn_transport_t)); if (!transport) return NULL; @@ -453,6 +475,13 @@ static void pn_transport_finalize(void *object) { pn_transport_t *transport = (pn_transport_t *) object; + if (transport->referenced && transport->connection && pn_refcount(transport->connection) > 1) { + pn_object_incref(transport); + transport->referenced = false; + pn_decref(transport->connection); + return; + } + // once the application frees the transport, no further I/O // processing can be done to the connection: pn_transport_unbind(transport); @@ -550,6 +579,7 @@ int pn_transport_unbind(pn_transport_t *transport) pn_connection_t *conn = transport->connection; transport->connection = NULL; + bool was_referenced = transport->referenced; pn_collector_put(conn->collector, PN_OBJECT, conn, PN_CONNECTION_UNBOUND); @@ -572,7 +602,9 @@ int pn_transport_unbind(pn_transport_t *transport) pni_transport_unbind_channels(transport->remote_channels); pn_connection_unbound(conn); - pn_decref(conn); + if (was_referenced) { + pn_decref(conn); + } return 0; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
