Changeset: 4150ed50638e for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=4150ed50638e
Modified Files:
        MonetDB5/src/optimizer/opt_tarantula.mx
Branch: default
Log Message:

TPCH explain for tarantula
All 22 tpch queries can be turned into tarantula plans
without crashing the system. Their output correctnes
need to be determined. Extended the documentation to
highlight the challenges.


diffs (truncated from 416 to 300 lines):

diff -r 5c5a96805c35 -r 4150ed50638e MonetDB5/src/optimizer/opt_tarantula.mx
--- a/MonetDB5/src/optimizer/opt_tarantula.mx   Tue Aug 31 16:37:03 2010 +0200
+++ b/MonetDB5/src/optimizer/opt_tarantula.mx   Wed Sep 01 09:25:27 2010 +0200
@@ -32,22 +32,40 @@
 The Tarantula optimizer, like the Octopus optimizer, use the
 output of the mitosis+mergetable optimizer and produces the
 actual plans for parallel execution.
-They both break a single, large fact-table into pieces
-based on the head and propagate the effect through the plan.
+The tarantula untangles the query plan into a controlling head
+function and a series of subplans, one for each leg to execute concurrently.
 
-The tarantula untangles the resulting plan into a controlling head
-function and a series of plans, one for each leg to execute concurrently.
+The target for breaking the plan are the blocking operations,
+in particular mat.pack(). The flow graph leading to the
+the pack arguments are extracted from the query plan and
+each subgraph is cast into an independent plan. 
+Since the query plan is a DAG, it is perfectly possible that
+a portion being extracted is shared amongst all legs.
+The naive extraction then leads to a re-calculation of 
+shared intermediates in each leg.
+
+The subplan produces the argument to the blocking operator, whose
+result will be assembled in the head. It is also perfectly possible
+that variables assigned a value are used later on in the query graph.
+These variables are identified and one leg becomes responsible to
+return it also to the head to be used later on.
+
+The orginal pack operation is replaced by a call to a function
+to orchestrate the distributed processing and return the final
+result. Then the next pack operation is searched and its
+subgraph is derived. Again, it may share portions produced
+in the first pack subgraph.
+
+A potential more optimal scheme would be to detect each such case and
+turn it into a splitting point as well. This can be detected by
+looking for the last assignment and multiple use cases. [VARIANT TODO]
+
 The allocation of a subplan to leg depends on a bidding scheme. 
 Bidding can not depend on BAT arguments, because that would cause 
 significant communication overhead. Scalar values could be used and
 would function well in terms of using the recycler to get involved into
 precise bidding. 
 
-The Tarantula optimizer differs from the Octopus optimizer in performing
-a recursive divide and conquer method to deal with blocking operations.
-Furthermore, we assume that the persistent store is shared amongst
-the node, which means that fragments of the tables can be directly accessed.
-
 A snippet of an tarantula plan with two legs is shown.
 The main part of the query becomes a three step procedure of
 1) remote registration of subplans, 2) obtaining bids and schedule design
@@ -193,7 +211,7 @@
 } Peer;
 
 #include "opt_mitosis.h"
-#define MINLEGSIZE 5   /* number of MAL instructions to consider for a leg */
+#define MINLEGSIZE 0   /* number of MAL instructions to consider for a leg */
 #define MAXSHARE 64            /* number of input output arguments to consider 
*/
 #define MAXSITES MAXSLICES   /* should become dynamic at some point */
 Peer peers[MAXSITES];    /* registry of peer servers */
@@ -393,7 +411,7 @@
 Likewise, look for variables that are produced by other legs and will become 
input parameters.
 @c
 static MalBlkPtr 
-TARmakeLeg(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, int idx, int 
leg, int input[MAXSLICES][MAXSHARE], int output[MAXSLICES][MAXSHARE], InstrPtr 
*list)
+TARmakeLeg(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, int last, int 
limit, int idx, int leg, int input[MAXSLICES][MAXSHARE], int 
output[MAXSLICES][MAXSHARE], InstrPtr *list,int *map)
 {
        MalBlkPtr tm = NULL;
        InstrPtr p = NULL, sig;
@@ -404,7 +422,7 @@
        assert(old[pc]->fcnname == packRef);
 
        OPTDEBUGtarantula 
-               mnstr_printf(cntxt->fdout,"#create leg for %d %d %d\n",pc,idx 
-old[pc]->retc, getArg(old[pc],idx));
+               mnstr_printf(cntxt->fdout,"#create leg %d for %d %d 
%d\n",leg,pc,idx -old[pc]->retc, getArg(old[pc],idx));
 @-
 The leg should have enough instructions to warrant a distributed execution. 
This should involve
 a careful analysis of the instructions assembled. For the time being, we only 
allow for a leg 
@@ -422,13 +440,15 @@
                return 0;
 
        OPTDEBUGtarantula{
-               mnstr_printf(cntxt->fdout,"#input ");
+               mnstr_printf(cntxt->fdout,"#input leg %d ",leg);
                for(i=0; input[leg][i]; i++)
-                       mnstr_printf(cntxt->fdout,"%d, ", input[leg][i]);
+                       mnstr_printf(cntxt->fdout,"%d(%d), ", input[leg][i], 
map[input[leg][i]]);
                mnstr_printf(cntxt->fdout,"\n#output ");
                for(i=0; output[leg][i]; i++)
-                       mnstr_printf(cntxt->fdout,"%d, ", output[leg][i]);
+                       mnstr_printf(cntxt->fdout,"%d(%d), ", output[leg][i], 
map[output[leg][i]]);
                mnstr_printf(cntxt->fdout,"\n");
+               for(i=top -1;i>=0; i--)
+               printInstruction(cntxt->fdout,mb,0,list[i], LIST_MAL_STMT);
        }
        alias= (int*) GDKzalloc(2 * mb->vtop * sizeof(int));
 
@@ -445,23 +465,26 @@
 
        /* add the return variables */
        for ( i = 1; output[leg][i]>0; i++){
-               alias[output[leg][i]] = cloneVariable(tm,mb,output[leg][i]);
-               sig = pushReturn(tm, sig, alias[output[leg][i]]);
+               alias[map[output[leg][i]]] = 
cloneVariable(tm,mb,map[output[leg][i]]);
+               sig = pushReturn(tm, sig, alias[map[output[leg][i]]]);
                setVarUDFtype(tm,getArg(sig,i));
+               OPTDEBUGtarantula
+                       mnstr_printf(cntxt->fdout,"#map %d 
->%d\n",output[leg][i], map[output[leg][i]]);
        }
        /* add the arguments from the query template */
        for ( i = 0; input[leg][i]> 0; i++){
-               alias[input[leg][i]] = cloneVariable(tm,mb,input[leg][i]);
-               sig = pushArgument(tm, sig, alias[input[leg][i]]);
+               alias[map[input[leg][i]]] = 
cloneVariable(tm,mb,map[input[leg][i]]);
+               sig = pushArgument(tm, sig, alias[map[input[leg][i]]]);
        }
 
        /* include the necessary functions */
        for (top--; top >= 0; top--){
                p = copyInstruction(list[top]);
                for (i= 0; i< p->argc; i++){
-                       int a= getArg(p,i);
-                       if (alias[a]==0) 
+                       int a= map[getArg(p,i)];
+                       if (alias[a]==0) {
                                alias[a] = cloneVariable(tm,mb,a);
+                       }
                        getArg(p,i) = alias[a];
                }
                pushInstruction(tm, p);
@@ -475,8 +498,12 @@
        for ( i = 0; i < sig->retc; i++)
                p = pushReturn(tm,p, getArg(sig,i));
        for ( i = 0; output[leg][i]>0; i++)
-               p = pushArgument(tm,p, alias[output[leg][i]]);
+               p = pushArgument(tm,p, alias[map[output[leg][i]]]);
        pushEndInstruction(tm);
+       for (i = last + 1; i < limit; i++)
+               if (old[i] && old[i]->token != REMsymbol)
+                       newStmt(tm, getModuleId(old[i]), getFunctionId(old[i]));
+
        clrDeclarations(tm);
        chkProgram(cntxt->nspace,tm);
        if ( tm->errors )
@@ -615,7 +642,7 @@
        /* conn := tarantula.connect(node); */
        q = newStmt(tm, tarantulaRef,connectRef);
        conn= getArg(q,0);
-       q = pushArgument(tm, q, getArg(sig,1));
+       q = pushArgument(tm, q, getArg(sig,sig->retc));
 
        /* get addition arguments needed in a leg */
        /* k:= remote.put(conn,kvar) */
@@ -686,12 +713,12 @@
 Moreover, all legs may share variables.
 @c
 static void
-TARmakeRun(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, int nodes, int 
legs, int input[MAXSLICES][MAXSHARE], int output[MAXSLICES][MAXSHARE])
+TARmakeRun(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, int limit, int 
nodes, int legs, int input[MAXSLICES][MAXSHARE], int 
output[MAXSLICES][MAXSHARE])
 {
        Symbol s;
        MalBlkPtr tm;
        char buf[BUFSIZ], fcn[BUFSIZ];
-       InstrPtr sig, r, p,q;
+       InstrPtr ret,sig, r, p,q;
        int j,k,l,x=0;
        int *lmap;
 
@@ -702,10 +729,14 @@
 
        setVarType(tm,getArg(tm->stmt[0],0), getVarType(mb,getArg(old[pc],0)));
        setVarUDFtype(tm, getArg(tm->stmt[0],0));
+
+       ret= newInstruction(tm,ASSIGNsymbol);
+       ret->barrier = RETURNsymbol;
+       getArg(ret,0)= getArg(tm->stmt[0],0);
 @-
 Build a consolidated map for all input/output variables
 @c
-       lmap = (int*) GDKzalloc(mb->vtop);
+       lmap = (int*) GDKzalloc(2 * limit * sizeof(int));
        
        /* include the remaining return variables */
        /* is relies on the assumption that all output variables are disjoint */
@@ -714,6 +745,7 @@
                if (lmap[output[l][j]] == 0) {
                        lmap[output[l][j]] = cloneVariable(tm, mb, 
output[l][j]);
                        tm->stmt[0]= pushReturn(tm, tm->stmt[0], 
lmap[output[l][j]]);
+                       ret= pushReturn(tm,ret, lmap[output[l][j]]);
                }
        }
 
@@ -783,11 +815,7 @@
                getArg(q,0)= x;
        }
 
-       r= newInstruction(tm,ASSIGNsymbol);
-       r->barrier = RETURNsymbol;
-       getArg(r,0)= getArg(sig,0);
-       /* r = pushArgument(tm,r,k);*/
-       pushInstruction(tm,r);
+       pushInstruction(tm,ret);
 
        pushEndInstruction(tm);
        clrDeclarations(tm);
@@ -833,7 +861,7 @@
        InstrPtr q, p, pp, sig, *old;
        int last=0, i, j, k, l, limit, actions=0, block = 0, fnd;
        int leg=0, ta =0, vtop=0;
-       MalBlkPtr legs[MAXSLICES];
+       MalBlkPtr tm;
        char fcn[BUFSIZ];
        int *map, *used, top;
        int itop[MAXSLICES], input[MAXSLICES][MAXSHARE];
@@ -841,6 +869,7 @@
        InstrPtr *list;
        int *needed;
        char *done;
+       Lifespan span;
 
        if( cntxt == 0){
                /* confuscate, delay for later activation */
@@ -861,6 +890,7 @@
 The optimizer works by looking only to the mat.pack statement.
 @c
        (void) fixModule(cntxt->nspace,tarantulaRef);
+       span= newLifespan(mb);
 
        limit = mb->stop;
        old = mb->stmt;
@@ -870,10 +900,6 @@
                return 0;
        pushInstruction(mb, old[0]);
 
-       memset((char*) itop, 0, sizeof(int)* MAXSLICES);
-       memset((char*) input, 0, sizeof(int)* MAXSLICES * MAXSHARE);
-       memset((char*) otop, 0, sizeof(int)* MAXSLICES);
-       memset((char*) output, 0, sizeof(int)* MAXSLICES * MAXSHARE);
 
        map= (int*) GDKzalloc(2 * vtop * sizeof(int));
        used= (int*) GDKzalloc(2 * vtop * sizeof(int));
@@ -904,7 +930,11 @@
 for this pack function. Some variables may have to be re-used
 in subsequent calls.
 @c
-                       for (leg =0, ta = p->retc; ta < p->argc; ta++,leg++) {
+                       memset((char*) itop, 0, sizeof(int)* MAXSLICES);
+                       memset((char*) input, 0, sizeof(int)* MAXSLICES * 
MAXSHARE);
+                       memset((char*) otop, 0, sizeof(int)* MAXSLICES);
+                       memset((char*) output, 0, sizeof(int)* MAXSLICES * 
MAXSHARE);
+                       for (leg =0, ta = p->retc; leg < MAXSLICES &&  ta < 
p->argc; ta++,leg++) {
                                list = (InstrPtr*) GDKzalloc(sizeof(InstrPtr) * 
mb->ssize);
                                needed= (int*) GDKzalloc(mb->vtop*2 * 
sizeof(int));
                                
@@ -912,17 +942,17 @@
                                assert(needed);
                                top = 0;
 
-                               needed[getArg(p,ta)] = 1;
+                               needed[map[getArg(p,ta)]] = 1;
                                output[leg][otop[leg]++]= getArg(p,ta);
 
                                /* find variables used outside leg scope */
                                /* find variables defined before by legs */
                                for (l = i-1; l > 0; l--){
                                        pp = old[l];
-                                       /* find variables needed later and not 
mapped already */
+                                       /* find variables needed and not mapped 
already */
                                        fnd = 0;
                                        for (j = 0; j < pp->retc; j++)
-                                               fnd += (needed[getArg(pp,j)] && 
map[getArg(pp,j)] == getArg(pp,j)) || getArg(pp,j) < old[0]->argc;
+                                               fnd += needed[getArg(pp,j)] && 
map[getArg(pp,j)] == getArg(pp,j);
 
                                        /* blocks are copied as is */
                                        switch( pp->barrier ){
@@ -932,50 +962,83 @@
                                        }
                                        if ( block ){
                                                for (j = 0; j < pp->argc; j++)
-                                                       needed[getArg(pp,j)]= 1;
+                                                       needed[getArg(pp,j)] = 
1;
                                                fnd = pp->retc;
                                        }
 
                                        if ( fnd) { /* instruction has result 
variables needed */
-                                               for (j = pp->retc; j < 
pp->argc; j++){
-                                                       if ( map[getArg(pp,j)] 
!= getArg(pp,j) || ( needed[getArg(pp,j)]== 0  && getArg(pp,j) < old[0]->argc ) 
)
-                                                               
input[leg][itop[leg]++] = getArg(pp,j);
-                                                       needed[getArg(pp,j)]= 1;
-                                               }
+                                               for (j = pp->retc; j < 
pp->argc; j++)
+                                                       needed[getArg(pp,j)] = 
!isVarConstant(mb,getArg(pp,j));
                                                list[top++] = pp;
                                        }
                                }
                                /* for all variables assigned, check if they 
are needed outside */
                                /* each variable is produced only once */
-                               for ( l=0; l< top; l++){
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to