Changeset: 7994be7caa4b for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7994be7caa4b
Modified Files:
monetdb5/mal/mal_dataflow.c
Branch: Jan2014
Log Message:
Use private dataflow lock, and break thread join loop when no threads found.
diffs (142 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -97,6 +97,8 @@ static MT_Lock exitingLock MT_LOCK_INITI
#endif
static volatile ATOMIC_TYPE exiting = 0;
+static MT_Lock dataflowLock MT_LOCK_INITIALIZER("dataflowLock");
+
/*
* Calculate the size of the dataflow dependency graph.
*/
@@ -242,9 +244,9 @@ q_dequeue(Queue *q, Client cntxt)
if (q->exitcount > 0) {
q->exitcount--;
MT_lock_unset(&q->l, "q_dequeue");
- MT_lock_set(&mal_contextLock, "q_dequeue");
+ MT_lock_set(&dataflowLock, "q_dequeue");
q->exitedcount++;
- MT_lock_unset(&mal_contextLock, "q_dequeue");
+ MT_lock_unset(&dataflowLock, "q_dequeue");
return NULL;
}
assert(q->last > 0);
@@ -304,18 +306,18 @@ DFLOWworker(void *T)
GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
GDKerrbuf[0] = 0;
- MT_lock_set(&mal_contextLock, "DFLOWworker");
+ MT_lock_set(&dataflowLock, "DFLOWworker");
cntxt = t->cntxt;
- MT_lock_unset(&mal_contextLock, "DFLOWworker");
+ MT_lock_unset(&dataflowLock, "DFLOWworker");
if (cntxt) {
/* wait until we are allowed to start working */
MT_sema_down(&t->s, "DFLOWworker");
}
while (1) {
if (fnxt == 0) {
- MT_lock_set(&mal_contextLock, "DFLOWworker");
+ MT_lock_set(&dataflowLock, "DFLOWworker");
cntxt = t->cntxt;
- MT_lock_unset(&mal_contextLock, "DFLOWworker");
+ MT_lock_unset(&dataflowLock, "DFLOWworker");
fe = q_dequeue(todo, cntxt);
if (fe == NULL) {
if (cntxt) {
@@ -424,9 +426,9 @@ DFLOWworker(void *T)
GDKfree(GDKerrbuf);
GDKsetbuf(0);
THRdel(thr);
- MT_lock_set(&mal_contextLock, "DFLOWworker");
+ MT_lock_set(&dataflowLock, "DFLOWworker");
t->flag = EXITED;
- MT_lock_unset(&mal_contextLock, "DFLOWworker");
+ MT_lock_unset(&dataflowLock, "DFLOWworker");
}
/*
@@ -473,6 +475,7 @@ DFLOWinitialize(void)
}
#ifdef NEED_MT_LOCK_INIT
ATOMIC_INIT(exitingLock, "exitingLock");
+ MT_lock_init(&dataflowLock, "dataflowLock");
#endif
MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
return 0;
@@ -695,9 +698,9 @@ DFLOWscheduler(DataFlow flow, struct wor
}
/* release the worker from its specific task (turn it into a
* generic worker) */
- MT_lock_set(&mal_contextLock, "DFLOWscheduler");
+ MT_lock_set(&dataflowLock, "DFLOWscheduler");
w->cntxt = NULL;
- MT_lock_unset(&mal_contextLock, "DFLOWscheduler");
+ MT_lock_unset(&dataflowLock, "DFLOWscheduler");
/* wrap up errors */
assert(flow->done->last == 0);
if (flow->error ) {
@@ -761,7 +764,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
/* in addition, create one more worker that will only execute
* tasks for the current client to compensate for our waiting
* until all work is done */
- MT_lock_set(&mal_contextLock, "runMALdataflow");
+ MT_lock_set(&dataflowLock, "runMALdataflow");
/* join with already exited threads */
while (todo->exitedcount > 0) {
for (i = 0; i < THREADS; i++) {
@@ -769,12 +772,14 @@ runMALdataflow(Client cntxt, MalBlkPtr m
todo->exitedcount--;
workers[i].flag = IDLE;
workers[i].cntxt = NULL;
- MT_lock_unset(&mal_contextLock,
"runMALdataflow");
+ MT_lock_unset(&dataflowLock, "runMALdataflow");
MT_join_thread(workers[i].id);
- MT_lock_set(&mal_contextLock, "runMALdataflow");
+ MT_lock_set(&dataflowLock, "runMALdataflow");
break;
}
}
+ if (i == THREADS)
+ break;
}
for (i = 0; i < THREADS; i++) {
if (workers[i].flag == IDLE) {
@@ -800,14 +805,14 @@ runMALdataflow(Client cntxt, MalBlkPtr m
if (MT_create_thread(&workers[i].id, DFLOWworker, (void
*) &workers[i], MT_THR_JOINABLE) < 0) {
/* cannot start new thread, run serially */
*ret = TRUE;
- MT_lock_unset(&mal_contextLock,
"runMALdataflow");
+ MT_lock_unset(&dataflowLock, "runMALdataflow");
return MAL_SUCCEED;
}
workers[i].flag = RUNNING;
break;
}
}
- MT_lock_unset(&mal_contextLock, "runMALdataflow");
+ MT_lock_unset(&dataflowLock, "runMALdataflow");
if (i == THREADS) {
/* no empty thread slots found, run serially */
*ret = TRUE;
@@ -892,15 +897,15 @@ stopMALdataflow(void)
if (todo) {
for (i = 0; i < THREADS; i++)
MT_sema_up(&todo->s, "stopMALdataflow");
- MT_lock_set(&mal_contextLock, "stopMALdataflow");
+ MT_lock_set(&dataflowLock, "stopMALdataflow");
for (i = 0; i < THREADS; i++) {
if (workers[i].flag != IDLE) {
- MT_lock_unset(&mal_contextLock,
"stopMALdataflow");
+ MT_lock_unset(&dataflowLock, "stopMALdataflow");
MT_join_thread(workers[i].id);
- MT_lock_set(&mal_contextLock,
"stopMALdataflow");
+ MT_lock_set(&dataflowLock, "stopMALdataflow");
}
workers[i].flag = IDLE;
}
- MT_lock_unset(&mal_contextLock, "stopMALdataflow");
+ MT_lock_unset(&dataflowLock, "stopMALdataflow");
}
}
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list