Repository: qpid-proton Updated Branches: refs/heads/master a85c89ac8 -> 6e63fd787
PROTON-1445: tests/proactor.c - fix memory management Fix the unit tests to respect the new memory management rules. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6e63fd78 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6e63fd78 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6e63fd78 Branch: refs/heads/master Commit: 6e63fd7878a033d5822987c4b943021e74b6fc83 Parents: a85c89a Author: Alan Conway <[email protected]> Authored: Mon Mar 27 18:05:40 2017 -0400 Committer: Alan Conway <[email protected]> Committed: Mon Mar 27 18:23:53 2017 -0400 ---------------------------------------------------------------------- examples/c/proactor/broker.c | 4 ++ examples/c/proactor/direct.c | 2 + examples/c/proactor/receive.c | 4 ++ examples/c/proactor/send.c | 1 + proton-c/src/proactor/libuv.c | 24 ++++++----- proton-c/src/tests/proactor.c | 83 +++++++++++++++++++++++++++++--------- 6 files changed, 87 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/examples/c/proactor/broker.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c index d322ad0..6a7d1eb 100644 --- a/examples/c/proactor/broker.c +++ b/examples/c/proactor/broker.c @@ -340,6 +340,7 @@ static void handle(broker_t* b, pn_event_t* e) { case PN_TRANSPORT_CLOSED: connection_unsub(b, pn_event_connection(e)); check_condition(e, pn_transport_condition(pn_event_transport(e))); + pn_connection_free(pn_event_connection(e)); break; case PN_CONNECTION_REMOTE_CLOSE: @@ -365,8 +366,11 @@ static void handle(broker_t* b, pn_event_t* e) { case PN_LISTENER_CLOSE: check_condition(e, pn_listener_condition(pn_event_listener(e))); broker_stop(b); + pn_listener_free(pn_event_listener(e)); break; + break; + case PN_PROACTOR_INACTIVE: /* listener and all connections closed */ broker_stop(b); break; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/examples/c/proactor/direct.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/direct.c b/examples/c/proactor/direct.c index f76895c..bda66db 100644 --- a/examples/c/proactor/direct.c +++ b/examples/c/proactor/direct.c @@ -246,6 +246,7 @@ static bool handle(app_data_t* app, pn_event_t* event) { case PN_TRANSPORT_CLOSED: check_condition(event, pn_transport_condition(pn_event_transport(event))); pn_listener_close(app->listener); /* Finished */ + pn_connection_free(pn_event_connection(event)); break; case PN_CONNECTION_REMOTE_CLOSE: @@ -271,6 +272,7 @@ static bool handle(app_data_t* app, pn_event_t* event) { case PN_LISTENER_CLOSE: check_condition(event, pn_listener_condition(pn_event_listener(event))); + pn_listener_free(pn_event_listener(event)); break; case PN_PROACTOR_INACTIVE: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/examples/c/proactor/receive.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c index 43a68cd..6b4f02c 100644 --- a/examples/c/proactor/receive.c +++ b/examples/c/proactor/receive.c @@ -125,6 +125,10 @@ static bool handle(app_data_t* app, pn_event_t* event) { } } break; + case PN_TRANSPORT_CLOSED: + pn_connection_free(pn_event_connection(event)); + break; + case PN_TRANSPORT_ERROR: check_condition(event, pn_transport_condition(pn_event_transport(event))); break; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/examples/c/proactor/send.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c index c21ac68..0b2e68f 100644 --- a/examples/c/proactor/send.c +++ b/examples/c/proactor/send.c @@ -137,6 +137,7 @@ static bool handle(app_data_t* app, pn_event_t* event) { case PN_TRANSPORT_CLOSED: check_condition(event, pn_transport_condition(pn_event_transport(event))); + pn_connection_free(pn_event_connection(event)); break; case PN_CONNECTION_REMOTE_CLOSE: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/proton-c/src/proactor/libuv.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c index 728ba7d..18d9101 100644 --- a/proton-c/src/proactor/libuv.c +++ b/proton-c/src/proactor/libuv.c @@ -655,6 +655,19 @@ static bool leader_process_listener(pn_listener_t *l) { /* NOTE: l may be concurrently accessed by on_connection() */ bool closed = false; uv_mutex_lock(&l->lock); + + /* Process accepted connections */ + for (pconnection_t *pc = pconnection_pop(&l->accept); pc; pc = pconnection_pop(&l->accept)) { + int err = pconnection_init(pc); + if (!err) { + err = uv_accept((uv_stream_t*)&pc->lsocket->tcp, (uv_stream_t*)&pc->tcp); + } else { + listener_error(l, err, "accepting from"); + pconnection_error(pc, err, "accepting from"); + } + work_start(&pc->work); /* Process events for the accepted/failed connection */ + } + switch (l->state) { case L_UNINIT: @@ -685,17 +698,6 @@ static bool leader_process_listener(pn_listener_t *l) { closed = true; } } - /* Process accepted connections - if we are closed they will get an error */ - for (pconnection_t *pc = pconnection_pop(&l->accept); pc; pc = pconnection_pop(&l->accept)) { - int err = pconnection_init(pc); - if (!err) { - err = uv_accept((uv_stream_t*)&pc->lsocket->tcp, (uv_stream_t*)&pc->tcp); - } else { - listener_error(l, err, "accepting from"); - pconnection_error(pc, err, "accepting from"); - } - work_start(&pc->work); /* Process events for the accepted/failed connection */ - } bool has_work = !closed && pn_collector_peek(l->collector); uv_mutex_unlock(&l->lock); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6e63fd78/proton-c/src/tests/proactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/proactor.c b/proton-c/src/tests/proactor.c index f0431cb..b8c4088 100644 --- a/proton-c/src/tests/proactor.c +++ b/proton-c/src/tests/proactor.c @@ -167,7 +167,8 @@ static pn_event_type_t common_handler(test_t *t, pn_event_t *e) { /* Cleanup events */ case PN_LISTENER_CLOSE: pn_listener_free(pn_event_listener(e)); - return PN_LISTENER_CLOSE; + return PN_EVENT_NONE; + case PN_TRANSPORT_CLOSED: pn_connection_free(pn_event_connection(e)); return PN_TRANSPORT_CLOSED; @@ -180,6 +181,7 @@ static pn_event_type_t common_handler(test_t *t, pn_event_t *e) { case PN_LISTENER_ACCEPT: pn_listener_accept(l, pn_connection()); + pn_listener_close(l); /* Only accept one connection */ return PN_EVENT_NONE; case PN_CONNECTION_REMOTE_OPEN: @@ -207,6 +209,23 @@ static pn_event_type_t common_handler(test_t *t, pn_event_t *e) { } } +/* Like common_handler but does not auto-close the listener after one accept */ +static pn_event_type_t listen_handler(test_t *t, pn_event_t *e) { + switch (pn_event_type(e)) { + case PN_LISTENER_ACCEPT: + /* No automatic listener close/free for the inactive test */ + pn_listener_accept(pn_event_listener(e), pn_connection()); + return PN_EVENT_NONE; + + case PN_LISTENER_CLOSE: + /* No automatic free */ + return PN_LISTENER_CLOSE; + + default: + return common_handler(t, e); + } +} + /* close a connection when it is remote open */ static pn_event_type_t open_close_handler(test_t *t, pn_event_t *e) { switch (pn_event_type(e)) { @@ -218,7 +237,7 @@ static pn_event_type_t open_close_handler(test_t *t, pn_event_t *e) { } } -/* Test several client/server connection with 2 proactors */ +/* Test simple client/server connection with 2 proactors */ static void test_client_server(test_t *t) { proactor_test_t pts[] ={ { open_close_handler }, { common_handler } }; PROACTOR_TEST_INIT(pts, t); @@ -231,11 +250,6 @@ static void test_client_server(test_t *t) { pn_proactor_connect(client, pn_connection(), port.host_port); TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); - /* Connect and wait for close at both ends */ - pn_proactor_connect(client, pn_connection(), port.host_port); - TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); - TEST_ETYPE_EQUAL(t, PN_TRANSPORT_CLOSED, PROACTOR_TEST_RUN(pts)); - PROACTOR_TEST_FREE(pts); } @@ -263,7 +277,6 @@ static void test_connection_wake(test_t *t) { sock_close(port.sock); pn_connection_t *c = pn_connection(); - pn_incref(c); /* Keep c alive after proactor frees it */ pn_proactor_connect(client, c, port.host_port); TEST_ETYPE_EQUAL(t, PN_CONNECTION_REMOTE_OPEN, PROACTOR_TEST_RUN(pts)); TEST_CHECK(t, pn_proactor_get(client) == NULL); /* Should be idle */ @@ -276,12 +289,12 @@ static void test_connection_wake(test_t *t) { PROACTOR_TEST_FREE(pts); /* The pn_connection_t is still valid so wake is legal but a no-op */ pn_connection_wake(c); - pn_decref(c); + pn_connection_free(c); } /* Test that INACTIVE event is generated when last connections/listeners closes. */ static void test_inactive(test_t *t) { - proactor_test_t pts[] = { { open_wake_handler }, { common_handler } }; + proactor_test_t pts[] = { { open_wake_handler }, { listen_handler } }; PROACTOR_TEST_INIT(pts, t); pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor; test_port_t port = test_port(localhost); /* Hold a port */ @@ -303,6 +316,7 @@ static void test_inactive(test_t *t) { pn_listener_close(l); TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts)); TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts)); + pn_listener_free(l); sock_close(port.sock); PROACTOR_TEST_FREE(pts); @@ -310,7 +324,7 @@ static void test_inactive(test_t *t) { /* Tests for error handling */ static void test_errors(test_t *t) { - proactor_test_t pts[] = { { open_wake_handler }, { common_handler } }; + proactor_test_t pts[] = { { open_wake_handler }, { listen_handler } }; PROACTOR_TEST_INIT(pts, t); pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor; test_port_t port = test_port(localhost); /* Hold a port */ @@ -328,6 +342,7 @@ static void test_errors(test_t *t) { TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts)); TEST_STR_IN(t, "xxx", last_condition); TEST_ETYPE_EQUAL(t, PN_PROACTOR_INACTIVE, PROACTOR_TEST_RUN(pts)); + pn_listener_free(l); /* Connect with no listener */ c = pn_connection(); @@ -342,13 +357,14 @@ static void test_errors(test_t *t) { /* Test that we can control listen/select on ipv6/v4 and listen on both by default */ static void test_ipv4_ipv6(test_t *t) { - proactor_test_t pts[] ={ { open_close_handler }, { common_handler } }; + proactor_test_t pts[] ={ { open_close_handler }, { listen_handler } }; PROACTOR_TEST_INIT(pts, t); pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor; /* Listen on all interfaces for IPv6 only. If this fails, skip IPv6 tests */ test_port_t port6 = test_port("[::]"); - pn_proactor_listen(server, pn_listener(), port6.host_port, 4); + pn_listener_t *l6 = pn_listener(); + pn_proactor_listen(server, l6, port6.host_port, 4); TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); sock_close(port6.sock); pn_event_type_t e = PROACTOR_TEST_GET(pts); @@ -360,7 +376,8 @@ static void test_ipv4_ipv6(test_t *t) { /* Listen on all interfaces for IPv4 only. */ test_port_t port4 = test_port("0.0.0.0"); - pn_proactor_listen(server, pn_listener(), port4.host_port, 4); + pn_listener_t *l4 = pn_listener(); + pn_proactor_listen(server, l4, port4.host_port, 4); TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); sock_close(port4.sock); TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s", last_condition); @@ -368,11 +385,13 @@ static void test_ipv4_ipv6(test_t *t) { /* Empty address listens on both IPv4 and IPv6 on all interfaces */ test_port_t port = test_port(""); - pn_proactor_listen(server, pn_listener(), port.host_port, 4); + pn_listener_t *l = pn_listener(); + pn_proactor_listen(server, l, port.host_port, 4); TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); sock_close(port.sock); e = PROACTOR_TEST_GET(pts); - TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s", last_condition); PROACTOR_TEST_DRAIN(pts); + TEST_CHECKF(t, PROACTOR_TEST_GET(pts) != PN_LISTENER_CLOSE, "listener error: %s", last_condition); + PROACTOR_TEST_DRAIN(pts); #define EXPECT_CONNECT(TP, HOST) do { \ pn_proactor_connect(client, pn_connection(), test_port_use_host(&(TP), (HOST))); \ @@ -402,24 +421,48 @@ static void test_ipv4_ipv6(test_t *t) { EXPECT_FAIL(port6, "127.0.0.1"); /* fail v4->v6 */ EXPECT_FAIL(port4, "[::1]"); /* fail v6->v4 */ } + PROACTOR_TEST_DRAIN(pts); + + pn_listener_close(l); + TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts)); + pn_listener_free(l); + + pn_listener_close(l6); + TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts)); + pn_listener_free(l6); + + pn_listener_close(l4); + TEST_ETYPE_EQUAL(t, PN_LISTENER_CLOSE, PROACTOR_TEST_RUN(pts)); + pn_listener_free(l4); PROACTOR_TEST_FREE(pts); } /* Make sure pn_proactor_free cleans up open sockets */ static void test_free_cleanup(test_t *t) { - proactor_test_t pts[] = { { open_wake_handler }, { common_handler } }; + proactor_test_t pts[] = { { open_wake_handler }, { listen_handler } }; PROACTOR_TEST_INIT(pts, t); pn_proactor_t *client = pts[0].proactor, *server = pts[1].proactor; test_port_t ports[3] = { test_port(localhost), test_port(localhost), test_port(localhost) }; + pn_listener_t *l[3]; + pn_connection_t *c[3]; for (int i = 0; i < 3; ++i) { - pn_proactor_listen(server, pn_listener(), ports[i].host_port, 2); + l[i] = pn_listener(); + pn_proactor_listen(server, l[i], ports[i].host_port, 2); TEST_ETYPE_EQUAL(t, PN_LISTENER_OPEN, PROACTOR_TEST_RUN(pts)); sock_close(ports[i].sock); - pn_proactor_connect(client, pn_connection(), ports[i].host_port); - pn_proactor_connect(client, pn_connection(), ports[i].host_port); + c[i] = pn_connection(); + pn_proactor_connect(client, c[i], ports[i].host_port); } PROACTOR_TEST_FREE(pts); + /* Safe to free after proactor is gone */ + for (int i = 0; i < 3; ++i) { + pn_listener_free(l[i]); + pn_connection_free(c[i]); + } + /* Freeing an unused listener/connector should be safe */ + pn_listener_free(pn_listener()); + pn_connection_free(pn_connection()); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
