Author: rhs
Date: Tue Apr 24 16:55:39 2012
New Revision: 1329851
URL: http://svn.apache.org/viewvc?rev=1329851&view=rev
Log:
added pn_credit; several flow control fixes
Modified:
qpid/proton/trunk/proton-c/include/proton/engine.h
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
Modified: qpid/proton/trunk/proton-c/include/proton/engine.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/engine.h?rev=1329851&r1=1329850&r2=1329851&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/engine.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/engine.h Tue Apr 24 16:55:39 2012
@@ -121,6 +121,7 @@ wchar_t *pn_remote_target(pn_link_t *lin
pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag);
pn_delivery_t *pn_current(pn_link_t *link);
bool pn_advance(pn_link_t *link);
+int pn_credit(pn_link_t *link);
pn_delivery_t *pn_unsettled_head(pn_link_t *link);
pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery);
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1329851&r1=1329850&r2=1329851&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Tue Apr 24 16:55:39
2012
@@ -84,6 +84,8 @@ typedef struct {
bool incoming_init;
pn_delivery_buffer_t incoming;
pn_delivery_buffer_t outgoing;
+ pn_sequence_t incoming_transfer_count;
+ pn_sequence_t outgoing_transfer_count;
pn_link_state_t *links;
size_t link_capacity;
pn_link_state_t **handles;
@@ -145,9 +147,7 @@ struct pn_link_t {
pn_delivery_t *current;
pn_delivery_t *settled_head;
pn_delivery_t *settled_tail;
- // XXX
pn_sequence_t credit;
- pn_sequence_t credits;
size_t id;
};
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1329851&r1=1329850&r2=1329851&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Tue Apr 24 16:55:39 2012
@@ -672,7 +672,8 @@ pn_session_state_t *pn_session_get_state
{
transport->sessions[i] = (pn_session_state_t) {.session=NULL,
.local_channel=-1,
- .remote_channel=-1};
+ .remote_channel=-1,
+ .outgoing_transfer_count=0};
pn_delivery_buffer_init(&transport->sessions[i].incoming, 0, 1024);
pn_delivery_buffer_init(&transport->sessions[i].outgoing, 0, 1024);
}
@@ -732,7 +733,6 @@ void pn_link_init(pn_link_t *link, int t
link->settled_head = link->settled_tail = NULL;
link->head = link->tail = link->current = NULL;
link->credit = 0;
- link->credits = 0;
}
void pn_set_source(pn_link_t *link, const wchar_t *source)
@@ -907,6 +907,7 @@ pn_delivery_t *pn_current(pn_link_t *lin
void pn_advance_sender(pn_link_t *link)
{
if (link->credit > 0) {
+ link->current->done = true;
link->credit--;
pn_add_tpwork(link->current);
link->current = link->current->link_next;
@@ -922,7 +923,6 @@ bool pn_advance(pn_link_t *link)
{
if (link && link->current) {
pn_delivery_t *prev = link->current;
- prev->done = true;
if (link->endpoint.type == SENDER) {
pn_advance_sender(link);
} else {
@@ -937,6 +937,11 @@ bool pn_advance(pn_link_t *link)
}
}
+int pn_credit(pn_link_t *link)
+{
+ return link ? link->credit : 0;
+}
+
void pn_real_settle(pn_delivery_t *delivery)
{
pn_link_t *link = delivery->link;
@@ -986,7 +991,6 @@ void pn_do_begin(pn_dispatcher_t *disp)
{
pn_transport_t *transport = disp->context;
pn_value_t remote_channel = pn_list_get(disp->args, BEGIN_REMOTE_CHANNEL);
- //pn_sequence_t next = pn_to_uint32(pn_list_get(disp->args,
BEGIN_NEXT_OUTGOING_ID));
pn_session_state_t *state;
if (remote_channel.type == USHORT) {
// XXX: what if session is NULL?
@@ -995,6 +999,8 @@ void pn_do_begin(pn_dispatcher_t *disp)
pn_session_t *ssn = pn_session(transport->connection);
state = pn_session_get_state(transport, ssn);
}
+ pn_sequence_t next = pn_to_uint32(pn_list_get(disp->args,
BEGIN_NEXT_OUTGOING_ID));
+ state->incoming_transfer_count = next;
pn_map_channel(transport, disp->channel, state);
PN_SET_REMOTE(state->session->endpoint.state, PN_REMOTE_ACTIVE);
}
@@ -1071,6 +1077,12 @@ void pn_do_transfer(pn_dispatcher_t *dis
pn_sequence_t id = pn_to_int32(pn_list_get(args, TRANSFER_DELIVERY_ID));
pn_delivery_buffer_t *incoming = &ssn_state->incoming;
+
+ if (!pn_delivery_buffer_available(incoming)) {
+ pn_do_error(transport, "amqp:session:window-violation", "incoming
session window exceeded");
+ return;
+ }
+
if (!ssn_state->incoming_init) {
incoming->next = id;
ssn_state->incoming_init = true;
@@ -1079,17 +1091,26 @@ void pn_do_transfer(pn_dispatcher_t *dis
delivery = pn_delivery(link, pn_dtag(pn_binary_bytes(tag),
pn_binary_size(tag)));
pn_delivery_state_t *state = pn_delivery_buffer_push(incoming, delivery);
delivery->context = state;
- // XXX: need to check that state is not null (i.e. we haven't hit the
limit)
if (id != state->id) {
- // XXX: signal error somehow
- fprintf(stderr, "sequencing error\n");
+ pn_do_error(transport, "amqp:session:invalid-field",
+ "sequencing error, expected delivery-id %u, got %u",
+ state->id, id);
+ // XXX: this will probably leave delivery buffer state messed up
+ pn_full_settle(incoming, delivery);
+ return;
}
+
+ link_state->delivery_count++;
+ link_state->link_credit--;
+ link->credit--;
}
PN_ENSURE(delivery->bytes, delivery->capacity, delivery->size + disp->size);
memmove(delivery->bytes + delivery->size, disp->payload, disp->size);
delivery->size += disp->size;
delivery->done = !pn_to_bool(pn_list_get(args, TRANSFER_MORE));
+
+ ssn_state->incoming_transfer_count++;
}
void pn_do_flow(pn_dispatcher_t *disp)
@@ -1129,7 +1150,7 @@ void pn_do_disposition(pn_dispatcher_t *
pn_sequence_t first = pn_to_int32(pn_list_get(args, DISPOSITION_FIRST));
pn_value_t lastv = pn_list_get(args, DISPOSITION_LAST);
pn_sequence_t last = lastv.type == EMPTY ? first : pn_to_int32(lastv);
- //bool settled = pn_to_bool(pn_list_get(args, DISPOSITION_SETTLED));
+ bool settled = pn_to_bool(pn_list_get(args, DISPOSITION_SETTLED));
pn_tag_t *dstate = pn_to_tag(pn_list_get(args, DISPOSITION_STATE));
uint64_t code = pn_to_uint32(pn_tag_descriptor(dstate));
pn_disposition_t dispo;
@@ -1162,6 +1183,7 @@ void pn_do_disposition(pn_dispatcher_t *
pn_delivery_state_t *state = pn_delivery_buffer_get(deliveries, id - lwm);
pn_delivery_t *delivery = state->delivery;
delivery->remote_state = dispo;
+ delivery->remote_settled = settled;
delivery->updated = true;
pn_work_update(transport->connection, delivery);
}
@@ -1256,9 +1278,11 @@ void pn_process_ssn_setup(pn_transport_t
pn_init_frame(transport->disp);
if ((int16_t) state->remote_channel >= 0)
pn_field(transport->disp, BEGIN_REMOTE_CHANNEL, pn_value("H",
state->remote_channel));
- pn_field(transport->disp, BEGIN_NEXT_OUTGOING_ID, pn_value("I",
state->outgoing.next));
- pn_field(transport->disp, BEGIN_INCOMING_WINDOW, pn_value("I",
state->incoming.capacity));
- pn_field(transport->disp, BEGIN_OUTGOING_WINDOW, pn_value("I",
state->outgoing.capacity));
+ pn_field(transport->disp, BEGIN_NEXT_OUTGOING_ID, pn_value("I",
state->outgoing_transfer_count));
+ pn_field(transport->disp, BEGIN_INCOMING_WINDOW,
+ pn_value("I", pn_delivery_buffer_available(&state->incoming)));
+ pn_field(transport->disp, BEGIN_OUTGOING_WINDOW,
+ pn_value("I", pn_delivery_buffer_available(&state->outgoing)));
// XXX: we use the session id as the outgoing channel, we depend
// on this for looking up via remote channel
uint16_t channel = ssn->id;
@@ -1298,26 +1322,37 @@ void pn_process_link_setup(pn_transport_
}
}
+void pn_post_flow(pn_transport_t *transport, pn_session_state_t *ssn_state,
pn_link_state_t *state)
+{
+ pn_init_frame(transport->disp);
+ if ((int16_t) ssn_state->remote_channel >= 0)
+ pn_field(transport->disp, FLOW_NEXT_INCOMING_ID, pn_value("I",
ssn_state->incoming_transfer_count));
+ pn_field(transport->disp, FLOW_INCOMING_WINDOW,
+ pn_value("I", pn_delivery_buffer_available(&ssn_state->incoming)));
+ pn_field(transport->disp, FLOW_NEXT_OUTGOING_ID, pn_value("I",
ssn_state->outgoing.next));
+ pn_field(transport->disp, FLOW_OUTGOING_WINDOW,
+ pn_value("I", pn_delivery_buffer_available(&ssn_state->outgoing)));
+ if (state) {
+ pn_field(transport->disp, FLOW_HANDLE, pn_value("I", state->local_handle));
+ pn_field(transport->disp, FLOW_DELIVERY_COUNT, pn_value("I",
state->delivery_count));
+ pn_field(transport->disp, FLOW_LINK_CREDIT, pn_value("I",
state->link_credit));
+ }
+ pn_post_frame(transport->disp, ssn_state->local_channel, FLOW);
+}
+
void pn_process_flow_receiver(pn_transport_t *transport, pn_endpoint_t
*endpoint)
{
if (endpoint->type == RECEIVER && endpoint->state & PN_LOCAL_ACTIVE)
{
pn_link_t *rcv = (pn_link_t *) endpoint;
- if (rcv->credits) {
- pn_session_state_t *ssn_state = pn_session_get_state(transport,
rcv->session);
- pn_link_state_t *state = pn_link_get_state(ssn_state, rcv);
- state->link_credit += rcv->credits;
- rcv->credits = 0;
+ pn_session_state_t *ssn_state = pn_session_get_state(transport,
rcv->session);
+ pn_link_state_t *state = pn_link_get_state(ssn_state, rcv);
+ if ((int16_t) ssn_state->local_channel >= 0 &&
+ (int32_t) state->local_handle >= 0 &&
+ state->link_credit != rcv->credit) {
+ state->link_credit = rcv->credit;
- pn_init_frame(transport->disp);
- //pn_field(transport->disp, FLOW_NEXT_INCOMING_ID, pn_value("I",
ssn_state->next_incoming_id));
- pn_field(transport->disp, FLOW_INCOMING_WINDOW, pn_value("I",
ssn_state->incoming.capacity));
- pn_field(transport->disp, FLOW_NEXT_OUTGOING_ID, pn_value("I",
ssn_state->outgoing.next));
- pn_field(transport->disp, FLOW_OUTGOING_WINDOW, pn_value("I",
ssn_state->outgoing.capacity));
- pn_field(transport->disp, FLOW_HANDLE, pn_value("I",
state->local_handle));
- //pn_field(transport->disp, FLOW_DELIVERY_COUNT, pn_value("I",
delivery_count));
- pn_field(transport->disp, FLOW_LINK_CREDIT, pn_value("I",
state->link_credit));
- pn_post_frame(transport->disp, ssn_state->local_channel, FLOW);
+ pn_post_flow(transport, ssn_state, state);
}
}
}
@@ -1347,10 +1382,12 @@ void pn_post_disp(pn_transport_t *transp
default:
code = 0;
}
- if (code) {
+
+ if (code)
pn_field(transport->disp, DISPOSITION_STATE, pn_value("L([])", code));
+
+ if (code || delivery->local_settled)
pn_post_frame(transport->disp, ssn_state->local_channel, DISPOSITION);
- }
}
void pn_process_disp_receiver(pn_transport_t *transport, pn_endpoint_t
*endpoint)
@@ -1364,13 +1401,19 @@ void pn_process_disp_receiver(pn_transpo
pn_link_t *link = delivery->link;
if (link->endpoint.type == RECEIVER) {
// XXX: need to prevent duplicate disposition sending
+ fprintf(stderr, "settled=%u\n", delivery->local_settled);
pn_session_state_t *ssn_state = pn_session_get_state(transport,
link->session);
- if ((int16_t) ssn_state->local_channel >= 0) {
+ if ((int16_t) ssn_state->local_channel >= 0 &&
!delivery->remote_settled) {
pn_post_disp(transport, delivery);
}
if (delivery->local_settled) {
+ size_t available =
pn_delivery_buffer_available(&ssn_state->incoming);
pn_full_settle(&ssn_state->incoming, delivery);
+ fprintf(stderr, "%zi, %zi\n", available,
pn_delivery_buffer_available(&ssn_state->incoming));
+ if (pn_delivery_buffer_available(&ssn_state->incoming) > available) {
+ pn_post_flow(transport, ssn_state, NULL);
+ }
}
}
delivery = delivery->tpwork_next;
@@ -1408,8 +1451,12 @@ void pn_process_msg_data(pn_transport_t
delivery->size = 0;
}
pn_post_frame(transport->disp, ssn_state->local_channel, TRANSFER);
- if (delivery->done)
+ ssn_state->outgoing_transfer_count++;
+ if (delivery->done) {
state->sent = true;
+ link_state->delivery_count++;
+ link_state->link_credit--;
+ }
}
}
}
@@ -1430,7 +1477,7 @@ void pn_process_disp_sender(pn_transport
if (link->endpoint.type == SENDER) {
// XXX: need to prevent duplicate disposition sending
pn_session_state_t *ssn_state = pn_session_get_state(transport,
link->session);
- if ((int16_t) ssn_state->local_channel >= 0) {
+ if ((int16_t) ssn_state->local_channel >= 0 &&
!delivery->remote_settled) {
pn_post_disp(transport, delivery);
}
@@ -1592,10 +1639,10 @@ ssize_t pn_recv(pn_link_t *receiver, cha
}
}
-void pn_flow(pn_link_t *receiver, int credits)
+void pn_flow(pn_link_t *receiver, int credit)
{
if (!receiver) return;
- receiver->credits += credits;
+ receiver->credit += credit;
pn_modified(receiver->session->connection, &receiver->endpoint);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]