Author: rhs
Date: Tue Jun 12 19:29:58 2012
New Revision: 1349489

URL: http://svn.apache.org/viewvc?rev=1349489&view=rev
Log:
fixed numerous engine bugs; added tests

Modified:
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/tests/proton_tests/engine.py

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1349489&r1=1349488&r2=1349489&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Tue Jun 12 19:29:58 2012
@@ -97,13 +97,15 @@ pn_state_t pn_connection_state(pn_connec
 /** @todo: needs documentation */
 pn_error_t *pn_connection_error(pn_connection_t *connection);
 /** @todo: needs documentation */
-char *pn_connection_container(pn_connection_t *connection);
+const char *pn_connection_container(pn_connection_t *connection);
 /** @todo: needs documentation */
 void pn_connection_set_container(pn_connection_t *connection, const char 
*container);
 /** @todo: needs documentation */
-char *pn_connection_hostname(pn_connection_t *connection);
+const char *pn_connection_hostname(pn_connection_t *connection);
 /** @todo: needs documentation */
 void pn_connection_set_hostname(pn_connection_t *connection, const char 
*hostname);
+const char *pn_connection_remote_container(pn_connection_t *connection);
+const char *pn_connection_remote_hostname(pn_connection_t *connection);
 
 /** Extracts the first delivery on the connection that has pending
  *  operations.

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1349489&r1=1349488&r2=1349489&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Tue Jun 12 19:29:58 
2012
@@ -124,6 +124,8 @@ struct pn_connection_t {
   pn_delivery_t *tpwork_tail;
   char *container;
   char *hostname;
+  char *remote_container;
+  char *remote_hostname;
 };
 
 struct pn_session_t {

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1349489&r1=1349488&r2=1349489&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Jun 12 19:29:58 2012
@@ -197,6 +197,8 @@ void pn_connection_destroy(pn_connection
   free(connection->sessions);
   free(connection->container);
   free(connection->hostname);
+  free(connection->remote_container);
+  free(connection->remote_hostname);
   free(connection);
 }
 
@@ -407,6 +409,8 @@ pn_connection_t *pn_connection()
   conn->tpwork_tail = NULL;
   conn->container = NULL;
   conn->hostname = NULL;
+  conn->remote_container = NULL;
+  conn->remote_hostname = NULL;
 
   return conn;
 }
@@ -421,7 +425,7 @@ pn_error_t *pn_connection_error(pn_conne
   return connection ? &connection->endpoint.error : NULL;
 }
 
-char *pn_connection_container(pn_connection_t *connection)
+const char *pn_connection_container(pn_connection_t *connection)
 {
   return connection ? connection->container : NULL;
 }
@@ -433,7 +437,7 @@ void pn_connection_set_container(pn_conn
   connection->container = pn_strdup(container);
 }
 
-char *pn_connection_hostname(pn_connection_t *connection)
+const char *pn_connection_hostname(pn_connection_t *connection)
 {
   return connection ? connection->hostname : NULL;
 }
@@ -445,6 +449,16 @@ void pn_connection_set_hostname(pn_conne
   connection->hostname = pn_strdup(hostname);
 }
 
+const char *pn_connection_remote_container(pn_connection_t *connection)
+{
+  return connection ? connection->remote_container : NULL;
+}
+
+const char *pn_connection_remote_hostname(pn_connection_t *connection)
+{
+  return connection ? connection->remote_hostname : NULL;
+}
+
 pn_delivery_t *pn_work_head(pn_connection_t *connection)
 {
   if (!connection) return NULL;
@@ -556,6 +570,8 @@ bool pn_matches(pn_endpoint_t *endpoint,
 {
   if (endpoint->type != type) return false;
 
+  if (!state) return true;
+
   int st = endpoint->state;
   if ((state & PN_REMOTE_MASK) == 0 || (state & PN_LOCAL_MASK) == 0)
     return st & state;
@@ -1055,11 +1071,32 @@ int pn_do_error(pn_transport_t *transpor
   return PN_ERR;
 }
 
+char *pn_bytes_strdup(pn_bytes_t str)
+{
+  return pn_strndup(str.start, str.size);
+}
+
 int pn_do_open(pn_dispatcher_t *disp)
 {
   pn_transport_t *transport = disp->context;
   pn_connection_t *conn = transport->connection;
+  bool container_q, hostname_q;
+  pn_bytes_t remote_container, remote_hostname;
+  int err = pn_scan_args(disp, "D.[?S?S]", &container_q, &remote_container,
+                         &hostname_q, &remote_hostname);
+  if (err) return err;
+  if (container_q) {
+    conn->remote_container = pn_bytes_strdup(remote_container);
+  } else {
+    conn->remote_container = NULL;
+  }
+  if (hostname_q) {
+    conn->remote_hostname = pn_bytes_strdup(remote_hostname);
+  } else {
+    conn->remote_hostname = NULL;
+  }
   PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE);
+  transport->open_rcvd = true;
   return 0;
 }
 
@@ -1103,11 +1140,6 @@ pn_link_state_t *pn_find_link(pn_session
   return NULL;
 }
 
-char *pn_bytes_strdup(pn_bytes_t str)
-{
-  return pn_strndup(str.start, str.size);
-}
-
 int pn_do_attach(pn_dispatcher_t *disp)
 {
   pn_transport_t *transport = disp->context;
@@ -1329,7 +1361,7 @@ int pn_do_detach(pn_dispatcher_t *disp)
   pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
   pn_link_t *link = link_state->link;
 
-  link_state->remote_handle = -1;
+  link_state->remote_handle = -2;
 
   if (closed)
   {
@@ -1347,7 +1379,7 @@ int pn_do_end(pn_dispatcher_t *disp)
   pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
   pn_session_t *session = ssn_state->session;
 
-  ssn_state->remote_channel = -1;
+  ssn_state->remote_channel = -2;
   PN_SET_REMOTE(session->endpoint.state, PN_REMOTE_CLOSED);
   return 0;
 }
@@ -1362,6 +1394,8 @@ int pn_do_close(pn_dispatcher_t *disp)
 
 ssize_t pn_input(pn_transport_t *transport, char *bytes, size_t available)
 {
+  if (!transport) return PN_ARG_ERR;
+
   if (!available) {
     pn_do_error(transport, "amqp:connection:framing-error", "connection 
aborted");
     if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM))
@@ -1454,13 +1488,15 @@ int pn_process_link_setup(pn_transport_t
     {
       // XXX
       state->local_handle = link->id;
-      pn_post_frame(transport->disp, ssn_state->local_channel, 
"DL[SIonn?DL[S]?DL[S]nnI]", ATTACH,
-                    link->name,
-                    state->local_handle,
-                    endpoint->type == RECEIVER,
-                    (bool) link->local_source, SOURCE, link->local_source,
-                    (bool) link->local_target, TARGET, link->local_target,
-                    0);
+      int err = pn_post_frame(transport->disp, ssn_state->local_channel,
+                              "DL[SIonn?DL[S]?DL[S]nnI]", ATTACH,
+                              link->name,
+                              state->local_handle,
+                              endpoint->type == RECEIVER,
+                              (bool) link->local_source, SOURCE, 
link->local_source,
+                              (bool) link->local_target, TARGET, 
link->local_target,
+                              0);
+      if (err) return err;
     }
   }
 
@@ -1581,7 +1617,7 @@ int pn_process_tpwork_receiver(pn_transp
   pn_link_t *link = delivery->link;
   // XXX: need to prevent duplicate disposition sending
   pn_session_state_t *ssn_state = pn_session_get_state(transport, 
link->session);
-  if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled) {
+  if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled && 
delivery->context) {
     int err = pn_post_disp(transport, delivery);
     if (err) return err;
   }
@@ -1659,8 +1695,12 @@ int pn_process_link_teardown(pn_transpor
     pn_session_t *session = link->session;
     pn_session_state_t *ssn_state = pn_session_get_state(transport, session);
     pn_link_state_t *state = pn_link_get_state(ssn_state, link);
-    if (endpoint->state & PN_LOCAL_CLOSED && (int32_t) state->local_handle >= 
0) {
-      if (pn_is_sender(link) && pn_queued(link)) return 0;
+    if (endpoint->state & PN_LOCAL_CLOSED && (int32_t) state->local_handle >= 
0 &&
+        (int16_t) ssn_state->local_channel >= 0 && !transport->close_sent) {
+      if (pn_is_sender(link) && pn_queued(link) &&
+          (int32_t) state->remote_handle != -2 &&
+          (int16_t) ssn_state->remote_channel != -2 &&
+          !transport->close_rcvd) return 0;
       int err = pn_post_frame(transport->disp, ssn_state->local_channel, 
"DL[Io]", DETACH,
                               state->local_handle, true);
       if (err) return err;
@@ -1673,15 +1713,42 @@ int pn_process_link_teardown(pn_transpor
   return 0;
 }
 
+bool pn_pointful_buffering(pn_transport_t *transport, pn_session_t *session)
+{
+  if (!transport->open_rcvd) return true;
+  if (transport->close_rcvd) return false;
+
+  pn_connection_t *conn = transport->connection;
+  pn_link_t *link = pn_link_head(conn, 0);
+  while (link) {
+    if (pn_is_sender(link) && pn_queued(link) > 0) {
+      pn_session_t *ssn = link->session;
+      if (session && session == ssn) {
+        pn_session_state_t *ssn_state = pn_session_get_state(transport, 
session);
+        pn_link_state_t *state = pn_link_get_state(ssn_state, link);
+
+        if ((int32_t) state->remote_handle != -2 &&
+            (int16_t) ssn_state->remote_channel != -2) {
+          return true;
+        }
+      }
+    }
+    link = pn_link_next(link, 0);
+  }
+
+  return false;
+}
+
 int pn_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint)
 {
   if (endpoint->type == SESSION)
   {
     pn_session_t *session = (pn_session_t *) endpoint;
     pn_session_state_t *state = pn_session_get_state(transport, session);
-    if (endpoint->state & PN_LOCAL_CLOSED && (int16_t) state->local_channel >= 
0)
+    if (endpoint->state & PN_LOCAL_CLOSED && (int16_t) state->local_channel >= 0
+        && !transport->close_sent)
     {
-      if (transport->connection->tpwork_head) return 0;
+      if (pn_pointful_buffering(transport, session)) return 0;
       int err = pn_post_frame(transport->disp, state->local_channel, "DL[]", 
END);
       if (err) return err;
       state->local_channel = -2;
@@ -1697,7 +1764,7 @@ int pn_process_conn_teardown(pn_transpor
   if (endpoint->type == CONNECTION)
   {
     if (endpoint->state & PN_LOCAL_CLOSED && !transport->close_sent) {
-      if (transport->connection->tpwork_head) return 0;
+      if (pn_pointful_buffering(transport, NULL)) return 0;
       int err = pn_post_close(transport);
       if (err) return err;
       transport->close_sent = true;
@@ -1714,9 +1781,10 @@ int pn_phase(pn_transport_t *transport, 
   pn_endpoint_t *endpoint = conn->transport_head;
   while (endpoint)
   {
+    pn_endpoint_t *next = endpoint->transport_next;
     int err = phase(transport, endpoint);
     if (err) return err;
-    endpoint = endpoint->transport_next;
+    endpoint = next;
   }
   return 0;
 }
@@ -1749,6 +1817,8 @@ int pn_process(pn_transport_t *transport
 
 ssize_t pn_output(pn_transport_t *transport, char *bytes, size_t size)
 {
+  if (!transport) return PN_ARG_ERR;
+
   if (!transport->error)
     transport->error = pn_process(transport);
 

Modified: qpid/proton/trunk/tests/proton_tests/engine.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1349489&r1=1349488&r2=1349489&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/proton_tests/engine.py Tue Jun 12 19:29:58 2012
@@ -17,13 +17,15 @@
 # under the License.
 #
 
-import os, common
+import os, common, xproton
 from xproton import *
 
 # future test areas
 #  + different permutations of setup
 #   - creating deliveries and calling input/output before opening the 
session/link
 #  + shrinking output_size down to something small? should the enginge buffer?
+#  + resuming
+#    - locally and remotely created deliveries with the same tag
 
 OUTPUT_SIZE = 10*1024
 
@@ -222,6 +224,15 @@ class SessionTest(Test):
     assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
     assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
 
+  def test_closing_connection(self):
+    pn_session_open(self.ssn)
+    self.pump()
+    pn_connection_close(self.c1)
+    self.pump()
+    pn_session_close(self.ssn)
+    self.pump()
+
+
 class LinkTest(Test):
 
   def setup(self):
@@ -300,6 +311,49 @@ class LinkTest(Test):
     assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
     assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
 
+  def test_multiple(self):
+    rcv = pn_receiver(pn_get_session(self.snd), "second-rcv")
+    pn_link_open(self.snd)
+    pn_link_open(rcv)
+    self.pump()
+    c2 = pn_get_connection(pn_get_session(self.rcv))
+    l = pn_link_head(c2, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
+    while l:
+      pn_link_open(l)
+      l = pn_link_next(l, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
+    self.pump()
+
+    assert self.snd
+    assert rcv
+    pn_link_close(self.snd)
+    pn_link_close(rcv)
+    ssn = pn_get_session(rcv)
+    conn = pn_get_connection(ssn)
+    pn_session_close(ssn)
+    pn_connection_close(conn)
+    self.pump()
+
+  def test_closing_session(self):
+    pn_link_open(self.snd)
+    pn_link_open(self.rcv)
+    ssn1 = pn_get_session(self.snd)
+    self.pump()
+    pn_session_close(ssn1)
+    self.pump()
+    pn_link_close(self.snd)
+    self.pump()
+
+  def test_closing_connection(self):
+    pn_link_open(self.snd)
+    pn_link_open(self.rcv)
+    ssn1 = pn_get_session(self.snd)
+    c1 = pn_get_connection(ssn1)
+    self.pump()
+    pn_connection_close(c1)
+    self.pump()
+    pn_link_close(self.snd)
+    self.pump()
+
 class TransferTest(Test):
 
   def setup(self):
@@ -512,6 +566,61 @@ class CreditTest(Test):
 
     assert pn_queued(self.rcv) == 0, pn_queued(self.rcv)
 
+  def _testBufferingOnClose(self, a, b):
+    for i in range(10):
+      d = pn_delivery(self.snd, "tag-%s" % i)
+      assert d
+      pn_settle(d)
+    self.pump()
+    assert pn_queued(self.snd) == 10
+
+    endpoints = {"connection": (self.c1, self.c2),
+                 "session": (pn_get_session(self.snd), 
pn_get_session(self.rcv)),
+                 "link": (self.snd, self.rcv)}
+
+    local_a, remote_a = endpoints[a]
+    local_b, remote_b = endpoints[b]
+
+    a_close = getattr(xproton, "pn_%s_close" % a)
+    a_state = getattr(xproton, "pn_%s_state" % a)
+    b_close = getattr(xproton, "pn_%s_close" % b)
+    b_state = getattr(xproton, "pn_%s_state" % b)
+
+    b_close(remote_b)
+    self.pump()
+    assert b_state(local_b) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
+    a_close(local_a)
+    self.pump()
+    assert a_state(remote_a) & PN_REMOTE_CLOSED
+    assert pn_queued(self.snd) == 10
+
+  def testBufferingOnCloseLinkLink(self):
+    self._testBufferingOnClose("link", "link")
+
+  def testBufferingOnCloseLinkSession(self):
+    self._testBufferingOnClose("link", "session")
+
+  def testBufferingOnCloseLinkConnection(self):
+    self._testBufferingOnClose("link", "connection")
+
+  def testBufferingOnCloseSessionLink(self):
+    self._testBufferingOnClose("session", "link")
+
+  def testBufferingOnCloseSessionSession(self):
+    self._testBufferingOnClose("session", "session")
+
+  def testBufferingOnCloseSessionConnection(self):
+    self._testBufferingOnClose("session", "connection")
+
+  def testBufferingOnCloseConnectionLink(self):
+    self._testBufferingOnClose("connection", "link")
+
+  def testBufferingOnCloseConnectionSession(self):
+    self._testBufferingOnClose("connection", "session")
+
+  def testBufferingOnCloseConnectionConnection(self):
+    self._testBufferingOnClose("connection", "connection")
+
   def testCreditWithBuffering(self):
     pn_flow(self.rcv, PN_SESSION_WINDOW + 10)
     self.pump()



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to