Changeset: f249986489b5 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f249986489b5
Modified Files:
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_dataflow.h
        monetdb5/modules/mal/language.c
        monetdb5/scheduler/run_octopus.c
        monetdb5/scheduler/srvpool.c
Branch: default
Log Message:

Use a limited number of MAL workers
There are a fixed number of MAL interpreters to handle all parallel blocks.
the total number of threads is GDKnr_threads + clients.

Some design decisions.
The resource fairness is factored out to be further improved under multi query 
load.
An errors is recognized but does not stop the dataflow interpretation. All 
instructiones
are looked upon, but not executed. This simplifies error handling.


diffs (truncated from 1300 to 300 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -36,59 +36,53 @@
  * access it mostly without expensive locking.
  */
 #include "mal_dataflow.h"
+
 #define DFLOWpending 0         /* runnable */
 #define DFLOWrunning 1         /* currently in progress */
 #define DFLOWwrapup  2         /* done! */
 #define DFLOWretry   3         /* reschedule */
+#define DFLOWskipped 4         /* due to errors */
+
+/* The per instruction status of execution */
+typedef struct FLOWEVENT {
+       struct DATAFLOW *flow;/* execution context */
+       int pc;         /* pc in underlying malblock */
+       int blocks;     /* awaiting for variables */
+       sht state;      /* of execution */
+       lng clk;
+       sht cost;
+       lng hotclaim;   /* memory foot print of result variables */
+       lng argclaim;   /* memory foot print of arguments */
+} *FlowEvent, FlowEventRec;
 
 typedef struct queue {
        int size;       /* size of queue */
        int last;       /* last element in the queue */
-       void **data;
+       FlowEvent *data;
        MT_Lock l;      /* its a shared resource, ie we need locks */
        MT_Sema s;      /* threads wait on empty queues */
 } queue;
 
-
 /*
  * The dataflow dependency is administered in a graph list structure.
  * For each instruction we keep the list of instructions that
  * should be checked for eligibility once we are finished with it.
  */
-typedef struct {
-       MT_Id tid;
-       int id;
-       queue *todo;            /* pending actions for this client */
-       lng clk;
-       struct DataFlow *flow;
-} FlowTask;
-
-typedef struct FLOWSTATUS {
+typedef struct DATAFLOW {
        Client cntxt;   /* for debugging and client resolution */
        MalBlkPtr mb;   /* carry the context */
        MalStkPtr stk;
-       int pc;         /* pc in underlying malblock */
-       int blocks;     /* awaiting for variables */
-       sht state;      /* of execution */
-       sht cost;
-       lng hotclaim;   /* memory foot print of result variables */
-       lng argclaim;   /* memory foot print of arguments */
-       str error;
-} *FlowStatus, FlowStatusRec;
+       int start, stop;    /* guarded block under consideration*/
+       FlowEvent status;   /* status of each instruction */
+       str error;          /* error encountered */
+       int *nodes;         /* dependency graph nodes */
+       int *edges;         /* dependency graph */
+       MT_Lock flowlock;   /* lock to protect the above */
+       queue *done;        /* instructions handled */
+} *DataFlow, DataFlowRec;
 
-typedef struct DataFlow {
-    int start, stop;    /* guarded block under consideration*/
-    FlowStatus status;  /* status of each instruction */
-    int *nodes;         /* dependency graph nodes */
-    int *edges;         /* dependency graph */
-    queue *done;        /* work finished */
-    queue *todo;        /* pending actions for this client */
-    int    nway;        /* number of workers */
-    FlowTask *worker;   /* worker threads for the client */
-    struct DataFlow *free; /* free list */
-    int terminate;      /* set if we need to terminate */
-    MT_Lock termlock;   /* lock to protect the above */
-} *DataFlow, DataFlowRec;
+static MT_Id workers[THREADS];
+static queue *todo = 0;        /* pending instructions */
 
 /* does not seem to have a major impact */
 lng memorypool = 0;      /* memory claimed by concurrent threads */
@@ -147,10 +141,11 @@ DFLOWgraphSize(MalBlkPtr mb, int start, 
  * Views are consider cheap and ignored
  */
 lng
-getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int i, int flag)
+getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, int pc, int i, int flag)
 {
        lng total = 0, vol = 0;
        BAT *b;
+       InstrPtr pci = getInstrPtr(mb,pc);
 
        (void)mb;
        if (stk->stk[getArg(pci, i)].vtype == TYPE_bat) {
@@ -250,26 +245,21 @@ q_create(int sz)
        return q;
 }
 
-static void
-q_destroy(queue *q)
-{
-       GDKfree(q->data);
-       GDKfree(q);
-}
 
 /* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue 
is possible */
 /* we might actually sort it for better scheduling behavior */
 static void
-q_enqueue_(queue *q, FlowStatus d)
+q_enqueue_(queue *q, FlowEvent d)
 {
+       assert(d);
        if (q->last == q->size) {
                q->size <<= 1;
                q->data = GDKrealloc(q->data, sizeof(void*) * q->size);
        }
-       q->data[q->last++] = (void*)d;
+       q->data[q->last++] = d;
 }
 static void
-q_enqueue(queue *q, FlowStatus d)
+q_enqueue(queue *q, FlowEvent d)
 {
        MT_lock_set(&q->l, "q_enqueue");
        q_enqueue_(q, d);
@@ -284,9 +274,11 @@ q_enqueue(queue *q, FlowStatus d)
  */
 
 static void
-q_requeue_(queue *q, void *d)
+q_requeue_(queue *q, FlowEvent d)
 {
        int i;
+
+       assert(d);
        if (q->last == q->size) {
                /* enlarge buffer */
                q->size <<= 1;
@@ -298,8 +290,9 @@ q_requeue_(queue *q, void *d)
        q->last++;
 }
 static void
-q_requeue(queue *q, void *d)
+q_requeue(queue *q, FlowEvent d)
 {
+       assert(d);
        MT_lock_set(&q->l, "q_requeue");
        q_requeue_(q, d);
        MT_lock_unset(&q->l, "q_requeue");
@@ -312,6 +305,7 @@ q_dequeue(queue *q)
        void *r = NULL;
 
        MT_sema_down(&q->s, "q_dequeue");
+       assert(q->last);
        MT_lock_set(&q->l, "q_dequeue");
        if (q->last > 0)
                /* LIFO favors garbage collection */
@@ -329,27 +323,10 @@ q_dequeue(queue *q)
         */
 
        MT_lock_unset(&q->l, "q_dequeue");
+       assert(r);
        return r;
 }
 
-/* it makes sense to give priority to those
- * instructions that were first in the plan
- */
-static void
-queue_sort(queue *q)
-{
-       int i, j;
-       void *f;
-
-       for (i = 0; i < q->last; i++)
-               for (j = i + 1; j < q->last; j++)
-                       if (((FlowStatus)q->data[i])->pc < 
((FlowStatus)q->data[j])->pc) {
-                               f = q->data[i];
-                               q->data[i] = q->data[j];
-                               q->data[j] = f;
-                       }
-}
-
 /*
  * We simply move an instruction into the front of the queue.
  * Beware, we assume that variables are assigned a value once, otherwise
@@ -367,404 +344,6 @@ queue_sort(queue *q)
  * because in the kernel we don't know what routines are available
  * with this property. Nor do we maintain such properties.
  */
-static str
-DFLOWstep(FlowTask *t, FlowStatus fs)
-{
-       DataFlow flow = t->flow;
-       int stkpc = fs->pc;
-       int stamp = -1;
-
-       ValPtr lhs, rhs, v;
-       int i, k;
-       int exceptionVar = -1;
-       str ret = MAL_SUCCEED;
-       ValRecord backups[16];
-       ValPtr backup;
-       int garbages[16], *garbage;
-       Client cntxt = fs->cntxt;
-       MalBlkPtr mb = fs->mb;
-       MalStkPtr stk = fs->stk;
-       int startpc = fs->pc;
-       InstrPtr pci;
-       MT_Lock *lock = &flow->done->l;
-       int tid = t->id, prevpc = 0;
-       RuntimeProfileRecord runtimeProfile;
-
-       runtimeProfileInit(mb, &runtimeProfile, cntxt->flags & memoryFlag);
-       if (stk == NULL || stkpc < 0)
-               throw(MAL, "mal.interpreter", MAL_STACK_FAIL);
-
-       /* prepare extended backup and garbage structures */
-       if (mb->maxarg > 16) {
-               backup = GDKzalloc(mb->maxarg * sizeof(ValRecord));
-               garbage = (int *) GDKzalloc(mb->maxarg * sizeof(int));
-       } else {
-               backup = backups;
-               garbage = garbages;
-       }
-
-       pci = getInstrPtr(fs->mb, stkpc);
-#ifdef DEBUG_FLOW
-       printf("#EXECUTE THREAD %d \n", tid);
-       printInstruction(GDKstdout, flow->mb, 0, pci, LIST_MAL_STMT | 
LIST_MAPI);
-#endif
-       if (stk->cmd || mb->trap) {
-               if (cntxt->flags & bbpFlag)
-                       BBPTraceCall(cntxt, mb, stk, prevpc);
-               prevpc = stkpc;
-               mdbStep(cntxt, mb, stk, getPC(mb, pci));
-               if (stk->cmd == 'x' || cntxt->mode == FINISHING) {
-                       /* need a way to skip */
-                       stkpc = mb->stop;
-                       fs->state = -1;
-                       if (backup != backups)
-                               GDKfree(backup);
-                       if (garbage != garbages)
-                               GDKfree(garbage);
-                       return ret;
-               }
-       }
-
-       /*runtimeProfileBegin(cntxt, mb, stk, stkpc, &runtimeProfile, 0);*/
-       if (pci->recycle > 0)
-               t->clk = GDKusec();
-       if (!RECYCLEentry(cntxt, mb, stk, pci)) {
-               runtimeProfileBegin(cntxt, mb, stk, stkpc, &runtimeProfile, 1);
-               /*
-                * Before we execute an instruction the variables to be garbage
-                * collected are identified. In the post-execution phase they
-                * are removed.
-                */
-               if (garbageControl(pci)) {
-                       for (i = 0; i < pci->argc; i++) {
-                               int a = getArg(pci, i);
-
-                               backup[i].vtype = 0;
-                               backup[i].len = 0;
-                               backup[i].val.pval = 0;
-                               garbage[i] = -1;
-                               if (stk->stk[a].vtype == TYPE_bat && 
getEndOfLife(mb, a) == stkpc && isNotUsedIn(pci, i + 1, a))
-                                       garbage[i] = a;
-
-                               if (i < pci->retc && stk->stk[a].vtype == 
TYPE_bat) {
-                                       backup[i] = stk->stk[a];
-                                       stamp = BBPcurstamp();
-                               } else if (i < pci->retc &&
-                                                  0 < stk->stk[a].vtype &&
-                                                  stk->stk[a].vtype < TYPE_any 
&&
-                                                  
ATOMextern(stk->stk[a].vtype)) {
-                                       backup[i] = stk->stk[a];
-                               }
-                       }
-               }
-               /*
-                * The number of instructions allowed is severely limited.  We
-                * don't allow sequential flow control here, which is enforced
-                * by the dataflow optimizer;
-                */
-               switch (pci->token) {
-               case ASSIGNsymbol:
-                       for (k = 0, i = pci->retc; k < pci->retc && i < 
pci->argc; i++, k++) {
-                               lhs = &stk->stk[pci->argv[k]];
-                               rhs = &stk->stk[pci->argv[i]];
-                               VALcopy(lhs, rhs);
-                               if (lhs->vtype == TYPE_bat && lhs->val.bval)
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to