Changeset: 6e2a4528ff70 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6e2a4528ff70
Modified Files:
Branch: default
Log Message:
merge
diffs (truncated from 506 to 300 lines):
diff --git a/NT/winconfig.py b/NT/winconfig.py
--- a/NT/winconfig.py
+++ b/NT/winconfig.py
@@ -41,8 +41,6 @@ subs = [("@exec_prefix@", r'%prefix%'),
("@pkgincludedir@", r'%prefix%\include\@PACKAGE@'),
("@DIRSEP@", '\\'),
("@CROSS_COMPILING_FALSE@", ''),
- ("@HAVE_CLIENTS_FALSE@", '#'),
- ("@HAVE_MONETDB_FALSE@", '#'),
("@NATIVE_WIN32_FALSE@", '#'),
("@NOT_WIN32_FALSE@", ''),
("@PATHSEP@", ';')]
diff --git a/debian/monetdb5-sql.init.d b/debian/monetdb5-sql.init.d
--- a/debian/monetdb5-sql.init.d
+++ b/debian/monetdb5-sql.init.d
@@ -19,7 +19,7 @@ test -x $DAEMON || exit 0
umask 022
LOGDIR=/var/log/monetdb
-PIDFILE=/var/run/monetdb/$NAME.pid
+PIDFILE=/var/run/monetdb/merovingian.pid
# Include monetdb5-sql defaults if available
if [ -f /etc/default/monetdb5-sql ] ; then
diff --git a/monetdb5/optimizer/opt_dataflow.mx
b/monetdb5/optimizer/opt_dataflow.mx
--- a/monetdb5/optimizer/opt_dataflow.mx
+++ b/monetdb5/optimizer/opt_dataflow.mx
@@ -130,10 +130,10 @@ simpleFlow(InstrPtr *old, int start, int
if( getArg(p,0) == getArg(q,j))
simple= TRUE;
if( !simple)
- simple = getModuleId(p) == calcRef || getModuleId(p) ==
mtimeRef || getModuleId(p) == strRef || getModuleId(p)== mmathRef;
+ simple = getModuleId(q) == calcRef || getModuleId(q) ==
mtimeRef || getModuleId(q) == strRef || getModuleId(q)== mmathRef;
+ else p = q;
if( !simple)
return 0;
- p = q;
}
return 1;
}
diff --git a/monetdb5/optimizer/opt_partition.mx
b/monetdb5/optimizer/opt_partition.mx
--- a/monetdb5/optimizer/opt_partition.mx
+++ b/monetdb5/optimizer/opt_partition.mx
@@ -213,6 +213,116 @@ OPTplanCntrl(Client cntxt, MalBlkPtr mb,
return cmb;
}
+static int
+OPTinitcode(Client cntxt, MalBlkPtr mb)
+{
+ InstrPtr p;
+ str s;
+ str l = NULL;
+
+ (void) cntxt;
+
+ /* _x := remote.connect(uri,"monetdb","monetdb","msql"); */
+ p = newStmt(mb, remoteRef,connectRef);
+ s = GDKgetenv("merovingian_uri");
+ if (s == NULL) /* aparently not under Merovingian control, fall back to
local only */
+ s= "dummyconnection";
+ /* SABAOTHgetLocalConnection(&l);*/
+ p= pushStr(mb,p, s == NULL ? l : s);
+ p= pushStr(mb,p,"monetdb");
+ p= pushStr(mb,p,"monetdb");
+ p= pushStr(mb,p,"msql");
+ if (l)
+ GDKfree(l);
+ return getArg(p,0);
+}
+
+static MalBlkPtr
+OPTpartitionStub(Client cntxt, MalBlkPtr mb, MalBlkPtr pmb)
+{
+ MalBlkPtr smb = 0;
+ Symbol s;
+ InstrPtr sig, q, ret;
+ int j,k,conn, *arg;
+ char nme[BUFSIZ];
+
+ /* define the sub query stub for remote processing */
+ snprintf(nme,BUFSIZ,"%s_stub",getFunctionId( getInstrPtr(mb,0)));
+ s = newFunction(userRef, putName(nme, strlen(nme)),FUNCTIONsymbol);
+ if ( s == NULL)
+ return 0;
+ freeMalBlk(s->def);
+ s->def = copyMalBlk(pmb); /* get variables */
+ smb = s->def;
+ if ( newMalBlkStmt(smb,smb->ssize) < 0 )
+ return 0;
+ pushInstruction(smb, copyInstruction(pmb->stmt[0]));
+ getFunctionId( getInstrPtr(smb,0)) = putName(nme,strlen(nme));
+ insertSymbol(cntxt->nspace,s);
+
+ conn = OPTinitcode(cntxt,smb);
+ sig = getInstrPtr(smb,0);
+ arg = (int*) GDKzalloc(sizeof(int) * sig->argc);
+ /* k:= remote.put(conn,kvar) */
+ for (j= sig->retc; j < sig->argc; j++) {
+ q= newFcnCall(smb,remoteRef,putRef);
+ setVarType(smb, getArg(q,0), TYPE_str);
+ setVarUDFtype(smb, getArg(q,0));
+ q= pushArgument(smb,q,conn);
+ q= pushArgument(smb,q,getArg(sig,j));
+ arg[j]= getArg(q,0);
+ }
+
+ /* (k1,...kn):= remote.exec(conn,slicing,qry,version....) */
+ snprintf(nme,BUFSIZ,"%s_plan",getFunctionId( getInstrPtr(mb,0)));
+ q = newInstruction(smb,ASSIGNsymbol);
+ getModuleId(q) = remoteRef;
+ getFunctionId(q) = execRef;
+ q->retc= q->argc= 0;
+ for (j=0; j < sig->retc; j++){
+ arg[j]= newTmpVariable(smb,TYPE_str);
+ q = pushReturn(smb,q,arg[j]);
+ }
+ q= pushArgument(smb,q,conn);
+ q= pushStr(smb,q,userRef);
+ q= pushStr(smb,q,putName(nme,strlen(nme)));
+ /* deal with all arguments ! */
+ for (j=sig->retc; j < sig->argc; j++)
+ q = pushArgument(smb,q,arg[j]);
+ pushInstruction(smb,q);
+
+ /* return exec_qry; */
+ ret = newInstruction(smb, ASSIGNsymbol);
+ ret->barrier= RETURNsymbol;
+ ret->argc = ret->retc = 0;
+ /* l:=remote.get(conn,k) */
+ for ( j=0; j< sig->retc; j++){
+ q= newFcnCall(smb,remoteRef,getRef);
+ q= pushArgument(smb,q,conn);
+ q= pushArgument(smb,q,arg[j]);
+ k= getArg(q,0);
+ setVarType(smb,k, getArgType(smb,sig,j));
+ setVarUDFtype(smb, k);
+ ret = pushArgument(smb,ret,k);
+ ret = pushReturn(smb,ret,getArg(sig,j));
+ }
+
+ newCatchStmt(smb, "ANYexception");
+ q = newStmt(smb, remoteRef, disconnectRef);
+ pushArgument(smb, q, conn);
+ newRaiseStmt(smb, "ANYexception"); /* pass to caller */
+ newExitStmt(smb, "ANYexception");
+
+ q = newStmt(smb, remoteRef, disconnectRef);
+ pushArgument(smb, q, conn);
+ if ( sig->retc)
+ pushInstruction(smb,ret);
+ pushEndInstruction(smb);
+
+ GDKfree(arg);
+ return smb;
+}
+
/* prepare access to partitions by injection of the materialize instructions */
static int
OPTpreparePartition(MalBlkPtr nmb, InstrPtr p, Slices *slices, int pc)
@@ -313,22 +423,28 @@ OPTsliceColumn(Client cntxt, MalBlkPtr n
#define BLOCKED 1
#define REQUIRED 2
#define SUPPORTIVE 3
+#define EXPORTED 4
+#define KEEPLOCAL 5
+
+#ifdef _DEBUG_OPT_PARTITION_
+static char *status[6]= {"", "blocked ", "required ", "supportive ",
"exported ", "keeplocal "};
+#endif
static int
OPTplanFragment(Client cntxt, MalBlkPtr mb, Slices *slices)
{
- char *plan,*vars;
+ int *plan,*vars;
int i, j, k, limit, last;
InstrPtr ret, call, p, *old;
Symbol s;
- MalBlkPtr nmb, cmb;
+ MalBlkPtr nmb, cmb, smb;
str msg;
char nme[BUFSIZ];
- plan = GDKzalloc(mb->ssize);
+ plan = GDKzalloc(mb->ssize * sizeof(int));
if( plan == 0)
return 0;
- vars = GDKzalloc(mb->vsize);
+ vars = GDKzalloc(mb->vsize * sizeof(int));
if( vars == 0){
GDKfree(plan);
return 0;
@@ -361,7 +477,7 @@ OPTplanFragment(Client cntxt, MalBlkPtr
/* Phase 1: determine all variables/instructions indirectly dependent
on a fragmented column */
last = limit;
- for ( i = 0; i < limit ; i++) {
+ for ( i = 1; i < limit ; i++) {
p = old[i];
if ( p->token == ENDsymbol || i > last) {
plan[i] = REQUIRED;
@@ -370,59 +486,55 @@ OPTplanFragment(Client cntxt, MalBlkPtr
if ( getModuleId(p) == sqlRef && (getFunctionId(p) == bindRef
|| getFunctionId(p) == bindidxRef) &&
strcmp(slices->schema, getVarConstant(mb,
getArg(p,2)).val.sval) == 0 &&
strcmp(slices->table, getVarConstant(mb,
getArg(p,3)).val.sval) == 0 ) {
- vars[getArg(p,0)] = REQUIRED;
plan[i] = REQUIRED;
- } else
- /* all arguments should be free to use in distributed setting */
- for( j = p->retc; j < p->argc; j++)
- if (vars[getArg(p,j)] == BLOCKED)
- plan[i] = BLOCKED;
+ }
/* blocking instructions are those that require data exchange
or total view */
if ( getModuleId(p) == algebraRef && getFunctionId(p) ==
joinRef ) {
- if (vars[getArg(p,1)] == REQUIRED && vars[getArg(p,2)]
== REQUIRED) {
- /* not possible to delegate */
- plan[i] = BLOCKED;
- } else {
- /* other variable is supportive */
- if (vars[getArg(p,1)] != REQUIRED)
- vars[getArg(p,1)] = SUPPORTIVE;
- if (vars[getArg(p,2)] != REQUIRED)
- vars[getArg(p,2)] = SUPPORTIVE;
+ /* be aware that supportive subqueries may produce
pivot sets */
+ /* this means we have to enforce the following */
+ if ( vars[getArg(p,1)] == REQUIRED && vars[getArg(p,2)]
!= REQUIRED )
plan[i] = SUPPORTIVE;
- }
} else
if ( (getModuleId(p) == groupRef && (getFunctionId(p) ==
doneRef || getFunctionId(p) == newRef ||getFunctionId(p) == deriveRef) ) ||
getModuleId(p) == pqueueRef || getModuleId(p)
== aggrRef || getModuleId(p) == ioRef ||
- (getModuleId(p) == sqlRef && (getFunctionId(p)
== resultSetRef || getFunctionId(p) == putName("exportValue",11) )) ||
- (getModuleId(p) == algebraRef
&&(getFunctionId(p) == sliceRef || getFunctionId(p)==markTRef)) ) {
- /* add the targets of its argument to the output */
+ (getModuleId(p) == algebraRef
&&(getFunctionId(p) == sliceRef || getFunctionId(p)==markTRef
||getFunctionId(p) ==markHRef)) ) {
+ /* this is blocking for partitioned columns, this can
be achieved by propagating the REQUIRED property over variables */
+ for( j = p->retc; j < p->argc; j++)
+ if (vars[getArg(p,j)] == REQUIRED )
+ break;
+ if ( j != p->argc)
+ plan[i] = BLOCKED;
+ } else
+ if ( (getModuleId(p) == sqlRef && (getFunctionId(p) ==
resultSetRef || getFunctionId(p) == putName("exportValue",11) ) ) ||
+ getModuleId(p) == ioRef )
plan[i] = BLOCKED;
- }
if( plan[i] == BLOCKED){
for ( j= 0; j< p->retc; j++)
vars[getArg(p,j)] = BLOCKED;
} else {
- for( j = 0; j < p->argc; j++)
- if (vars[getArg(p,j)] == REQUIRED )
+ /* one blocking argument blocks the instruction */
+ for( j = p->retc; j < p->argc; j++)
+ if (vars[getArg(p,j)] == BLOCKED )
break;
- if ( j != p->argc)
+ if ( j != p->argc )
+ plan[i]= BLOCKED;
+
+ /* one required then instruction is needed */
+ k = 0;
+ for( j = p->retc; j < p->argc; j++)
+ if (vars[getArg(p,j)] == REQUIRED )
+ k++;
+ if (k && k == p->argc-p->retc && plan[i] != BLOCKED )
plan[i]= REQUIRED;
+ else
+ if ( k && plan[i] != BLOCKED )
+ plan[i] = REQUIRED;
- for( j = 0; j < p->argc; j++)
- if (vars[getArg(p,j)] == SUPPORTIVE )
- break;
- if ( j != p->argc && plan[i] != REQUIRED)
- plan[i]= SUPPORTIVE;
-
- if ( plan[i] == REQUIRED)
- for ( j= 0; j< p->argc; j++)
- vars[getArg(p,j)] = REQUIRED;
- if ( plan[i] == SUPPORTIVE)
- for ( j= 0; j< p->argc; j++)
- if ( vars[getArg(p,j)] == 0)
- vars[getArg(p,j)] = SUPPORTIVE;
+ for ( j= 0; j< p->retc; j++)
+ if ( vars[getArg(p,j)] == 0)
+ vars[getArg(p,j)] = plan[i];
}
}
@@ -430,103 +542,118 @@ OPTplanFragment(Client cntxt, MalBlkPtr
mnstr_printf(cntxt->fdout,"\n#phase 1\n");
for( i= 0; i< limit; i++)
if (plan[i] ) {
- switch (plan[i]) {
- case BLOCKED:
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list