Changeset: c93bf17c3d61 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/c93bf17c3d61
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: Sep2022
Log Message:

Reimplemented the dataflow queue.
The new queue uses a linked list of dataflow events instead of an array
with pointers.  The problem with the array was that in certain
circumstances it needed to be extended (realloc) which could fail.
Recovering from that failure is unpleasant.  The linked list
implementation cannot fail.


diffs (215 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -51,13 +51,12 @@ typedef struct FLOWEVENT {
        lng hotclaim;   /* memory foot print of result variables */
        lng argclaim;   /* memory foot print of arguments */
        lng maxclaim;   /* memory foot print of  largest argument, counld be 
used to indicate result size */
+       struct FLOWEVENT *next;         /* linked list for queues */
 } *FlowEvent, FlowEventRec;
 
 typedef struct queue {
-       int size;       /* size of queue */
-       int last;       /* last element in the queue */
        int exitcount;  /* how many threads should exit */
-       FlowEvent *data;
+       FlowEvent first, last;          /* first and last element of the queue 
*/
        MT_Lock l;      /* it's a shared resource, ie we need locks */
        MT_Sema s;      /* threads wait on empty queues */
 } Queue;
@@ -111,7 +110,6 @@ mal_dataflow_reset(void)
        idle_workers = -1;
        exited_workers = -1;
        if( todo) {
-               GDKfree(todo->data);
                MT_lock_destroy(&todo->l);
                MT_sema_destroy(&todo->s);
                GDKfree(todo);
@@ -141,21 +139,12 @@ DFLOWgraphSize(MalBlkPtr mb, int start, 
  */
 
 static Queue*
-q_create(int sz, const char *name)
+q_create(const char *name)
 {
-       Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
+       Queue *q = GDKzalloc(sizeof(Queue));
 
        if (q == NULL)
                return NULL;
-       *q = (Queue) {
-               .size = ((sz << 1) >> 1), /* we want a multiple of 2 */
-       };
-       q->data = (FlowEvent*) GDKmalloc(sizeof(FlowEvent) * q->size);
-       if (q->data == NULL) {
-               GDKfree(q);
-               return NULL;
-       }
-
        MT_lock_init(&q->l, name);
        MT_sema_init(&q->s, 0, name);
        return q;
@@ -167,31 +156,26 @@ q_destroy(Queue *q)
        assert(q);
        MT_lock_destroy(&q->l);
        MT_sema_destroy(&q->s);
-       GDKfree(q->data);
        GDKfree(q);
 }
 
 /* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue 
is possible */
 /* we might actually sort it for better scheduling behavior */
 static void
-q_enqueue_(Queue *q, FlowEvent d)
-{
-       assert(q);
-       assert(d);
-       if (q->last == q->size) {
-               q->size <<= 1;
-               q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * 
q->size);
-               assert(q->data);
-       }
-       q->data[q->last++] = d;
-}
-static void
 q_enqueue(Queue *q, FlowEvent d)
 {
        assert(q);
        assert(d);
        MT_lock_set(&q->l);
-       q_enqueue_(q, d);
+       if (q->first == NULL) {
+               assert(q->last == NULL);
+               q->first = q->last = d;
+       } else {
+               assert(q->last != NULL);
+               q->last->next = d;
+               q->last = d;
+       }
+       d->next = NULL;
        MT_lock_unset(&q->l);
        MT_sema_up(&q->s);
 }
@@ -203,30 +187,20 @@ q_enqueue(Queue *q, FlowEvent d)
  */
 
 static void
-q_requeue_(Queue *q, FlowEvent d)
-{
-       int i;
-
-       assert(q);
-       assert(d);
-       if (q->last == q->size) {
-               /* enlarge buffer */
-               q->size <<= 1;
-               q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * 
q->size);
-               assert(q->data);
-       }
-       for (i = q->last; i > 0; i--)
-               q->data[i] = q->data[i - 1];
-       q->data[0] = d;
-       q->last++;
-}
-static void
 q_requeue(Queue *q, FlowEvent d)
 {
        assert(q);
        assert(d);
        MT_lock_set(&q->l);
-       q_requeue_(q, d);
+       if (q->first == NULL) {
+               assert(q->last == NULL);
+               q->first = q->last = d;
+               d->next = NULL;
+       } else {
+               assert(q->last != NULL);
+               d->next = q->first;
+               q->first = d;
+       }
        MT_lock_unset(&q->l);
        MT_sema_up(&q->s);
 }
@@ -234,46 +208,37 @@ q_requeue(Queue *q, FlowEvent d)
 static FlowEvent
 q_dequeue(Queue *q, Client cntxt)
 {
-       FlowEvent r = NULL, s = NULL;
-       //int i;
-
        assert(q);
        MT_sema_down(&q->s);
        if (ATOMIC_GET(&exiting))
                return NULL;
        MT_lock_set(&q->l);
-       if( cntxt == NULL && q->exitcount > 0){
+       if (cntxt == NULL && q->exitcount > 0) {
                q->exitcount--;
                MT_lock_unset(&q->l);
                return NULL;
        }
-       {
-               int i, minpc;
 
-               minpc = q->last -1;
-               s = q->data[minpc];
-               /* for long "queues", just grab the first eligible entry we 
encounter */
-               if (q->last < 1024) {
-                       for (i = q->last - 1; i >= 0; i--) {
-                               if ( cntxt ==  NULL || q->data[i]->flow->cntxt 
== cntxt) {
-                                       /* for shorter "queues", find the 
oldest eligible entry */
-                                       r = q->data[i];
-                                       if (s && r && s->pc > r->pc) {
-                                               minpc = i;
-                                               s = r;
-                                       }
-                               }
-                       }
-               }
-               if (minpc >= 0) {
-                       r = q->data[minpc];
-                       i = minpc;
-                       q->last--;
-                       memmove(q->data + i, q->data + i + 1, (q->last - i) * 
sizeof(q->data[0]));
+       FlowEvent *dp = &q->first;
+       FlowEvent pd = NULL;
+       /* if cntxt == NULL, return the first event, if cntxt != NULL, find
+        * the first event in the queue with matching cntxt value and return
+        * that */
+       if (cntxt != NULL) {
+               while (*dp && (*dp)->flow->cntxt != cntxt) {
+                       pd = *dp;
+                       dp = &pd->next;
                }
        }
+       FlowEvent d = *dp;
+       if (d) {
+               *dp = d->next;
+               d->next = NULL;
+               if (*dp == NULL)
+                       q->last = pd;
+       }
        MT_lock_unset(&q->l);
-       return r;
+       return d;
 }
 
 /*
@@ -503,7 +468,7 @@ DFLOWinitialize(void)
                return 0;
        }
        free_max = GDKgetenv_int("dataflow_max_free", GDKnr_threads < 4 ? 4 : 
GDKnr_threads);
-       todo = q_create(2048, "todo");
+       todo = q_create("todo");
        if (todo == NULL) {
                MT_lock_unset(&dataflowLock);
                MT_lock_unset(&mal_contextLock);
@@ -913,7 +878,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        flow->start = startpc + 1;
        flow->stop = stoppc;
 
-       flow->done = q_create(stoppc- startpc+1, "flow->done");
+       flow->done = q_create("flow->done");
        if (flow->done == NULL) {
                GDKfree(flow);
                throw(MAL, "dataflow", "runMALdataflow(): Failed to create 
flow->done queue");
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to