Author: rhs
Date: Tue Apr 24 16:55:39 2012
New Revision: 1329851

URL: http://svn.apache.org/viewvc?rev=1329851&view=rev
Log:
added pn_credit; several flow control fixes

Modified:
    qpid/proton/trunk/proton-c/include/proton/engine.h
    qpid/proton/trunk/proton-c/src/engine/engine-internal.h
    qpid/proton/trunk/proton-c/src/engine/engine.c

Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1329851&r1=1329850&r2=1329851&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Tue Apr 24 16:55:39 2012
@@ -121,6 +121,7 @@ wchar_t *pn_remote_target(pn_link_t *lin
 pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag);
 pn_delivery_t *pn_current(pn_link_t *link);
 bool pn_advance(pn_link_t *link);
+int pn_credit(pn_link_t *link);
 
 pn_delivery_t *pn_unsettled_head(pn_link_t *link);
 pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery);

Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1329851&r1=1329850&r2=1329851&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Tue Apr 24 16:55:39 
2012
@@ -84,6 +84,8 @@ typedef struct {
   bool incoming_init;
   pn_delivery_buffer_t incoming;
   pn_delivery_buffer_t outgoing;
+  pn_sequence_t incoming_transfer_count;
+  pn_sequence_t outgoing_transfer_count;
   pn_link_state_t *links;
   size_t link_capacity;
   pn_link_state_t **handles;
@@ -145,9 +147,7 @@ struct pn_link_t {
   pn_delivery_t *current;
   pn_delivery_t *settled_head;
   pn_delivery_t *settled_tail;
-  // XXX
   pn_sequence_t credit;
-  pn_sequence_t credits;
   size_t id;
 };
 

Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL: 
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1329851&r1=1329850&r2=1329851&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Apr 24 16:55:39 2012
@@ -672,7 +672,8 @@ pn_session_state_t *pn_session_get_state
   {
     transport->sessions[i] = (pn_session_state_t) {.session=NULL,
                                                    .local_channel=-1,
-                                                   .remote_channel=-1};
+                                                   .remote_channel=-1,
+                                                   .outgoing_transfer_count=0};
     pn_delivery_buffer_init(&transport->sessions[i].incoming, 0, 1024);
     pn_delivery_buffer_init(&transport->sessions[i].outgoing, 0, 1024);
   }
@@ -732,7 +733,6 @@ void pn_link_init(pn_link_t *link, int t
   link->settled_head = link->settled_tail = NULL;
   link->head = link->tail = link->current = NULL;
   link->credit = 0;
-  link->credits = 0;
 }
 
 void pn_set_source(pn_link_t *link, const wchar_t *source)
@@ -907,6 +907,7 @@ pn_delivery_t *pn_current(pn_link_t *lin
 void pn_advance_sender(pn_link_t *link)
 {
   if (link->credit > 0) {
+    link->current->done = true;
     link->credit--;
     pn_add_tpwork(link->current);
     link->current = link->current->link_next;
@@ -922,7 +923,6 @@ bool pn_advance(pn_link_t *link)
 {
   if (link && link->current) {
     pn_delivery_t *prev = link->current;
-    prev->done = true;
     if (link->endpoint.type == SENDER) {
       pn_advance_sender(link);
     } else {
@@ -937,6 +937,11 @@ bool pn_advance(pn_link_t *link)
   }
 }
 
+int pn_credit(pn_link_t *link)
+{
+  return link ? link->credit : 0;
+}
+
 void pn_real_settle(pn_delivery_t *delivery)
 {
   pn_link_t *link = delivery->link;
@@ -986,7 +991,6 @@ void pn_do_begin(pn_dispatcher_t *disp)
 {
   pn_transport_t *transport = disp->context;
   pn_value_t remote_channel = pn_list_get(disp->args, BEGIN_REMOTE_CHANNEL);
-  //pn_sequence_t next = pn_to_uint32(pn_list_get(disp->args, 
BEGIN_NEXT_OUTGOING_ID));
   pn_session_state_t *state;
   if (remote_channel.type == USHORT) {
     // XXX: what if session is NULL?
@@ -995,6 +999,8 @@ void pn_do_begin(pn_dispatcher_t *disp)
     pn_session_t *ssn = pn_session(transport->connection);
     state = pn_session_get_state(transport, ssn);
   }
+  pn_sequence_t next = pn_to_uint32(pn_list_get(disp->args, 
BEGIN_NEXT_OUTGOING_ID));
+  state->incoming_transfer_count = next;
   pn_map_channel(transport, disp->channel, state);
   PN_SET_REMOTE(state->session->endpoint.state, PN_REMOTE_ACTIVE);
 }
@@ -1071,6 +1077,12 @@ void pn_do_transfer(pn_dispatcher_t *dis
     pn_sequence_t id = pn_to_int32(pn_list_get(args, TRANSFER_DELIVERY_ID));
 
     pn_delivery_buffer_t *incoming = &ssn_state->incoming;
+
+    if (!pn_delivery_buffer_available(incoming)) {
+      pn_do_error(transport, "amqp:session:window-violation", "incoming 
session window exceeded");
+      return;
+    }
+
     if (!ssn_state->incoming_init) {
       incoming->next = id;
       ssn_state->incoming_init = true;
@@ -1079,17 +1091,26 @@ void pn_do_transfer(pn_dispatcher_t *dis
     delivery = pn_delivery(link, pn_dtag(pn_binary_bytes(tag), 
pn_binary_size(tag)));
     pn_delivery_state_t *state = pn_delivery_buffer_push(incoming, delivery);
     delivery->context = state;
-    // XXX: need to check that state is not null (i.e. we haven't hit the 
limit)
     if (id != state->id) {
-      // XXX: signal error somehow
-      fprintf(stderr, "sequencing error\n");
+      pn_do_error(transport, "amqp:session:invalid-field",
+                  "sequencing error, expected delivery-id %u, got %u",
+                  state->id, id);
+      // XXX: this will probably leave delivery buffer state messed up
+      pn_full_settle(incoming, delivery);
+      return;
     }
+
+    link_state->delivery_count++;
+    link_state->link_credit--;
+    link->credit--;
   }
 
   PN_ENSURE(delivery->bytes, delivery->capacity, delivery->size + disp->size);
   memmove(delivery->bytes + delivery->size, disp->payload, disp->size);
   delivery->size += disp->size;
   delivery->done = !pn_to_bool(pn_list_get(args, TRANSFER_MORE));
+
+  ssn_state->incoming_transfer_count++;
 }
 
 void pn_do_flow(pn_dispatcher_t *disp)
@@ -1129,7 +1150,7 @@ void pn_do_disposition(pn_dispatcher_t *
   pn_sequence_t first = pn_to_int32(pn_list_get(args, DISPOSITION_FIRST));
   pn_value_t lastv = pn_list_get(args, DISPOSITION_LAST);
   pn_sequence_t last = lastv.type == EMPTY ? first : pn_to_int32(lastv);
-  //bool settled = pn_to_bool(pn_list_get(args, DISPOSITION_SETTLED));
+  bool settled = pn_to_bool(pn_list_get(args, DISPOSITION_SETTLED));
   pn_tag_t *dstate = pn_to_tag(pn_list_get(args, DISPOSITION_STATE));
   uint64_t code = pn_to_uint32(pn_tag_descriptor(dstate));
   pn_disposition_t dispo;
@@ -1162,6 +1183,7 @@ void pn_do_disposition(pn_dispatcher_t *
     pn_delivery_state_t *state = pn_delivery_buffer_get(deliveries, id - lwm);
     pn_delivery_t *delivery = state->delivery;
     delivery->remote_state = dispo;
+    delivery->remote_settled = settled;
     delivery->updated = true;
     pn_work_update(transport->connection, delivery);
   }
@@ -1256,9 +1278,11 @@ void pn_process_ssn_setup(pn_transport_t
       pn_init_frame(transport->disp);
       if ((int16_t) state->remote_channel >= 0)
         pn_field(transport->disp, BEGIN_REMOTE_CHANNEL, pn_value("H", 
state->remote_channel));
-      pn_field(transport->disp, BEGIN_NEXT_OUTGOING_ID, pn_value("I", 
state->outgoing.next));
-      pn_field(transport->disp, BEGIN_INCOMING_WINDOW, pn_value("I", 
state->incoming.capacity));
-      pn_field(transport->disp, BEGIN_OUTGOING_WINDOW, pn_value("I", 
state->outgoing.capacity));
+      pn_field(transport->disp, BEGIN_NEXT_OUTGOING_ID, pn_value("I", 
state->outgoing_transfer_count));
+      pn_field(transport->disp, BEGIN_INCOMING_WINDOW,
+               pn_value("I", pn_delivery_buffer_available(&state->incoming)));
+      pn_field(transport->disp, BEGIN_OUTGOING_WINDOW,
+               pn_value("I", pn_delivery_buffer_available(&state->outgoing)));
       // XXX: we use the session id as the outgoing channel, we depend
       // on this for looking up via remote channel
       uint16_t channel = ssn->id;
@@ -1298,26 +1322,37 @@ void pn_process_link_setup(pn_transport_
   }
 }
 
+void pn_post_flow(pn_transport_t *transport, pn_session_state_t *ssn_state, 
pn_link_state_t *state)
+{
+  pn_init_frame(transport->disp);
+  if ((int16_t) ssn_state->remote_channel >= 0)
+    pn_field(transport->disp, FLOW_NEXT_INCOMING_ID, pn_value("I", 
ssn_state->incoming_transfer_count));
+  pn_field(transport->disp, FLOW_INCOMING_WINDOW,
+           pn_value("I", pn_delivery_buffer_available(&ssn_state->incoming)));
+  pn_field(transport->disp, FLOW_NEXT_OUTGOING_ID, pn_value("I", 
ssn_state->outgoing.next));
+  pn_field(transport->disp, FLOW_OUTGOING_WINDOW,
+           pn_value("I", pn_delivery_buffer_available(&ssn_state->outgoing)));
+  if (state) {
+    pn_field(transport->disp, FLOW_HANDLE, pn_value("I", state->local_handle));
+    pn_field(transport->disp, FLOW_DELIVERY_COUNT, pn_value("I", 
state->delivery_count));
+    pn_field(transport->disp, FLOW_LINK_CREDIT, pn_value("I", 
state->link_credit));
+  }
+  pn_post_frame(transport->disp, ssn_state->local_channel, FLOW);
+}
+
 void pn_process_flow_receiver(pn_transport_t *transport, pn_endpoint_t 
*endpoint)
 {
   if (endpoint->type == RECEIVER && endpoint->state & PN_LOCAL_ACTIVE)
   {
     pn_link_t *rcv = (pn_link_t *) endpoint;
-    if (rcv->credits) {
-      pn_session_state_t *ssn_state = pn_session_get_state(transport, 
rcv->session);
-      pn_link_state_t *state = pn_link_get_state(ssn_state, rcv);
-      state->link_credit += rcv->credits;
-      rcv->credits = 0;
+    pn_session_state_t *ssn_state = pn_session_get_state(transport, 
rcv->session);
+    pn_link_state_t *state = pn_link_get_state(ssn_state, rcv);
+    if ((int16_t) ssn_state->local_channel >= 0 &&
+        (int32_t) state->local_handle >= 0 &&
+        state->link_credit != rcv->credit) {
+      state->link_credit = rcv->credit;
 
-      pn_init_frame(transport->disp);
-      //pn_field(transport->disp, FLOW_NEXT_INCOMING_ID, pn_value("I", 
ssn_state->next_incoming_id));
-      pn_field(transport->disp, FLOW_INCOMING_WINDOW, pn_value("I", 
ssn_state->incoming.capacity));
-      pn_field(transport->disp, FLOW_NEXT_OUTGOING_ID, pn_value("I", 
ssn_state->outgoing.next));
-      pn_field(transport->disp, FLOW_OUTGOING_WINDOW, pn_value("I", 
ssn_state->outgoing.capacity));
-      pn_field(transport->disp, FLOW_HANDLE, pn_value("I", 
state->local_handle));
-      //pn_field(transport->disp, FLOW_DELIVERY_COUNT, pn_value("I", 
delivery_count));
-      pn_field(transport->disp, FLOW_LINK_CREDIT, pn_value("I", 
state->link_credit));
-      pn_post_frame(transport->disp, ssn_state->local_channel, FLOW);
+      pn_post_flow(transport, ssn_state, state);
     }
   }
 }
@@ -1347,10 +1382,12 @@ void pn_post_disp(pn_transport_t *transp
   default:
     code = 0;
   }
-  if (code) {
+
+  if (code)
     pn_field(transport->disp, DISPOSITION_STATE, pn_value("L([])", code));
+
+  if (code || delivery->local_settled)
     pn_post_frame(transport->disp, ssn_state->local_channel, DISPOSITION);
-  }
 }
 
 void pn_process_disp_receiver(pn_transport_t *transport, pn_endpoint_t 
*endpoint)
@@ -1364,13 +1401,19 @@ void pn_process_disp_receiver(pn_transpo
       pn_link_t *link = delivery->link;
       if (link->endpoint.type == RECEIVER) {
         // XXX: need to prevent duplicate disposition sending
+        fprintf(stderr, "settled=%u\n", delivery->local_settled);
         pn_session_state_t *ssn_state = pn_session_get_state(transport, 
link->session);
-        if ((int16_t) ssn_state->local_channel >= 0) {
+        if ((int16_t) ssn_state->local_channel >= 0 && 
!delivery->remote_settled) {
           pn_post_disp(transport, delivery);
         }
 
         if (delivery->local_settled) {
+          size_t available = 
pn_delivery_buffer_available(&ssn_state->incoming);
           pn_full_settle(&ssn_state->incoming, delivery);
+          fprintf(stderr, "%zi, %zi\n", available, 
pn_delivery_buffer_available(&ssn_state->incoming));
+          if (pn_delivery_buffer_available(&ssn_state->incoming) > available) {
+            pn_post_flow(transport, ssn_state, NULL);
+          }
         }
       }
       delivery = delivery->tpwork_next;
@@ -1408,8 +1451,12 @@ void pn_process_msg_data(pn_transport_t 
               delivery->size = 0;
             }
             pn_post_frame(transport->disp, ssn_state->local_channel, TRANSFER);
-            if (delivery->done)
+            ssn_state->outgoing_transfer_count++;
+            if (delivery->done) {
               state->sent = true;
+              link_state->delivery_count++;
+              link_state->link_credit--;
+            }
           }
         }
       }
@@ -1430,7 +1477,7 @@ void pn_process_disp_sender(pn_transport
       if (link->endpoint.type == SENDER) {
         // XXX: need to prevent duplicate disposition sending
         pn_session_state_t *ssn_state = pn_session_get_state(transport, 
link->session);
-        if ((int16_t) ssn_state->local_channel >= 0) {
+        if ((int16_t) ssn_state->local_channel >= 0 && 
!delivery->remote_settled) {
           pn_post_disp(transport, delivery);
         }
 
@@ -1592,10 +1639,10 @@ ssize_t pn_recv(pn_link_t *receiver, cha
   }
 }
 
-void pn_flow(pn_link_t *receiver, int credits)
+void pn_flow(pn_link_t *receiver, int credit)
 {
   if (!receiver) return;
-  receiver->credits += credits;
+  receiver->credit += credit;
   pn_modified(receiver->session->connection, &receiver->endpoint);
 }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to