Author: kgiusti
Date: Mon Oct 22 21:50:36 2012
New Revision: 1401094
URL: http://svn.apache.org/viewvc?rev=1401094&view=rev
Log:
PROTON-79: checkpoint proposed transfer message chunking.
Modified:
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1401094&r1=1401093&r2=1401094&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Mon Oct 22 21:50:36 2012
@@ -277,6 +277,10 @@ ssize_t pn_transport_input(pn_transport_
ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t
size);
time_t pn_transport_tick(pn_transport_t *transport, time_t now);
void pn_transport_trace(pn_transport_t *transport, pn_trace_t trace);
+// max frame of zero means "unlimited"
+uint32_t pn_transport_get_max_frame(pn_transport_t *transport);
+void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size);
+uint32_t pn_transport_get_peer_max_frame(pn_transport_t *transport);
void pn_transport_free(pn_transport_t *transport);
// session
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1401094&r1=1401093&r2=1401094&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Mon Oct 22 21:50:36
2012
@@ -24,7 +24,9 @@
#include <string.h>
#include <proton/framing.h>
#include <proton/engine.h>
+#include <proton/buffer.h>
#include "dispatcher.h"
+#include "protocol.h"
#include "../util.h"
pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, void *context)
@@ -277,3 +279,79 @@ ssize_t pn_dispatcher_output(pn_dispatch
// XXX: need to check for errors
return n;
}
+
+
+int pn_post_transfer_frame(pn_dispatcher_t *disp, uint16_t ch,
+ uint32_t handle,
+ pn_sequence_t id,
+ const pn_bytes_t *tag,
+ uint32_t message_format,
+ bool settled,
+ bool more)
+{
+
+ if (!disp->remote_max_frame) // "unlimited" frame size accepted
+ return pn_post_frame( disp, ch, "DL[IIzIoo]", TRANSFER,
+ handle, id, tag->size, tag->start,
+ message_format,
+ settled, more);
+
+ bool more_flag = true; // optimize: check against hard-code size of
performative
+ do {
+ pn_data_clear(disp->output_args);
+ int err = pn_data_fill(disp->output_args, "DL[IIzIoo]", TRANSFER,
+ handle, id, tag->size, tag->start,
+ message_format,
+ settled, more_flag);
+ if (err) {
+ fprintf(stderr, "error posting transfer frame: %s: %s\n", pn_code(err),
pn_data_error(disp->output_args));
+ return PN_ERR;
+ }
+
+ pn_buffer_clear(disp->frame);
+ pn_bytes_t frame_buf = pn_buffer_bytes( disp->frame );
+ ssize_t wr = pn_data_encode(disp->output_args, frame_buf.start,
+ pn_buffer_available( disp->frame ));
+ if (wr < 0) {
+ fprintf(stderr, "error posting frame: %s", pn_code(wr));
+ return PN_ERR;
+ }
+ frame_buf.size = wr;
+
+ if (more == false && (frame_buf.size + disp->output_size) <=
disp->remote_max_frame) {
+ more_flag = false;
+ continue; // rebuild performative with new value for more flag
+ }
+
+ size_t available = disp->remote_max_frame - frame_buf.size;
+ if (available > disp->output_size) available = disp->output_size;
+ memmove(frame_buf.start + frame_buf.size, disp->output_payload, available);
+ frame_buf.size += available;
+ //disp->output_payload = NULL;
+ disp->output_size -= available;
+
+ pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload,
available);
+
+ pn_frame_t frame = {disp->frame_type};
+ frame.channel = ch;
+ frame.payload = frame_buf.start;
+ frame.size = frame_buf.size;
+
+ size_t n;
+ while (!(n = pn_write_frame(disp->output + disp->available,
+ disp->capacity - disp->available, frame))) {
+ disp->capacity *= 2;
+ disp->output = realloc(disp->output, disp->capacity);
+ }
+ if (disp->trace & PN_TRACE_RAW) {
+ fprintf(stderr, "RAW: \"");
+ pn_fprint_data(stderr, disp->output + disp->available, n);
+ fprintf(stderr, "\"\n");
+ }
+ disp->available += n;
+ } while (disp->output_size > 0);
+
+ disp->output_payload = NULL;
+ return 0;
+}
+
Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1401094&r1=1401093&r2=1401094&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Mon Oct 22 21:50:36
2012
@@ -48,13 +48,15 @@ struct pn_dispatcher_t {
pn_data_t *output_args;
const char *output_payload;
size_t output_size;
+ size_t remote_max_frame;
+ pn_buffer_t *frame; // frame under construction
size_t capacity;
size_t available;
char *output;
void *context;
bool halt;
bool batch;
- char scratch[SCRATCH];
+ char scratch[SCRATCH]; // ? Rafi - can I use this instead of frame
(size for remote-max-frame?)
};
pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, void *context);
@@ -67,5 +69,12 @@ int pn_post_frame(pn_dispatcher_t *disp,
ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t
available);
ssize_t pn_dispatcher_output(pn_dispatcher_t *disp, char *bytes, size_t size);
void pn_dispatcher_trace(pn_dispatcher_t *disp, uint16_t ch, char *fmt, ...);
-
+int pn_post_transfer_frame(pn_dispatcher_t *disp,
+ uint16_t local_channel,
+ uint32_t handle,
+ pn_sequence_t delivery_id,
+ const pn_bytes_t *delivery_tag,
+ uint32_t message_format,
+ bool settled,
+ bool more);
#endif /* dispatcher.h */
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1401094&r1=1401093&r2=1401094&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Mon Oct 22 21:50:36
2012
@@ -89,6 +89,9 @@ typedef struct {
#include <proton/sasl.h>
#include <proton/ssl.h>
+// minimum allowable max-frame
+#define PN_MIN_MAX_FRAME_SIZE ((uint32_t)512)
+
struct pn_transport_t {
ssize_t (*process_input)(pn_transport_t *, const char *, size_t);
ssize_t (*process_output)(pn_transport_t *, char *, size_t);
@@ -105,6 +108,8 @@ struct pn_transport_t {
char *remote_hostname;
pn_data_t *remote_offered_capabilities;
pn_data_t *remote_desired_capabilities;
+ uint32_t local_max_frame;
+ uint32_t remote_max_frame;
pn_error_t *error;
pn_session_state_t *sessions;
size_t session_capacity;
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1401094&r1=1401093&r2=1401094&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Mon Oct 22 21:50:36 2012
@@ -761,6 +761,8 @@ void pn_transport_init(pn_transport_t *t
transport->close_rcvd = false;
transport->remote_container = NULL;
transport->remote_hostname = NULL;
+ transport->local_max_frame = 0;
+ transport->remote_max_frame = 0;
transport->remote_offered_capabilities = pn_data(16);
transport->remote_desired_capabilities = pn_data(16);
transport->error = pn_error();
@@ -1321,11 +1323,22 @@ int pn_do_open(pn_dispatcher_t *disp)
pn_bytes_t remote_container, remote_hostname;
pn_data_clear(transport->remote_offered_capabilities);
pn_data_clear(transport->remote_desired_capabilities);
- int err = pn_scan_args(disp, "D.[?S?S.....CC]", &container_q,
+ int err = pn_scan_args(disp, "D.[?S?SI....CC]", &container_q,
&remote_container, &hostname_q, &remote_hostname,
+ &transport->remote_max_frame,
transport->remote_offered_capabilities,
transport->remote_desired_capabilities);
if (err) return err;
+ if (transport->remote_max_frame > 0) {
+ if (transport->remote_max_frame < PN_MIN_MAX_FRAME_SIZE) {
+ fprintf(stderr, "Peer advertised bad max-frame (%u), forcing to %u\n",
+ transport->remote_max_frame, PN_MIN_MAX_FRAME_SIZE);
+ transport->remote_max_frame = PN_MIN_MAX_FRAME_SIZE;
+ }
+ disp->remote_max_frame = transport->remote_max_frame;
+ if (disp->frame) pn_buffer_free( disp->frame );
+ disp->frame = pn_buffer( disp->remote_max_frame );
+ }
if (container_q) {
transport->remote_container = pn_bytes_strdup(remote_container);
} else {
@@ -1838,9 +1851,11 @@ int pn_process_conn_setup(pn_transport_t
if (!(endpoint->state & PN_LOCAL_UNINIT) && !transport->open_sent)
{
pn_connection_t *connection = (pn_connection_t *) endpoint;
- int err = pn_post_frame(transport->disp, 0, "DL[SSnnnnnCC]", OPEN,
+ int err = pn_post_frame(transport->disp, 0, "DL[SS?InnnnCC]", OPEN,
connection->container,
connection->hostname,
+ // if not zero, advertise our max frame size
+ (bool)transport->local_max_frame,
transport->local_max_frame,
connection->offered_capabilities,
connection->desired_capabilities);
if (err) return err;
@@ -2012,10 +2027,13 @@ int pn_process_tpwork_sender(pn_transpor
pn_set_payload(transport->disp, bytes.start, bytes.size);
pn_buffer_clear(delivery->bytes);
pn_bytes_t tag = pn_buffer_bytes(delivery->tag);
- int err = pn_post_frame(transport->disp, ssn_state->local_channel,
"DL[IIzIoo]", TRANSFER,
- link_state->local_handle, state->id,
- tag.size, tag.start, 0, delivery->local_settled,
- !delivery->done);
+ int err = pn_post_transfer_frame(transport->disp,
+ ssn_state->local_channel,
+ link_state->local_handle,
+ state->id, &tag,
+ 0, // message-format
+ delivery->local_settled,
+ !delivery->done);
if (err) return err;
ssn_state->outgoing_transfer_count++;
ssn_state->outgoing_window--;
@@ -2350,6 +2368,23 @@ void pn_transport_trace(pn_transport_t *
transport->disp->trace = trace;
}
+uint32_t pn_transport_get_max_frame(pn_transport_t *transport)
+{
+ return transport->local_max_frame;
+}
+
+void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size)
+{
+ if (size && size < PN_MIN_MAX_FRAME_SIZE)
+ size = PN_MIN_MAX_FRAME_SIZE;
+ transport->local_max_frame = size;
+}
+
+uint32_t pn_transport_get_peer_max_frame(pn_transport_t *transport)
+{
+ return transport->remote_max_frame;
+}
+
void pn_link_offered(pn_link_t *sender, int credit)
{
sender->available = credit;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]