Author: rhs
Date: Wed May 8 20:39:43 2013
New Revision: 1480445
URL: http://svn.apache.org/r1480445
Log:
PROTON-295: decoupled tracking of store entries from put/get of store entries,
fixed tracking of incoming entries to start when they are returned via get
rather than when they are read off of the wire
Modified:
qpid/proton/trunk/proton-c/src/messenger/messenger.c
qpid/proton/trunk/proton-c/src/messenger/store.c
qpid/proton/trunk/proton-c/src/messenger/store.h
qpid/proton/trunk/tests/python/proton_tests/messenger.py
Modified: qpid/proton/trunk/proton-c/src/messenger/messenger.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/messenger.c?rev=1480445&r1=1480444&r2=1480445&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/messenger.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/messenger.c Wed May 8 20:39:43
2013
@@ -1182,7 +1182,7 @@ int pn_messenger_put(pn_messenger_t *mes
if (!entry)
return pn_error_format(messenger->error, PN_ERR, "store error");
- messenger->outgoing_tracker = pn_tracker(OUTGOING, pni_entry_tracker(entry));
+ messenger->outgoing_tracker = pn_tracker(OUTGOING, pni_entry_track(entry));
pn_buffer_t *buf = pni_entry_bytes(entry);
while (true) {
@@ -1226,7 +1226,7 @@ pni_store_t *pn_tracker_store(pn_messeng
pn_status_t pn_messenger_status(pn_messenger_t *messenger, pn_tracker_t
tracker)
{
pni_store_t *store = pn_tracker_store(messenger, tracker);
- pni_entry_t *e = pni_store_track(store, pn_tracker_sequence(tracker));
+ pni_entry_t *e = pni_store_entry(store, pn_tracker_sequence(tracker));
if (e) {
return pni_entry_get_status(e);
} else {
@@ -1346,7 +1346,7 @@ int pn_messenger_get(pn_messenger_t *mes
// XXX: need to drain credit before returning EOS
if (!entry) return PN_EOS;
- messenger->incoming_tracker = pn_tracker(INCOMING, pni_entry_tracker(entry));
+ messenger->incoming_tracker = pn_tracker(INCOMING, pni_entry_track(entry));
pn_buffer_t *buf = pni_entry_bytes(entry);
pn_bytes_t bytes = pn_buffer_bytes(buf);
const char *encoded = bytes.start;
Modified: qpid/proton/trunk/proton-c/src/messenger/store.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/store.c?rev=1480445&r1=1480444&r2=1480445&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/store.c (original)
+++ qpid/proton/trunk/proton-c/src/messenger/store.c Wed May 8 20:39:43 2013
@@ -21,6 +21,7 @@
#include <proton/messenger.h>
#include <proton/engine.h>
+#include <proton/object.h>
#include <assert.h>
#ifndef __cplusplus
#include <stdbool.h>
@@ -32,20 +33,15 @@
typedef struct pni_stream_t pni_stream_t;
-typedef struct {
- size_t capacity;
- int window;
- pn_sequence_t lwm;
- pn_sequence_t hwm;
- pni_entry_t **entries;
-} pni_queue_t;
-
struct pni_store_t {
size_t size;
- pni_queue_t queue;
pni_stream_t *streams;
pni_entry_t *store_head;
pni_entry_t *store_tail;
+ int window;
+ pn_sequence_t lwm;
+ pn_sequence_t hwm;
+ pn_hash_t *tracked;
};
struct pni_stream_t {
@@ -57,7 +53,6 @@ struct pni_stream_t {
};
struct pni_entry_t {
- int refcount;
pn_sequence_t id;
pni_stream_t *stream;
bool free;
@@ -71,169 +66,15 @@ struct pni_entry_t {
void *context;
};
-static void pni_entry_incref(pni_entry_t *entry)
-{
- entry->refcount++;
-}
-
-void pni_entry_reclaim(pni_entry_t *entry)
+void pni_entry_finalize(void *object)
{
+ pni_entry_t *entry = (pni_entry_t *) object;
assert(entry->free);
pn_delivery_t *d = entry->delivery;
if (d) {
- if (!pn_delivery_local_state(d)) {
- pn_delivery_update(d, PN_ACCEPTED);
- }
pn_delivery_settle(d);
pni_entry_set_delivery(entry, NULL);
}
- free(entry);
-}
-
-static void pni_entry_decref(pni_entry_t *entry)
-{
- if (entry) {
- assert(entry->refcount > 0);
- entry->refcount--;
- if (entry->refcount == 0) {
- pni_entry_reclaim(entry);
- }
- }
-}
-
-void pni_queue_init(pni_queue_t *queue)
-{
- queue->capacity = 1024;
- queue->window = 0;
- queue->lwm = 0;
- queue->hwm = 0;
- queue->entries = (pni_entry_t **) calloc(queue->capacity, sizeof(pni_entry_t
*));
-}
-
-void pni_queue_tini(pni_queue_t *queue)
-{
- for (int i = 0; i < queue->hwm - queue->lwm; i++) {
- pni_entry_decref(queue->entries[i]);
- }
- free(queue->entries);
-}
-
-bool pni_queue_contains(pni_queue_t *queue, pn_sequence_t id)
-{
- return (id - queue->lwm >= 0) && (queue->hwm - id > 0);
-}
-
-pni_entry_t *pni_queue_get(pni_queue_t *queue, pn_sequence_t id)
-{
- if (pni_queue_contains(queue, id)) {
- size_t offset = id - queue->lwm;
- assert(offset >= 0 && offset < queue->capacity);
- return queue->entries[offset];
- } else {
- return NULL;
- }
-}
-
-void pni_queue_gc(pni_queue_t *queue)
-{
- size_t count = queue->hwm - queue->lwm;
- size_t delta = 0;
-
- while (delta < count && !queue->entries[delta]) {
- delta++;
- }
-
- memmove(queue->entries, queue->entries + delta, (count -
delta)*sizeof(pni_entry_t *));
- queue->lwm += delta;
-}
-
-void pni_queue_del(pni_queue_t *queue, pni_entry_t *entry)
-{
- pn_sequence_t id = entry->id;
- if (pni_queue_contains(queue, id)) {
- size_t offset = id - queue->lwm;
- assert(offset >= 0 && offset < queue->capacity);
- queue->entries[offset] = NULL;
- pni_entry_decref(entry);
- }
-}
-
-void pni_queue_slide(pni_queue_t *queue)
-{
- if (queue->window >= 0) {
- while (queue->hwm - queue->lwm > queue->window) {
- pni_entry_t *e = pni_queue_get(queue, queue->lwm);
- if (e) {
- pni_queue_del(queue, e);
- } else {
- pni_queue_gc(queue);
- }
- }
- }
- pni_queue_gc(queue);
-}
-
-pn_sequence_t pni_queue_add(pni_queue_t *queue, pni_entry_t *entry)
-{
- pn_sequence_t id = queue->hwm++;
- entry->id = id;
- size_t offset = id - queue->lwm;
- PN_ENSUREZ(queue->entries, queue->capacity, offset + 1, pni_entry_t *);
- assert(offset >= 0 && offset < queue->capacity);
- queue->entries[offset] = entry;
- pni_entry_incref(entry);
- pni_queue_slide(queue);
- return id;
-}
-
-int pni_queue_update(pni_queue_t *queue, pn_sequence_t id, pn_status_t status,
- int flags, bool settle, bool match)
-{
- if (!pni_queue_contains(queue, id)) {
- return 0;
- }
-
- size_t start;
- if (PN_CUMULATIVE & flags) {
- start = queue->lwm;
- } else {
- start = id;
- }
-
- for (pn_sequence_t i = start; i <= id; i++) {
- pni_entry_t *e = pni_queue_get(queue, i);
- if (e) {
- pn_delivery_t *d = e->delivery;
- if (d) {
- if (!pn_delivery_local_state(d)) {
- if (match) {
- pn_delivery_update(d, pn_delivery_remote_state(d));
- } else {
- switch (status) {
- case PN_STATUS_ACCEPTED:
- pn_delivery_update(d, PN_ACCEPTED);
- break;
- case PN_STATUS_REJECTED:
- pn_delivery_update(d, PN_REJECTED);
- break;
- default:
- break;
- }
- }
- }
- }
- if (settle) {
- if (d) {
- pn_delivery_settle(d);
- }
- pni_queue_del(queue, e);
- }
- }
- }
-
- pni_queue_gc(queue);
-
- return 0;
}
pni_store_t *pni_store()
@@ -245,7 +86,10 @@ pni_store_t *pni_store()
store->streams = NULL;
store->store_head = NULL;
store->store_tail = NULL;
- pni_queue_init(&store->queue);
+ store->window = 0;
+ store->lwm = 0;
+ store->hwm = 0;
+ store->tracked = pn_hash(0, 0.75, PN_REFCOUNT);
return store;
}
@@ -314,7 +158,7 @@ void pni_entry_free(pni_entry_t *entry)
pn_buffer_free(entry->bytes);
entry->bytes = NULL;
- pni_entry_decref(entry);
+ pn_decref(entry);
store->size--;
}
@@ -331,13 +175,13 @@ void pni_stream_free(pni_stream_t *strea
void pni_store_free(pni_store_t *store)
{
if (!store) return;
+ pn_free(store->tracked);
pni_stream_t *stream = store->streams;
while (stream) {
pni_stream_t *next = stream->next;
pni_stream_free(stream);
stream = next;
}
- pni_queue_tini(&store->queue);
free(store);
}
@@ -356,12 +200,13 @@ pni_stream_t *pni_stream_get(pni_store_t
pni_entry_t *pni_store_put(pni_store_t *store, const char *address)
{
assert(store);
+ static pn_class_t clazz = {pni_entry_finalize};
+
if (!address) address = "";
pni_stream_t *stream = pni_stream_put(store, address);
if (!stream) return NULL;
- pni_entry_t *entry = (pni_entry_t *) malloc(sizeof(pni_entry_t));
+ pni_entry_t *entry = (pni_entry_t *) pn_new(sizeof(pni_entry_t), &clazz);
if (!entry) return NULL;
- entry->refcount = 0;
entry->stream = stream;
entry->free = false;
entry->stream_next = NULL;
@@ -373,10 +218,6 @@ pni_entry_t *pni_store_put(pni_store_t *
LL_ADD(stream, stream, entry);
LL_ADD(store, store, entry);
store->size++;
-
- pni_entry_incref(entry);
- pni_queue_add(&store->queue, entry);
-
return entry;
}
@@ -443,7 +284,7 @@ void *pni_entry_get_context(pni_entry_t
static pn_status_t disp2status(pn_disposition_t disp)
{
- if (!disp) return PN_STATUS_UNKNOWN;
+ if (!disp) return PN_STATUS_PENDING;
switch (disp) {
case PN_ACCEPTED:
@@ -463,42 +304,119 @@ void pni_entry_updated(pni_entry_t *entr
assert(entry);
pn_delivery_t *d = entry->delivery;
if (d) {
- if (pn_delivery_remote_state(d))
+ if (pn_delivery_remote_state(d)) {
entry->status = disp2status(pn_delivery_remote_state(d));
- else if (pn_delivery_settled(d))
+ } else if (pn_delivery_settled(d)) {
entry->status = disp2status(pn_delivery_local_state(d));
- else
+ } else {
entry->status = PN_STATUS_PENDING;
+ }
}
}
-pn_sequence_t pni_entry_tracker(pni_entry_t *entry)
+pn_sequence_t pni_entry_id(pni_entry_t *entry)
{
assert(entry);
return entry->id;
}
-pni_entry_t *pni_store_track(pni_store_t *store, pn_sequence_t id)
+pni_entry_t *pni_store_entry(pni_store_t *store, pn_sequence_t id)
{
assert(store);
- return pni_queue_get(&store->queue, id);
+ return (pni_entry_t *) pn_hash_get(store->tracked, id);
+}
+
+bool pni_store_tracking(pni_store_t *store, pn_sequence_t id)
+{
+ return (id - store->lwm >= 0) && (store->hwm - id > 0);
+}
+
+pn_sequence_t pni_entry_track(pni_entry_t *entry)
+{
+ assert(entry);
+
+ pni_store_t *store = entry->stream->store;
+ entry->id = store->hwm++;
+ pn_hash_put(store->tracked, entry->id, entry);
+
+ if (store->window >= 0) {
+ while (store->hwm - store->lwm > store->window) {
+ pni_entry_t *e = pni_store_entry(store, store->lwm);
+ if (e) {
+ pn_hash_del(store->tracked, store->lwm);
+ }
+ store->lwm++;
+ }
+ }
+
+ return entry->id;
}
int pni_store_update(pni_store_t *store, pn_sequence_t id, pn_status_t status,
int flags, bool settle, bool match)
{
assert(store);
- return pni_queue_update(&store->queue, id, status, flags, settle, match);
+
+ if (!pni_store_tracking(store, id)) {
+ return 0;
+ }
+
+ size_t start;
+ if (PN_CUMULATIVE & flags) {
+ start = store->lwm;
+ } else {
+ start = id;
+ }
+
+ for (pn_sequence_t i = start; i <= id; i++) {
+ pni_entry_t *e = pni_store_entry(store, i);
+ if (e) {
+ pn_delivery_t *d = e->delivery;
+ if (d) {
+ if (!pn_delivery_local_state(d)) {
+ if (match) {
+ pn_delivery_update(d, pn_delivery_remote_state(d));
+ } else {
+ switch (status) {
+ case PN_STATUS_ACCEPTED:
+ pn_delivery_update(d, PN_ACCEPTED);
+ break;
+ case PN_STATUS_REJECTED:
+ pn_delivery_update(d, PN_REJECTED);
+ break;
+ default:
+ break;
+ }
+ }
+
+ pni_entry_updated(e);
+ }
+ }
+ if (settle) {
+ if (d) {
+ pn_delivery_settle(d);
+ }
+ pn_hash_del(store->tracked, e->id);
+ }
+ }
+ }
+
+ while (store->hwm - store->lwm > 0 &&
+ !pn_hash_get(store->tracked, store->lwm)) {
+ store->lwm++;
+ }
+
+ return 0;
}
int pni_store_get_window(pni_store_t *store)
{
assert(store);
- return store->queue.window;
+ return store->window;
}
void pni_store_set_window(pni_store_t *store, int window)
{
assert(store);
- store->queue.window = window;
+ store->window = window;
}
Modified: qpid/proton/trunk/proton-c/src/messenger/store.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/messenger/store.h?rev=1480445&r1=1480444&r2=1480445&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/messenger/store.h (original)
+++ qpid/proton/trunk/proton-c/src/messenger/store.h Wed May 8 20:39:43 2013
@@ -43,8 +43,8 @@ void *pni_entry_get_context(pni_entry_t
void pni_entry_updated(pni_entry_t *entry);
void pni_entry_free(pni_entry_t *entry);
-pn_sequence_t pni_entry_tracker(pni_entry_t *entry);
-pni_entry_t *pni_store_track(pni_store_t *store, pn_sequence_t id);
+pn_sequence_t pni_entry_track(pni_entry_t *entry);
+pni_entry_t *pni_store_entry(pni_store_t *store, pn_sequence_t id);
int pni_store_update(pni_store_t *store, pn_sequence_t id, pn_status_t status,
int flags, bool settle, bool match);
int pni_store_get_window(pni_store_t *store);
Modified: qpid/proton/trunk/tests/python/proton_tests/messenger.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/python/proton_tests/messenger.py?rev=1480445&r1=1480444&r2=1480445&view=diff
==============================================================================
--- qpid/proton/trunk/tests/python/proton_tests/messenger.py (original)
+++ qpid/proton/trunk/tests/python/proton_tests/messenger.py Wed May 8
20:39:43 2013
@@ -265,6 +265,39 @@ class MessengerTest(Test):
for t in trackers:
assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
+ def testIncomingQueueBiggerThanWindow(self):
+ self.server.outgoing_window = 10
+ self.client.incoming_window = 10
+ self.start()
+
+ msg = Message()
+ msg.address = "amqp://0.0.0.0:12345"
+ msg.subject = "Hello World!"
+
+ for i in range(20):
+ self.client.put(msg)
+
+ while self.client.incoming < 20:
+ self.client.recv(20 - self.client.incoming)
+
+ trackers = []
+ while self.client.incoming:
+ t = self.client.get(msg)
+ assert self.client.status(t) is PENDING, (t, self.client.status(t))
+ trackers.append(t)
+
+ for t in trackers[:10]:
+ assert self.client.status(t) is None, (t, self.client.status(t))
+ for t in trackers[10:]:
+ assert self.client.status(t) is PENDING, (t, self.client.status(t))
+
+ self.client.accept()
+
+ for t in trackers[:10]:
+ assert self.client.status(t) is None, (t, self.client.status(t))
+ for t in trackers[10:]:
+ assert self.client.status(t) is ACCEPTED, (t, self.client.status(t))
+
def test_proton222(self):
self.start()
msg = Message()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]