Changeset: 5c6037eed2a5 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5c6037eed2a5
Modified Files:
monetdb5/mal/mal_interpreter.mx
Branch: default
Log Message:
Revamp dataflow scheduler
The old scheduler was based on linear scans ot determine eligible
instructions. The new one uses a pre-pared dataflow graph to
quickly identify eligable instructions after each instruction.
todo: avoid expensive memory foot claims
diffs (truncated from 699 to 300 lines):
diff --git a/monetdb5/mal/mal_interpreter.mx b/monetdb5/mal/mal_interpreter.mx
--- a/monetdb5/mal/mal_interpreter.mx
+++ b/monetdb5/mal/mal_interpreter.mx
@@ -572,9 +572,6 @@
@-
The flow graphs should be organized such that parallel threads can
access it mostly without expensive locking.
-Furthermore, per flow block we keep an administration when
-variables are blocked or not. An instruction can fire if all
-its input variables are not blocked.
@c
#define DFLOWpending 0 /* runnable */
#define DFLOWrunning 1 /* currently in progress */
@@ -590,22 +587,36 @@
} queue;
+@-
+The dataflow dependency is administered in a graph list structure.
+For each instruction we keep the list of instructions that
+should be checked for eligibility once we are finished with it.
+@c
typedef struct {
MT_Id tid;
int id;
queue *todo; /* pending actions for this client */
lng clk;
+ struct DataFlow *flow;
} FlowTask;
-typedef struct DataFlow {
+typedef struct FLOWSTATUS {
Client cntxt; /* for debugging and client resolution */
MalBlkPtr mb; /* carry the context */
MalStkPtr stk;
+ int pc; /* pc in underlying malblock */
+ int blocks; /* awaiting for variables */
+ sht state; /* of execution */
+ sht cost;
+ lng claim; /* memory foot print */
+ str error;
+} *FlowStatus, FlowStatusRec;
+
+typedef struct DataFlow {
int start, stop; /* guarded block under consideration*/
- char *status; /* statements can be blocked on other
statements */
- char *blocked; /* blocked, should be created first */
- int *assign; /* first assignment of variable */
- int *inuse; /* inuse in parallel threads reference count */
+ FlowStatus status; /* status of each instruction */
+ int *nodes; /* dependency graph nodes */
+ int *edges; /* dependency graph */
queue *done; /* work finished */
queue *todo; /* pending actions for this client */
int nway; /* number of workers */
@@ -613,13 +624,18 @@
struct DataFlow *free; /* free list */
} *DataFlow, DataFlowRec;
-typedef struct {
- int pc; /* pc in underlying malblock */
- sht status;
- sht cost;
- str error;
- DataFlow flow;
-} *FlowStep, FlowStepRec;
+@-
+Calculate the size of the dataflow dependency graph.
+@c
+static int
+DFLOWgraphSize(MalBlkPtr mb, int start, int stop){
+ int cnt = 0;
+ int i;
+
+ for(i= start; i < stop; i++)
+ cnt += getInstrPtr(mb,i)->argc;
+ return cnt;
+}
@-
Running all eligible instructions in parallel creates
@@ -628,7 +644,7 @@
postponed if the claim for memory exceeds a threshold
In general such contentions will be hard to predict,
because they depend on the algorithm, the input sizes,
-and the output produced.
+concurrent use of the same variables, and the output produced.
The heuristic is based on calculating the storage footprint
of the operands and assuming it preferrably should fit in memory.
@@ -974,9 +990,9 @@
The order of the instructions should be retained as long as possible.
@c
static str
-DFLOWstep(FlowTask *t, FlowStep fs)
+DFLOWstep(FlowTask *t, FlowStatus fs)
{
- DataFlow flow = fs->flow;
+ DataFlow flow = t->flow;
int stkpc = fs->pc;
ValPtr lhs,rhs,v;
@@ -986,13 +1002,13 @@
#if FAST
int stamp = -1;
#endif
- bat *backup= (bat*) alloca(flow->mb->maxarg * sizeof(bat));
- str *sbackup= (str*) alloca(flow->mb->maxarg * sizeof(str));
- int *garbage= (int*) alloca(flow->mb->maxarg * sizeof(int));
- Client cntxt = flow->cntxt;
- MalBlkPtr mb = flow->mb;
- MalStkPtr stk = flow->stk;
- int startpc = flow->start;
+ bat *backup= (bat*) alloca(fs->mb->maxarg * sizeof(bat));
+ str *sbackup= (str*) alloca(fs->mb->maxarg * sizeof(str));
+ int *garbage= (int*) alloca(fs->mb->maxarg * sizeof(int));
+ Client cntxt = fs->cntxt;
+ MalBlkPtr mb = fs->mb;
+ MalStkPtr stk = fs->stk;
+ int startpc = fs->pc;
InstrPtr pci;
lng oldtimer=0;
struct Mallinfo oldMemory;
@@ -1014,7 +1030,7 @@
if (stk == NULL || stkpc < 0)
throw(MAL, "mal.interpreter", MAL_STACK_FAIL);
- pci = getInstrPtr(flow->mb, stkpc);
+ pci = getInstrPtr(fs->mb, stkpc);
#ifdef DEBUG_FLOW
printf("#EXECUTE THREAD %d \n", tid);
printInstruction(GDKstdout, flow->mb, 0, pci, LIST_MAL_STMT |
LIST_MAPI);
@@ -1031,7 +1047,7 @@
if (stk->cmd == 'x' || cntxt->mode == FINISHING) {
/* need a way to skip */
stkpc = mb->stop;
- fs->status = -1;
+ fs->state = -1;
return ret;
}
if (oldtimer ) {
@@ -1150,16 +1166,14 @@
static void
runDFLOWworker(void *t)
{
- FlowStep fs,oldfs =0;
+ FlowStatus fs,oldfs =0;
FlowTask *task = (FlowTask*) t;
str err;
Thread thr;
thr = THRnew(MT_getpid(), "DFLOWworker");
while(task) {
- fs = (FlowStep)q_dequeue(task->todo);
- PARDEBUG
- mnstr_printf(GDKstdout,"#execute pc=%d thread =%d
\n",fs->pc, task->id);
+ fs = (FlowStatus)q_dequeue(task->todo);
if ( fs == oldfs){
q_requeue(task->todo,fs);
MT_sleep_ms(10);
@@ -1172,182 +1186,125 @@
printInstruction(GDKstdout, fs->flow->mb, 0,
getInstrPtr(fs->flow->mb,fs->pc), LIST_MAL_STMT | LIST_MAPI);
#endif
err = fs->pc >0 ? DFLOWstep(task, fs):
createException(MAL,"interpreter","flow step failed");
+ PARDEBUG
+ mnstr_printf(GDKstdout,"#execute pc=%d thr= %d finished
err =%s \n",fs->pc, task->id, err?err:"");
/* restore the instruction and wait in specific cases*/
if ( err != MAL_SUCCEED && strstr(err,"DFLOWadmission") != NULL
&& strstr(err,"failed") != NULL){
FREE_EXCEPTION(err);
fs->pc = ABS(fs->pc);
- fs->status = DFLOWrunning;
+ fs->state = DFLOWrunning;
q_requeue(task->todo,fs);
continue;
}
fs->error = err;
- q_enqueue(fs->flow->done, fs);
+ q_enqueue(task->flow->done, fs);
}
THRdel(thr);
}
@-
The dataflow block assumes a linear MAL program.
-The dataflow administration is based on three properties.
-For each variable we keep a flag telling if it is blocked,
-i.e. not available, because it has not been assigned a value yet.
-Blocked[a]== 0 means that the variable can be used.
-
-The property inus[a] tells how many instructions still
-want to use the variable before it reaches the next assignment.
-Each instruction is counted once, even if the same variable is used multiple
times.
-
-The eoscope statement should be treated with care, because it
-can only be executed when the inuse[a] drops to 1.
-The instruction is enqued last.
-
-Further complications are the definitions of variables outside
-the dataflow block. They appear as input variables without assignment.
-They are considered unblocked.
-
-After an instruction has been executed, we should decrease
-the inuse[a] property for all input arguments. Furthermore, all
-target variables become nonblocked and can be used to enqueue instructions.
-
+The dataflow administration is based on administration of
+how many variables are still missing before it can be executed.
+For each instruction we keep a list of instructions whose
+blocking counter should be decremented upon finishined it.
@c
static void
-DFLOWinit(DataFlow flow, FlowStep fs)
+DFLOWinit(DataFlow flow, Client cntxt, MalBlkPtr mb, MalStkPtr stk, int size)
{
- int i, n;
+ int pc, i, j, k, n, etop = 0;
+ int *assign;
+ InstrPtr p;
PARDEBUG
printf("Initialize dflow block\n");
- for (n=0, i = flow->start; i<flow->stop; i++, n++) {
- InstrPtr p = getInstrPtr(flow->mb, i);
- int j, a;
-
- PARDEBUG
- printInstruction(GDKstdout, flow->mb, 0, p,
LIST_MAL_STMT | LIST_MAPI);
+ assign = (int*) GDKzalloc(mb->vtop * sizeof(int));
+ etop = flow->stop - flow->start;
+ for (n=0, pc = flow->start; pc<flow->stop; pc++, n++) {
+ p = getInstrPtr(mb, pc);
/* initial state, ie everything can run */
- fs[n].pc = i;
- fs[n].status = DFLOWpending;
- fs[n].flow = flow;
- fs[n].cost = -1;
- fs[n].error = NULL;
+ flow->status[n].cntxt = cntxt;
+ flow->status[n].mb = mb;
+ flow->status[n].stk = stk;
+ flow->status[n].pc = pc;
+ flow->status[n].state = DFLOWpending;
+ flow->status[n].cost = -1;
+ flow->status[n].error = NULL;
- for (j=0; j<p->argc; j++){
- a = getArg(p, j);
+ /* administer flow dependencies */
+ for (j= p->retc; j<p->argc; j++) {
+ if ( !isVarConstant(mb,getArg(p,j)) && ( k =
assign[getArg(p,j)]) ) {
+ /* add edge to the target instruction for
wakeup call */
+ k -= flow->start;
+ if ( flow->nodes[k] ){
+ /* add wakeup to tail of list */
+ for ( i= k; flow->edges[i] > 0; i =
flow->edges[i])
+ ;
+ flow->nodes[etop] = n;
+ flow->edges[etop] = -1;
+ flow->edges[i] = etop;
+ etop++;
+ (void) size;
+ assert(etop < size);
+ } else {
+ flow->nodes[k] = n;
+ flow->edges[k] = -1;
+ }
+
+ flow->status[n].blocks++;
+ }
+ /* be careful, watch out for garbage collection
interference */
+ k= getEndOfLife(mb,getArg(p,j));
+ if ( k != pc && k< flow->stop && k > flow->start){
+ /* add edge to the target instruction for
wakeup call */
+ PARDEBUG
+ mnstr_printf(GDKstdout,"forward %d ->
%d\n",n,k);
+ k -= flow->start;
+ if ( flow->nodes[n] ){
+ /* add wakeup to tail of list */
+ for ( i= n; flow->edges[i] > 0; i =
flow->edges[i])
+ ;
+ flow->nodes[etop] = k;
+ flow->edges[etop] = -1;
+ flow->edges[i] = etop;
+ etop++;
+ (void) size;
+ assert(etop < size);
+ } else {
+ flow->nodes[n] = k;
+ flow->edges[n] = -1;
+ }
+ flow->status[k].blocks++;
+ }
+ }
- if (j<p->retc && flow->assign[a] != 0)
- assert(0);
-
- if (j<p->retc && flow->assign[a] == 0 && flow->inuse[a]
== 0) {
- flow->assign[a] = i;
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list