Author: kgiusti
Date: Mon Oct 22 21:50:45 2012
New Revision: 1401095
URL: http://svn.apache.org/viewvc?rev=1401095&view=rev
Log:
PROTON-79: use variable size buffer for output frames.
Modified:
qpid/proton/trunk/proton-c/include/proton/framing.h
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
Modified: qpid/proton/trunk/proton-c/include/proton/framing.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/framing.h?rev=1401095&r1=1401094&r2=1401095&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/framing.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/framing.h Mon Oct 22 21:50:45 2012
@@ -30,6 +30,7 @@ extern "C" {
#endif
#define AMQP_HEADER_SIZE (8)
+#define AMQP_MIN_MAX_FRAME_SIZE ((uint32_t)512) // minimum allowable max-frame
typedef struct {
uint8_t type;
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1401095&r1=1401094&r2=1401095&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Mon Oct 22 21:50:45
2012
@@ -46,6 +46,7 @@ pn_dispatcher_t *pn_dispatcher(uint8_t f
disp->size = 0;
disp->output_args = pn_data(16);
+ disp->frame = pn_buffer( 4*1024 );
// XXX
disp->capacity = 4*1024;
disp->output = malloc(disp->capacity);
@@ -222,51 +223,38 @@ int pn_post_frame(pn_dispatcher_t *disp,
pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload,
disp->output_size);
- size_t size = 1024;
-
- while (true) {
- char buf[size];
- ssize_t wr = pn_data_encode(disp->output_args, buf, size);
- if (wr < 0)
- {
- if (wr == PN_OVERFLOW) {
- size *= 2;
- continue;
- } else {
- fprintf(stderr, "error posting frame: %s", pn_code(wr));
- return PN_ERR;
- }
- } else if (size - wr < disp->output_size) {
- size += wr + disp->output_size - size;
- continue;
- } else {
- if (disp->output_size) {
- memmove(buf + wr, disp->output_payload, disp->output_size);
- wr += disp->output_size;
- disp->output_payload = NULL;
- disp->output_size = 0;
- }
-
- pn_frame_t frame = {disp->frame_type};
- frame.channel = ch;
- frame.payload = buf;
- frame.size = wr;
- size_t n;
- while (!(n = pn_write_frame(disp->output + disp->available,
- disp->capacity - disp->available, frame))) {
- disp->capacity *= 2;
- disp->output = realloc(disp->output, disp->capacity);
- }
- if (disp->trace & PN_TRACE_RAW) {
- fprintf(stderr, "RAW: \"");
- pn_fprint_data(stderr, disp->output + disp->available, n);
- fprintf(stderr, "\"\n");
- }
- disp->available += n;
- break;
+ encode_performatives:
+ pn_buffer_clear( disp->frame );
+ pn_bytes_t buf = pn_buffer_bytes( disp->frame );
+ buf.size = pn_buffer_available( disp->frame );
+
+ ssize_t wr = pn_data_encode( disp->output_args, buf.start, buf.size );
+ if (wr < 0) {
+ if (wr == PN_OVERFLOW) {
+ pn_buffer_ensure( disp->frame, pn_buffer_available( disp->frame ) * 2 );
+ goto encode_performatives;
}
+ fprintf(stderr, "error posting frame: %s", pn_code(wr));
+ return PN_ERR;
}
+ pn_frame_t frame = {disp->frame_type};
+ frame.channel = ch;
+ frame.payload = buf.start;
+ frame.size = wr;
+ size_t n;
+ while (!(n = pn_write_frame(disp->output + disp->available,
+ disp->capacity - disp->available, frame))) {
+ disp->capacity *= 2;
+ disp->output = realloc(disp->output, disp->capacity);
+ }
+ if (disp->trace & PN_TRACE_RAW) {
+ fprintf(stderr, "RAW: \"");
+ pn_fprint_data(stderr, disp->output + disp->available, n);
+ fprintf(stderr, "\"\n");
+ }
+ disp->available += n;
+
return 0;
}
@@ -289,53 +277,70 @@ int pn_post_transfer_frame(pn_dispatcher
bool settled,
bool more)
{
+ bool more_flag = more;
- if (!disp->remote_max_frame) // "unlimited" frame size accepted
- return pn_post_frame( disp, ch, "DL[IIzIoo]", TRANSFER,
- handle, id, tag->size, tag->start,
- message_format,
- settled, more);
-
- bool more_flag = true; // optimize: check against hard-code size of
performative
- do {
- pn_data_clear(disp->output_args);
- int err = pn_data_fill(disp->output_args, "DL[IIzIoo]", TRANSFER,
- handle, id, tag->size, tag->start,
- message_format,
- settled, more_flag);
- if (err) {
- fprintf(stderr, "error posting transfer frame: %s: %s\n", pn_code(err),
pn_data_error(disp->output_args));
- return PN_ERR;
- }
+ // create preformatives, assuming 'more' flag need not change
+
+ compute_performatives:
+ pn_data_clear(disp->output_args);
+ int err = pn_data_fill(disp->output_args, "DL[IIzIoo]", TRANSFER,
+ handle, id, tag->size, tag->start,
+ message_format,
+ settled, more_flag);
+ if (err) {
+ fprintf(stderr, "error posting transfer frame: %s: %s\n", pn_code(err),
pn_data_error(disp->output_args));
+ return PN_ERR;
+ }
+
+ do { // send as many frames as possible without changing the 'more' flag...
- pn_buffer_clear(disp->frame);
- pn_bytes_t frame_buf = pn_buffer_bytes( disp->frame );
- ssize_t wr = pn_data_encode(disp->output_args, frame_buf.start,
- pn_buffer_available( disp->frame ));
+ encode_performatives:
+ pn_buffer_clear( disp->frame );
+ pn_bytes_t buf = pn_buffer_bytes( disp->frame );
+ buf.size = pn_buffer_available( disp->frame );
+
+ ssize_t wr = pn_data_encode(disp->output_args, buf.start, buf.size);
if (wr < 0) {
+ if (wr == PN_OVERFLOW) {
+ pn_buffer_ensure( disp->frame, pn_buffer_available( disp->frame ) * 2
);
+ goto encode_performatives;
+ }
fprintf(stderr, "error posting frame: %s", pn_code(wr));
return PN_ERR;
}
- frame_buf.size = wr;
+ buf.size = wr;
- if (more == false && (frame_buf.size + disp->output_size) <=
disp->remote_max_frame) {
- more_flag = false;
- continue; // rebuild performative with new value for more flag
+ // check if we need to break up the outbound frame
+ size_t available = disp->output_size;
+ if (disp->remote_max_frame) {
+ if ((available + buf.size) > disp->remote_max_frame) {
+ available = disp->remote_max_frame - buf.size;
+ if (more_flag == false) {
+ more_flag = true;
+ goto compute_performatives; // deal with flag change
+ }
+ } else if (more_flag == true && more == false) {
+ // caller has no more, and this is the last frame
+ more_flag = false;
+ goto compute_performatives;
+ }
}
- size_t available = disp->remote_max_frame - frame_buf.size;
- if (available > disp->output_size) available = disp->output_size;
- memmove(frame_buf.start + frame_buf.size, disp->output_payload, available);
- frame_buf.size += available;
- //disp->output_payload = NULL;
+ if (pn_buffer_available( disp->frame ) < (available + buf.size)) {
+ // not enough room for payload - try again...
+ pn_buffer_ensure( disp->frame, available + buf.size );
+ goto encode_performatives;
+ }
+ memmove( buf.start + buf.size, disp->output_payload, available);
disp->output_size -= available;
-
- pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload,
available);
+ buf.size += available;
pn_frame_t frame = {disp->frame_type};
frame.channel = ch;
- frame.payload = frame_buf.start;
- frame.size = frame_buf.size;
+ frame.payload = buf.start;
+ frame.size = buf.size;
+
+ pn_do_trace(disp, ch, OUT, disp->output_args, frame.payload, frame.size);
size_t n;
while (!(n = pn_write_frame(disp->output + disp->available,
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1401095&r1=1401094&r2=1401095&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Mon Oct 22 21:50:45
2012
@@ -56,7 +56,7 @@ struct pn_dispatcher_t {
void *context;
bool halt;
bool batch;
- char scratch[SCRATCH]; // ? Rafi - can I use this instead of frame
(size for remote-max-frame?)
+ char scratch[SCRATCH];
};
pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, void *context);
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1401095&r1=1401094&r2=1401095&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Mon Oct 22 21:50:45
2012
@@ -89,9 +89,6 @@ typedef struct {
#include <proton/sasl.h>
#include <proton/ssl.h>
-// minimum allowable max-frame
-#define PN_MIN_MAX_FRAME_SIZE ((uint32_t)512)
-
struct pn_transport_t {
ssize_t (*process_input)(pn_transport_t *, const char *, size_t);
ssize_t (*process_output)(pn_transport_t *, char *, size_t);
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1401095&r1=1401094&r2=1401095&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Mon Oct 22 21:50:45 2012
@@ -1330,14 +1330,14 @@ int pn_do_open(pn_dispatcher_t *disp)
transport->remote_desired_capabilities);
if (err) return err;
if (transport->remote_max_frame > 0) {
- if (transport->remote_max_frame < PN_MIN_MAX_FRAME_SIZE) {
+ if (transport->remote_max_frame < AMQP_MIN_MAX_FRAME_SIZE) {
fprintf(stderr, "Peer advertised bad max-frame (%u), forcing to %u\n",
- transport->remote_max_frame, PN_MIN_MAX_FRAME_SIZE);
- transport->remote_max_frame = PN_MIN_MAX_FRAME_SIZE;
+ transport->remote_max_frame, AMQP_MIN_MAX_FRAME_SIZE);
+ transport->remote_max_frame = AMQP_MIN_MAX_FRAME_SIZE;
}
disp->remote_max_frame = transport->remote_max_frame;
- if (disp->frame) pn_buffer_free( disp->frame );
- disp->frame = pn_buffer( disp->remote_max_frame );
+ pn_buffer_clear( disp->frame );
+ pn_buffer_ensure( disp->frame, disp->remote_max_frame );
}
if (container_q) {
transport->remote_container = pn_bytes_strdup(remote_container);
@@ -2375,8 +2375,8 @@ uint32_t pn_transport_get_max_frame(pn_t
void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size)
{
- if (size && size < PN_MIN_MAX_FRAME_SIZE)
- size = PN_MIN_MAX_FRAME_SIZE;
+ if (size && size < AMQP_MIN_MAX_FRAME_SIZE)
+ size = AMQP_MIN_MAX_FRAME_SIZE;
transport->local_max_frame = size;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]