Changeset: f4cebd87aa01 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f4cebd87aa01
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: Jan2014
Log Message:

Fix data races in dataflow.


diffs (145 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -92,7 +92,10 @@ static struct worker {
        MT_Sema s;
 } workers[THREADS];
 static Queue *todo = 0;        /* pending instructions */
-static int volatile exiting = 0;
+#ifdef ATOMIC_LOCK
+static MT_Lock exitingLock MT_LOCK_INITIALIZER("exitingLock");
+#endif
+static volatile ATOMIC_TYPE exiting = 0;
 
 /*
  * Calculate the size of the dataflow dependency graph.
@@ -216,7 +219,7 @@ q_dequeue(Queue *q, Client cntxt)
 
        assert(q);
        MT_sema_down(&q->s, "q_dequeue");
-       if (exiting)
+       if (ATOMIC_GET(exiting, exitingLock, "q_dequeue"))
                return NULL;
        MT_lock_set(&q->l, "q_dequeue");
        if (cntxt) {
@@ -295,18 +298,24 @@ DFLOWworker(void *T)
        Thread thr;
        str error = 0;
        int i,last;
+       Client cntxt;
 
        thr = THRnew("DFLOWworker");
 
        GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
        GDKerrbuf[0] = 0;
-       if (t->cntxt) {
+       MT_lock_set(&mal_contextLock, "DFLOWworker");
+       cntxt = t->cntxt;
+       MT_lock_unset(&mal_contextLock, "DFLOWworker");
+       if (cntxt) {
                /* wait until we are allowed to start working */
                MT_sema_down(&t->s, "DFLOWworker");
        }
        while (1) {
                if (fnxt == 0) {
-                       Client cntxt = t->cntxt;
+                       MT_lock_set(&mal_contextLock, "DFLOWworker");
+                       cntxt = t->cntxt;
+                       MT_lock_unset(&mal_contextLock, "DFLOWworker");
                        fe = q_dequeue(todo, cntxt);
                        if (fe == NULL) {
                                if (cntxt) {
@@ -324,7 +333,7 @@ DFLOWworker(void *T)
                        }
                } else
                        fe = fnxt;
-               if (exiting) {
+               if (ATOMIC_GET(exiting, exitingLock, "DFLOWworker")) {
                        break;
                }
                fnxt = 0;
@@ -415,7 +424,9 @@ DFLOWworker(void *T)
        GDKfree(GDKerrbuf);
        GDKsetbuf(0);
        THRdel(thr);
+       MT_lock_set(&mal_contextLock, "DFLOWworker");
        t->flag = EXITED;
+       MT_lock_unset(&mal_contextLock, "DFLOWworker");
 }
 
 /*
@@ -460,6 +471,9 @@ DFLOWinitialize(void)
                MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
                return -1;
        }
+#ifdef NEED_MT_LOCK_INIT
+       ATOMIC_INIT(exitingLock, "exitingLock");
+#endif
        MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
        return 0;
 }
@@ -652,7 +666,7 @@ DFLOWscheduler(DataFlow flow, struct wor
 
        while (actions != tasks ) {
                f = q_dequeue(flow->done, NULL);
-               if (exiting)
+               if (ATOMIC_GET(exiting, exitingLock, "DFLOWscheduler"))
                        break;
                if (f == NULL)
                        throw(MAL, "dataflow", "DFLOWscheduler(): 
q_dequeue(flow->done) returned NULL");
@@ -681,7 +695,9 @@ DFLOWscheduler(DataFlow flow, struct wor
        }
        /* release the worker from its specific task (turn it into a
         * generic worker) */
+       MT_lock_set(&mal_contextLock, "DFLOWscheduler");
        w->cntxt = NULL;
+       MT_lock_unset(&mal_contextLock, "DFLOWscheduler");
        /* wrap up errors */
        assert(flow->done->last == 0);
        if (flow->error ) {
@@ -747,12 +763,17 @@ runMALdataflow(Client cntxt, MalBlkPtr m
         * until all work is done */
        MT_lock_set(&mal_contextLock, "runMALdataflow");
        /* join with already exited threads */
-       for (i = 0; i < THREADS && todo->exitedcount > 0; i++) {
-               if (workers[i].flag == EXITED) {
-                       todo->exitedcount--;
-                       workers[i].flag = IDLE;
-                       workers[i].cntxt = NULL;
-                       MT_join_thread(workers[i].id);
+       while (todo->exitedcount > 0) {
+               for (i = 0; i < THREADS; i++) {
+                       if (workers[i].flag == EXITED) {
+                               todo->exitedcount--;
+                               workers[i].flag = IDLE;
+                               workers[i].cntxt = NULL;
+                               MT_lock_unset(&mal_contextLock, 
"runMALdataflow");
+                               MT_join_thread(workers[i].id);
+                               MT_lock_set(&mal_contextLock, "runMALdataflow");
+                               break;
+                       }
                }
        }
        for (i = 0; i < THREADS; i++) {
@@ -867,14 +888,19 @@ stopMALdataflow(void)
 {
        int i;
 
-       exiting = 1;
+       ATOMIC_SET(exiting, 1, exitingLock, "q_dequeue");
        if (todo) {
                for (i = 0; i < THREADS; i++)
                        MT_sema_up(&todo->s, "stopMALdataflow");
+               MT_lock_set(&mal_contextLock, "stopMALdataflow");
                for (i = 0; i < THREADS; i++) {
-                       if (workers[i].flag != IDLE)
+                       if (workers[i].flag != IDLE) {
+                               MT_lock_unset(&mal_contextLock, 
"stopMALdataflow");
                                MT_join_thread(workers[i].id);
+                               MT_lock_set(&mal_contextLock, 
"stopMALdataflow");
+                       }
                        workers[i].flag = IDLE;
                }
+               MT_lock_unset(&mal_contextLock, "stopMALdataflow");
        }
 }
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to