Changeset: 6ce816ead8f0 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6ce816ead8f0
Modified Files:
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_dataflow.h
        monetdb5/mal/mal_interpreter.h
        monetdb5/optimizer/opt_commonTerms.c
        monetdb5/optimizer/opt_joinpath.c
        monetdb5/optimizer/opt_prelude.c
        monetdb5/optimizer/opt_prelude.h
        monetdb5/optimizer/opt_support.c
        monetdb5/optimizer/opt_support.h
Branch: headless
Log Message:

Merge with default
Manually patch the changed scheduler and optimizers.


diffs (truncated from 1538 to 300 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -56,11 +56,6 @@
 static lng memorypool;         /* memory claimed by concurrent threads */
 static lng memoryused;         /* memory used for intermediates */
 static int memoryclaims = 0;   /* number of threads active with expensive 
operations */
-static struct{
-       lng claim;      /* actual claim on memory*/
-       int bid;
-} hotpotatoes[MAXHOT];
-static int hottop = 0;
 
 #define DFLOWpending 0         /* runnable */
 #define DFLOWrunning 1         /* currently in progress */
@@ -81,17 +76,27 @@
        int id;
        queue *todo;            /* pending actions for this client */
        lng clk;
+       struct DataFlow *flow;
 } FlowTask;
 
-typedef struct DataFlow {
+typedef struct FLOWSTATUS {
        Client cntxt;           /* for debugging and client resolution */
        MalBlkPtr mb;           /* carry the context */
        MalStkPtr stk;
+       int pc;                 /* pc in underlying malblock */
+       int blocks;     /* awaiting for variables */
+       sht state;              /* of execution */
+       sht cost;
+       lng hotclaim;   /* memory foot print of result variables */
+       lng argclaim;   /* memory foot print of arguments */
+       str error;
+} *FlowStatus, FlowStatusRec;
+
+typedef struct DataFlow {
        int start, stop;        /* guarded block under consideration*/
-       char *status;           /* statements can be blocked on other 
statements */
-       char *blocked;          /* blocked, should be created first */
-       int *assign;            /* first assignment of variable */
-       int *inuse;             /* inuse in parallel threads reference count */
+       FlowStatus status;              /* status of each instruction */
+       int *nodes;                     /* dependency graph nodes */
+       int *edges;                     /* dependency graph */
        queue *done;            /* work finished */
        queue *todo;            /* pending actions for this client */
        int    nway;            /* number of workers */
@@ -99,17 +104,8 @@
        struct DataFlow *free;  /* free list */
 } *DataFlow, DataFlowRec;
 
-typedef struct {
-       int pc; /* pc in underlying malblock */
-       sht status;
-       sht cost;
-       str error;
-       DataFlow flow;
-} *FlowStep, FlowStepRec;
 
 
-static DataFlow flows = NULL;
-static int workerid = 0;
 /*
  * Running all eligible instructions in parallel creates
  * resource contention. This means we should implement
@@ -144,225 +140,81 @@
  * suspended instructions.
 */
 
+static int
+DFLOWgraphSize(MalBlkPtr mb, int start, int stop){
+    int cnt = 0;
+    int i;
+
+    for(i= start; i < stop; i++)
+        cnt += getInstrPtr(mb,i)->argc;
+    return cnt;
+}
+
 #define heapinfo(X) if((X) && (X)->base) vol = (X)->free; else vol = 0;
 #define hashinfo(X) if((X) && (X)->mask) vol = 
((X)->mask+(X)->lim+1)*sizeof(int) + sizeof(*(X)); else vol = 0;
 
 static lng
-calcclaim(MalStkPtr stk, InstrPtr pci, int i)
-{
-       lng total = 0,vol;
+getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int i, int flag){
+       lng total=0, vol = 0;
+       oid cnt = 0;
        COL *b;
 
-       assert(i < pci->argc);
+       (void) mb;
        if (stk->stk[getArg(pci,i)].vtype == TYPE_bat){
                b = CBPquickdesc(stk->stk[getArg(pci,i)].val.bval,TRUE);
                if (b==NULL)
-                       return total;
+                       return 0;
+        if ( flag && isVIEW(b)){
+            CBPreleaseref( b);
+            return 0;
+        }
                heapinfo(&b->heap); total += vol;
                heapinfo(b->vheap); total += vol;
                hashinfo(b->hash); total += vol; 
                if ( COLtype(b) == TYPE_oid && !COLordered(b) && !COLdense(b) ){
                        /* assume we may have to do random IO, punish it by 
increasing the claim  with hash elem size*/
-                       total += 4 * COLcount(b) ;
-                       total = total > (lng) (MEMORY_THRESHOLD * 
monet_memory)? (lng) (MEMORY_THRESHOLD * monet_memory) : total;
+                       total += 4 * cnt;
+                       total = total >(lng) (MEMORY_THRESHOLD * monet_memory)? 
(lng) (MEMORY_THRESHOLD * monet_memory):total;
                }
        }
        return total;
 }
 
-static lng
-getHotClaim()
+int
+DFLOWadmission(lng argclaim, lng hotclaim)
 {
-       lng total = 0;
-       int i;
-       for ( i = 0; i < hottop; i++)
-               total += hotpotatoes[i].claim;
-       return total;
-}
-
-/*
- * Calculate the memory need of a single instruction and also determine how 
much of the
- * hot potatoes it will eat.
-*/
-void
-getMemoryClaim(MalStkPtr stk, InstrPtr pci, lng *argclaim, lng *retclaim, lng 
*hotclaim)
-{
-       int i, h = int_nil;
-       lng guess=0, t=0, total = 0;
-       oid cnt = 0;
-
-       total = 0;
-       if (retclaim)
-               *retclaim =0;
-       if (hotclaim)
-               *hotclaim =0;
-       if (argclaim){
-               for(i=  pci->retc; i< pci->argc; i++) {
-                       h = int_nil;
-                       total+= calcclaim(stk,pci,i);
-                       if ( guess == 0)
-                               guess = total -t;
-                       /* claims on hotpotatoes are not counted */
-                       if (hotclaim)
-                       for (t =0; t< hottop; t++)
-                       if (hotpotatoes[t].bid == h)
-                               *hotclaim += hotpotatoes[t].claim;
-               }
-               /* make sure you can afford one hash when you have propCheck 
enabled */
-               if ( GDKdebug & 10 ){
-                       total += cnt * sizeof(oid);
-               }
-               *argclaim = total;
-       }
-       if ( retclaim  && *retclaim == 0)
-               *retclaim = guess * pci->retc;
-#ifdef DEBUG_MEMORY_CLAIM
-       if ( total )
-               mnstr_printf(GDKout,"#DFLOWgetMemoryClaim   pool " LLFMT " 
claims " LLFMT "," LLFMT "," LLFMT"\n", 
memorypool,total,(retclaim?*retclaim:0),(hotclaim?*hotclaim:0));
-#endif
-}
-
-/*
- * After we have executed the instruction, we should release any hotpotatoe 
claim (it is hot only once)
- * and make more precise claims for the return arguments. Moreover, we should 
reduce the hot potatoe set,
- * due to other arguments being loaded.
-*/
-
-void
-updMemoryUsedPart(MalStkPtr stk, InstrPtr pci, int start, int stop, lng 
argclaim)
-{
-       /* remove the result arguments from the hot set */
-       int i,j,h,bid;
-
-       if ( hottop >= MAXHOT || memoryused > (lng) (MEMORY_THRESHOLD * 
monet_memory) ){
-               /* forget everything returning memory to pool */
-               mal_set_lock(mal_contextLock, "DFLOWdelay");
-               for ( i =0; i< MAXHOT; i++){
-                       hotpotatoes[i].claim=0;
-                       hotpotatoes[i].bid=0;
-               }
-               memoryused = 0;
-               hottop = 0;
-               mal_unset_lock(mal_contextLock, "DFLOWdelay");
-#ifdef DEBUG_MEMORY_CLAIM
-               mnstr_printf(GDKout,"#DFLOWhotpotatoes reset\n");
-#endif
-       }
-
-       if (memorypool == 0 ){
-               /* not initialized */
-               return;
-       }
-       mal_set_lock(mal_contextLock, "DFLOWdelay");
-       for ( i = start; i< stop; i++)
-       if (stk->stk[getArg(pci,i)].vtype == TYPE_bat && (bid = 
stk->stk[getArg(pci,i)].val.bval) && bid)
-       {
-               for ( h = j= 0; j< hottop; j++)
-                       if ( hotpotatoes[j].bid != bid)
-                               hotpotatoes[h++]= hotpotatoes[j];
-                       else{
-#ifdef DEBUG_MEMORY_CLAIM
-                               if ( hotpotatoes[j].claim){
-                                       str cv = NULL;
-                                       ATOMformat(TYPE_bat, 
&hotpotatoes[hottop].bid, &cv);
-                                       
mnstr_printf(GDKout,"#DFLOWhotpotatoes[%d] drops  [%s]" LLFMT "\n", j, cv, 
hotpotatoes[j].claim);
-                                       if (cv) GDKfree(cv);
-                               }
-#endif
-                       }
-               hottop = h;
-       }
-       /* input also invalidates part of the hot set */
-       argclaim = argclaim * (memorypool / (memorypool+memoryused));
-       for ( h = j= 0; j< hottop; j++)
-       if ( argclaim > 0)
-                       argclaim -= hotpotatoes[j].claim;
-       else    hotpotatoes[h++]= hotpotatoes[j];
-       hottop = h;
-       mal_unset_lock(mal_contextLock, "DFLOWdelay");
-}
-
-void
-updMemoryUsed(MalStkPtr stk, InstrPtr pci, lng argclaim)
-{
-       lng action=0,total,t;
-       int i,h,bid;
-
-       for ( i= 0; i< pci->retc && hottop < MAXHOT; i++)
-       if (stk->stk[getArg(pci,i)].vtype == TYPE_bat && (bid = 
stk->stk[getArg(pci,i)].val.bval) && bid)
-       {
-               total= calcclaim(stk,pci,i);
-               (void) t;
-               (void) h;
-               if ( total ) {
-                       mal_set_lock(mal_contextLock, "DFLOWdelay");
-                       hotpotatoes[hottop].bid = 
stk->stk[getArg(pci,i)].val.bval;
-                       hotpotatoes[hottop].claim = total;
-#ifdef DEBUG_MEMORY_CLAIM
-                       if ( total ) {
-                               str cv = NULL;
-                               ATOMformat(TYPE_bat, &hotpotatoes[hottop].bid, 
&cv);
-                               mnstr_printf(GDKout,"#DFLOWhotpotatoes[%d] 
claims [%s]" LLFMT "\n", hottop, cv, total);
-                               GDKfree(cv);
-                       }
-#endif
-                       hottop++;
-                       action++;
-                       mal_unset_lock(mal_contextLock, "DFLOWdelay");
-               }
-       }
-       updMemoryUsedPart(stk,pci, pci->retc,pci->argc,argclaim);
-#ifdef DEBUG_MEMORY_CLAIM
-       if ( total && action )
-               mnstr_printf(GDKout,"#DFLOWhotpotatoes pool " LLFMT " used " 
LLFMT "\n", memorypool, memoryused);
-#endif
-}
-
-int
-DFLOWadmission(lng argclaim, lng retclaim, lng hotclaim)
-{
-       lng hot;
        /* optimistically set memory */
-       if ( argclaim + retclaim == 0)
+       if ( argclaim == 0)
                return 0;
+       if( argclaim >= 0)
+               return 0;  /* admission control not used */
 
        mal_set_lock(mal_contextLock, "DFLOWdelay");
        if (memorypool <= 0 && memoryclaims == 0) {
                        memorypool = (lng) (MEMORY_THRESHOLD * monet_memory);
-                       hottop = 0;
        }
 
-       if ( argclaim + retclaim > 0 ) {
-               if (memoryclaims == 0 || memorypool - memoryused > argclaim + 
retclaim -hotclaim){
-                       hot = getHotClaim();
-                       if ( memoryclaims && hotclaim == 0  && argclaim + 
retclaim > memorypool - memoryused - hot) {
-                               /* don't start unless you can eat the hot 
potatoes first */
-#ifdef DEBUG_MEMORY_CLAIM
-                               mnstr_printf(GDKerr,"#Delayed due to hot 
potatoes pool " LLFMT " used " LLFMT " hot " LLFMT "\n", memorypool, 
memoryused,hot);
-#endif
-                               mal_unset_lock(mal_contextLock, "DFLOWdelay");
-                               return -1;
-                       }
-                       memorypool -= (argclaim + retclaim);
+       if ( argclaim > 0 ) {
+               if (memoryclaims == 0 || memorypool - memoryused > argclaim + 
hotclaim){
+                       memorypool -= (argclaim + hotclaim);
                        memoryclaims ++;
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to