Author: rhs
Date: Wed Apr 11 22:39:33 2012
New Revision: 1325050

URL: http://svn.apache.org/viewvc?rev=1325050&view=rev
Log:
added tests and fixed various bugs found along the way

Added:
    qpid/proton/trunk/proton-c/tests/
    qpid/proton/trunk/proton-c/tests/__init__.py
    qpid/proton/trunk/proton-c/tests/engine.py
Modified:
    qpid/proton/trunk/proton-c/cproton.i
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c

Modified: qpid/proton/trunk/proton-c/cproton.i
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/cproton.i?rev=1325050&r1=1325049&r2=1325050&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/cproton.i (original)
+++ qpid/proton/trunk/proton-c/cproton.i Wed Apr 11 22:39:33 2012
@@ -25,11 +25,10 @@ ssize_t pn_send(pn_link_t *transport, ch
     ssize_t sz = pn_recv(link, OUTPUT, *OUTPUT_SIZE);
     if (sz >= 0) {
       *OUTPUT_SIZE = sz;
-      return 0;
     } else {
       *OUTPUT_SIZE = 0;
-      return sz;
     }
+    return sz;
   }
 %}
 %ignore pn_recv;
@@ -40,11 +39,10 @@ ssize_t pn_send(pn_link_t *transport, ch
     ssize_t sz = pn_output(transport, OUTPUT, *OUTPUT_SIZE);
     if (sz >= 0) {
       *OUTPUT_SIZE = sz;
-      return 0;
     } else {
       *OUTPUT_SIZE = 0;
-      return sz;
     }
+    return sz;
   }
 %}
 %ignore pn_output;
@@ -74,11 +72,10 @@ ssize_t pn_send(pn_link_t *transport, ch
     ssize_t sz = pn_message_data(OUTPUT, *OUTPUT_SIZE, STRING, LENGTH);
     if (sz >= 0) {
       *OUTPUT_SIZE = sz;
-      return 0;
     } else {
       *OUTPUT_SIZE = 0;
-      return sz;
     }
+    return sz;
   }
 %}
 %ignore pn_message_data;

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1325050&r1=1325049&r2=1325050&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Wed Apr 11 22:39:33 
2012
@@ -169,6 +169,7 @@ struct pn_delivery_t {
   char *bytes;
   size_t size;
   size_t capacity;
+  bool done;
   void *context;
 };
 

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1325050&r1=1325049&r2=1325050&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Wed Apr 11 22:39:33 2012
@@ -848,6 +848,7 @@ pn_delivery_t *pn_delivery(pn_link_t *li
   delivery->bytes = NULL;
   delivery->size = 0;
   delivery->capacity = 0;
+  delivery->done = false;
   delivery->context = NULL;
 
   if (!link->current)
@@ -888,7 +889,11 @@ void pn_delivery_dump(pn_delivery_t *d)
 
 pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
 {
-  return pn_dtag(pn_binary_bytes(delivery->tag), 
pn_binary_size(delivery->tag));
+  if (delivery) {
+    return pn_dtag(pn_binary_bytes(delivery->tag), 
pn_binary_size(delivery->tag));
+  } else {
+    return (pn_delivery_tag_t) {0};
+  }
 }
 
 pn_delivery_t *pn_current(pn_link_t *link)
@@ -915,6 +920,7 @@ bool pn_advance(pn_link_t *link)
 {
   if (link && link->current) {
     pn_delivery_t *prev = link->current;
+    prev->done = true;
     if (link->endpoint.type == SENDER) {
       pn_advance_sender(link);
     } else {
@@ -992,12 +998,15 @@ void pn_do_begin(pn_dispatcher_t *disp)
   PN_SET_REMOTE(state->session->endpoint.state, PN_REMOTE_ACTIVE);
 }
 
-pn_link_state_t *pn_find_link(pn_session_state_t *ssn_state, pn_string_t *name)
+pn_link_state_t *pn_find_link(pn_session_state_t *ssn_state, pn_string_t 
*name, bool is_sender)
 {
+  pn_endpoint_type_t type = is_sender ? SENDER : RECEIVER;
+
   for (int i = 0; i < ssn_state->session->link_count; i++)
   {
     pn_link_t *link = ssn_state->session->links[i];
-    if (!wcsncmp(pn_string_wcs(name), link->name, pn_string_size(name)))
+    if (link->endpoint.type == type &&
+        !wcsncmp(pn_string_wcs(name), link->name, pn_string_size(name)))
     {
       return pn_link_get_state(ssn_state, link);
     }
@@ -1013,7 +1022,7 @@ void pn_do_attach(pn_dispatcher_t *disp)
   bool is_sender = pn_to_bool(pn_list_get(args, ATTACH_ROLE));
   pn_string_t *name = pn_to_string(pn_list_get(args, ATTACH_NAME));
   pn_session_state_t *ssn_state = pn_channel_state(transport, disp->channel);
-  pn_link_state_t *link_state = pn_find_link(ssn_state, name);
+  pn_link_state_t *link_state = pn_find_link(ssn_state, name, is_sender);
   if (!link_state) {
     pn_link_t *link;
     if (is_sender) {
@@ -1053,19 +1062,25 @@ void pn_do_transfer(pn_dispatcher_t *dis
   uint32_t handle = pn_to_uint32(pn_list_get(args, TRANSFER_HANDLE));
   pn_link_state_t *link_state = pn_handle_state(ssn_state, handle);
   pn_link_t *link = link_state->link;
-  pn_binary_t *tag = pn_to_binary(pn_list_get(args, TRANSFER_DELIVERY_TAG));
-  pn_delivery_t *delivery = pn_delivery(link, pn_dtag(pn_binary_bytes(tag), 
pn_binary_size(tag)));
-  pn_delivery_state_t *state = pn_delivery_buffer_push(&ssn_state->incoming, 
delivery);
-  delivery->context = state;
-  // XXX: need to check that state is not null (i.e. we haven't hit the limit)
-  pn_sequence_t id = pn_to_int32(pn_list_get(args, TRANSFER_DELIVERY_ID));
-  if (id != state->id) {
-    // XXX: signal error somehow
+  pn_delivery_t *delivery;
+  if (link->tail && !link->tail->done) {
+    delivery = link->tail;
+  } else {
+    pn_binary_t *tag = pn_to_binary(pn_list_get(args, TRANSFER_DELIVERY_TAG));
+    delivery = pn_delivery(link, pn_dtag(pn_binary_bytes(tag), 
pn_binary_size(tag)));
+    pn_delivery_state_t *state = pn_delivery_buffer_push(&ssn_state->incoming, 
delivery);
+    delivery->context = state;
+    // XXX: need to check that state is not null (i.e. we haven't hit the 
limit)
+    pn_sequence_t id = pn_to_int32(pn_list_get(args, TRANSFER_DELIVERY_ID));
+    if (id != state->id) {
+      // XXX: signal error somehow
+    }
   }
 
-  PN_ENSURE(delivery->bytes, delivery->capacity, disp->size);
-  memmove(delivery->bytes, disp->payload, disp->size);
-  delivery->size = disp->size;
+  PN_ENSURE(delivery->bytes, delivery->capacity, delivery->size + disp->size);
+  memmove(delivery->bytes + delivery->size, disp->payload, disp->size);
+  delivery->size += disp->size;
+  delivery->done = !pn_to_bool(pn_list_get(args, TRANSFER_MORE));
 }
 
 void pn_do_flow(pn_dispatcher_t *disp)
@@ -1223,7 +1238,7 @@ void pn_process_conn_setup(pn_transport_
 
 void pn_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
 {
-  if (endpoint->type == SESSION)
+  if (endpoint->type == SESSION && transport->open_sent)
   {
     pn_session_t *ssn = (pn_session_t *) endpoint;
     pn_session_state_t *state = pn_session_get_state(transport, ssn);
@@ -1246,12 +1261,14 @@ void pn_process_ssn_setup(pn_transport_t
 
 void pn_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint)
 {
-  if (endpoint->type == SENDER || endpoint->type == RECEIVER)
+  if (transport->open_sent && (endpoint->type == SENDER ||
+                               endpoint->type == RECEIVER))
   {
     pn_link_t *link = (pn_link_t *) endpoint;
     pn_session_state_t *ssn_state = pn_session_get_state(transport, 
link->session);
     pn_link_state_t *state = pn_link_get_state(ssn_state, link);
-    if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_handle == 
(uint32_t) -1)
+    if (((int16_t) ssn_state->local_channel >= 0) &&
+        !(endpoint->state & PN_LOCAL_UNINIT) && state->local_handle == 
(uint32_t) -1)
     {
       pn_init_frame(transport->disp);
       pn_field(transport->disp, ATTACH_ROLE, pn_boolean(endpoint->type == 
RECEIVER));
@@ -1308,6 +1325,7 @@ void pn_post_disp(pn_transport_t *transp
   pn_field(transport->disp, DISPOSITION_LAST, pn_uint(state->id));
   // XXX
   pn_field(transport->disp, DISPOSITION_SETTLED, 
pn_boolean(delivery->local_settled));
+  //pn_field(transport->disp, DISPOSITION_BATCHABLE, pn_boolean(batchable));
   uint64_t code;
   switch(delivery->local_state) {
   case PN_ACCEPTED:
@@ -1320,10 +1338,10 @@ void pn_post_disp(pn_transport_t *transp
   default:
     code = 0;
   }
-  if (code)
+  if (code) {
     pn_field(transport->disp, DISPOSITION_STATE, pn_value("L([])", code));
-  //pn_field(transport->disp, DISPOSITION_BATCHABLE, pn_boolean(batchable));
-  pn_post_frame(transport->disp, ssn_state->local_channel, DISPOSITION);
+    pn_post_frame(transport->disp, ssn_state->local_channel, DISPOSITION);
+  }
 }
 
 void pn_process_disp_receiver(pn_transport_t *transport, pn_endpoint_t 
*endpoint)
@@ -1368,18 +1386,22 @@ void pn_process_msg_data(pn_transport_t 
           state = pn_delivery_buffer_push(&ssn_state->outgoing, delivery);
           delivery->context = state;
         }
-        if (!state->sent && (int16_t) ssn_state->local_channel >= 0 && 
(int32_t) link_state->local_handle >= 0) {
+        if (!state->sent &&
+            (delivery->done || delivery->size > 0) &&
+            (int16_t) ssn_state->local_channel >= 0 && (int32_t) 
link_state->local_handle >= 0) {
           pn_init_frame(transport->disp);
           pn_field(transport->disp, TRANSFER_HANDLE, pn_value("I", 
link_state->local_handle));
           pn_field(transport->disp, TRANSFER_DELIVERY_ID, pn_value("I", 
state->id));
           pn_field(transport->disp, TRANSFER_DELIVERY_TAG, 
pn_from_binary(pn_binary_dup(delivery->tag)));
           pn_field(transport->disp, TRANSFER_MESSAGE_FORMAT, pn_value("I", 0));
+          pn_field(transport->disp, TRANSFER_MORE, 
pn_boolean(!delivery->done));
           if (delivery->bytes) {
             pn_append_payload(transport->disp, delivery->bytes, 
delivery->size);
             delivery->size = 0;
           }
           pn_post_frame(transport->disp, ssn_state->local_channel, TRANSFER);
-          state->sent = true;
+          if (delivery->done)
+            state->sent = true;
         }
       }
       delivery = delivery->tpwork_next;
@@ -1399,9 +1421,9 @@ void pn_process_disp_sender(pn_transport
       if (link->endpoint.type == SENDER) {
         // XXX: need to prevent duplicate disposition sending
         pn_session_state_t *ssn_state = pn_session_get_state(transport, 
link->session);
-        /*if ((int16_t) ssn_state->local_channel >= 0) {
+        if ((int16_t) ssn_state->local_channel >= 0) {
           pn_post_disp(transport, delivery);
-          }*/
+        }
 
         if (delivery->local_settled) {
           pn_full_settle(&ssn_state->outgoing, delivery);
@@ -1533,17 +1555,18 @@ void pn_trace(pn_transport_t *transport,
 ssize_t pn_send(pn_link_t *sender, const char *bytes, size_t n)
 {
   pn_delivery_t *current = pn_current(sender);
-  if (!current) return -1;
-  if (current->bytes) return 0;
+  if (!current) return PN_EOS;
   PN_ENSURE(current->bytes, current->capacity, current->size + n);
   memmove(current->bytes + current->size, bytes, n);
-  current->size = +n;
+  current->size += n;
   pn_add_tpwork(current);
   return n;
 }
 
 ssize_t pn_recv(pn_link_t *receiver, char *bytes, size_t n)
 {
+  if (!receiver) return PN_ARG_ERR;
+
   pn_delivery_t *delivery = receiver->current;
   if (delivery) {
     if (delivery->size) {
@@ -1553,11 +1576,10 @@ ssize_t pn_recv(pn_link_t *receiver, cha
       delivery->size -= size;
       return size;
     } else {
-      return PN_EOS;
+      return delivery->done ? PN_EOS : 0;
     }
   } else {
-    // XXX: ?
-    return PN_EOS;
+    return PN_STATE_ERR;
   }
 }
 
@@ -1615,8 +1637,12 @@ bool pn_writable(pn_delivery_t *delivery
 
 bool pn_readable(pn_delivery_t *delivery)
 {
-  pn_link_t *link = delivery->link;
-  return pn_is_receiver(link) && pn_is_current(delivery);
+  if (delivery) {
+    pn_link_t *link = delivery->link;
+    return pn_is_receiver(link) && pn_is_current(delivery);
+  } else {
+    return false;
+  }
 }
 
 size_t pn_pending(pn_delivery_t *delivery)

Added: qpid/proton/trunk/proton-c/tests/__init__.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/tests/__init__.py?rev=1325050&view=auto
==============================================================================
--- qpid/proton/trunk/proton-c/tests/__init__.py (added)
+++ qpid/proton/trunk/proton-c/tests/__init__.py Wed Apr 11 22:39:33 2012
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import tests.engine

Added: qpid/proton/trunk/proton-c/tests/engine.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/tests/engine.py?rev=1325050&view=auto
==============================================================================
--- qpid/proton/trunk/proton-c/tests/engine.py (added)
+++ qpid/proton/trunk/proton-c/tests/engine.py Wed Apr 11 22:39:33 2012
@@ -0,0 +1,384 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+from cproton import *
+
+def pump(t1, t2):
+  while True:
+    cd, out1 = pn_output(t1, 1024)
+    assert cd >= 0
+    cd, out2 = pn_output(t2, 1024)
+    assert cd >= 0
+
+    if out1 or out2:
+      if out1:
+        cd = pn_input(t2, out1, len(out1))
+        assert cd == len(out1)
+      if out2:
+        cd = pn_input(t1, out2, len(out2))
+        assert cd == len(out2)
+    else:
+      return
+
+class Test:
+
+  def __init__(self, name):
+    self.name = name
+
+  def setup(self):
+    self.c1 = pn_connection()
+    self.c2 = pn_connection()
+    self.t1 = pn_transport(self.c1)
+    self.t2 = pn_transport(self.c2)
+    pn_transport_open(self.t1)
+    pn_transport_open(self.t2)
+    trc = os.environ["PN_TRACE_FRM"]
+    if trc and trc.lower() in ("1", "2", "yes", "true"):
+      pn_trace(self.t1, PN_TRACE_FRM)
+    if trc == "2":
+      pn_trace(self.t2, PN_TRACE_FRM)
+
+  def teardown(self):
+    pn_connection_destroy(self.c1)
+    pn_connection_destroy(self.c2)
+
+  def pump(self):
+    pump(self.t1, self.t2)
+
+class ConnectionTest(Test):
+
+  def test_open_close(self):
+    assert pn_connection_state(self.c1) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+    assert pn_connection_state(self.c2) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+
+    pn_connection_open(self.c1)
+    self.pump()
+
+    assert pn_connection_state(self.c1) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+    assert pn_connection_state(self.c2) == PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE
+
+    pn_connection_open(self.c2)
+    self.pump()
+
+    assert pn_connection_state(self.c1) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+    assert pn_connection_state(self.c2) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+    pn_connection_close(self.c1)
+    self.pump()
+
+    assert pn_connection_state(self.c1) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+    assert pn_connection_state(self.c2) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
+
+    pn_connection_close(self.c2)
+    self.pump()
+
+    assert pn_connection_state(self.c1) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+    assert pn_connection_state(self.c2) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+  def test_simultaneous_open_close(self):
+    assert pn_connection_state(self.c1) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+    assert pn_connection_state(self.c2) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+
+    pn_connection_open(self.c1)
+    pn_connection_open(self.c2)
+
+    self.pump()
+
+    assert pn_connection_state(self.c1) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+    assert pn_connection_state(self.c2) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+    pn_connection_close(self.c1)
+    pn_connection_close(self.c2)
+
+    self.pump()
+
+    assert pn_connection_state(self.c1) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+    assert pn_connection_state(self.c2) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+class SessionTest(Test):
+
+  def setup(self):
+    Test.setup(self)
+    self.ssn = pn_session(self.c1)
+    pn_connection_open(self.c1)
+    pn_connection_open(self.c2)
+
+  def test_open_close(self):
+    assert pn_session_state(self.ssn) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+
+    pn_session_open(self.ssn)
+
+    assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+
+    self.pump()
+
+    assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+
+    ssn = pn_session_head(self.c2, PN_REMOTE_ACTIVE | PN_LOCAL_UNINIT)
+
+    assert ssn != None
+    assert pn_session_state(ssn) == PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE
+    assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+
+    pn_session_open(ssn)
+
+    assert pn_session_state(ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+    assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+
+    self.pump()
+
+    assert pn_session_state(ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+    assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+    pn_session_close(ssn)
+
+    assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+    assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+    self.pump()
+
+    assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+    assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
+
+    pn_session_close(self.ssn)
+
+    assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+    assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+    self.pump()
+
+    assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+    assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+  def test_simultaneous_close(self):
+    pn_session_open(self.ssn)
+    self.pump()
+    ssn = pn_session_head(self.c2, PN_REMOTE_ACTIVE | PN_LOCAL_UNINIT)
+    assert ssn != None
+    pn_session_open(ssn)
+    self.pump()
+
+    assert pn_session_state(self.ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+    assert pn_session_state(ssn) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+    pn_session_close(self.ssn)
+    pn_session_close(ssn)
+
+    assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+    assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+
+    self.pump()
+
+    assert pn_session_state(self.ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+    assert pn_session_state(ssn) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+class LinkTest(Test):
+
+  def setup(self):
+    Test.setup(self)
+    pn_connection_open(self.c1)
+    pn_connection_open(self.c2)
+    self.ssn1 = pn_session(self.c1)
+    pn_session_open(self.ssn1)
+    self.pump()
+    self.ssn2 = pn_session_head(self.c2, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
+    pn_session_open(self.ssn2)
+    self.pump()
+    self.snd = pn_sender(self.ssn1, "test-link")
+    self.rcv = pn_receiver(self.ssn2, "test-link")
+
+  def test_open_close(self):
+    assert pn_link_state(self.snd) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+    assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+
+    pn_link_open(self.snd)
+
+    assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+    assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+
+    self.pump()
+
+    assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+    assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE
+
+    pn_link_open(self.rcv)
+
+    assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+    assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+    self.pump()
+
+    assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+    assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+    pn_link_close(self.snd)
+
+    assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+    assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+    self.pump()
+
+    assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+    assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED
+
+    pn_link_close(self.rcv)
+
+    assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+    assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+    self.pump()
+
+    assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+    assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+  def test_simultaneous_open_close(self):
+    assert pn_link_state(self.snd) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+    assert pn_link_state(self.rcv) == PN_LOCAL_UNINIT | PN_REMOTE_UNINIT
+
+    pn_link_open(self.snd)
+    pn_link_open(self.rcv)
+
+    assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+    assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_UNINIT
+
+    self.pump()
+
+    assert pn_link_state(self.snd) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+    assert pn_link_state(self.rcv) == PN_LOCAL_ACTIVE | PN_REMOTE_ACTIVE
+
+    pn_link_close(self.snd)
+    pn_link_close(self.rcv)
+
+    assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+    assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_ACTIVE
+
+    self.pump()
+
+    assert pn_link_state(self.snd) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+    assert pn_link_state(self.rcv) == PN_LOCAL_CLOSED | PN_REMOTE_CLOSED
+
+class TransferTest(Test):
+
+  def setup(self):
+    Test.setup(self)
+    self.ssn1 = pn_session(self.c1)
+    pn_connection_open(self.c1)
+    pn_connection_open(self.c2)
+    pn_session_open(self.ssn1)
+    self.pump()
+    self.ssn2 = pn_session_head(self.c2, PN_LOCAL_UNINIT | PN_REMOTE_ACTIVE)
+    assert self.ssn2 != None
+    pn_session_open(self.ssn2)
+
+    self.snd = pn_sender(self.ssn1, "test-link")
+    self.rcv = pn_receiver(self.ssn2, "test-link")
+    pn_link_open(self.snd)
+    pn_link_open(self.rcv)
+    self.pump()
+
+  def test_work_queue(self):
+    assert pn_work_head(self.c1) is None
+    pn_delivery(self.snd, "tag")
+    assert pn_work_head(self.c1) is None
+    pn_flow(self.rcv, 1)
+    self.pump()
+    d = pn_work_head(self.c1)
+    assert d is not None
+    assert pn_delivery_tag(d) == "tag"
+    assert pn_writable(d)
+
+    n = pn_send(self.snd, "this is a test")
+    assert pn_advance(self.snd)
+    assert pn_work_head(self.c1) is None
+
+    self.pump()
+
+    d = pn_work_head(self.c2)
+    assert pn_delivery_tag(d) == "tag"
+    assert pn_readable(d)
+
+  def test_multiframe(self):
+    pn_flow(self.rcv, 1)
+    pn_delivery(self.snd, "tag")
+    msg = "this is a test"
+    n = pn_send(self.snd, msg)
+    assert n == len(msg)
+
+    self.pump()
+
+    d = pn_current(self.rcv)
+    assert pn_delivery_tag(d) == "tag"
+    assert pn_readable(d)
+
+    cd, bytes = pn_recv(self.rcv, 1024)
+    assert bytes == msg
+    assert cd == len(bytes)
+
+    cd, bytes = pn_recv(self.rcv, 1024)
+    assert cd == 0
+    assert bytes == ""
+
+    msg = "this is more"
+    n = pn_send(self.snd, msg)
+    assert n == len(msg)
+    assert pn_advance(self.snd)
+
+    self.pump()
+
+    cd, bytes = pn_recv(self.rcv, 1024)
+    assert cd == len(bytes)
+    assert bytes == msg
+
+    cd, bytes = pn_recv(self.rcv, 1024)
+    assert cd == PN_EOS
+    assert bytes == ""
+
+  def test_disposition(self):
+    pn_flow(self.rcv, 1)
+
+    self.pump()
+
+    sd = pn_delivery(self.snd, "tag")
+    msg = "this is a test"
+    n = pn_send(self.snd, msg)
+    assert n == len(msg)
+    assert pn_advance(self.snd)
+
+    self.pump()
+
+    rd = pn_current(self.rcv)
+    assert rd is not None
+    assert pn_delivery_tag(rd) == pn_delivery_tag(sd)
+    cd, rmsg = pn_recv(self.rcv, 1024)
+    assert cd == len(rmsg)
+    assert rmsg == msg
+    pn_disposition(rd, PN_ACCEPTED)
+
+    self.pump()
+
+    assert pn_remote_disp(sd) == pn_local_disp(rd) == PN_ACCEPTED
+    assert pn_dirty(sd)
+
+    pn_disposition(sd, PN_ACCEPTED)
+    pn_settle(sd)
+
+    self.pump()
+
+    assert pn_local_disp(sd) == pn_remote_disp(rd) == PN_ACCEPTED



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to