Changeset: c93bf17c3d61 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/c93bf17c3d61
Modified Files:
monetdb5/mal/mal_dataflow.c
Branch: Sep2022
Log Message:
Reimplemented the dataflow queue.
The new queue uses a linked list of dataflow events instead of an array
with pointers. The problem with the array was that in certain
circumstances it needed to be extended (realloc) which could fail.
Recovering from that failure is unpleasant. The linked list
implementation cannot fail.
diffs (215 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -51,13 +51,12 @@ typedef struct FLOWEVENT {
lng hotclaim; /* memory foot print of result variables */
lng argclaim; /* memory foot print of arguments */
lng maxclaim; /* memory foot print of largest argument, counld be
used to indicate result size */
+ struct FLOWEVENT *next; /* linked list for queues */
} *FlowEvent, FlowEventRec;
typedef struct queue {
- int size; /* size of queue */
- int last; /* last element in the queue */
int exitcount; /* how many threads should exit */
- FlowEvent *data;
+ FlowEvent first, last; /* first and last element of the queue
*/
MT_Lock l; /* it's a shared resource, ie we need locks */
MT_Sema s; /* threads wait on empty queues */
} Queue;
@@ -111,7 +110,6 @@ mal_dataflow_reset(void)
idle_workers = -1;
exited_workers = -1;
if( todo) {
- GDKfree(todo->data);
MT_lock_destroy(&todo->l);
MT_sema_destroy(&todo->s);
GDKfree(todo);
@@ -141,21 +139,12 @@ DFLOWgraphSize(MalBlkPtr mb, int start,
*/
static Queue*
-q_create(int sz, const char *name)
+q_create(const char *name)
{
- Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
+ Queue *q = GDKzalloc(sizeof(Queue));
if (q == NULL)
return NULL;
- *q = (Queue) {
- .size = ((sz << 1) >> 1), /* we want a multiple of 2 */
- };
- q->data = (FlowEvent*) GDKmalloc(sizeof(FlowEvent) * q->size);
- if (q->data == NULL) {
- GDKfree(q);
- return NULL;
- }
-
MT_lock_init(&q->l, name);
MT_sema_init(&q->s, 0, name);
return q;
@@ -167,31 +156,26 @@ q_destroy(Queue *q)
assert(q);
MT_lock_destroy(&q->l);
MT_sema_destroy(&q->s);
- GDKfree(q->data);
GDKfree(q);
}
/* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue
is possible */
/* we might actually sort it for better scheduling behavior */
static void
-q_enqueue_(Queue *q, FlowEvent d)
-{
- assert(q);
- assert(d);
- if (q->last == q->size) {
- q->size <<= 1;
- q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) *
q->size);
- assert(q->data);
- }
- q->data[q->last++] = d;
-}
-static void
q_enqueue(Queue *q, FlowEvent d)
{
assert(q);
assert(d);
MT_lock_set(&q->l);
- q_enqueue_(q, d);
+ if (q->first == NULL) {
+ assert(q->last == NULL);
+ q->first = q->last = d;
+ } else {
+ assert(q->last != NULL);
+ q->last->next = d;
+ q->last = d;
+ }
+ d->next = NULL;
MT_lock_unset(&q->l);
MT_sema_up(&q->s);
}
@@ -203,30 +187,20 @@ q_enqueue(Queue *q, FlowEvent d)
*/
static void
-q_requeue_(Queue *q, FlowEvent d)
-{
- int i;
-
- assert(q);
- assert(d);
- if (q->last == q->size) {
- /* enlarge buffer */
- q->size <<= 1;
- q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) *
q->size);
- assert(q->data);
- }
- for (i = q->last; i > 0; i--)
- q->data[i] = q->data[i - 1];
- q->data[0] = d;
- q->last++;
-}
-static void
q_requeue(Queue *q, FlowEvent d)
{
assert(q);
assert(d);
MT_lock_set(&q->l);
- q_requeue_(q, d);
+ if (q->first == NULL) {
+ assert(q->last == NULL);
+ q->first = q->last = d;
+ d->next = NULL;
+ } else {
+ assert(q->last != NULL);
+ d->next = q->first;
+ q->first = d;
+ }
MT_lock_unset(&q->l);
MT_sema_up(&q->s);
}
@@ -234,46 +208,37 @@ q_requeue(Queue *q, FlowEvent d)
static FlowEvent
q_dequeue(Queue *q, Client cntxt)
{
- FlowEvent r = NULL, s = NULL;
- //int i;
-
assert(q);
MT_sema_down(&q->s);
if (ATOMIC_GET(&exiting))
return NULL;
MT_lock_set(&q->l);
- if( cntxt == NULL && q->exitcount > 0){
+ if (cntxt == NULL && q->exitcount > 0) {
q->exitcount--;
MT_lock_unset(&q->l);
return NULL;
}
- {
- int i, minpc;
- minpc = q->last -1;
- s = q->data[minpc];
- /* for long "queues", just grab the first eligible entry we
encounter */
- if (q->last < 1024) {
- for (i = q->last - 1; i >= 0; i--) {
- if ( cntxt == NULL || q->data[i]->flow->cntxt
== cntxt) {
- /* for shorter "queues", find the
oldest eligible entry */
- r = q->data[i];
- if (s && r && s->pc > r->pc) {
- minpc = i;
- s = r;
- }
- }
- }
- }
- if (minpc >= 0) {
- r = q->data[minpc];
- i = minpc;
- q->last--;
- memmove(q->data + i, q->data + i + 1, (q->last - i) *
sizeof(q->data[0]));
+ FlowEvent *dp = &q->first;
+ FlowEvent pd = NULL;
+ /* if cntxt == NULL, return the first event, if cntxt != NULL, find
+ * the first event in the queue with matching cntxt value and return
+ * that */
+ if (cntxt != NULL) {
+ while (*dp && (*dp)->flow->cntxt != cntxt) {
+ pd = *dp;
+ dp = &pd->next;
}
}
+ FlowEvent d = *dp;
+ if (d) {
+ *dp = d->next;
+ d->next = NULL;
+ if (*dp == NULL)
+ q->last = pd;
+ }
MT_lock_unset(&q->l);
- return r;
+ return d;
}
/*
@@ -503,7 +468,7 @@ DFLOWinitialize(void)
return 0;
}
free_max = GDKgetenv_int("dataflow_max_free", GDKnr_threads < 4 ? 4 :
GDKnr_threads);
- todo = q_create(2048, "todo");
+ todo = q_create("todo");
if (todo == NULL) {
MT_lock_unset(&dataflowLock);
MT_lock_unset(&mal_contextLock);
@@ -913,7 +878,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
flow->start = startpc + 1;
flow->stop = stoppc;
- flow->done = q_create(stoppc- startpc+1, "flow->done");
+ flow->done = q_create("flow->done");
if (flow->done == NULL) {
GDKfree(flow);
throw(MAL, "dataflow", "runMALdataflow(): Failed to create
flow->done queue");
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]