Changeset: a626b67c7c58 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/a626b67c7c58
Modified Files:
monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:
Keep number of spare threads in check.
Use --set dataflow_max_free=N to set maximum number of free threads
(default gdk_nr_threads).
diffs (86 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -80,7 +80,7 @@ typedef struct DATAFLOW {
static struct worker {
MT_Id id;
- enum {IDLE, WAITING, RUNNING, FREE } flag;
+ enum {IDLE, WAITING, RUNNING, FREE, EXITED } flag;
ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any) */
char *errbuf; /* GDKerrbuf so that we can allocate before
fork */
MT_Sema s;
@@ -89,8 +89,11 @@ static struct worker {
} workers[THREADS];
/* heads of two mutually exclusive linked lists, both using the .next
* field in the worker struct */
+static int exited_workers = -1; /* to be joined threads */
static int idle_workers = -1; /* idle workers (no thread associated) */
static int free_workers = -1; /* free workers (thread doing nothing) */
+static int free_count = 0; /* number of free threads */
+static int free_max = 0; /* max number of spare free threads */
static Queue *todo = 0; /* pending instructions */
@@ -104,6 +107,7 @@ mal_dataflow_reset(void)
stopMALdataflow();
memset((char*) workers, 0, sizeof(workers));
idle_workers = -1;
+ exited_workers = -1;
if( todo) {
GDKfree(todo->data);
MT_lock_destroy(&todo->l);
@@ -449,6 +453,14 @@ DFLOWworker(void *T)
MT_lock_unset(&dataflowLock);
break;
}
+ if (free_count >= free_max) {
+ t->flag = EXITED;
+ t->next = exited_workers;
+ exited_workers = t->self;
+ MT_lock_unset(&dataflowLock);
+ break;
+ }
+ free_count++;
t->flag = FREE;
assert(free_workers != t->self);
t->next = free_workers;
@@ -459,7 +471,6 @@ DFLOWworker(void *T)
break;
assert(t->flag == WAITING);
}
-
GDKfree(GDKerrbuf);
GDKsetbuf(0);
}
@@ -486,6 +497,7 @@ DFLOWinitialize(void)
MT_lock_unset(&mal_contextLock);
return 0;
}
+ free_max = GDKgetenv_int("dataflow_max_free", GDKnr_threads < 4 ? 4 :
GDKnr_threads);
todo = q_create(2048, "todo");
if (todo == NULL) {
MT_lock_unset(&dataflowLock);
@@ -806,9 +818,22 @@ runMALdataflow(Client cntxt, MalBlkPtr m
* tasks for the current client to compensate for our waiting
* until all work is done */
MT_lock_set(&dataflowLock);
+ /* join with already exited threads */
+ while ((i = exited_workers) >= 0) {
+ assert(workers[i].flag == EXITED);
+ exited_workers = workers[i].next;
+ workers[i].flag = IDLE;
+ MT_lock_unset(&dataflowLock);
+ MT_join_thread(workers[i].id);
+ MT_lock_set(&dataflowLock);
+ workers[i].next = idle_workers;
+ idle_workers = i;
+ }
assert(cntxt != NULL);
if ((i = free_workers) >= 0) {
assert(workers[i].flag == FREE);
+ assert(free_count > 0);
+ free_count--;
free_workers = workers[i].next;
workers[i].flag = WAITING;
ATOMIC_PTR_SET(&workers[i].cntxt, cntxt);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list