Author: rhs
Date: Mon Oct 29 17:15:41 2012
New Revision: 1403430

URL: http://svn.apache.org/viewvc?rev=1403430&view=rev
Log:
fixed multi-frame transfers

Modified:
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
    qpid/proton/trunk/proton-c/src/engine/engine.c
    qpid/proton/trunk/proton-c/src/messenger.c
    qpid/proton/trunk/tests/proton_tests/engine.py

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1403430&r1=1403429&r2=1403430&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Mon Oct 29 17:15:41 2012
@@ -363,6 +363,7 @@ pn_disposition_t pn_delivery_local_state
 pn_disposition_t pn_delivery_remote_state(pn_delivery_t *delivery);
 bool pn_delivery_settled(pn_delivery_t *delivery);
 size_t pn_delivery_pending(pn_delivery_t *delivery);
+bool pn_delivery_partial(pn_delivery_t *delivery);
 bool pn_delivery_writable(pn_delivery_t *delivery);
 bool pn_delivery_readable(pn_delivery_t *delivery);
 bool pn_delivery_updated(pn_delivery_t *delivery);

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1403430&r1=1403429&r2=1403430&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Mon Oct 29 17:15:41 
2012
@@ -313,8 +313,8 @@ int pn_post_transfer_frame(pn_dispatcher
     // check if we need to break up the outbound frame
     size_t available = disp->output_size;
     if (disp->remote_max_frame) {
-      if ((available + buf.size) > disp->remote_max_frame) {
-        available = disp->remote_max_frame - buf.size;
+      if ((available + buf.size) > disp->remote_max_frame - 8) {
+        available = disp->remote_max_frame - 8 - buf.size;
         if (more_flag == false) {
           more_flag = true;
           goto compute_performatives;  // deal with flag change
@@ -331,7 +331,11 @@ int pn_post_transfer_frame(pn_dispatcher
       pn_buffer_ensure( disp->frame, available + buf.size );
       goto encode_performatives;
     }
+
+    pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload, 
disp->output_size);
+
     memmove( buf.start + buf.size, disp->output_payload, available);
+    disp->output_payload += available;
     disp->output_size -= available;
     buf.size += available;
 
@@ -340,8 +344,6 @@ int pn_post_transfer_frame(pn_dispatcher
     frame.payload = buf.start;
     frame.size = buf.size;
 
-    pn_do_trace(disp, ch, OUT, disp->output_args, frame.payload, frame.size);
-
     size_t n;
     while (!(n = pn_write_frame(disp->output + disp->available,
                                 disp->capacity - disp->available, frame))) {

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1403430&r1=1403429&r2=1403430&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Mon Oct 29 17:15:41 2012
@@ -2509,3 +2509,8 @@ size_t pn_delivery_pending(pn_delivery_t
 {
   return pn_buffer_size(delivery->bytes);
 }
+
+bool pn_delivery_partial(pn_delivery_t *delivery)
+{
+  return !delivery->done;
+}

Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1403430&r1=1403429&r2=1403430&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Mon Oct 29 17:15:41 2012
@@ -639,7 +639,7 @@ bool pn_messenger_rcvd(pn_messenger_t *m
 
     pn_delivery_t *d = pn_work_head(conn);
     while (d) {
-      if (pn_delivery_readable(d)) {
+      if (pn_delivery_readable(d) && !pn_delivery_partial(d)) {
         return true;
       }
       d = pn_work_next(d);

Modified: qpid/proton/trunk/tests/proton_tests/engine.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1403430&r1=1403429&r2=1403430&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/proton_tests/engine.py Mon Oct 29 17:15:41 2012
@@ -572,6 +572,12 @@ class MaxFrameTransferTest(Test):
   def teardown(self):
     self.cleanup()
 
+  def message(self, size):
+    parts = []
+    for i in range(size):
+      parts.append(str(i))
+    return "/".join(parts)[:size]
+
   def testMinFrame(self):
     """
     Configure receiver to support minimum max-frame as defined by AMQP-1.0.
@@ -588,7 +594,7 @@ class MaxFrameTransferTest(Test):
 
     self.rcv.flow(1)
     self.snd.delivery("tag")
-    msg = "X" * 513
+    msg = self.message(513)
     n = self.snd.send(msg)
     assert n == len(msg)
     assert self.snd.advance()
@@ -633,7 +639,7 @@ class MaxFrameTransferTest(Test):
     self.rcv.advance()
 
     self.snd.delivery("gat")
-    msg = "Y" * 1426
+    msg = self.message(1426)
     n = self.snd.send(msg)
     assert n == len(msg)
     assert self.snd.advance()



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to