Repository: qpid-proton
Updated Branches:
  refs/heads/master a4d44fd77 -> d1123b0e9


made the connection -> transport pointer from weak to strong


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d1123b0e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d1123b0e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d1123b0e

Branch: refs/heads/master
Commit: d1123b0e95a4aca2f54d3122d49d598adc08e654
Parents: a4d44fd
Author: Rafael Schloming <[email protected]>
Authored: Thu Feb 12 07:41:19 2015 -0500
Committer: Rafael Schloming <[email protected]>
Committed: Thu Feb 12 07:44:18 2015 -0500

----------------------------------------------------------------------
 proton-c/src/engine/engine-internal.h |   2 +
 proton-c/src/engine/engine.c          |  10 +++
 proton-c/src/handlers/iohandler.c     |   4 ++
 proton-c/src/messenger/messenger.c    |   3 +
 proton-c/src/reactor/acceptor.c       |   2 +-
 proton-c/src/reactor/connection.c     |   2 +-
 proton-c/src/tests/refcount.c         | 101 +++++++++++++++++++++++++++++
 proton-c/src/transport/transport.c    |  36 +++++++++-
 8 files changed, 156 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/engine/engine-internal.h
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine-internal.h 
b/proton-c/src/engine/engine-internal.h
index 5655e79..c8aece4 100644
--- a/proton-c/src/engine/engine-internal.h
+++ b/proton-c/src/engine/engine-internal.h
@@ -194,6 +194,8 @@ struct pn_transport_t {
   bool posted_idle_timeout;
   bool server;
   bool halt;
+
+  bool referenced;
 };
 
 struct pn_connection_t {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/engine/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c
index 4d56100..09668c5 100644
--- a/proton-c/src/engine/engine.c
+++ b/proton-c/src/engine/engine.c
@@ -460,6 +460,16 @@ static void pn_connection_finalize(void *object)
   pn_connection_t *conn = (pn_connection_t *) object;
   pn_endpoint_t *endpoint = &conn->endpoint;
 
+  if (conn->transport) {
+    assert(!conn->transport->referenced);
+    pn_free(conn->transport);
+  }
+
+  // freeing the transport could post events
+  if (pn_refcount(conn) > 0) {
+    return;
+  }
+
   pni_free_children(conn->sessions, conn->freed);
   pn_free(conn->context);
   pn_decref(conn->collector);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/handlers/iohandler.c
----------------------------------------------------------------------
diff --git a/proton-c/src/handlers/iohandler.c 
b/proton-c/src/handlers/iohandler.c
index 7f5dcb2..9c8db58 100644
--- a/proton-c/src/handlers/iohandler.c
+++ b/proton-c/src/handlers/iohandler.c
@@ -21,6 +21,7 @@
 
 #include <proton/handlers.h>
 #include <proton/selector.h>
+#include <proton/transport.h>
 #include <assert.h>
 
 static void *pni_selector_handle = NULL;
@@ -91,6 +92,9 @@ static void pn_iodispatch(pn_iohandler_t *handler, pn_event_t 
*event) {
   case PN_TRANSPORT:
     pni_handle_transport(reactor, event);
     break;
+  case PN_TRANSPORT_CLOSED:
+    pn_transport_unbind(pn_event_transport(event));
+    break;
   case PN_REACTOR_QUIESCED:
     pni_handle_quiesced(reactor, selector);
     break;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/messenger/messenger.c
----------------------------------------------------------------------
diff --git a/proton-c/src/messenger/messenger.c 
b/proton-c/src/messenger/messenger.c
index 0d85ac7..c6fdfb5 100644
--- a/proton-c/src/messenger/messenger.c
+++ b/proton-c/src/messenger/messenger.c
@@ -337,6 +337,7 @@ static void pni_listener_readable(pn_selectable_t *sel)
 
   pn_connection_t *conn = pn_messenger_connection(ctx->messenger, sock, 
scheme, NULL, NULL, NULL, NULL, ctx);
   pn_transport_bind(t, conn);
+  pn_decref(t);
   pni_conn_modified((pn_connection_ctx_t *) pn_connection_get_context(conn));
 }
 
@@ -1096,6 +1097,7 @@ void pn_messenger_process_connection(pn_messenger_t 
*messenger, pn_event_t *even
       pn_connection_reset(conn);
       pn_transport_t *t = pn_transport();
       pn_transport_bind(t, conn);
+      pn_decref(t);
       pn_transport_config(messenger, conn);
     }
   }
@@ -1623,6 +1625,7 @@ pn_connection_t *pn_messenger_resolve(pn_messenger_t 
*messenger, const char *add
     pn_messenger_connection(messenger, sock, scheme, user, pass, host, port, 
NULL);
   pn_transport_t *transport = pn_transport();
   pn_transport_bind(transport, connection);
+  pn_decref(transport);
   pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) 
pn_connection_get_context(connection);
   pn_selectable_t *sel = ctx->selectable;
   err = pn_transport_config(messenger, connection);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/reactor/acceptor.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/acceptor.c b/proton-c/src/reactor/acceptor.c
index ccd81f0..952e16f 100644
--- a/proton-c/src/reactor/acceptor.c
+++ b/proton-c/src/reactor/acceptor.c
@@ -44,8 +44,8 @@ void pni_acceptor_readable(pn_selectable_t *sel) {
   pn_sasl_mechanisms(sasl, "ANONYMOUS");
   pn_sasl_done(sasl, PN_SASL_OK);
   pn_transport_bind(trans, conn);
-  pn_reactor_selectable_transport(reactor, sock, trans);
   pn_decref(trans);
+  pn_reactor_selectable_transport(reactor, sock, trans);
 }
 
 void pni_acceptor_finalize(pn_selectable_t *sel) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/reactor/connection.c
----------------------------------------------------------------------
diff --git a/proton-c/src/reactor/connection.c 
b/proton-c/src/reactor/connection.c
index 4d08da2..eea0594 100644
--- a/proton-c/src/reactor/connection.c
+++ b/proton-c/src/reactor/connection.c
@@ -103,6 +103,7 @@ void pni_handle_open(pn_reactor_t *reactor, pn_event_t 
*event) {
   pn_sasl_t *sasl = pn_sasl(transport);
   pn_sasl_mechanisms(sasl, "ANONYMOUS");
   pn_transport_bind(transport, conn);
+  pn_decref(transport);
   const char *hostname = pn_connection_get_hostname(conn);
   pn_string_t *str = pn_string(hostname);
   char *host = pn_string_buffer(str);
@@ -115,7 +116,6 @@ void pni_handle_open(pn_reactor_t *reactor, pn_event_t 
*event) {
   pn_socket_t sock = pn_connect(pn_reactor_io(reactor), host, port);
   pn_free(str);
   pn_reactor_selectable_transport(reactor, sock, transport);
-  pn_decref(transport);
 }
 
 void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/tests/refcount.c
----------------------------------------------------------------------
diff --git a/proton-c/src/tests/refcount.c b/proton-c/src/tests/refcount.c
index 3c563ad..a36d01c 100644
--- a/proton-c/src/tests/refcount.c
+++ b/proton-c/src/tests/refcount.c
@@ -20,9 +20,11 @@
  */
 
 #include <proton/connection.h>
+#include <proton/event.h>
 #include <proton/session.h>
 #include <proton/link.h>
 #include <proton/delivery.h>
+#include <proton/transport.h>
 #include <stdio.h>
 #include <stdlib.h>
 
@@ -274,6 +276,99 @@ static void test_decref_permutations(void) {
   permute(4, indexes, objects);
 }
 
+static void test_transport(void) {
+  pn_transport_t *transport = pn_transport();
+  assert(pn_refcount(transport) == 1);
+  pn_incref(transport);
+  assert(pn_refcount(transport) == 2);
+  pn_decref(transport);
+  assert(pn_refcount(transport) == 1);
+  pn_free(transport);
+}
+
+static void test_connection_transport(void) {
+  pn_connection_t *connection = pn_connection();
+  assert(pn_refcount(connection) == 1);
+  pn_transport_t *transport = pn_transport();
+  assert(pn_refcount(transport) == 1);
+  pn_transport_bind(transport, connection);
+  assert(pn_refcount(connection) == 2);
+  pn_decref(transport);
+  assert(pn_refcount(transport) == 1); // preserved by the bind
+  assert(pn_refcount(connection) == 1);
+  pn_free(connection);
+}
+
+static void test_transport_connection(void) {
+  pn_transport_t *transport = pn_transport();
+  assert(pn_refcount(transport) == 1);
+  pn_connection_t *connection = pn_connection();
+  assert(pn_refcount(connection) == 1);
+  pn_transport_bind(transport, connection);
+  assert(pn_refcount(connection) == 2);
+  pn_decref(connection);
+  assert(pn_refcount(connection) == 1);
+  assert(pn_refcount(transport) == 1);
+  pn_free(transport);
+}
+
+static void drain(pn_collector_t *collector) {
+  while (pn_collector_peek(collector)) { pn_collector_pop(collector); }
+}
+
+static void test_collector_connection_transport(void) {
+  pn_collector_t *collector = pn_collector();
+  assert(pn_refcount(collector) == 1);
+  pn_connection_t *connection = pn_connection();
+  assert(pn_refcount(connection) == 1);
+  pn_connection_collect(connection, collector);
+  assert(pn_refcount(collector) == 2);
+  assert(pn_refcount(connection) == 2);
+  drain(collector);
+  assert(pn_refcount(connection) == 1);
+  pn_transport_t *transport = pn_transport();
+  assert(pn_refcount(transport) == 1);
+  pn_transport_bind(transport, connection);
+  assert(pn_refcount(transport) == 1);
+  assert(pn_refcount(connection) == 3);
+  drain(collector);
+  assert(pn_refcount(connection) == 2);
+  pn_decref(transport);
+  assert(pn_refcount(transport) == 1); // preserved by the bind
+  assert(pn_refcount(connection) == 1);
+  pn_free(connection);
+  assert(pn_refcount(transport) == 1); // events
+  assert(pn_refcount(connection) == 1); // events
+  pn_collector_free(collector);
+}
+
+static void test_collector_transport_connection(void) {
+  pn_collector_t *collector = pn_collector();
+  assert(pn_refcount(collector) == 1);
+  pn_transport_t *transport = pn_transport();
+  assert(pn_refcount(transport) == 1);
+  pn_connection_t *connection = pn_connection();
+  assert(pn_refcount(connection) == 1);
+  pn_connection_collect(connection, collector);
+  assert(pn_refcount(collector) == 2);
+  assert(pn_refcount(connection) == 2);
+  drain(collector);
+  assert(pn_refcount(connection) == 1);
+  pn_transport_bind(transport, connection);
+  assert(pn_refcount(connection) == 3);
+  assert(pn_refcount(transport) == 1);
+  drain(collector);
+  assert(pn_refcount(connection) == 2);
+  assert(pn_refcount(transport) == 1);
+  pn_decref(connection);
+  assert(pn_refcount(connection) == 1);
+  assert(pn_refcount(transport) == 1);
+  pn_free(transport);
+  assert(pn_refcount(connection) == 1);
+  assert(pn_refcount(transport) == 1);
+  pn_collector_free(collector);
+}
+
 int main(int argc, char **argv)
 {
   test_decref_order_csl();
@@ -287,5 +382,11 @@ int main(int argc, char **argv)
   test_incref_order_ls();
 
   test_decref_permutations();
+
+  test_transport();
+  test_connection_transport();
+  test_transport_connection();
+  test_collector_connection_transport();
+  test_collector_transport_connection();
   return 0;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d1123b0e/proton-c/src/transport/transport.c
----------------------------------------------------------------------
diff --git a/proton-c/src/transport/transport.c 
b/proton-c/src/transport/transport.c
index 86b35f8..1a7ec69 100644
--- a/proton-c/src/transport/transport.c
+++ b/proton-c/src/transport/transport.c
@@ -364,6 +364,8 @@ static void pn_transport_initialize(void *object)
   transport->server = false;
   transport->halt = false;
 
+  transport->referenced = true;
+
   transport->trace = (pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : 
PN_TRACE_OFF) |
     (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) |
     (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF);
@@ -400,15 +402,35 @@ static void pni_unmap_remote_channel(pn_session_t *ssn)
   pn_hash_del(transport->remote_channels, channel);
 }
 
+static void pn_transport_incref(void *object)
+{
+  pn_transport_t *transport = (pn_transport_t *) object;
+  if (!transport->referenced) {
+    transport->referenced = true;
+    if (transport->connection) {
+      pn_incref(transport->connection);
+    } else {
+      pn_object_incref(object);
+    }
+  } else {
+    pn_object_incref(object);
+  }
+}
 
 static void pn_transport_finalize(void *object);
+#define pn_transport_new pn_object_new
+#define pn_transport_refcount pn_object_refcount
+#define pn_transport_decref pn_object_decref
+#define pn_transport_reify pn_object_reify
 #define pn_transport_hashcode NULL
 #define pn_transport_compare NULL
 #define pn_transport_inspect NULL
 
 pn_transport_t *pn_transport(void)
 {
-  static const pn_class_t clazz = PN_CLASS(pn_transport);
+#define pn_transport_free pn_object_free
+  static const pn_class_t clazz = PN_METACLASS(pn_transport);
+#undef pn_transport_free
   pn_transport_t *transport =
     (pn_transport_t *) pn_class_new(&clazz, sizeof(pn_transport_t));
   if (!transport) return NULL;
@@ -453,6 +475,13 @@ static void pn_transport_finalize(void *object)
 {
   pn_transport_t *transport = (pn_transport_t *) object;
 
+  if (transport->referenced && transport->connection && 
pn_refcount(transport->connection) > 1) {
+    pn_object_incref(transport);
+    transport->referenced = false;
+    pn_decref(transport->connection);
+    return;
+  }
+
   // once the application frees the transport, no further I/O
   // processing can be done to the connection:
   pn_transport_unbind(transport);
@@ -550,6 +579,7 @@ int pn_transport_unbind(pn_transport_t *transport)
 
   pn_connection_t *conn = transport->connection;
   transport->connection = NULL;
+  bool was_referenced = transport->referenced;
 
   pn_collector_put(conn->collector, PN_OBJECT, conn, PN_CONNECTION_UNBOUND);
 
@@ -572,7 +602,9 @@ int pn_transport_unbind(pn_transport_t *transport)
   pni_transport_unbind_channels(transport->remote_channels);
 
   pn_connection_unbound(conn);
-  pn_decref(conn);
+  if (was_referenced) {
+    pn_decref(conn);
+  }
   return 0;
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to