Changeset: 5c6037eed2a5 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5c6037eed2a5
Modified Files:
        monetdb5/mal/mal_interpreter.mx
Branch: default
Log Message:

Revamp dataflow scheduler
The old scheduler was based on linear scans ot determine eligible
instructions. The new one uses a pre-pared dataflow graph to
quickly identify eligable instructions after each instruction.

todo: avoid expensive memory foot claims


diffs (truncated from 699 to 300 lines):

diff --git a/monetdb5/mal/mal_interpreter.mx b/monetdb5/mal/mal_interpreter.mx
--- a/monetdb5/mal/mal_interpreter.mx
+++ b/monetdb5/mal/mal_interpreter.mx
@@ -572,9 +572,6 @@
 @-
 The flow graphs should be organized such that parallel threads can
 access it mostly without expensive locking.
-Furthermore, per flow block we keep an administration when
-variables are blocked or not. An instruction can fire if all
-its input variables are not blocked.
 @c
 #define DFLOWpending 0         /* runnable */
 #define DFLOWrunning 1         /* currently in progress */
@@ -590,22 +587,36 @@
 } queue;
 
 
+@-
+The dataflow dependency is administered in a graph list structure. 
+For each instruction we keep the list of instructions that
+should be checked for eligibility once we are finished with it.
+@c 
 typedef struct {
        MT_Id tid;
        int id;
        queue *todo;            /* pending actions for this client */
        lng clk;
+       struct DataFlow *flow;
 } FlowTask;
 
-typedef struct DataFlow {
+typedef struct FLOWSTATUS {
        Client cntxt;           /* for debugging and client resolution */
        MalBlkPtr mb;           /* carry the context */
        MalStkPtr stk;
+       int pc;                 /* pc in underlying malblock */
+       int blocks;     /* awaiting for variables */
+       sht state;              /* of execution */
+       sht cost;
+       lng claim;              /* memory foot print */
+       str error;
+} *FlowStatus, FlowStatusRec;
+
+typedef struct DataFlow {
        int start, stop;        /* guarded block under consideration*/
-       char *status;           /* statements can be blocked on other 
statements */
-       char *blocked;          /* blocked, should be created first */
-       int *assign;            /* first assignment of variable */
-       int *inuse;             /* inuse in parallel threads reference count */
+       FlowStatus status;              /* status of each instruction */
+       int *nodes;                     /* dependency graph nodes */
+       int *edges;                     /* dependency graph */
        queue *done;            /* work finished */
        queue *todo;            /* pending actions for this client */
        int    nway;            /* number of workers */
@@ -613,13 +624,18 @@
        struct DataFlow *free;  /* free list */
 } *DataFlow, DataFlowRec;
 
-typedef struct {
-       int pc; /* pc in underlying malblock */
-       sht status;
-       sht cost;
-       str error;
-       DataFlow flow;
-} *FlowStep, FlowStepRec;
+@-
+Calculate the size of the dataflow dependency graph.
+@c
+static int
+DFLOWgraphSize(MalBlkPtr mb, int start, int stop){
+       int cnt = 0;
+       int i;
+       
+       for(i= start; i < stop; i++)
+               cnt += getInstrPtr(mb,i)->argc;
+       return cnt;
+}
 
 @-
 Running all eligible instructions in parallel creates
@@ -628,7 +644,7 @@
 postponed if the claim for memory exceeds a threshold
 In general such contentions will be hard to predict,
 because they depend on the algorithm, the input sizes,
-and the output produced.
+concurrent use of the same variables, and the output produced.
 
 The heuristic is based on calculating the storage footprint
 of the operands and assuming it preferrably should fit in memory.
@@ -974,9 +990,9 @@
 The order of the instructions should be retained as long as possible.
 @c
 static str
-DFLOWstep(FlowTask *t, FlowStep fs)
+DFLOWstep(FlowTask *t, FlowStatus fs)
 {
-       DataFlow flow = fs->flow;
+       DataFlow flow = t->flow;
        int stkpc = fs->pc;
 
        ValPtr lhs,rhs,v;
@@ -986,13 +1002,13 @@
 #if FAST
        int stamp = -1;
 #endif
-       bat *backup= (bat*) alloca(flow->mb->maxarg * sizeof(bat));
-       str *sbackup= (str*) alloca(flow->mb->maxarg * sizeof(str));
-       int *garbage= (int*) alloca(flow->mb->maxarg * sizeof(int));
-       Client cntxt = flow->cntxt;
-       MalBlkPtr mb = flow->mb;
-       MalStkPtr stk = flow->stk;
-       int startpc = flow->start;
+       bat *backup= (bat*) alloca(fs->mb->maxarg * sizeof(bat));
+       str *sbackup= (str*) alloca(fs->mb->maxarg * sizeof(str));
+       int *garbage= (int*) alloca(fs->mb->maxarg * sizeof(int));
+       Client cntxt = fs->cntxt;
+       MalBlkPtr mb = fs->mb;
+       MalStkPtr stk = fs->stk;
+       int startpc = fs->pc;
        InstrPtr pci;
        lng oldtimer=0;
        struct Mallinfo oldMemory;
@@ -1014,7 +1030,7 @@
        if (stk == NULL || stkpc < 0)
                throw(MAL, "mal.interpreter", MAL_STACK_FAIL);
 
-       pci = getInstrPtr(flow->mb, stkpc);
+       pci = getInstrPtr(fs->mb, stkpc);
 #ifdef DEBUG_FLOW
        printf("#EXECUTE THREAD %d \n", tid);
        printInstruction(GDKstdout, flow->mb, 0, pci, LIST_MAL_STMT | 
LIST_MAPI);
@@ -1031,7 +1047,7 @@
                if (stk->cmd == 'x' || cntxt->mode == FINISHING) {
                        /* need a way to skip */
                        stkpc = mb->stop;
-                       fs->status = -1;
+                       fs->state = -1;
                        return ret;
                }
                if (oldtimer ) {
@@ -1150,16 +1166,14 @@
 static void
 runDFLOWworker(void *t)
 {
-       FlowStep fs,oldfs =0;
+       FlowStatus fs,oldfs =0;
        FlowTask *task = (FlowTask*) t;
        str err;
        Thread thr;
 
        thr = THRnew(MT_getpid(), "DFLOWworker");
        while(task) {
-               fs = (FlowStep)q_dequeue(task->todo);
-               PARDEBUG
-                       mnstr_printf(GDKstdout,"#execute pc=%d thread =%d 
\n",fs->pc, task->id);
+               fs = (FlowStatus)q_dequeue(task->todo);
                if ( fs == oldfs){
                        q_requeue(task->todo,fs);
                        MT_sleep_ms(10);
@@ -1172,182 +1186,125 @@
                        printInstruction(GDKstdout, fs->flow->mb, 0, 
getInstrPtr(fs->flow->mb,fs->pc), LIST_MAL_STMT | LIST_MAPI);
 #endif
                err = fs->pc >0 ? DFLOWstep(task, fs): 
createException(MAL,"interpreter","flow step failed");
+               PARDEBUG
+                       mnstr_printf(GDKstdout,"#execute pc=%d thr= %d finished 
err =%s \n",fs->pc, task->id, err?err:"");
                /* restore the instruction and wait in specific cases*/
                if ( err != MAL_SUCCEED && strstr(err,"DFLOWadmission") != NULL 
&& strstr(err,"failed") != NULL){
                        FREE_EXCEPTION(err);
                        fs->pc = ABS(fs->pc);
-                       fs->status = DFLOWrunning;
+                       fs->state = DFLOWrunning;
                        q_requeue(task->todo,fs);
                        continue;
                }
                fs->error = err;
-               q_enqueue(fs->flow->done, fs);
+               q_enqueue(task->flow->done, fs);
        }
        THRdel(thr);
 }
 
 @-
 The dataflow block assumes a linear MAL program.
-The dataflow administration is based on three properties.
-For each variable we keep a flag telling if it is blocked,
-i.e. not available, because it has not been assigned a value yet.
-Blocked[a]== 0 means that the variable can be used.
-
-The property inus[a] tells how many instructions still
-want to use the variable before it reaches the next assignment.
-Each instruction is counted once, even if the same variable is used multiple 
times.
-
-The eoscope statement should be treated with care, because it
-can only be executed when the inuse[a] drops  to 1.
-The instruction is enqued last.
-
-Further complications are the definitions of variables outside
-the dataflow block. They appear as input variables without assignment.
-They are considered unblocked.
-
-After an instruction has been executed, we should decrease
-the inuse[a] property for all input arguments. Furthermore, all
-target variables become nonblocked and can be used to enqueue instructions.
-
+The dataflow administration is based on administration of
+how many variables are still missing before it can be executed.
+For each instruction we keep a list of instructions whose
+blocking counter should be decremented upon finishined it.
 @c
 static void
-DFLOWinit(DataFlow flow, FlowStep fs)
+DFLOWinit(DataFlow flow, Client cntxt, MalBlkPtr mb, MalStkPtr stk, int size)
 {
-       int i, n;
+       int pc, i, j, k, n, etop = 0;
+       int *assign;
+       InstrPtr p;
 
        PARDEBUG
                printf("Initialize dflow block\n");
-       for (n=0, i = flow->start; i<flow->stop; i++, n++) {
-               InstrPtr p = getInstrPtr(flow->mb, i);
-               int j, a;
-
-               PARDEBUG
-                       printInstruction(GDKstdout, flow->mb, 0, p, 
LIST_MAL_STMT | LIST_MAPI);
+       assign = (int*) GDKzalloc(mb->vtop * sizeof(int));
+       etop = flow->stop - flow->start;
+       for (n=0, pc = flow->start; pc<flow->stop; pc++, n++) {
+               p = getInstrPtr(mb, pc);
 
                /* initial state, ie everything can run */
-               fs[n].pc = i;
-               fs[n].status = DFLOWpending;
-               fs[n].flow = flow;
-               fs[n].cost = -1;
-               fs[n].error = NULL;
+               flow->status[n].cntxt = cntxt;
+               flow->status[n].mb = mb;
+               flow->status[n].stk = stk;
+               flow->status[n].pc = pc;
+               flow->status[n].state = DFLOWpending;
+               flow->status[n].cost = -1;
+               flow->status[n].error = NULL;
 
-               for (j=0; j<p->argc; j++){
-                       a = getArg(p, j);
+               /* administer flow dependencies */
+               for (j= p->retc; j<p->argc; j++)  {
+                       if ( !isVarConstant(mb,getArg(p,j)) && ( k = 
assign[getArg(p,j)]) ) {
+                               /* add edge to the target instruction for 
wakeup call */
+                               k -= flow->start;
+                               if ( flow->nodes[k] ){
+                                       /* add wakeup to tail of list */
+                                       for ( i= k; flow->edges[i] > 0; i = 
flow->edges[i])
+                                               ;
+                                       flow->nodes[etop] = n;
+                                       flow->edges[etop] = -1;
+                                       flow->edges[i]  = etop;
+                                       etop++;
+                                       (void) size;
+                                       assert(etop < size);
+                               } else {
+                                       flow->nodes[k] = n;
+                                       flow->edges[k] = -1;
+                               }
+                               
+                               flow->status[n].blocks++;
+                       }
+                       /* be careful, watch out for garbage collection 
interference */
+                       k= getEndOfLife(mb,getArg(p,j));
+                       if ( k != pc && k< flow->stop && k > flow->start){
+                               /* add edge to the target instruction for 
wakeup call */
+                               PARDEBUG
+                                       mnstr_printf(GDKstdout,"forward %d -> 
%d\n",n,k);
+                               k -= flow->start;
+                               if ( flow->nodes[n] ){
+                                       /* add wakeup to tail of list */
+                                       for ( i= n; flow->edges[i] > 0; i = 
flow->edges[i])
+                                               ;
+                                       flow->nodes[etop] = k;
+                                       flow->edges[etop] = -1;
+                                       flow->edges[i]  = etop;
+                                       etop++;
+                                       (void) size;
+                                       assert(etop < size);
+                               } else {
+                                       flow->nodes[n] = k;
+                                       flow->edges[n] = -1;
+                               }
+                               flow->status[k].blocks++;
+                       }
+               }
 
-                       if (j<p->retc && flow->assign[a] != 0)
-                               assert(0);
-
-                       if (j<p->retc && flow->assign[a] == 0 && flow->inuse[a] 
== 0) {
-                               flow->assign[a] = i;
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to