Changeset: 3c128e4137aa for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=3c128e4137aa
Modified Files:
        monetdb5/mal/mal_dataflow.c
        monetdb5/optimizer/opt_pipes.c
Branch: default
Log Message:

fix concurrency problem initial enqueuing of instructions


diffs (62 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -313,6 +313,7 @@ DFLOWworker(void *t)
                        fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, 
fe->pc, i, FALSE);
 #endif
                MT_lock_set(&flow->flowlock, "MALworker");
+       
                for (last = fe->pc - flow->start; last >= 0 && (i = 
flow->nodes[last]) > 0; last = flow->edges[last])
                        if (flow->status[i].state == DFLOWpending &&
                                flow->status[i].blocks == 1) {
@@ -321,7 +322,7 @@ DFLOWworker(void *t)
                                flow->status[i].hotclaim = fe->hotclaim;
                                flow->status[i].argclaim += fe->hotclaim;
                                if (fnxt) {
-                                       if ( flow->error)
+                                       if (flow->error)
                                                q_enqueue(flow->done, fnxt);
                                        else
                                                q_enqueue(todo, fnxt);
@@ -505,6 +506,7 @@ DFLOWscheduler(DataFlow flow)
        if (fe[0].flow->cntxt->flags & timerFlag)
                fe[0].flow->cntxt->timer = GDKusec();
 
+       MT_lock_set(&flow->flowlock, "MALworker");
        for (i = 0; i < actions; i++)
                if (fe[i].blocks == 0) {
 #ifdef USE_MAL_ADMISSION
@@ -516,12 +518,12 @@ DFLOWscheduler(DataFlow flow)
                        flow->status[i].state = DFLOWrunning;
                        PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d 
claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
                }
+       MT_lock_unset(&flow->flowlock, "MALworker");
 
        PARDEBUG mnstr_printf(GDKstdout, "#run %d instructions in dataflow 
block\n", actions);
 
        while (actions != tasks ) {
                f = q_dequeue(flow->done);
-               tasks++;
 
                /*
                 * When an instruction is finished we have to reduce the blocked
@@ -530,6 +532,7 @@ DFLOWscheduler(DataFlow flow)
                 */
 
                MT_lock_set(&flow->flowlock, "MALworker");
+               tasks++;
                for (last = f->pc - flow->start; last >= 0 && (i = 
flow->nodes[last]) > 0; last = flow->edges[last])
                        if (flow->status[i].state == DFLOWpending) {
                                flow->status[i].argclaim += f->hotclaim;
diff --git a/monetdb5/optimizer/opt_pipes.c b/monetdb5/optimizer/opt_pipes.c
--- a/monetdb5/optimizer/opt_pipes.c
+++ b/monetdb5/optimizer/opt_pipes.c
@@ -79,7 +79,7 @@ struct PIPELINES {
 //     "optimizer.reorder();"
         "optimizer.deadcode();"
         "optimizer.reduce();"
-//      "optimizer.dataflow();"
+        "optimizer.dataflow();"
         "optimizer.history();"
         "optimizer.multiplex();"
         "optimizer.garbageCollector();",
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to