Changeset: 419a75b94b86 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=419a75b94b86
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:

Be protected against concurrent access


diffs (115 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -186,9 +186,11 @@ q_dequeue(queue *q)
        MT_sema_down(&q->s, "q_dequeue");
        assert(q->last);
        MT_lock_set(&q->l, "q_dequeue");
-       if (q->last > 0)
+       if (q->last > 0) {
                /* LIFO favors garbage collection */
                r = q->data[--q->last];
+               q->data[q->last] = 0;
+       }
        /* else: terminating */
        /* try out random draw *
        {
@@ -310,6 +312,7 @@ DFLOWworker(void *t)
                for (i = 0; i < p->retc; i++)
                        fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, 
fe->pc, i, FALSE);
 #endif
+               MT_lock_set(&flow->flowlock, "MALworker");
                for (last = fe->pc - flow->start; last >= 0 && (i = 
flow->nodes[last]) > 0; last = flow->edges[last])
                        if (flow->status[i].state == DFLOWpending &&
                                flow->status[i].blocks == 1) {
@@ -317,10 +320,15 @@ DFLOWworker(void *t)
                                flow->status[i].blocks = 0;
                                flow->status[i].hotclaim = fe->hotclaim;
                                flow->status[i].argclaim += fe->hotclaim;
-                               if (fnxt)
-                                       q_enqueue(todo, fnxt);
+                               if (fnxt) {
+                                       if ( flow->error)
+                                               q_enqueue(flow->done, fnxt);
+                                       else
+                                               q_enqueue(todo, fnxt);
+                               }
                                fnxt = flow->status + i;
                        }
+               MT_lock_unset(&flow->flowlock, "MALworker");
 
                q_enqueue(flow->done, fe);
                MALresourceFairness(flow->cntxt, flow->mb, usec);
@@ -461,6 +469,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
  * They take effect after we have ensured that the basic properties for
  * execution hold.
  */
+/*
 static void showFlowEvent(DataFlow flow, int pc)
 {
        int i;
@@ -473,17 +482,18 @@ static void showFlowEvent(DataFlow flow,
                        printInstruction(GDKstdout, fe[i].flow->mb, 0, 
getInstrPtr(fe[i].flow->mb, fe[i].pc), LIST_MAL_STMT | LIST_MAPI);
                }
 }
+*/
 
 static str
 DFLOWscheduler(DataFlow flow)
 {
        int last;
-       int i, pc = 0;
+       int i;
 #ifdef USE_MAL_ADMISSION
        int j;
        InstrPtr p;
 #endif
-       int actions = flow->stop - flow->start;
+       int tasks=0, actions = flow->stop - flow->start;
        str ret = MAL_SUCCEED;
        FlowEvent fe, f = 0;
 
@@ -509,9 +519,9 @@ DFLOWscheduler(DataFlow flow)
 
        PARDEBUG mnstr_printf(GDKstdout, "#run %d instructions in dataflow 
block\n", actions);
 
-       while (actions > 0 ) {
+       while (actions != tasks ) {
                f = q_dequeue(flow->done);
-               actions--;
+               tasks++;
 
                /*
                 * When an instruction is finished we have to reduce the blocked
@@ -519,26 +529,29 @@ DFLOWscheduler(DataFlow flow)
                 * drops to zero we can scheduler it we do it here instead of 
the scheduler
                 */
 
+               MT_lock_set(&flow->flowlock, "MALworker");
                for (last = f->pc - flow->start; last >= 0 && (i = 
flow->nodes[last]) > 0; last = flow->edges[last])
                        if (flow->status[i].state == DFLOWpending) {
                                flow->status[i].argclaim += f->hotclaim;
                                if (flow->status[i].blocks == 1 ) {
-                                       q_enqueue(todo, flow->status + i);
                                        flow->status[i].state = DFLOWrunning;
                                        flow->status[i].blocks--;
+                                       q_enqueue(todo, flow->status + i);
                                        PARDEBUG
                                        mnstr_printf(GDKstdout, "#enqueue pc=%d 
claim= " LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
                                } else {
                                        flow->status[i].blocks--;
                                }
                        } 
+               MT_lock_unset(&flow->flowlock, "MALworker");
        }
        /* wrap up errors */
+       assert(todo->last == 0);
+       assert(flow->done->last == 0);
        if (flow->error ) {
                PARDEBUG mnstr_printf(GDKstdout, "#errors encountered %s ", 
flow->error ? flow->error : "unknown");
                ret = flow->error;
        }
-       PARDEBUG showFlowEvent(flow, pc);
        return ret;
 }
 
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to