Author: rhs
Date: Wed May 29 13:44:43 2013
New Revision: 1487485
URL: http://svn.apache.org/r1487485
Log:
replaced delivery buffer data structure with delivery map
Modified:
qpid/proton/trunk/proton-c/src/engine/engine-internal.h
qpid/proton/trunk/proton-c/src/engine/engine.c
Modified: qpid/proton/trunk/proton-c/src/engine/engine-internal.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine-internal.h?rev=1487485&r1=1487484&r2=1487485&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine-internal.h (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine-internal.h Wed May 29 13:44:43
2013
@@ -56,18 +56,16 @@ struct pn_endpoint_t {
};
typedef struct {
- pn_delivery_t *delivery;
pn_sequence_t id;
bool sent;
+ bool init;
} pn_delivery_state_t;
typedef struct {
- pn_sequence_t next;
size_t capacity;
- size_t head;
- size_t size;
- pn_delivery_state_t *deliveries;
-} pn_delivery_buffer_t;
+ pn_sequence_t next;
+ pn_hash_t *deliveries;
+} pn_delivery_map_t;
typedef struct {
pn_link_t *link;
@@ -84,8 +82,8 @@ typedef struct {
uint16_t local_channel;
uint16_t remote_channel;
bool incoming_init;
- pn_delivery_buffer_t incoming;
- pn_delivery_buffer_t outgoing;
+ pn_delivery_map_t incoming;
+ pn_delivery_map_t outgoing;
pn_sequence_t incoming_transfer_count;
pn_sequence_t incoming_window;
pn_sequence_t outgoing_transfer_count;
@@ -258,8 +256,8 @@ struct pn_delivery_t {
bool tpwork;
pn_buffer_t *bytes;
bool done;
- void *transport_context;
void *context;
+ pn_delivery_state_t state;
};
#define PN_SET_LOCAL(OLD, NEW) \
Modified: qpid/proton/trunk/proton-c/src/engine/engine.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/engine/engine.c?rev=1487485&r1=1487484&r2=1487485&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/engine/engine.c (original)
+++ qpid/proton/trunk/proton-c/src/engine/engine.c Wed May 29 13:44:43 2013
@@ -37,92 +37,60 @@ static ssize_t transport_consume(pn_tran
// delivery buffers
-void pn_delivery_buffer_init(pn_delivery_buffer_t *db, pn_sequence_t next,
size_t capacity)
+void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next, size_t
capacity)
{
- // XXX: error handling
- db->deliveries = (pn_delivery_state_t *) malloc(sizeof(pn_delivery_state_t)
* capacity);
- db->next = next;
+ db->deliveries = pn_hash(capacity, 0.75, PN_REFCOUNT);
db->capacity = capacity;
- db->head = 0;
- db->size = 0;
-}
-
-void pn_delivery_buffer_free(pn_delivery_buffer_t *db)
-{
- free(db->deliveries);
-}
-
-size_t pn_delivery_buffer_size(pn_delivery_buffer_t *db)
-{
- return db->size;
-}
-
-size_t pn_delivery_buffer_available(pn_delivery_buffer_t *db)
-{
- return db->capacity - db->size;
-}
-
-bool pn_delivery_buffer_empty(pn_delivery_buffer_t *db)
-{
- return db->size == 0;
-}
-
-pn_delivery_state_t *pn_delivery_buffer_get(pn_delivery_buffer_t *db, size_t
index)
-{
- if (index < db->size) return db->deliveries + ((db->head + index) %
db->capacity);
- else return NULL;
+ db->next = next;
}
-pn_delivery_state_t *pn_delivery_buffer_head(pn_delivery_buffer_t *db)
+void pn_delivery_map_free(pn_delivery_map_t *db)
{
- if (db->size) return db->deliveries + db->head;
- else return NULL;
+ pn_free(db->deliveries);
}
-pn_delivery_state_t *pn_delivery_buffer_tail(pn_delivery_buffer_t *db)
+size_t pn_delivery_map_available(pn_delivery_map_t *db)
{
- if (db->size) return pn_delivery_buffer_get(db, db->size - 1);
- else return NULL;
+ return db->capacity - pn_hash_size(db->deliveries);
}
-pn_sequence_t pn_delivery_buffer_lwm(pn_delivery_buffer_t *db)
+pn_delivery_t *pn_delivery_map_get(pn_delivery_map_t *db, pn_sequence_t id)
{
- if (db->size) return pn_delivery_buffer_head(db)->id;
- else return db->next;
+ return (pn_delivery_t *) pn_hash_get(db->deliveries, id);
}
static void pn_delivery_state_init(pn_delivery_state_t *ds, pn_delivery_t
*delivery, pn_sequence_t id)
{
- ds->delivery = delivery;
ds->id = id;
ds->sent = false;
+ ds->init = true;
}
-pn_delivery_state_t *pn_delivery_buffer_push(pn_delivery_buffer_t *db,
pn_delivery_t *delivery)
+pn_delivery_state_t *pn_delivery_map_push(pn_delivery_map_t *db, pn_delivery_t
*delivery)
{
- if (!pn_delivery_buffer_available(db))
+ if (!pn_delivery_map_available(db))
return NULL;
- db->size++;
- pn_delivery_state_t *ds = pn_delivery_buffer_tail(db);
+ pn_delivery_state_t *ds = &delivery->state;
pn_delivery_state_init(ds, delivery, db->next++);
+ pn_hash_put(db->deliveries, ds->id, delivery);
return ds;
}
-bool pn_delivery_buffer_pop(pn_delivery_buffer_t *db)
+void pn_delivery_map_del(pn_delivery_map_t *db, pn_delivery_t *delivery)
{
- if (db->size) {
- db->head = (db->head + 1) % db->capacity;
- db->size--;
- return true;
- } else {
- return false;
- }
+ pn_hash_del(db->deliveries, delivery->state.id);
+ delivery->state.init = false;
}
-void pn_delivery_buffer_gc(pn_delivery_buffer_t *db)
+void pn_delivery_map_clear(pn_delivery_map_t *dm)
{
- while (db->size && !pn_delivery_buffer_head(db)->delivery) {
- pn_delivery_buffer_pop(db);
+ pn_hash_t *hash = dm->deliveries;
+ for (pn_handle_t entry = pn_hash_head(hash);
+ entry;
+ entry = pn_hash_next(hash, entry))
+ {
+ pn_delivery_t *dlv = (pn_delivery_t *) pn_hash_value(hash, entry);
+ pn_delivery_map_del(dm, dlv);
}
}
@@ -642,8 +610,8 @@ static void pn_session_finalize(void *ob
pn_free(session->links);
pn_endpoint_tini(&session->endpoint);
- pn_delivery_buffer_free(&session->state.incoming);
- pn_delivery_buffer_free(&session->state.outgoing);
+ pn_delivery_map_free(&session->state.incoming);
+ pn_delivery_map_free(&session->state.outgoing);
pn_free(session->state.local_handles);
pn_free(session->state.remote_handles);
}
@@ -662,9 +630,12 @@ pn_session_t *pn_session(pn_connection_t
ssn->context = 0;
// begin transport state
- ssn->state = (pn_session_state_t) {ssn, (uint16_t)-1, (uint16_t)-1};
- pn_delivery_buffer_init(&ssn->state.incoming, 0, PN_SESSION_WINDOW);
- pn_delivery_buffer_init(&ssn->state.outgoing, 0, PN_SESSION_WINDOW);
+ memset(&ssn->state, 0, sizeof(ssn->state));
+ ssn->state.session = ssn;
+ ssn->state.local_channel = (uint16_t)-1;
+ ssn->state.remote_channel = (uint16_t)-1;
+ pn_delivery_map_init(&ssn->state.incoming, 0, PN_SESSION_WINDOW);
+ pn_delivery_map_init(&ssn->state.outgoing, 0, PN_SESSION_WINDOW);
ssn->state.local_handles = pn_hash(0, 0.75, PN_REFCOUNT);
ssn->state.remote_handles = pn_hash(0, 0.75, PN_REFCOUNT);
// end transport state
@@ -838,21 +809,11 @@ int pn_transport_unbind(pn_transport_t *
transport->connection = NULL;
conn->transport = NULL;
- pn_link_t *link = pn_link_head(conn, 0);
- while (link) {
- pn_delivery_t *dlv = link->unsettled_head;
- while (dlv) {
- dlv->transport_context = NULL;
- dlv = dlv->unsettled_next;
- }
-
- dlv = link->settled_head;
- while (dlv) {
- dlv->transport_context = NULL;
- dlv = dlv->settled_next;
- }
-
- link = pn_link_next(link, 0);
+ pn_session_t *ssn = pn_session_head(conn, 0);
+ while (ssn) {
+ pn_delivery_map_clear(&ssn->state.incoming);
+ pn_delivery_map_clear(&ssn->state.outgoing);
+ ssn = pn_session_next(ssn, 0);
}
pn_endpoint_t *endpoint = conn->endpoint_head;
@@ -1190,9 +1151,12 @@ pn_delivery_t *pn_delivery(pn_link_t *li
delivery->tpwork = false;
pn_buffer_clear(delivery->bytes);
delivery->done = false;
- delivery->transport_context = NULL;
delivery->context = NULL;
+ // begin delivery state
+ delivery->state.init = false;
+ // end delivery state
+
if (!link->current)
link->current = delivery;
@@ -1332,14 +1296,13 @@ void pn_real_settle(pn_delivery_t *deliv
delivery->settled = true;
}
-void pn_full_settle(pn_delivery_buffer_t *db, pn_delivery_t *delivery)
+void pn_full_settle(pn_delivery_map_t *db, pn_delivery_t *delivery)
{
assert(!delivery->work);
- pn_delivery_state_t *state = (pn_delivery_state_t *)
delivery->transport_context;
- delivery->transport_context = NULL;
- if (state) state->delivery = NULL;
+ if (delivery->state.init) {
+ pn_delivery_map_del(db, delivery);
+ }
pn_real_settle(delivery);
- if (state) pn_delivery_buffer_gc(db);
pn_clear_tpwork(delivery);
}
@@ -1615,9 +1578,9 @@ int pn_do_transfer(pn_dispatcher_t *disp
if (link->unsettled_tail && !link->unsettled_tail->done) {
delivery = link->unsettled_tail;
} else {
- pn_delivery_buffer_t *incoming = &ssn_state->incoming;
+ pn_delivery_map_t *incoming = &ssn_state->incoming;
- if (!pn_delivery_buffer_available(incoming)) {
+ if (!pn_delivery_map_available(incoming)) {
return pn_do_error(transport, "amqp:session:window-violation", "incoming
session window exceeded");
}
@@ -1627,8 +1590,7 @@ int pn_do_transfer(pn_dispatcher_t *disp
}
delivery = pn_delivery(link, pn_dtag(tag.start, tag.size));
- pn_delivery_state_t *state = pn_delivery_buffer_push(incoming, delivery);
- delivery->transport_context = state;
+ pn_delivery_state_t *state = pn_delivery_map_push(incoming, delivery);
if (id_present && id != state->id) {
int err = pn_do_error(transport, "amqp:session:invalid-field",
"sequencing error, expected delivery-id %u, got
%u",
@@ -1736,19 +1698,15 @@ int pn_do_disposition(pn_dispatcher_t *d
}
}
- pn_delivery_buffer_t *deliveries;
+ pn_delivery_map_t *deliveries;
if (role) {
deliveries = &ssn_state->outgoing;
} else {
deliveries = &ssn_state->incoming;
}
- pn_sequence_t lwm = pn_delivery_buffer_lwm(deliveries);
-
for (pn_sequence_t id = first; id <= last; id++) {
- if (id < lwm) continue;
- pn_delivery_state_t *state = pn_delivery_buffer_get(deliveries, id - lwm);
- pn_delivery_t *delivery = state->delivery;
+ pn_delivery_t *delivery = pn_delivery_map_get(deliveries, id);
if (delivery) {
delivery->remote_state = dispo;
delivery->remote_settled = settled;
@@ -1993,8 +1951,8 @@ bool pn_delivery_buffered(pn_delivery_t
{
if (delivery->settled) return false;
if (pn_link_is_sender(delivery->link)) {
- pn_delivery_state_t *state = (pn_delivery_state_t *)
delivery->transport_context;
- if (state) {
+ pn_delivery_state_t *state = &delivery->state;
+ if (state->init) {
return (delivery->done && !state->sent) ||
pn_buffer_size(delivery->bytes) > 0;
} else {
return delivery->done;
@@ -2051,8 +2009,8 @@ int pn_process_ssn_setup(pn_transport_t
pn_post_frame(transport->disp, channel, "DL[?HIII]", BEGIN,
((int16_t) state->remote_channel >= 0),
state->remote_channel,
state->outgoing_transfer_count,
- pn_delivery_buffer_available(&state->incoming),
- pn_delivery_buffer_available(&state->outgoing));
+ pn_delivery_map_available(&state->incoming),
+ pn_delivery_map_available(&state->outgoing));
state->local_channel = channel;
pn_hash_put(transport->local_channels, channel, ssn);
}
@@ -2123,13 +2081,13 @@ int pn_process_link_setup(pn_transport_t
int pn_post_flow(pn_transport_t *transport, pn_session_state_t *ssn_state,
pn_link_state_t *state)
{
- ssn_state->incoming_window =
pn_delivery_buffer_available(&ssn_state->incoming);
+ ssn_state->incoming_window = pn_delivery_map_available(&ssn_state->incoming);
bool link = (bool) state;
return pn_post_frame(transport->disp, ssn_state->local_channel,
"DL[?IIII?I?I?In?o]", FLOW,
(int16_t) ssn_state->remote_channel >= 0,
ssn_state->incoming_transfer_count,
ssn_state->incoming_window,
ssn_state->outgoing.next,
- pn_delivery_buffer_available(&ssn_state->outgoing),
+ pn_delivery_map_available(&ssn_state->outgoing),
link, state ? state->local_handle : 0,
link, state ? state->delivery_count : 0,
link, state ? state->link_credit : 0,
@@ -2178,8 +2136,8 @@ int pn_post_disp(pn_transport_t *transpo
pn_link_t *link = delivery->link;
pn_session_state_t *ssn_state = pn_session_get_state(transport,
link->session);
pn_modified(transport->connection, &link->session->endpoint);
- // XXX: check for null state
- pn_delivery_state_t *state = (pn_delivery_state_t *)
delivery->transport_context;
+ pn_delivery_state_t *state = &delivery->state;
+ assert(state->init);
uint64_t code;
switch(delivery->local_state) {
case PN_ACCEPTED:
@@ -2233,10 +2191,9 @@ int pn_process_tpwork_sender(pn_transpor
pn_session_state_t *ssn_state = pn_session_get_state(transport,
link->session);
pn_link_state_t *link_state = pn_link_get_state(ssn_state, link);
if ((int16_t) ssn_state->local_channel >= 0 && (int32_t)
link_state->local_handle >= 0) {
- pn_delivery_state_t *state = (pn_delivery_state_t *)
delivery->transport_context;
- if (!(*allocation_blocked) && !state &&
pn_delivery_buffer_available(&ssn_state->outgoing)) {
- state = pn_delivery_buffer_push(&ssn_state->outgoing, delivery);
- delivery->transport_context = state;
+ pn_delivery_state_t *state = delivery->state.init ? &delivery->state :
NULL;
+ if (!(*allocation_blocked) && !state &&
pn_delivery_map_available(&ssn_state->outgoing)) {
+ state = pn_delivery_map_push(&ssn_state->outgoing, delivery);
} else {
*allocation_blocked = true;
}
@@ -2266,7 +2223,7 @@ int pn_process_tpwork_sender(pn_transpor
}
}
- pn_delivery_state_t *state = (pn_delivery_state_t *)
delivery->transport_context;
+ pn_delivery_state_t *state = delivery->state.init ? &delivery->state : NULL;
// XXX: need to prevent duplicate disposition sending
if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled
&& state && state->sent) {
@@ -2286,16 +2243,16 @@ int pn_process_tpwork_receiver(pn_transp
pn_link_t *link = delivery->link;
// XXX: need to prevent duplicate disposition sending
pn_session_state_t *ssn_state = pn_session_get_state(transport,
link->session);
- if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled &&
delivery->transport_context) {
+ if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote_settled &&
delivery->state.init) {
int err = pn_post_disp(transport, delivery);
if (err) return err;
}
if (delivery->local_settled) {
- size_t available = pn_delivery_buffer_available(&ssn_state->incoming);
+ size_t available = pn_delivery_map_available(&ssn_state->incoming);
pn_full_settle(&ssn_state->incoming, delivery);
if (!ssn_state->incoming_window &&
- pn_delivery_buffer_available(&ssn_state->incoming) > available) {
+ pn_delivery_map_available(&ssn_state->incoming) > available) {
int err = pn_post_flow(transport, ssn_state, NULL);
if (err) return err;
}
@@ -2313,7 +2270,7 @@ int pn_process_tpwork(pn_transport_t *tr
bool allocation_blocked = false;
while (delivery)
{
- if (!delivery->transport_context && transport->disp->available > 0) {
+ if (!delivery->state.init && transport->disp->available > 0) {
break;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]