Changeset: f4cebd87aa01 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f4cebd87aa01
Modified Files:
monetdb5/mal/mal_dataflow.c
Branch: Jan2014
Log Message:
Fix data races in dataflow.
diffs (145 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -92,7 +92,10 @@ static struct worker {
MT_Sema s;
} workers[THREADS];
static Queue *todo = 0; /* pending instructions */
-static int volatile exiting = 0;
+#ifdef ATOMIC_LOCK
+static MT_Lock exitingLock MT_LOCK_INITIALIZER("exitingLock");
+#endif
+static volatile ATOMIC_TYPE exiting = 0;
/*
* Calculate the size of the dataflow dependency graph.
@@ -216,7 +219,7 @@ q_dequeue(Queue *q, Client cntxt)
assert(q);
MT_sema_down(&q->s, "q_dequeue");
- if (exiting)
+ if (ATOMIC_GET(exiting, exitingLock, "q_dequeue"))
return NULL;
MT_lock_set(&q->l, "q_dequeue");
if (cntxt) {
@@ -295,18 +298,24 @@ DFLOWworker(void *T)
Thread thr;
str error = 0;
int i,last;
+ Client cntxt;
thr = THRnew("DFLOWworker");
GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
GDKerrbuf[0] = 0;
- if (t->cntxt) {
+ MT_lock_set(&mal_contextLock, "DFLOWworker");
+ cntxt = t->cntxt;
+ MT_lock_unset(&mal_contextLock, "DFLOWworker");
+ if (cntxt) {
/* wait until we are allowed to start working */
MT_sema_down(&t->s, "DFLOWworker");
}
while (1) {
if (fnxt == 0) {
- Client cntxt = t->cntxt;
+ MT_lock_set(&mal_contextLock, "DFLOWworker");
+ cntxt = t->cntxt;
+ MT_lock_unset(&mal_contextLock, "DFLOWworker");
fe = q_dequeue(todo, cntxt);
if (fe == NULL) {
if (cntxt) {
@@ -324,7 +333,7 @@ DFLOWworker(void *T)
}
} else
fe = fnxt;
- if (exiting) {
+ if (ATOMIC_GET(exiting, exitingLock, "DFLOWworker")) {
break;
}
fnxt = 0;
@@ -415,7 +424,9 @@ DFLOWworker(void *T)
GDKfree(GDKerrbuf);
GDKsetbuf(0);
THRdel(thr);
+ MT_lock_set(&mal_contextLock, "DFLOWworker");
t->flag = EXITED;
+ MT_lock_unset(&mal_contextLock, "DFLOWworker");
}
/*
@@ -460,6 +471,9 @@ DFLOWinitialize(void)
MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
return -1;
}
+#ifdef NEED_MT_LOCK_INIT
+ ATOMIC_INIT(exitingLock, "exitingLock");
+#endif
MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
return 0;
}
@@ -652,7 +666,7 @@ DFLOWscheduler(DataFlow flow, struct wor
while (actions != tasks ) {
f = q_dequeue(flow->done, NULL);
- if (exiting)
+ if (ATOMIC_GET(exiting, exitingLock, "DFLOWscheduler"))
break;
if (f == NULL)
throw(MAL, "dataflow", "DFLOWscheduler():
q_dequeue(flow->done) returned NULL");
@@ -681,7 +695,9 @@ DFLOWscheduler(DataFlow flow, struct wor
}
/* release the worker from its specific task (turn it into a
* generic worker) */
+ MT_lock_set(&mal_contextLock, "DFLOWscheduler");
w->cntxt = NULL;
+ MT_lock_unset(&mal_contextLock, "DFLOWscheduler");
/* wrap up errors */
assert(flow->done->last == 0);
if (flow->error ) {
@@ -747,12 +763,17 @@ runMALdataflow(Client cntxt, MalBlkPtr m
* until all work is done */
MT_lock_set(&mal_contextLock, "runMALdataflow");
/* join with already exited threads */
- for (i = 0; i < THREADS && todo->exitedcount > 0; i++) {
- if (workers[i].flag == EXITED) {
- todo->exitedcount--;
- workers[i].flag = IDLE;
- workers[i].cntxt = NULL;
- MT_join_thread(workers[i].id);
+ while (todo->exitedcount > 0) {
+ for (i = 0; i < THREADS; i++) {
+ if (workers[i].flag == EXITED) {
+ todo->exitedcount--;
+ workers[i].flag = IDLE;
+ workers[i].cntxt = NULL;
+ MT_lock_unset(&mal_contextLock,
"runMALdataflow");
+ MT_join_thread(workers[i].id);
+ MT_lock_set(&mal_contextLock, "runMALdataflow");
+ break;
+ }
}
}
for (i = 0; i < THREADS; i++) {
@@ -867,14 +888,19 @@ stopMALdataflow(void)
{
int i;
- exiting = 1;
+ ATOMIC_SET(exiting, 1, exitingLock, "q_dequeue");
if (todo) {
for (i = 0; i < THREADS; i++)
MT_sema_up(&todo->s, "stopMALdataflow");
+ MT_lock_set(&mal_contextLock, "stopMALdataflow");
for (i = 0; i < THREADS; i++) {
- if (workers[i].flag != IDLE)
+ if (workers[i].flag != IDLE) {
+ MT_lock_unset(&mal_contextLock,
"stopMALdataflow");
MT_join_thread(workers[i].id);
+ MT_lock_set(&mal_contextLock,
"stopMALdataflow");
+ }
workers[i].flag = IDLE;
}
+ MT_lock_unset(&mal_contextLock, "stopMALdataflow");
}
}
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list