Author: rhs
Date: Mon Oct 29 17:15:41 2012
New Revision: 1403430
URL: http://svn.apache.org/viewvc?rev=1403430&view=rev
Log:
fixed multi-frame transfers
Modified:
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/messenger.c
qpid/proton/trunk/tests/proton_tests/engine.py
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1403430&r1=1403429&r2=1403430&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Mon Oct 29 17:15:41 2012
@@ -363,6 +363,7 @@ pn_disposition_t pn_delivery_local_state
pn_disposition_t pn_delivery_remote_state(pn_delivery_t *delivery);
bool pn_delivery_settled(pn_delivery_t *delivery);
size_t pn_delivery_pending(pn_delivery_t *delivery);
+bool pn_delivery_partial(pn_delivery_t *delivery);
bool pn_delivery_writable(pn_delivery_t *delivery);
bool pn_delivery_readable(pn_delivery_t *delivery);
bool pn_delivery_updated(pn_delivery_t *delivery);
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1403430&r1=1403429&r2=1403430&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Mon Oct 29 17:15:41
2012
@@ -313,8 +313,8 @@ int pn_post_transfer_frame(pn_dispatcher
// check if we need to break up the outbound frame
size_t available = disp->output_size;
if (disp->remote_max_frame) {
- if ((available + buf.size) > disp->remote_max_frame) {
- available = disp->remote_max_frame - buf.size;
+ if ((available + buf.size) > disp->remote_max_frame - 8) {
+ available = disp->remote_max_frame - 8 - buf.size;
if (more_flag == false) {
more_flag = true;
goto compute_performatives; // deal with flag change
@@ -331,7 +331,11 @@ int pn_post_transfer_frame(pn_dispatcher
pn_buffer_ensure( disp->frame, available + buf.size );
goto encode_performatives;
}
+
+ pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload,
disp->output_size);
+
memmove( buf.start + buf.size, disp->output_payload, available);
+ disp->output_payload += available;
disp->output_size -= available;
buf.size += available;
@@ -340,8 +344,6 @@ int pn_post_transfer_frame(pn_dispatcher
frame.payload = buf.start;
frame.size = buf.size;
- pn_do_trace(disp, ch, OUT, disp->output_args, frame.payload, frame.size);
-
size_t n;
while (!(n = pn_write_frame(disp->output + disp->available,
disp->capacity - disp->available, frame))) {
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1403430&r1=1403429&r2=1403430&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Mon Oct 29 17:15:41 2012
@@ -2509,3 +2509,8 @@ size_t pn_delivery_pending(pn_delivery_t
{
return pn_buffer_size(delivery->bytes);
}
+
+bool pn_delivery_partial(pn_delivery_t *delivery)
+{
+ return !delivery->done;
+}
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1403430&r1=1403429&r2=1403430&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Mon Oct 29 17:15:41 2012
@@ -639,7 +639,7 @@ bool pn_messenger_rcvd(pn_messenger_t *m
pn_delivery_t *d = pn_work_head(conn);
while (d) {
- if (pn_delivery_readable(d)) {
+ if (pn_delivery_readable(d) && !pn_delivery_partial(d)) {
return true;
}
d = pn_work_next(d);
Modified: qpid/proton/trunk/tests/proton_tests/engine.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/engine.py?rev=1403430&r1=1403429&r2=1403430&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/engine.py (original)
+++ qpid/proton/trunk/tests/proton_tests/engine.py Mon Oct 29 17:15:41 2012
@@ -572,6 +572,12 @@ class MaxFrameTransferTest(Test):
def teardown(self):
self.cleanup()
+ def message(self, size):
+ parts = []
+ for i in range(size):
+ parts.append(str(i))
+ return "/".join(parts)[:size]
+
def testMinFrame(self):
"""
Configure receiver to support minimum max-frame as defined by AMQP-1.0.
@@ -588,7 +594,7 @@ class MaxFrameTransferTest(Test):
self.rcv.flow(1)
self.snd.delivery("tag")
- msg = "X" * 513
+ msg = self.message(513)
n = self.snd.send(msg)
assert n == len(msg)
assert self.snd.advance()
@@ -633,7 +639,7 @@ class MaxFrameTransferTest(Test):
self.rcv.advance()
self.snd.delivery("gat")
- msg = "Y" * 1426
+ msg = self.message(1426)
n = self.snd.send(msg)
assert n == len(msg)
assert self.snd.advance()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]