Hi all,

attached patch implements extra sequence in the gw-prioqueue that will
guarantee the order of the equal messages. It should fix #436.

Any objections?

-- 
Thanks,
Alex
=== gwlib/gw-prioqueue.c
==================================================================
--- gwlib/gw-prioqueue.c	(revision 892)
+++ gwlib/gw-prioqueue.c	(local)
@@ -55,11 +55,11 @@
  */ 
 
 /*
- * gw-prioqueue.c - generic priority queue.
+ * gw-prioqueue.c - generic priority queue with guaranteed order.
  *
  * Algorithm ala Robert Sedgewick.
  *
- * Alexander Malysh <olek2002 at hotmail.com>, 2004
+ * Alexander Malysh <amalysh at kannel.org>, 2004, 2008
  */
 
 #include "gw-config.h"
@@ -71,12 +71,18 @@
 #include "gw-prioqueue.h"
 
 
+struct element {
+    void *item;
+    long long seq;
+};
+
 struct gw_prioqueue {
     Mutex *mutex;
-    void **tab;
+    struct element **tab;
     size_t size;
     long len;
     long producers;
+    long long seq;
     pthread_cond_t nonempty;
     int (*cmp)(const void*, const void *);
 };
@@ -97,7 +103,7 @@
 static void make_bigger(gw_prioqueue_t *queue, long items)
 {
     size_t size = queue->size;
-    size_t new_size = sizeof(void*) * (queue->len + items);
+    size_t new_size = sizeof(*queue->tab) * (queue->len + items);
     
     if (size >= new_size)
         return;
@@ -107,6 +113,23 @@
 }
 
 
+static int compare(struct element *a, struct element *b, int(*cmp)(const void*, const void *))
+{
+    int rc;
+
+    rc = cmp(a->item, b->item);
+    if (rc == 0) {
+        /* check sequence to guarantee order */
+        if (a->seq < b->seq)
+            rc = 1;
+        else if (a->seq > b->seq)
+            rc = -1;
+    }
+
+    return rc;
+}
+
+
 /**
  * Heapize up
  * @queue - our prioqueue
@@ -114,8 +137,8 @@
  */
 static void upheap(gw_prioqueue_t *queue, register long index)
 {
-    void *v = queue->tab[index];
-    while (queue->tab[index / 2] != NULL && queue->cmp(queue->tab[index / 2], v) <= 0) {
+    struct element *v = queue->tab[index];
+    while (queue->tab[index / 2]->item != NULL && compare(queue->tab[index / 2], v, queue->cmp) < 0) {
         queue->tab[index] = queue->tab[index / 2];
         index /= 2;
     }
@@ -130,16 +153,16 @@
  */
 static void downheap(gw_prioqueue_t *queue, register long index)
 {
-    void *v = queue->tab[index];
+    struct element *v = queue->tab[index];
     register long j;
     
     while (index <= queue->len / 2) {
         j = 2 * index;
         /* take the biggest child item */
-        if (j < queue->len && queue->cmp(queue->tab[j], queue->tab[j + 1]) < 0)
+        if (j < queue->len && compare(queue->tab[j], queue->tab[j + 1], queue->cmp) < 0)
             j++;
         /* break if our item bigger */
-        if (queue->cmp(v, queue->tab[j]) >= 0)
+        if (compare(v, queue->tab[j], queue->cmp) >= 0)
             break;
         queue->tab[index] = queue->tab[j];
         index = j;
@@ -161,11 +184,14 @@
     ret->tab = NULL;
     ret->size = 0;
     ret->len = 0;
+    ret->seq = 0;
     ret->cmp = cmp;
     
     /* put NULL item at pos 0 that is our stop marker */
     make_bigger(ret, 1);
-    ret->tab[0] = NULL;
+    ret->tab[0] = gw_malloc(sizeof(**ret->tab));
+    ret->tab[0]->item = NULL;
+    ret->tab[0]->seq = ret->seq++;
     ret->len++;
     
     return ret;
@@ -174,13 +200,15 @@
 
 void gw_prioqueue_destroy(gw_prioqueue_t *queue, void(*item_destroy)(void*))
 {
+    long i;
+
     if (queue == NULL)
         return;
     
-    if (item_destroy != NULL) {
-        void *item;
-        while((item = gw_prioqueue_remove(queue)) != NULL)
-            item_destroy(item);
+    for (i = 0; i < queue->len; i++) {
+        if (item_destroy != NULL && queue->tab[i]->item != NULL)
+            item_destroy(queue->tab[i]->item);
+        gw_free(queue->tab[i]);
     }
     mutex_destroy(queue->mutex);
     pthread_cond_destroy(&queue->nonempty);
@@ -211,7 +239,9 @@
     
     queue_lock(queue);
     make_bigger(queue, 1);
-    queue->tab[queue->len] = item;
+    queue->tab[queue->len] = gw_malloc(sizeof(**queue->tab));
+    queue->tab[queue->len]->item = item;
+    queue->tab[queue->len]->seq = queue->seq++;
     upheap(queue, queue->len);
     queue->len++;
     pthread_cond_signal(&queue->nonempty);
@@ -227,7 +257,7 @@
     
     queue_lock(queue);
     for (i = 1; i < queue->len; i++)
-        fn(queue->tab[i], i - 1);
+        fn(queue->tab[i]->item, i - 1);
     queue_unlock(queue);
 }
 
@@ -243,7 +273,8 @@
         queue_unlock(queue);
         return NULL;
     }
-    ret = queue->tab[1];
+    ret = queue->tab[1]->item;
+    gw_free(queue->tab[1]);
     queue->tab[1] = queue->tab[--queue->len];
     downheap(queue, 1);
     queue_unlock(queue);
@@ -260,7 +291,7 @@
     
     queue_lock(queue);
     if (queue->len > 1)
-        ret = queue->tab[1];
+        ret = queue->tab[1]->item;
     else
         ret = NULL;
     queue_unlock(queue);
@@ -282,7 +313,8 @@
         queue->mutex->owner = gwthread_self();
     }
     if (queue->len > 1) {
-        ret = queue->tab[1];
+        ret = queue->tab[1]->item;
+        gw_free(queue->tab[1]);
         queue->tab[1] = queue->tab[--queue->len];
         downheap(queue, 1);
     } else {

Reply via email to