Changeset: a71401bfa82e for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a71401bfa82e
Modified Files:
Branch: default
Log Message:
merge
diffs (truncated from 416 to 300 lines):
diff -r 9883d716b3d4 -r a71401bfa82e MonetDB5/src/optimizer/opt_tarantula.mx
--- a/MonetDB5/src/optimizer/opt_tarantula.mx Wed Sep 01 09:50:16 2010 +0200
+++ b/MonetDB5/src/optimizer/opt_tarantula.mx Wed Sep 01 09:51:55 2010 +0200
@@ -32,22 +32,40 @@
The Tarantula optimizer, like the Octopus optimizer, use the
output of the mitosis+mergetable optimizer and produces the
actual plans for parallel execution.
-They both break a single, large fact-table into pieces
-based on the head and propagate the effect through the plan.
+The tarantula untangles the query plan into a controlling head
+function and a series of subplans, one for each leg to execute concurrently.
-The tarantula untangles the resulting plan into a controlling head
-function and a series of plans, one for each leg to execute concurrently.
+The target for breaking the plan are the blocking operations,
+in particular mat.pack(). The flow graph leading to the
+the pack arguments are extracted from the query plan and
+each subgraph is cast into an independent plan.
+Since the query plan is a DAG, it is perfectly possible that
+a portion being extracted is shared amongst all legs.
+The naive extraction then leads to a re-calculation of
+shared intermediates in each leg.
+
+The subplan produces the argument to the blocking operator, whose
+result will be assembled in the head. It is also perfectly possible
+that variables assigned a value are used later on in the query graph.
+These variables are identified and one leg becomes responsible to
+return it also to the head to be used later on.
+
+The orginal pack operation is replaced by a call to a function
+to orchestrate the distributed processing and return the final
+result. Then the next pack operation is searched and its
+subgraph is derived. Again, it may share portions produced
+in the first pack subgraph.
+
+A potential more optimal scheme would be to detect each such case and
+turn it into a splitting point as well. This can be detected by
+looking for the last assignment and multiple use cases. [VARIANT TODO]
+
The allocation of a subplan to leg depends on a bidding scheme.
Bidding can not depend on BAT arguments, because that would cause
significant communication overhead. Scalar values could be used and
would function well in terms of using the recycler to get involved into
precise bidding.
-The Tarantula optimizer differs from the Octopus optimizer in performing
-a recursive divide and conquer method to deal with blocking operations.
-Furthermore, we assume that the persistent store is shared amongst
-the node, which means that fragments of the tables can be directly accessed.
-
A snippet of an tarantula plan with two legs is shown.
The main part of the query becomes a three step procedure of
1) remote registration of subplans, 2) obtaining bids and schedule design
@@ -193,7 +211,7 @@
} Peer;
#include "opt_mitosis.h"
-#define MINLEGSIZE 5 /* number of MAL instructions to consider for a leg */
+#define MINLEGSIZE 0 /* number of MAL instructions to consider for a leg */
#define MAXSHARE 64 /* number of input output arguments to consider
*/
#define MAXSITES MAXSLICES /* should become dynamic at some point */
Peer peers[MAXSITES]; /* registry of peer servers */
@@ -393,7 +411,7 @@
Likewise, look for variables that are produced by other legs and will become
input parameters.
@c
static MalBlkPtr
-TARmakeLeg(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, int idx, int
leg, int input[MAXSLICES][MAXSHARE], int output[MAXSLICES][MAXSHARE], InstrPtr
*list)
+TARmakeLeg(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, int last, int
limit, int idx, int leg, int input[MAXSLICES][MAXSHARE], int
output[MAXSLICES][MAXSHARE], InstrPtr *list,int *map)
{
MalBlkPtr tm = NULL;
InstrPtr p = NULL, sig;
@@ -404,7 +422,7 @@
assert(old[pc]->fcnname == packRef);
OPTDEBUGtarantula
- mnstr_printf(cntxt->fdout,"#create leg for %d %d %d\n",pc,idx
-old[pc]->retc, getArg(old[pc],idx));
+ mnstr_printf(cntxt->fdout,"#create leg %d for %d %d
%d\n",leg,pc,idx -old[pc]->retc, getArg(old[pc],idx));
@-
The leg should have enough instructions to warrant a distributed execution.
This should involve
a careful analysis of the instructions assembled. For the time being, we only
allow for a leg
@@ -422,13 +440,15 @@
return 0;
OPTDEBUGtarantula{
- mnstr_printf(cntxt->fdout,"#input ");
+ mnstr_printf(cntxt->fdout,"#input leg %d ",leg);
for(i=0; input[leg][i]; i++)
- mnstr_printf(cntxt->fdout,"%d, ", input[leg][i]);
+ mnstr_printf(cntxt->fdout,"%d(%d), ", input[leg][i],
map[input[leg][i]]);
mnstr_printf(cntxt->fdout,"\n#output ");
for(i=0; output[leg][i]; i++)
- mnstr_printf(cntxt->fdout,"%d, ", output[leg][i]);
+ mnstr_printf(cntxt->fdout,"%d(%d), ", output[leg][i],
map[output[leg][i]]);
mnstr_printf(cntxt->fdout,"\n");
+ for(i=top -1;i>=0; i--)
+ printInstruction(cntxt->fdout,mb,0,list[i], LIST_MAL_STMT);
}
alias= (int*) GDKzalloc(2 * mb->vtop * sizeof(int));
@@ -445,23 +465,26 @@
/* add the return variables */
for ( i = 1; output[leg][i]>0; i++){
- alias[output[leg][i]] = cloneVariable(tm,mb,output[leg][i]);
- sig = pushReturn(tm, sig, alias[output[leg][i]]);
+ alias[map[output[leg][i]]] =
cloneVariable(tm,mb,map[output[leg][i]]);
+ sig = pushReturn(tm, sig, alias[map[output[leg][i]]]);
setVarUDFtype(tm,getArg(sig,i));
+ OPTDEBUGtarantula
+ mnstr_printf(cntxt->fdout,"#map %d
->%d\n",output[leg][i], map[output[leg][i]]);
}
/* add the arguments from the query template */
for ( i = 0; input[leg][i]> 0; i++){
- alias[input[leg][i]] = cloneVariable(tm,mb,input[leg][i]);
- sig = pushArgument(tm, sig, alias[input[leg][i]]);
+ alias[map[input[leg][i]]] =
cloneVariable(tm,mb,map[input[leg][i]]);
+ sig = pushArgument(tm, sig, alias[map[input[leg][i]]]);
}
/* include the necessary functions */
for (top--; top >= 0; top--){
p = copyInstruction(list[top]);
for (i= 0; i< p->argc; i++){
- int a= getArg(p,i);
- if (alias[a]==0)
+ int a= map[getArg(p,i)];
+ if (alias[a]==0) {
alias[a] = cloneVariable(tm,mb,a);
+ }
getArg(p,i) = alias[a];
}
pushInstruction(tm, p);
@@ -475,8 +498,12 @@
for ( i = 0; i < sig->retc; i++)
p = pushReturn(tm,p, getArg(sig,i));
for ( i = 0; output[leg][i]>0; i++)
- p = pushArgument(tm,p, alias[output[leg][i]]);
+ p = pushArgument(tm,p, alias[map[output[leg][i]]]);
pushEndInstruction(tm);
+ for (i = last + 1; i < limit; i++)
+ if (old[i] && old[i]->token != REMsymbol)
+ newStmt(tm, getModuleId(old[i]), getFunctionId(old[i]));
+
clrDeclarations(tm);
chkProgram(cntxt->nspace,tm);
if ( tm->errors )
@@ -615,7 +642,7 @@
/* conn := tarantula.connect(node); */
q = newStmt(tm, tarantulaRef,connectRef);
conn= getArg(q,0);
- q = pushArgument(tm, q, getArg(sig,1));
+ q = pushArgument(tm, q, getArg(sig,sig->retc));
/* get addition arguments needed in a leg */
/* k:= remote.put(conn,kvar) */
@@ -686,12 +713,12 @@
Moreover, all legs may share variables.
@c
static void
-TARmakeRun(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, int nodes, int
legs, int input[MAXSLICES][MAXSHARE], int output[MAXSLICES][MAXSHARE])
+TARmakeRun(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, int limit, int
nodes, int legs, int input[MAXSLICES][MAXSHARE], int
output[MAXSLICES][MAXSHARE])
{
Symbol s;
MalBlkPtr tm;
char buf[BUFSIZ], fcn[BUFSIZ];
- InstrPtr sig, r, p,q;
+ InstrPtr ret,sig, r, p,q;
int j,k,l,x=0;
int *lmap;
@@ -702,10 +729,14 @@
setVarType(tm,getArg(tm->stmt[0],0), getVarType(mb,getArg(old[pc],0)));
setVarUDFtype(tm, getArg(tm->stmt[0],0));
+
+ ret= newInstruction(tm,ASSIGNsymbol);
+ ret->barrier = RETURNsymbol;
+ getArg(ret,0)= getArg(tm->stmt[0],0);
@-
Build a consolidated map for all input/output variables
@c
- lmap = (int*) GDKzalloc(mb->vtop);
+ lmap = (int*) GDKzalloc(2 * limit * sizeof(int));
/* include the remaining return variables */
/* is relies on the assumption that all output variables are disjoint */
@@ -714,6 +745,7 @@
if (lmap[output[l][j]] == 0) {
lmap[output[l][j]] = cloneVariable(tm, mb,
output[l][j]);
tm->stmt[0]= pushReturn(tm, tm->stmt[0],
lmap[output[l][j]]);
+ ret= pushReturn(tm,ret, lmap[output[l][j]]);
}
}
@@ -783,11 +815,7 @@
getArg(q,0)= x;
}
- r= newInstruction(tm,ASSIGNsymbol);
- r->barrier = RETURNsymbol;
- getArg(r,0)= getArg(sig,0);
- /* r = pushArgument(tm,r,k);*/
- pushInstruction(tm,r);
+ pushInstruction(tm,ret);
pushEndInstruction(tm);
clrDeclarations(tm);
@@ -833,7 +861,7 @@
InstrPtr q, p, pp, sig, *old;
int last=0, i, j, k, l, limit, actions=0, block = 0, fnd;
int leg=0, ta =0, vtop=0;
- MalBlkPtr legs[MAXSLICES];
+ MalBlkPtr tm;
char fcn[BUFSIZ];
int *map, *used, top;
int itop[MAXSLICES], input[MAXSLICES][MAXSHARE];
@@ -841,6 +869,7 @@
InstrPtr *list;
int *needed;
char *done;
+ Lifespan span;
if( cntxt == 0){
/* confuscate, delay for later activation */
@@ -861,6 +890,7 @@
The optimizer works by looking only to the mat.pack statement.
@c
(void) fixModule(cntxt->nspace,tarantulaRef);
+ span= newLifespan(mb);
limit = mb->stop;
old = mb->stmt;
@@ -870,10 +900,6 @@
return 0;
pushInstruction(mb, old[0]);
- memset((char*) itop, 0, sizeof(int)* MAXSLICES);
- memset((char*) input, 0, sizeof(int)* MAXSLICES * MAXSHARE);
- memset((char*) otop, 0, sizeof(int)* MAXSLICES);
- memset((char*) output, 0, sizeof(int)* MAXSLICES * MAXSHARE);
map= (int*) GDKzalloc(2 * vtop * sizeof(int));
used= (int*) GDKzalloc(2 * vtop * sizeof(int));
@@ -904,7 +930,11 @@
for this pack function. Some variables may have to be re-used
in subsequent calls.
@c
- for (leg =0, ta = p->retc; ta < p->argc; ta++,leg++) {
+ memset((char*) itop, 0, sizeof(int)* MAXSLICES);
+ memset((char*) input, 0, sizeof(int)* MAXSLICES *
MAXSHARE);
+ memset((char*) otop, 0, sizeof(int)* MAXSLICES);
+ memset((char*) output, 0, sizeof(int)* MAXSLICES *
MAXSHARE);
+ for (leg =0, ta = p->retc; leg < MAXSLICES && ta <
p->argc; ta++,leg++) {
list = (InstrPtr*) GDKzalloc(sizeof(InstrPtr) *
mb->ssize);
needed= (int*) GDKzalloc(mb->vtop*2 *
sizeof(int));
@@ -912,17 +942,17 @@
assert(needed);
top = 0;
- needed[getArg(p,ta)] = 1;
+ needed[map[getArg(p,ta)]] = 1;
output[leg][otop[leg]++]= getArg(p,ta);
/* find variables used outside leg scope */
/* find variables defined before by legs */
for (l = i-1; l > 0; l--){
pp = old[l];
- /* find variables needed later and not
mapped already */
+ /* find variables needed and not mapped
already */
fnd = 0;
for (j = 0; j < pp->retc; j++)
- fnd += (needed[getArg(pp,j)] &&
map[getArg(pp,j)] == getArg(pp,j)) || getArg(pp,j) < old[0]->argc;
+ fnd += needed[getArg(pp,j)] &&
map[getArg(pp,j)] == getArg(pp,j);
/* blocks are copied as is */
switch( pp->barrier ){
@@ -932,50 +962,83 @@
}
if ( block ){
for (j = 0; j < pp->argc; j++)
- needed[getArg(pp,j)]= 1;
+ needed[getArg(pp,j)] =
1;
fnd = pp->retc;
}
if ( fnd) { /* instruction has result
variables needed */
- for (j = pp->retc; j <
pp->argc; j++){
- if ( map[getArg(pp,j)]
!= getArg(pp,j) || ( needed[getArg(pp,j)]== 0 && getArg(pp,j) < old[0]->argc )
)
-
input[leg][itop[leg]++] = getArg(pp,j);
- needed[getArg(pp,j)]= 1;
- }
+ for (j = pp->retc; j <
pp->argc; j++)
+ needed[getArg(pp,j)] =
!isVarConstant(mb,getArg(pp,j));
list[top++] = pp;
}
}
/* for all variables assigned, check if they
are needed outside */
/* each variable is produced only once */
- for ( l=0; l< top; l++){
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list