Author: rhs
Date: Sun Nov 4 23:19:27 2012
New Revision: 1405664
URL: http://svn.apache.org/viewvc?rev=1405664&view=rev
Log:
fixed deadlock with acks, removed some redundant gc of the tracker history
Modified:
qpid/proton/trunk/proton-c/src/engine/engine.c
qpid/proton/trunk/proton-c/src/messenger.c
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1405664&r1=1405663&r2=1405664&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Sun Nov 4 23:19:27 2012
@@ -1669,10 +1669,12 @@ int pn_do_disposition(pn_dispatcher_t *d
if (id < lwm) continue;
pn_delivery_state_t *state = pn_delivery_buffer_get(deliveries, id - lwm);
pn_delivery_t *delivery = state->delivery;
- delivery->remote_state = dispo;
- delivery->remote_settled = settled;
- delivery->updated = true;
- pn_work_update(transport->connection, delivery);
+ if (delivery) {
+ delivery->remote_state = dispo;
+ delivery->remote_settled = settled;
+ delivery->updated = true;
+ pn_work_update(transport->connection, delivery);
+ }
}
return 0;
@@ -2146,6 +2148,10 @@ int pn_process_tpwork(pn_transport_t *tr
pn_delivery_t *delivery = conn->tpwork_head;
while (delivery)
{
+ if (!delivery->transport_context && transport->disp->available > 0) {
+ break;
+ }
+
pn_link_t *link = delivery->link;
if (pn_link_is_sender(link)) {
int err = pn_process_tpwork_sender(transport, delivery);
Modified: qpid/proton/trunk/proton-c/src/messenger.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger.c?rev=1405664&r1=1405663&r2=1405664&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger.c Sun Nov 4 23:19:27 2012
@@ -141,13 +141,11 @@ void pn_queue_del(pn_queue_t *queue, pn_
pn_connection_t *conn =
pn_session_connection(pn_link_session(pn_delivery_link(delivery)));
pn_decref(conn);
- pn_queue_gc(queue);
}
}
pn_sequence_t pn_queue_add(pn_queue_t *queue, pn_delivery_t *delivery)
{
- pn_queue_gc(queue);
pn_sequence_t id = queue->hwm++;
size_t offset = id - queue->lwm;
PN_ENSUREZ(queue->deliveries, queue->capacity, offset + 1);
@@ -160,6 +158,22 @@ pn_sequence_t pn_queue_add(pn_queue_t *q
return id;
}
+void pn_queue_slide(pn_queue_t *queue)
+{
+ if (queue->window >= 0) {
+ while (queue->hwm - queue->mwm > queue->window) {
+ pn_delivery_t *d = pn_queue_get(queue, queue->lwm);
+ if (d) {
+ pn_delivery_settle(d);
+ pn_queue_del(queue, d);
+ } else {
+ pn_queue_gc(queue);
+ }
+ }
+ }
+ pn_queue_gc(queue);
+}
+
int pn_queue_update(pn_queue_t *queue, pn_sequence_t id, pn_status_t status,
int flags, bool settle, bool match)
{
@@ -201,13 +215,7 @@ int pn_queue_update(pn_queue_t *queue, p
}
}
- if (queue->window >= 0) {
- while (queue->hwm - queue->mwm > queue->window) {
- pn_delivery_t *d = pn_queue_get(queue, queue->lwm);
- pn_delivery_settle(d);
- pn_queue_del(queue, d);
- }
- }
+ pn_queue_slide(queue);
return 0;
}
@@ -397,10 +405,16 @@ void pn_messenger_endpoints(pn_messenger
pn_delivery_t *d = pn_work_head(conn);
while (d) {
+ pn_link_t *link = pn_delivery_link(d);
+ if (pn_delivery_updated(d) && pn_link_is_sender(link)) {
+ pn_delivery_update(d, pn_delivery_remote_state(d));
+ }
pn_delivery_clear(d);
d = pn_work_next(d);
}
+ pn_queue_slide(&messenger->outgoing);
+
if (pn_work_head(conn)) {
return;
}
@@ -780,7 +794,7 @@ int pn_messenger_get_outgoing_window(pn_
int pn_messenger_set_outgoing_window(pn_messenger_t *messenger, int window)
{
- if (window > PN_SESSION_WINDOW) {
+ if (window >= PN_SESSION_WINDOW) {
return pn_error_format(messenger->error, PN_ARG_ERR,
"specified window (%i) exceeds max (%i)",
window, PN_SESSION_WINDOW);
@@ -797,7 +811,7 @@ int pn_messenger_get_incoming_window(pn_
int pn_messenger_set_incoming_window(pn_messenger_t *messenger, int window)
{
- if (window > PN_SESSION_WINDOW) {
+ if (window >= PN_SESSION_WINDOW) {
return pn_error_format(messenger->error, PN_ARG_ERR,
"specified window (%i) exceeds max (%i)",
window, PN_SESSION_WINDOW);
@@ -973,11 +987,7 @@ bool pn_messenger_rcvd(pn_messenger_t *m
int pn_messenger_send(pn_messenger_t *messenger)
{
- int err = pn_messenger_sync(messenger, pn_messenger_sent);
- if (err) return err;
- return pn_queue_update(&messenger->outgoing,
- messenger->outgoing.hwm - 1,
- 0, PN_CUMULATIVE, false, true);
+ return pn_messenger_sync(messenger, pn_messenger_sent);
}
int pn_messenger_recv(pn_messenger_t *messenger, int n)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]