Changeset: 7c1131b331bc for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7c1131b331bc
Modified Files:
MonetDB5/src/optimizer/opt_mitosis.mx
MonetDB5/src/optimizer/opt_tarantula.mx
Branch: default
Log Message:
Fixing the current status
Still struggling with proper propagation of the arguments to
all functions involved.
diffs (truncated from 513 to 300 lines):
diff -r c9012ea1987d -r 7c1131b331bc MonetDB5/src/optimizer/opt_mitosis.mx
--- a/MonetDB5/src/optimizer/opt_mitosis.mx Mon Aug 23 22:32:19 2010 +0200
+++ b/MonetDB5/src/optimizer/opt_mitosis.mx Mon Aug 30 23:41:31 2010 +0200
@@ -59,6 +59,7 @@
#include "opt_prelude.h"
#include "opt_support.h"
+#define MAXSLICES 256 /* to be refined */
@:exportOptimizer(mitosis)@
#define OPTDEBUGmitosis if ( optDebug & ((lng)1 <<DEBUG_OPT_MITOSIS) )
#endif
@@ -147,8 +148,8 @@
pieces = (int) (rowcnt /r+1);
if (pieces < GDKnr_threads )
pieces = GDKnr_threads;
- if (pieces > 256)
- pieces = 256; /* cut off potential plan explosion */
+ if (pieces > MAXSLICES)
+ pieces = MAXSLICES; /* cut off potential plan explosion
*/
if ( (size_t) rowcnt < (size_t) pieces || pieces <=1)
return 0;
diff -r c9012ea1987d -r 7c1131b331bc MonetDB5/src/optimizer/opt_tarantula.mx
--- a/MonetDB5/src/optimizer/opt_tarantula.mx Mon Aug 23 22:32:19 2010 +0200
+++ b/MonetDB5/src/optimizer/opt_tarantula.mx Mon Aug 30 23:41:31 2010 +0200
@@ -192,13 +192,14 @@
int inuse;
} Peer;
-opt_export int TARgetPeer(str uri);
-
+#include "opt_mitosis.h"
#define MINLEGSIZE 5 /* number of MAL instructions to consider for a leg */
-#define MAXSITES 2048 /* should become dynamic at some point */
+#define MAXSHARE 64 /* number of input output arguments to consider
*/
+#define MAXSITES MAXSLICES /* should become dynamic at some point */
Peer peers[MAXSITES]; /* registry of peer servers */
int TARnrpeers=0;
bte tarantulaLocal=0;
+opt_export int TARgetPeer(str uri);
@:exportOptimizer(tarantula)@
@@ -214,6 +215,7 @@
#include <mapilib/Mapi.h>
#include "remote.h"
#include "mal_sabaoth.h"
+
@-
The algorithm consists of several steps. The first one
replaces the original query and creates the leg functions.
@@ -385,13 +387,13 @@
return MAL_SUCCEED;
}
@-
-The TARmakeleg walks through the MAL block and extracts the dependent
structure for
+The TARmakeLeg walks through the MAL block and extracts the dependent
structure for
execution. Note that information van be recomputed in all legs. Possibly doing
duplicate work.
Keep track of the variables that will be used outside the leg. They have to be
exported.
Likewise, look for variables that are produced by other legs and will become
input parameters.
@c
static MalBlkPtr
-TARmakeleg(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, int idx, int
*input, int *output, InstrPtr *list)
+TARmakeLeg(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, int idx, int
leg, int input[MAXSLICES][MAXSHARE], int output[MAXSLICES][MAXSHARE], InstrPtr
*list)
{
MalBlkPtr tm = NULL;
InstrPtr p = NULL, sig;
@@ -421,11 +423,11 @@
OPTDEBUGtarantula{
mnstr_printf(cntxt->fdout,"#input ");
- for(i=0; input[i]; i++)
- mnstr_printf(cntxt->fdout,"%d, ", input[i]);
+ for(i=0; input[leg][i]; i++)
+ mnstr_printf(cntxt->fdout,"%d, ", input[leg][i]);
mnstr_printf(cntxt->fdout,"\n#output ");
- for(i=0; output[i]; i++)
- mnstr_printf(cntxt->fdout,"%d, ", output[i]);
+ for(i=0; output[leg][i]; i++)
+ mnstr_printf(cntxt->fdout,"%d, ", output[leg][i]);
mnstr_printf(cntxt->fdout,"\n");
}
alias= (int*) GDKzalloc(2 * mb->vtop * sizeof(int));
@@ -439,18 +441,18 @@
sig= getInstrPtr(tm,0);
setVarType(tm,getArg(sig,0), getArgType(mb,old[pc],idx));
setVarUDFtype(tm,getArg(sig,0));
- alias[output[0]] = getArg(sig,0);
+ alias[output[leg][0]] = getArg(sig,0);
/* add the return variables */
- for ( i = 1; output[i]>0; i++){
- alias[output[i]] = cloneVariable(tm,mb,output[i]);
- sig = pushReturn(tm, sig, alias[output[i]]);
+ for ( i = 1; output[leg][i]>0; i++){
+ alias[output[leg][i]] = cloneVariable(tm,mb,output[leg][i]);
+ sig = pushReturn(tm, sig, alias[output[leg][i]]);
setVarUDFtype(tm,getArg(sig,i));
}
/* add the arguments from the query template */
- for ( i = 0; input[i]> 0; i++){
- alias[input[i]] = cloneVariable(tm,mb,input[i]);
- sig = pushArgument(tm, sig, alias[input[i]]);
+ for ( i = 0; input[leg][i]> 0; i++){
+ alias[input[leg][i]] = cloneVariable(tm,mb,input[leg][i]);
+ sig = pushArgument(tm, sig, alias[input[leg][i]]);
}
/* include the necessary functions */
@@ -472,8 +474,8 @@
p->argc= 0;
for ( i = 0; i < sig->retc; i++)
p = pushReturn(tm,p, getArg(sig,i));
- for ( i = 0; output[i]>0; i++)
- p = pushArgument(tm,p, alias[output[i]]);
+ for ( i = 0; output[leg][i]>0; i++)
+ p = pushArgument(tm,p, alias[output[leg][i]]);
pushEndInstruction(tm);
clrDeclarations(tm);
chkProgram(cntxt->nspace,tm);
@@ -582,35 +584,33 @@
assembled using a pack and returned to the caller.
@c
static void
-TARmakeExecution(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, MalBlkPtr
leg)
+TARmakeStub(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, int idx, int
leg, int input[MAXSLICES][MAXSHARE], int output[MAXSLICES][MAXSHARE])
{
Symbol s;
MalBlkPtr tm;
char fcn[BUFSIZ];
- InstrPtr lsig,sig, r, q;
+ InstrPtr sig, r, q;
int conn,j,l;
int arg[1024];
/* generate stubb code for the remote execution */
- snprintf(fcn,BUFSIZ,"rmt_%s_%d", getFunctionId(getInstrPtr(mb,0)),
getArg(old[pc],0));
+ snprintf(fcn,BUFSIZ,"rmt_%s_%d_%d", getFunctionId(getInstrPtr(mb,0)),
getArg(old[pc],0),leg);
s= newFunction(tarantulaRef, putName(fcn,strlen(fcn)), FUNCTIONsymbol);
insertSymbol(findModule(cntxt->nspace,tarantulaRef),s);
tm= s->def;
sig = getInstrPtr(tm,0);
/* add the return values */
- lsig= getInstrPtr(leg,0);
- setVarType(tm, getArg(sig,0), getVarType(leg,getArg(lsig,0)));
- for ( j=1;j < lsig->retc; j++)
- sig= pushReturn(tm, sig, cloneVariable(tm, leg,
getArg(lsig,j)));
+ setVarType(tm, getArg(sig,0), getVarType(mb,getArg(old[pc],idx)));
+ setVarUDFtype(tm, getArg(sig,0));
+ for ( j=1; output[leg][j]; j++)
+ sig= pushReturn(tm, sig, cloneVariable(tm, mb, output[leg][j]));
/* get the input arguments */
sig = pushArgument(tm,sig,newVariable(tm,GDKstrdup("node"),TYPE_int));
- sig = pushArgument(tm,sig,newVariable(tm,GDKstrdup("fcn"),TYPE_str));
/* copy the query arguments */
- for ( j=lsig->retc; j < lsig->argc; j++)
- sig= pushArgument(tm, sig, cloneVariable(tm, leg,
getArg(lsig,j)));
-
+ for ( j=0; input[leg][j]; j++)
+ sig= pushArgument(tm, sig, cloneVariable(tm, mb,
input[leg][j]));
/* conn := tarantula.connect(node); */
q = newStmt(tm, tarantulaRef,connectRef);
@@ -620,7 +620,7 @@
/* get addition arguments needed in a leg */
/* k:= remote.put(conn,kvar) */
for (j= 0; j < sig->argc; j++)
- if ( j != sig->retc && j != sig->retc+1 ){
+ if ( j != sig->retc ){
q= newFcnCall(tm,remoteRef,putRef);
setVarType(tm, getArg(q,0), TYPE_str);
setVarUDFtype(tm, getArg(q,0));
@@ -630,15 +630,16 @@
}
/* (k1,...kn):= remote.exec(conn,tarantula,qry,version....) */
+ snprintf(fcn,BUFSIZ,"%s_%d_%d", getFunctionId(getInstrPtr(mb,0)),
getArg(old[pc],0),leg);
q= newFcnCall(tm,remoteRef,execRef);
q->retc= q->argc= 0;
for (j=0; j < sig->retc; j++)
q = pushReturn(tm,q,arg[j]);
q= pushArgument(tm,q,conn);
q= pushStr(tm,q,tarantulaRef);
- q= pushArgument(tm,q,getArg(sig,sig->retc+1));
+ q= pushStr(tm,q,putName(fcn,strlen(fcn)));
/* deal with all arguments ! */
- for (j=sig->retc+2; j < sig->argc; j++)
+ for (j=sig->retc+1; j < sig->argc; j++)
q = pushArgument(tm,q,arg[j]);
@@ -682,38 +683,56 @@
@-
The legs of the tarantula can be executed in parallel.
Watch out, the arguments should occupy the head of the stack.
+Moreover, all legs may share variables.
@c
static void
-TARmakeRun(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, int nodes, int
legs, int *input, int *output)
+TARmakeRun(Client cntxt, MalBlkPtr mb, InstrPtr *old, int pc, int nodes, int
legs, int input[MAXSLICES][MAXSHARE], int output[MAXSLICES][MAXSHARE])
{
Symbol s;
MalBlkPtr tm;
char buf[BUFSIZ], fcn[BUFSIZ];
InstrPtr sig, r, p,q;
int j,k,l,x=0;
+ int *lmap;
snprintf(fcn,BUFSIZ,"exe_%s_%d", getFunctionId(getInstrPtr(mb,0)),
getArg(old[pc],0));
s= newFunction(tarantulaRef, putName(fcn,strlen(fcn)), FUNCTIONsymbol);
insertSymbol(findModule(cntxt->nspace,tarantulaRef),s);
tm= s->def;
- sig = getInstrPtr(tm,0);
- setVarType(tm,getArg(sig,0), getVarType(mb,getArg(old[pc],0)));
-
+ setVarType(tm,getArg(tm->stmt[0],0), getVarType(mb,getArg(old[pc],0)));
+ setVarUDFtype(tm, getArg(tm->stmt[0],0));
+...@-
+Build a consolidated map for all input/output variables
+...@c
+ lmap = (int*) GDKzalloc(mb->vtop);
+
/* include the remaining return variables */
- for ( j=1;output[j]; j++)
- sig= pushReturn(tm, sig, cloneVariable(tm, mb, output[j]));
+ /* is relies on the assumption that all output variables are disjoint */
+ for ( l=0; l<legs; l++){
+ for ( j=1;output[l][j]; j++)
+ if (lmap[output[l][j]] == 0) {
+ lmap[output[l][j]] = cloneVariable(tm, mb,
output[l][j]);
+ tm->stmt[0]= pushReturn(tm, tm->stmt[0],
lmap[output[l][j]]);
+ }
+ }
/* add the execution nodes */
for( k=0 ; k<nodes; k++){
snprintf(buf,BUFSIZ,"node%d",k++);
- sig= pushArgument(tm, sig, newVariable(tm, GDKstrdup(buf),
TYPE_int));
+ tm->stmt[0]= pushArgument(tm, tm->stmt[0], newVariable(tm,
GDKstrdup(buf), TYPE_int));
}
/* input arguments */
- for ( j=0; input[j]; j++)
- sig = pushArgument(tm, sig, cloneVariable(tm,mb,input[j]));
+ for ( l=0; l<legs; l++){
+ for ( j=0; input[l][j]; j++)
+ if ( lmap[input[l][j]] == 0){
+ lmap[input[l][j]] = cloneVariable(tm, mb, input[l][j]);
+ tm->stmt[0] = pushArgument(tm, tm->stmt[0],
lmap[input[l][j]]);
+ }
+ }
+ sig= tm->stmt[0];
- /* initialize the return variables */
+ /* initialize all other return variables */
r= newAssignment(tm);
getArg(r,0)= getArg(sig,0);
pushNil(tm,r,getArgType(tm,sig,0));
@@ -730,21 +749,29 @@
setVarType(tm,x,TYPE_int);
}
- snprintf(fcn,BUFSIZ,"rmt_%s_%d", getFunctionId(getInstrPtr(mb,0)),
getArg(old[pc],0));
r= newInstruction(tm,ASSIGNsymbol);
setModuleId(r,matRef);
setFunctionId(r,packRef);
for( l=k=0; l<legs; l++, k=(k+1)%nodes){
- p = newStmt(tm,tarantulaRef,GDKstrdup(fcn));
+ snprintf(fcn,BUFSIZ,"rmt_%s_%d_%d",
getFunctionId(getInstrPtr(mb,0)), getArg(old[pc],0),l);
+ p = newInstruction(tm,ASSIGNsymbol);
+ setModuleId(p,tarantulaRef);
+ setFunctionId(p,putName(fcn,strlen(fcn)));
+ pushReturn(tm, p, cloneVariable(tm,mb,
getArg(old[pc],k+old[pc]->retc)));
+ setVarUDFtype(tm,getArg(p,0));
+
+ /* store the remaining output */
+ for ( j=1; output[l][j]; j++)
+ p = pushReturn(tm,p, lmap[output[l][j]]);
+
+ /* add the destination node and function*/
p = pushArgument(tm,p, getArg(sig, k + sig->retc));
-
snprintf(buf,BUFSIZ,"%s_%d_%d",getFunctionId(getInstrPtr(mb,0)),
getArg(old[pc],0), l);
- p = pushStr(tm, p, buf);
- setVarType(tm,getArg(p,0),
getArgType(mb,old[pc],k+old[pc]->retc));
- setVarUDFtype(tm,getArg(p,0));
- /* add the arguments from the query template */
- for( j =sig->retc+nodes; j<sig->argc; j++)
- p = pushArgument(tm, p, getArg(sig,j));
+
+ /* add the input arguments */
+ for ( j=0; input[l][j]; j++)
+ p = pushArgument(tm,p, lmap[input[l][j]]);
r= pushArgument(tm,r, getArg(p,0));
+ pushInstruction(tm,p);
}
k =getArg(r,0)= getArg(sig,0);
@@ -767,7 +794,7 @@
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list