Hi all,
attached patch implements extra sequence in the gw-prioqueue that will
guarantee the order of the equal messages. It should fix #436.
Any objections?
--
Thanks,
Alex
=== gwlib/gw-prioqueue.c
==================================================================
--- gwlib/gw-prioqueue.c (revision 892)
+++ gwlib/gw-prioqueue.c (local)
@@ -55,11 +55,11 @@
*/
/*
- * gw-prioqueue.c - generic priority queue.
+ * gw-prioqueue.c - generic priority queue with guaranteed order.
*
* Algorithm ala Robert Sedgewick.
*
- * Alexander Malysh <olek2002 at hotmail.com>, 2004
+ * Alexander Malysh <amalysh at kannel.org>, 2004, 2008
*/
#include "gw-config.h"
@@ -71,12 +71,18 @@
#include "gw-prioqueue.h"
+struct element {
+ void *item;
+ long long seq;
+};
+
struct gw_prioqueue {
Mutex *mutex;
- void **tab;
+ struct element **tab;
size_t size;
long len;
long producers;
+ long long seq;
pthread_cond_t nonempty;
int (*cmp)(const void*, const void *);
};
@@ -97,7 +103,7 @@
static void make_bigger(gw_prioqueue_t *queue, long items)
{
size_t size = queue->size;
- size_t new_size = sizeof(void*) * (queue->len + items);
+ size_t new_size = sizeof(*queue->tab) * (queue->len + items);
if (size >= new_size)
return;
@@ -107,6 +113,23 @@
}
+static int compare(struct element *a, struct element *b, int(*cmp)(const void*, const void *))
+{
+ int rc;
+
+ rc = cmp(a->item, b->item);
+ if (rc == 0) {
+ /* check sequence to guarantee order */
+ if (a->seq < b->seq)
+ rc = 1;
+ else if (a->seq > b->seq)
+ rc = -1;
+ }
+
+ return rc;
+}
+
+
/**
* Heapize up
* @queue - our prioqueue
@@ -114,8 +137,8 @@
*/
static void upheap(gw_prioqueue_t *queue, register long index)
{
- void *v = queue->tab[index];
- while (queue->tab[index / 2] != NULL && queue->cmp(queue->tab[index / 2], v) <= 0) {
+ struct element *v = queue->tab[index];
+ while (queue->tab[index / 2]->item != NULL && compare(queue->tab[index / 2], v, queue->cmp) < 0) {
queue->tab[index] = queue->tab[index / 2];
index /= 2;
}
@@ -130,16 +153,16 @@
*/
static void downheap(gw_prioqueue_t *queue, register long index)
{
- void *v = queue->tab[index];
+ struct element *v = queue->tab[index];
register long j;
while (index <= queue->len / 2) {
j = 2 * index;
/* take the biggest child item */
- if (j < queue->len && queue->cmp(queue->tab[j], queue->tab[j + 1]) < 0)
+ if (j < queue->len && compare(queue->tab[j], queue->tab[j + 1], queue->cmp) < 0)
j++;
/* break if our item bigger */
- if (queue->cmp(v, queue->tab[j]) >= 0)
+ if (compare(v, queue->tab[j], queue->cmp) >= 0)
break;
queue->tab[index] = queue->tab[j];
index = j;
@@ -161,11 +184,14 @@
ret->tab = NULL;
ret->size = 0;
ret->len = 0;
+ ret->seq = 0;
ret->cmp = cmp;
/* put NULL item at pos 0 that is our stop marker */
make_bigger(ret, 1);
- ret->tab[0] = NULL;
+ ret->tab[0] = gw_malloc(sizeof(**ret->tab));
+ ret->tab[0]->item = NULL;
+ ret->tab[0]->seq = ret->seq++;
ret->len++;
return ret;
@@ -174,13 +200,15 @@
void gw_prioqueue_destroy(gw_prioqueue_t *queue, void(*item_destroy)(void*))
{
+ long i;
+
if (queue == NULL)
return;
- if (item_destroy != NULL) {
- void *item;
- while((item = gw_prioqueue_remove(queue)) != NULL)
- item_destroy(item);
+ for (i = 0; i < queue->len; i++) {
+ if (item_destroy != NULL && queue->tab[i]->item != NULL)
+ item_destroy(queue->tab[i]->item);
+ gw_free(queue->tab[i]);
}
mutex_destroy(queue->mutex);
pthread_cond_destroy(&queue->nonempty);
@@ -211,7 +239,9 @@
queue_lock(queue);
make_bigger(queue, 1);
- queue->tab[queue->len] = item;
+ queue->tab[queue->len] = gw_malloc(sizeof(**queue->tab));
+ queue->tab[queue->len]->item = item;
+ queue->tab[queue->len]->seq = queue->seq++;
upheap(queue, queue->len);
queue->len++;
pthread_cond_signal(&queue->nonempty);
@@ -227,7 +257,7 @@
queue_lock(queue);
for (i = 1; i < queue->len; i++)
- fn(queue->tab[i], i - 1);
+ fn(queue->tab[i]->item, i - 1);
queue_unlock(queue);
}
@@ -243,7 +273,8 @@
queue_unlock(queue);
return NULL;
}
- ret = queue->tab[1];
+ ret = queue->tab[1]->item;
+ gw_free(queue->tab[1]);
queue->tab[1] = queue->tab[--queue->len];
downheap(queue, 1);
queue_unlock(queue);
@@ -260,7 +291,7 @@
queue_lock(queue);
if (queue->len > 1)
- ret = queue->tab[1];
+ ret = queue->tab[1]->item;
else
ret = NULL;
queue_unlock(queue);
@@ -282,7 +313,8 @@
queue->mutex->owner = gwthread_self();
}
if (queue->len > 1) {
- ret = queue->tab[1];
+ ret = queue->tab[1]->item;
+ gw_free(queue->tab[1]);
queue->tab[1] = queue->tab[--queue->len];
downheap(queue, 1);
} else {