Changeset: 6384e8cbc8a3 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/6384e8cbc8a3
Modified Files:
monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:
Use linked lists instead of running through all entries.
diffs (211 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -20,7 +20,7 @@
* are available or resources become scarce.
*
* The flow graphs is organized such that parallel threads can
- * access it mostly without expensive locking and dependent
+ * access it mostly without expensive locking and dependent
* variables are easy to find..
*/
#include "monetdb_config.h"
@@ -84,7 +84,12 @@ static struct worker {
ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any) */
char *errbuf; /* GDKerrbuf so that we can allocate before
fork */
MT_Sema s;
+ int next;
} workers[THREADS];
+/* heads of two mutually exclusive linked lists, both using the .next
+ * field in the worker struct */
+static int exited_workers = -1; /* to be joined threads */
+static int idle_workers = -1; /* idle workers (no thread associated) */
static Queue *todo = 0; /* pending instructions */
@@ -97,6 +102,8 @@ mal_dataflow_reset(void)
{
stopMALdataflow();
memset((char*) workers, 0, sizeof(workers));
+ exited_workers = -1;
+ idle_workers = -1;
if( todo) {
GDKfree(todo->data);
MT_lock_destroy(&todo->l);
@@ -437,6 +444,8 @@ DFLOWworker(void *T)
GDKsetbuf(0);
MT_lock_set(&dataflowLock);
t->flag = EXITED;
+ t->next = exited_workers;
+ exited_workers = (int) (t - workers);
MT_lock_unset(&dataflowLock);
}
@@ -465,11 +474,14 @@ DFLOWinitialize(void)
MT_lock_unset(&mal_contextLock);
return -1;
}
+ assert(idle_workers == -1);
for (i = 0; i < THREADS; i++) {
char name[MT_NAME_LEN];
snprintf(name, sizeof(name), "DFLOWsema%d", i);
MT_sema_init(&workers[i].s, 0, name);
workers[i].flag = IDLE;
+ workers[i].next = idle_workers;
+ idle_workers = i;
if (first) /* only initialize once
*/
ATOMIC_PTR_INIT(&workers[i].cntxt, NULL);
}
@@ -478,12 +490,15 @@ DFLOWinitialize(void)
if (limit > THREADS)
limit = THREADS;
MT_lock_set(&dataflowLock);
- for (i = 0; i < limit; i++) {
+ while (limit > 0) {
+ limit--;
+ i = idle_workers;
workers[i].errbuf = GDKmalloc(GDKMAXERRLEN);
if (workers[i].errbuf == NULL) {
TRC_CRITICAL(MAL_SERVER, "cannot allocate error buffer
for worker");
continue;
}
+ idle_workers = workers[i].next;
workers[i].flag = RUNNING;
ATOMIC_PTR_SET(&workers[i].cntxt, NULL);
char name[MT_NAME_LEN];
@@ -492,6 +507,8 @@ DFLOWinitialize(void)
GDKfree(workers[i].errbuf);
workers[i].errbuf = NULL;
workers[i].flag = IDLE;
+ workers[i].next = idle_workers;
+ idle_workers = i;
} else
created++;
}
@@ -770,64 +787,60 @@ runMALdataflow(Client cntxt, MalBlkPtr m
* until all work is done */
MT_lock_set(&dataflowLock);
/* join with already exited threads */
- {
- int joined;
- do {
- joined = 0;
- for (i = 0; i < THREADS; i++) {
- if (workers[i].flag == EXITED) {
- workers[i].flag = JOINING;
- ATOMIC_PTR_SET(&workers[i].cntxt, NULL);
- joined = 1;
- MT_lock_unset(&dataflowLock);
- MT_join_thread(workers[i].id);
- MT_lock_set(&dataflowLock);
- workers[i].flag = IDLE;
+ while (exited_workers >= 0) {
+ i = exited_workers;
+ exited_workers = workers[i].next;
+ assert(workers[i].flag == EXITED);
+ workers[i].flag = JOINING;
+ MT_lock_unset(&dataflowLock);
+ MT_join_thread(workers[i].id);
+ MT_lock_set(&dataflowLock);
+ workers[i].flag = IDLE;
+ workers[i].next = idle_workers;
+ idle_workers = i;
+ }
+ if ((i = idle_workers) >= 0) {
+ assert(workers[i].flag == IDLE);
+ /* only create specific worker if we are not doing a
+ * recursive call */
+ if (stk->calldepth > 1) {
+ int j;
+ MT_Id pid = MT_getpid();
+
+ /* doing a recursive call: copy specificity from
+ * current worker to new worker */
+ ATOMIC_PTR_SET(&workers[i].cntxt, NULL);
+ for (j = 0; j < THREADS; j++) {
+ if (workers[j].flag == RUNNING && workers[j].id
== pid) {
+ ATOMIC_PTR_SET(&workers[i].cntxt,
+
ATOMIC_PTR_GET(&workers[j].cntxt));
+ break;
}
}
- } while (joined);
- }
- for (i = 0; i < THREADS; i++) {
- if (workers[i].flag == IDLE) {
- /* only create specific worker if we are not doing a
- * recursive call */
- if (stk->calldepth > 1) {
- int j;
- MT_Id pid = MT_getpid();
-
- /* doing a recursive call: copy specificity from
- * current worker to new worker */
- ATOMIC_PTR_SET(&workers[i].cntxt, NULL);
- for (j = 0; j < THREADS; j++) {
- if (workers[j].flag == RUNNING &&
workers[j].id == pid) {
-
ATOMIC_PTR_SET(&workers[i].cntxt,
-
ATOMIC_PTR_GET(&workers[j].cntxt));
- break;
- }
- }
- } else {
- /* not doing a recursive call: create specific
worker */
- ATOMIC_PTR_SET(&workers[i].cntxt, cntxt);
- }
- workers[i].flag = RUNNING;
- char name[MT_NAME_LEN];
- snprintf(name, sizeof(name), "DFLOWworker%d", i);
- if ((workers[i].errbuf = GDKmalloc(GDKMAXERRLEN)) ==
NULL ||
- (workers[i].id = THRcreate(DFLOWworker, (void
*) &workers[i],
-
MT_THR_JOINABLE, name)) == 0) {
- /* cannot start new thread, run serially */
- *ret = TRUE;
- GDKfree(workers[i].errbuf);
- workers[i].errbuf = NULL;
- workers[i].flag = IDLE;
- MT_lock_unset(&dataflowLock);
- return MAL_SUCCEED;
- }
- break;
+ } else {
+ /* not doing a recursive call: create specific worker */
+ ATOMIC_PTR_SET(&workers[i].cntxt, cntxt);
+ }
+ idle_workers = workers[i].next;
+ workers[i].flag = RUNNING;
+ char name[MT_NAME_LEN];
+ snprintf(name, sizeof(name), "DFLOWworker%d", i);
+ if ((workers[i].errbuf = GDKmalloc(GDKMAXERRLEN)) == NULL ||
+ (workers[i].id = THRcreate(DFLOWworker, (void *)
&workers[i],
+
MT_THR_JOINABLE, name)) == 0) {
+ /* cannot start new thread, run serially */
+ *ret = TRUE;
+ GDKfree(workers[i].errbuf);
+ workers[i].errbuf = NULL;
+ workers[i].flag = IDLE;
+ workers[i].next = idle_workers;
+ idle_workers = i;
+ MT_lock_unset(&dataflowLock);
+ return MAL_SUCCEED;
}
}
MT_lock_unset(&dataflowLock);
- if (i == THREADS) {
+ if (i < 0) {
/* no empty thread slots found, run serially */
*ret = TRUE;
return MAL_SUCCEED;
@@ -927,7 +940,11 @@ stopMALdataflow(void)
MT_join_thread(workers[i].id);
MT_lock_set(&dataflowLock);
}
- workers[i].flag = IDLE;
+ if (workers[i].flag != IDLE) {
+ workers[i].flag = IDLE;
+ workers[i].next = idle_workers;
+ idle_workers = i;
+ }
MT_sema_destroy(&workers[i].s);
}
MT_lock_unset(&dataflowLock);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list