Author: kgiusti
Date: Mon Oct 22 21:50:36 2012
New Revision: 1401094

URL: http://svn.apache.org/viewvc?rev=1401094&view=rev
Log:
PROTON-79: checkpoint proposed transfer message chunking.

Modified:
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
    qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1401094&r1=1401093&r2=1401094&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Mon Oct 22 21:50:36 2012
@@ -277,6 +277,10 @@ ssize_t pn_transport_input(pn_transport_
 ssize_t pn_transport_output(pn_transport_t *transport, char *bytes, size_t 
size);
 time_t pn_transport_tick(pn_transport_t *transport, time_t now);
 void pn_transport_trace(pn_transport_t *transport, pn_trace_t trace);
+// max frame of zero means "unlimited"
+uint32_t pn_transport_get_max_frame(pn_transport_t *transport);
+void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size);
+uint32_t pn_transport_get_peer_max_frame(pn_transport_t *transport);
 void pn_transport_free(pn_transport_t *transport);
 
 // session

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c?rev=1401094&r1=1401093&r2=1401094&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.c Mon Oct 22 21:50:36 
2012
@@ -24,7 +24,9 @@
 #include <string.h>
 #include <proton/framing.h>
 #include <proton/engine.h>
+#include <proton/buffer.h>
 #include "dispatcher.h"
+#include "protocol.h"
 #include "../util.h"
 
 pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, void *context)
@@ -277,3 +279,79 @@ ssize_t pn_dispatcher_output(pn_dispatch
   // XXX: need to check for errors
   return n;
 }
+
+
+int pn_post_transfer_frame(pn_dispatcher_t *disp, uint16_t ch,
+                           uint32_t handle,
+                           pn_sequence_t id,
+                           const pn_bytes_t *tag,
+                           uint32_t message_format,
+                           bool settled,
+                           bool more)
+{
+
+  if (!disp->remote_max_frame)  // "unlimited" frame size accepted
+    return pn_post_frame( disp, ch, "DL[IIzIoo]", TRANSFER,
+                          handle, id, tag->size, tag->start,
+                          message_format,
+                          settled, more);
+
+  bool more_flag = true;        // optimize: check against hard-code size of 
performative
+  do {
+    pn_data_clear(disp->output_args);
+    int err = pn_data_fill(disp->output_args, "DL[IIzIoo]", TRANSFER,
+                           handle, id, tag->size, tag->start,
+                           message_format,
+                           settled, more_flag);
+    if (err) {
+      fprintf(stderr, "error posting transfer frame: %s: %s\n", pn_code(err), 
pn_data_error(disp->output_args));
+      return PN_ERR;
+    }
+
+    pn_buffer_clear(disp->frame);
+    pn_bytes_t frame_buf = pn_buffer_bytes( disp->frame );
+    ssize_t wr = pn_data_encode(disp->output_args, frame_buf.start,
+                                pn_buffer_available( disp->frame ));
+    if (wr < 0) {
+      fprintf(stderr, "error posting frame: %s", pn_code(wr));
+      return PN_ERR;
+    }
+    frame_buf.size = wr;
+
+    if (more == false && (frame_buf.size + disp->output_size) <= 
disp->remote_max_frame) {
+      more_flag = false;
+      continue;         // rebuild performative with new value for more flag
+    }
+
+    size_t available = disp->remote_max_frame - frame_buf.size;
+    if (available > disp->output_size) available = disp->output_size;
+    memmove(frame_buf.start + frame_buf.size, disp->output_payload, available);
+    frame_buf.size += available;
+    //disp->output_payload = NULL;
+    disp->output_size -= available;
+
+    pn_do_trace(disp, ch, OUT, disp->output_args, disp->output_payload, 
available);
+
+    pn_frame_t frame = {disp->frame_type};
+    frame.channel = ch;
+    frame.payload = frame_buf.start;
+    frame.size = frame_buf.size;
+
+    size_t n;
+    while (!(n = pn_write_frame(disp->output + disp->available,
+                                disp->capacity - disp->available, frame))) {
+      disp->capacity *= 2;
+      disp->output = realloc(disp->output, disp->capacity);
+    }
+    if (disp->trace & PN_TRACE_RAW) {
+      fprintf(stderr, "RAW: \"");
+      pn_fprint_data(stderr, disp->output + disp->available, n);
+      fprintf(stderr, "\"\n");
+    }
+    disp->available += n;
+  } while (disp->output_size > 0);
+
+  disp->output_payload = NULL;
+  return 0;
+}
+

Modified: qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h?rev=1401094&r1=1401093&r2=1401094&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h (original)
+++ qpid/proton/trunk/proton-c/src/dispatcher/dispatcher.h Mon Oct 22 21:50:36 
2012
@@ -48,13 +48,15 @@ struct pn_dispatcher_t {
   pn_data_t *output_args;
   const char *output_payload;
   size_t output_size;
+  size_t remote_max_frame;
+  pn_buffer_t *frame;  // frame under construction
   size_t capacity;
   size_t available;
   char *output;
   void *context;
   bool halt;
   bool batch;
-  char scratch[SCRATCH];
+  char scratch[SCRATCH];        // ? Rafi - can I use this instead of frame 
(size for remote-max-frame?)
 };
 
 pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, void *context);
@@ -67,5 +69,12 @@ int pn_post_frame(pn_dispatcher_t *disp,
 ssize_t pn_dispatcher_input(pn_dispatcher_t *disp, const char *bytes, size_t 
available);
 ssize_t pn_dispatcher_output(pn_dispatcher_t *disp, char *bytes, size_t size);
 void pn_dispatcher_trace(pn_dispatcher_t *disp, uint16_t ch, char *fmt, ...);
-
+int pn_post_transfer_frame(pn_dispatcher_t *disp,
+                           uint16_t local_channel,
+                           uint32_t handle,
+                           pn_sequence_t delivery_id,
+                           const pn_bytes_t *delivery_tag,
+                           uint32_t message_format,
+                           bool settled,
+                           bool more);
 #endif /* dispatcher.h */

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1401094&r1=1401093&r2=1401094&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Mon Oct 22 21:50:36 
2012
@@ -89,6 +89,9 @@ typedef struct {
 #include <proton/sasl.h>
 #include <proton/ssl.h>
 
+// minimum allowable max-frame
+#define PN_MIN_MAX_FRAME_SIZE ((uint32_t)512)
+
 struct pn_transport_t {
   ssize_t (*process_input)(pn_transport_t *, const char *, size_t);
   ssize_t (*process_output)(pn_transport_t *, char *, size_t);
@@ -105,6 +108,8 @@ struct pn_transport_t {
   char *remote_hostname;
   pn_data_t *remote_offered_capabilities;
   pn_data_t *remote_desired_capabilities;
+  uint32_t   local_max_frame;
+  uint32_t   remote_max_frame;
   pn_error_t *error;
   pn_session_state_t *sessions;
   size_t session_capacity;

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1401094&r1=1401093&r2=1401094&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Mon Oct 22 21:50:36 2012
@@ -761,6 +761,8 @@ void pn_transport_init(pn_transport_t *t
   transport->close_rcvd = false;
   transport->remote_container = NULL;
   transport->remote_hostname = NULL;
+  transport->local_max_frame = 0;
+  transport->remote_max_frame = 0;
   transport->remote_offered_capabilities = pn_data(16);
   transport->remote_desired_capabilities = pn_data(16);
   transport->error = pn_error();
@@ -1321,11 +1323,22 @@ int pn_do_open(pn_dispatcher_t *disp)
   pn_bytes_t remote_container, remote_hostname;
   pn_data_clear(transport->remote_offered_capabilities);
   pn_data_clear(transport->remote_desired_capabilities);
-  int err = pn_scan_args(disp, "D.[?S?S.....CC]", &container_q,
+  int err = pn_scan_args(disp, "D.[?S?SI....CC]", &container_q,
                          &remote_container, &hostname_q, &remote_hostname,
+                         &transport->remote_max_frame,
                          transport->remote_offered_capabilities,
                          transport->remote_desired_capabilities);
   if (err) return err;
+  if (transport->remote_max_frame > 0) {
+    if (transport->remote_max_frame < PN_MIN_MAX_FRAME_SIZE) {
+      fprintf(stderr, "Peer advertised bad max-frame (%u), forcing to %u\n",
+              transport->remote_max_frame, PN_MIN_MAX_FRAME_SIZE);
+      transport->remote_max_frame = PN_MIN_MAX_FRAME_SIZE;
+    }
+    disp->remote_max_frame = transport->remote_max_frame;
+    if (disp->frame) pn_buffer_free( disp->frame );
+    disp->frame = pn_buffer( disp->remote_max_frame );
+  }
   if (container_q) {
     transport->remote_container = pn_bytes_strdup(remote_container);
   } else {
@@ -1838,9 +1851,11 @@ int pn_process_conn_setup(pn_transport_t
     if (!(endpoint->state & PN_LOCAL_UNINIT) && !transport->open_sent)
     {
       pn_connection_t *connection = (pn_connection_t *) endpoint;
-      int err = pn_post_frame(transport->disp, 0, "DL[SSnnnnnCC]", OPEN,
+      int err = pn_post_frame(transport->disp, 0, "DL[SS?InnnnCC]", OPEN,
                               connection->container,
                               connection->hostname,
+                              // if not zero, advertise our max frame size
+                              (bool)transport->local_max_frame, 
transport->local_max_frame,
                               connection->offered_capabilities,
                               connection->desired_capabilities);
       if (err) return err;
@@ -2012,10 +2027,13 @@ int pn_process_tpwork_sender(pn_transpor
       pn_set_payload(transport->disp, bytes.start, bytes.size);
       pn_buffer_clear(delivery->bytes);
       pn_bytes_t tag = pn_buffer_bytes(delivery->tag);
-      int err = pn_post_frame(transport->disp, ssn_state->local_channel, 
"DL[IIzIoo]", TRANSFER,
-                              link_state->local_handle, state->id,
-                              tag.size, tag.start, 0, delivery->local_settled,
-                              !delivery->done);
+      int err = pn_post_transfer_frame(transport->disp,
+                                       ssn_state->local_channel,
+                                       link_state->local_handle,
+                                       state->id, &tag,
+                                       0, // message-format
+                                       delivery->local_settled,
+                                       !delivery->done);
       if (err) return err;
       ssn_state->outgoing_transfer_count++;
       ssn_state->outgoing_window--;
@@ -2350,6 +2368,23 @@ void pn_transport_trace(pn_transport_t *
   transport->disp->trace = trace;
 }
 
+uint32_t pn_transport_get_max_frame(pn_transport_t *transport)
+{
+  return transport->local_max_frame;
+}
+
+void pn_transport_set_max_frame(pn_transport_t *transport, uint32_t size)
+{
+  if (size && size < PN_MIN_MAX_FRAME_SIZE)
+    size = PN_MIN_MAX_FRAME_SIZE;
+  transport->local_max_frame = size;
+}
+
+uint32_t pn_transport_get_peer_max_frame(pn_transport_t *transport)
+{
+  return transport->remote_max_frame;
+}
+
 void pn_link_offered(pn_link_t *sender, int credit)
 {
   sender->available = credit;



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to