Changeset: 101eb82a8105 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=101eb82a8105
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:

Re-enable hotpotatoe scheduling.
Each worker can safely inspect the remaining flow graph
to find an instruction that is only blocked by the variable
just produced. All such instructions are scheduled for
execution and one is taken to continue directly.


diffs (124 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -228,8 +228,8 @@ static void
 DFLOWworker(void *t)
 {
        DataFlow flow;
-       FlowEvent fe = 0;
-       int id = (MT_Id *) t - workers;
+       FlowEvent fe = 0, fnxt = 0;
+       int id = (MT_Id *) t - workers, last = 0;
        Thread thr;
        str error = 0;
 
@@ -242,7 +242,9 @@ DFLOWworker(void *t)
        GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
        GDKerrbuf[0] = 0;
        while (1) {
-               fe = q_dequeue(todo);
+               if (fnxt == 0)
+                       fe = q_dequeue(todo);
+               else fe = fnxt;
                assert(fe);
                flow = fe->flow;
 
@@ -267,6 +269,11 @@ DFLOWworker(void *t)
                        error = runMALsequence(flow->cntxt, flow->mb, fe->pc, 
fe->pc + 1, flow->stk, 0, 0);
                        PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= 
%d claim= " LLFMT "," LLFMT " %s\n",
                                                                  fe->pc, id, 
fe->argclaim, fe->hotclaim, error ? error : "");
+#ifdef USE_MAL_ADMISSION
+                       /* release the memory claim */
+                       MALadmission(-fe->argclaim, -fe->hotclaim);
+#endif
+
                        fe->state = DFLOWwrapup;
                        if (error) {
                                MT_lock_set(&flow->flowlock, "runMALdataflow");
@@ -291,15 +298,11 @@ DFLOWworker(void *t)
 
                PARDEBUG mnstr_printf(GDKstdout, "#execute pc= %d wrk= %d 
finished %s\n", fe->pc, id, flow->error ? flow->error : "");
 
-#ifdef USE_MAL_ADMISSION
-               /* release the memory claim */
-               MALadmission(-fe->argclaim, -fe->hotclaim);
-#endif
-
                /* see if you can find an eligible instruction that uses the
-                * result just produced. Then we can continue with it right
-                * away.  We are just looking forward for the last block, which 
means we
-                * are safe from concurrent actions.
+                * result just produced. Then we can continue with it right 
away.
+                * We are just looking forward for the last block, which means 
we
+                * are safe from concurrent actions. No other thread can steal 
it,
+                * because we hold the logical lock.
                 * All eligible instructions are queued
                 */
 #ifdef USE_MAL_ADMISSION
@@ -308,6 +311,19 @@ DFLOWworker(void *t)
                for (i = 0; i < p->retc; i++)
                        fe->hotclaim += getMemoryClaim(flow->mb, flow->stk, 
fe->pc, i, FALSE);
 #endif
+        fnxt = 0;
+               for (last = fe->pc - flow->start; last >= 0 && (i = 
flow->nodes[last]) > 0; last = flow->edges[last])
+                       if (flow->status[i].state == DFLOWpending &&
+                               flow->status[i].blocks == 1) {
+                               flow->status[i].state = DFLOWrunning;
+                               flow->status[i].blocks = 0;
+                               flow->status[i].hotclaim = fe->hotclaim;
+                               flow->status[i].argclaim += fe->hotclaim;
+                               if (fnxt)
+                                       q_enqueue(todo, fnxt);
+                               fnxt = flow->status + i;
+                       }
+
                q_enqueue(flow->done, fe);
                MALresourceFairness(flow->cntxt, flow->mb, usec);
        }
@@ -463,7 +479,7 @@ static void showFlowEvent(DataFlow flow,
 static str
 DFLOWscheduler(DataFlow flow)
 {
-       int queued = 0, last;
+       int last;
        int i, pc = 0;
 #ifdef USE_MAL_ADMISSION
        int j;
@@ -481,7 +497,6 @@ DFLOWscheduler(DataFlow flow)
        if (fe[0].flow->cntxt->flags & timerFlag)
                fe[0].flow->cntxt->timer = GDKusec();
 
-       /* enter all dependencies before releasing the queue  */
        for (i = 0; i < actions; i++)
                if (fe[i].blocks == 0) {
 #ifdef USE_MAL_ADMISSION
@@ -491,14 +506,12 @@ DFLOWscheduler(DataFlow flow)
 #endif
                        q_enqueue(todo, flow->status + i);
                        flow->status[i].state = DFLOWrunning;
-                       queued++;
-                       PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d 
claim=" LLFMT " queue %d\n", flow->status[i].pc, flow->status[i].argclaim, 
queued);
+                       PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d 
claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
                }
 
        PARDEBUG mnstr_printf(GDKstdout, "#run %d instructions in dataflow 
block\n", actions);
 
        while (actions > 0 ) {
-               PARDEBUG mnstr_printf(GDKstdout, "#waiting for results, queued 
%d\n", queued);
                f = q_dequeue(flow->done);
                actions--;
 
@@ -517,12 +530,11 @@ DFLOWscheduler(DataFlow flow)
                        if (flow->status[i].state == DFLOWpending) {
                                flow->status[i].argclaim += f->hotclaim;
                                if (flow->status[i].blocks == 1 ) {
-                                       queued++;
                                        q_enqueue(todo, flow->status + i);
                                        flow->status[i].state = DFLOWrunning;
                                        flow->status[i].blocks--;
                                        PARDEBUG
-                                       mnstr_printf(GDKstdout, "#enqueue pc=%d 
claim= " LLFMT " queued= %d\n", flow->status[i].pc, flow->status[i].argclaim, 
queued);
+                                       mnstr_printf(GDKstdout, "#enqueue pc=%d 
claim= " LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
                                } else {
                                        flow->status[i].blocks--;
                                }
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to