Author: rhs
Date: Thu Jun 19 17:23:25 2014
New Revision: 1603961

URL: http://svn.apache.org/r1603961
Log:
fixed memory leak by making the transport clean up references held by the 
channel and handle maps; made use of the backpointer versions of new, incref, 
and decref to track down issue

Modified:
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/engine/event.c
    qpid/proton/trunk/proton-c/src/transport/transport.c

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1603961&r1=1603960&r2=1603961&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Thu Jun 19 17:23:25 2014
@@ -184,7 +184,7 @@ void pn_add_session(pn_connection_t *con
 {
   pn_list_add(conn->sessions, ssn);
   ssn->connection = conn;
-  pn_incref(conn);  // keep around until finalized
+  pn_incref2(conn, ssn);  // keep around until finalized
 }
 
 void pn_remove_session(pn_connection_t *conn, pn_session_t *ssn)
@@ -224,7 +224,7 @@ void pn_session_free(pn_session_t *sessi
   pn_endpoint_t *endpoint = (pn_endpoint_t *) session;
   LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint);
   session->endpoint.freed = true;
-  pn_decref(session);
+  pn_decref2(session, session->connection);
 }
 
 void *pn_session_get_context(pn_session_t *session)
@@ -288,10 +288,10 @@ void pn_link_free(pn_link_t *link)
   while (link->settled_head) {
     delivery = link->settled_head;
     LL_POP(link, settled, pn_delivery_t);
-    pn_decref(delivery);
+    pn_decref2(delivery, link);
   }
   link->endpoint.freed = true;
-  pn_decref(link);
+  pn_decref2(link, link->session);
 }
 
 void *pn_link_get_context(pn_link_t *link)
@@ -556,7 +556,7 @@ void pn_add_tpwork(pn_delivery_t *delive
   {
     LL_ADD(connection, tpwork, delivery);
     delivery->tpwork = true;
-    pn_incref(delivery);
+    pn_incref2(delivery, connection);
   }
   pn_modified(connection, &connection->endpoint, true);
 }
@@ -568,7 +568,7 @@ void pn_clear_tpwork(pn_delivery_t *deli
   {
     LL_REMOVE(connection, tpwork, delivery);
     delivery->tpwork = false;
-    pn_decref(delivery);  // may free delivery!
+    pn_decref2(delivery, connection);  // may free delivery!
   }
 }
 
@@ -590,7 +590,7 @@ void pn_modified(pn_connection_t *connec
   if (!endpoint->modified) {
     LL_ADD(connection, transport, endpoint);
     endpoint->modified = true;
-    pn_incref(endpoint);
+    pn_incref2(endpoint, connection);
   }
 
   if (emit && connection->transport) {
@@ -606,7 +606,7 @@ void pn_clear_modified(pn_connection_t *
     endpoint->transport_next = NULL;
     endpoint->transport_prev = NULL;
     endpoint->modified = false;
-    pn_decref(endpoint);  // may free endpoint!
+    pn_decref2(endpoint, connection);  // may free endpoint!
   }
 }
 
@@ -704,7 +704,7 @@ static void pn_session_finalize(void *ob
   pn_delivery_map_free(&session->state.outgoing);
   pn_free(session->state.local_handles);
   pn_free(session->state.remote_handles);
-  pn_decref(session->connection);
+  pn_decref2(session->connection, session);
 }
 
 #define pn_session_initialize NULL
@@ -716,7 +716,7 @@ pn_session_t *pn_session(pn_connection_t
 {
   assert(conn);
   static const pn_class_t clazz = PN_CLASS(pn_session);
-  pn_session_t *ssn = (pn_session_t *) pn_new(sizeof(pn_session_t), &clazz);
+  pn_session_t *ssn = (pn_session_t *) pn_new2(sizeof(pn_session_t), &clazz, 
conn);
   if (!ssn) return NULL;
 
   pn_endpoint_init(&ssn->endpoint, SESSION, conn);
@@ -812,7 +812,7 @@ static void pn_link_finalize(void *objec
   pn_terminus_free(&link->remote_target);
   pn_free(link->name);
   pn_endpoint_tini(endpoint);
-  pn_decref(link->session);
+  pn_decref2(link->session, link);
 }
 
 #define pn_link_initialize NULL
@@ -823,11 +823,11 @@ static void pn_link_finalize(void *objec
 pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name)
 {
   static const pn_class_t clazz = PN_CLASS(pn_link);
-  pn_link_t *link = (pn_link_t *) pn_new(sizeof(pn_link_t), &clazz);
+  pn_link_t *link = (pn_link_t *) pn_new2(sizeof(pn_link_t), &clazz, session);
 
   pn_endpoint_init(&link->endpoint, type, session->connection);
   pn_add_link(session, link);
-  pn_incref(session);  // keep session until link finalized
+  pn_incref2(session, link);  // keep session until link finalized
   link->name = pn_string(name);
   pn_terminus_init(&link->source, PN_SOURCE);
   pn_terminus_init(&link->target, PN_TARGET);
@@ -1067,7 +1067,7 @@ static void pn_delivery_finalize(void *o
   pn_buffer_free(delivery->bytes);
   pn_disposition_finalize(&delivery->local);
   pn_disposition_finalize(&delivery->remote);
-  pn_decref(delivery->link);
+  pn_decref2(delivery->link, delivery);
 }
 
 static void pn_disposition_init(pn_disposition_t *ds)
@@ -1102,10 +1102,10 @@ pn_delivery_t *pn_delivery(pn_link_t *li
   LL_POP(link, settled, pn_delivery_t);
   if (!delivery) {
     static const pn_class_t clazz = PN_CLASS(pn_delivery);
-    delivery = (pn_delivery_t *) pn_new(sizeof(pn_delivery_t), &clazz);
+    delivery = (pn_delivery_t *) pn_new2(sizeof(pn_delivery_t), &clazz, link);
     if (!delivery) return NULL;
     delivery->link = link;
-    pn_incref(delivery->link);  // keep link until finalized
+    pn_incref2(delivery->link, delivery);  // keep link until finalized
     delivery->tag = pn_buffer(16);
     delivery->bytes = pn_buffer(64);
     pn_disposition_init(&delivery->local);

Modified: qpid/proton/trunk/proton-c/src/engine/event.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/event.c?rev=1603961&r1=1603960&r2=1603961&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/event.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/event.c Thu Jun 19 17:23:25 2014
@@ -111,7 +111,9 @@ pn_event_t *pn_collector_put(pn_collecto
 
   event->type = type;
   event->context = context;
-  pn_incref(event->context);
+  pn_incref2(event->context, collector);
+
+  //printf("event %s on %p\n", pn_event_type_name(event->type), 
event->context);
 
   return event;
 }
@@ -136,7 +138,7 @@ bool pn_collector_pop(pn_collector_t *co
 
   // decref before adding to the free list
   if (event->context) {
-    pn_decref(event->context);
+    pn_decref2(event->context, collector);
     event->context = NULL;
   }
 

Modified: qpid/proton/trunk/proton-c/src/transport/transport.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/transport/transport.c?rev=1603961&r1=1603960&r2=1603961&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/transport/transport.c (original)
+++ qpid/proton/trunk/proton-c/src/transport/transport.c Thu Jun 19 17:23:25 
2014
@@ -262,7 +262,7 @@ int pn_transport_bind(pn_transport_t *tr
   if (connection->transport) return PN_STATE_ERR;
   transport->connection = connection;
   connection->transport = transport;
-  pn_incref(connection);
+  pn_incref2(connection, transport);
   if (transport->open_rcvd) {
     PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE);
     pn_collector_put(connection->collector, PN_CONNECTION_REMOTE_STATE, 
connection);
@@ -274,6 +274,25 @@ int pn_transport_bind(pn_transport_t *tr
   return 0;
 }
 
+void pni_transport_unbind_handles(pn_hash_t *handles)
+{
+  for (pn_handle_t h = pn_hash_head(handles); h; h = pn_hash_next(handles, h)) 
{
+    uintptr_t key = pn_hash_key(handles, h);
+    pn_hash_del(handles, key);
+  }
+}
+
+void pni_transport_unbind_channels(pn_hash_t *channels)
+{
+  for (pn_handle_t h = pn_hash_head(channels); h; h = pn_hash_next(channels, 
h)) {
+    uintptr_t key = pn_hash_key(channels, h);
+    pn_session_t *ssn = (pn_session_t *) pn_hash_value(channels, h);
+    pni_transport_unbind_handles(ssn->state.local_handles);
+    pni_transport_unbind_handles(ssn->state.remote_handles);
+    pn_hash_del(channels, key);
+  }
+}
+
 int pn_transport_unbind(pn_transport_t *transport)
 {
   assert(transport);
@@ -282,6 +301,7 @@ int pn_transport_unbind(pn_transport_t *
   pn_connection_t *conn = transport->connection;
   transport->connection = NULL;
 
+  // XXX: what happens if the endpoints are freed before we get here?
   pn_session_t *ssn = pn_session_head(conn, 0);
   while (ssn) {
     pn_delivery_map_clear(&ssn->state.incoming);
@@ -296,8 +316,11 @@ int pn_transport_unbind(pn_transport_t *
     endpoint = endpoint->endpoint_next;
   }
 
+  pni_transport_unbind_channels(transport->local_channels);
+  pni_transport_unbind_channels(transport->remote_channels);
+
   pn_connection_unbound(conn);
-  pn_decref(conn);
+  pn_decref2(conn, transport);
   return 0;
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to