Changeset: 6e2a4528ff70 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6e2a4528ff70
Modified Files:
        
Branch: default
Log Message:

merge


diffs (truncated from 506 to 300 lines):

diff --git a/NT/winconfig.py b/NT/winconfig.py
--- a/NT/winconfig.py
+++ b/NT/winconfig.py
@@ -41,8 +41,6 @@ subs = [("@exec_prefix@", r'%prefix%'),
         ("@pkgincludedir@", r'%prefix%\include\@PACKAGE@'),
         ("@DIRSEP@", '\\'),
         ("@CROSS_COMPILING_FALSE@", ''),
-        ("@HAVE_CLIENTS_FALSE@", '#'),
-        ("@HAVE_MONETDB_FALSE@", '#'),
         ("@NATIVE_WIN32_FALSE@", '#'),
         ("@NOT_WIN32_FALSE@", ''),
         ("@PATHSEP@", ';')]
diff --git a/debian/monetdb5-sql.init.d b/debian/monetdb5-sql.init.d
--- a/debian/monetdb5-sql.init.d
+++ b/debian/monetdb5-sql.init.d
@@ -19,7 +19,7 @@ test -x $DAEMON || exit 0
 umask 022
 
 LOGDIR=/var/log/monetdb
-PIDFILE=/var/run/monetdb/$NAME.pid
+PIDFILE=/var/run/monetdb/merovingian.pid
 
 # Include monetdb5-sql defaults if available
 if [ -f /etc/default/monetdb5-sql ] ; then
diff --git a/monetdb5/optimizer/opt_dataflow.mx 
b/monetdb5/optimizer/opt_dataflow.mx
--- a/monetdb5/optimizer/opt_dataflow.mx
+++ b/monetdb5/optimizer/opt_dataflow.mx
@@ -130,10 +130,10 @@ simpleFlow(InstrPtr *old, int start, int
                        if( getArg(p,0) == getArg(q,j))
                                simple= TRUE;
                if( !simple)
-                       simple = getModuleId(p) == calcRef || getModuleId(p) == 
mtimeRef || getModuleId(p) == strRef || getModuleId(p)== mmathRef;
+                       simple = getModuleId(q) == calcRef || getModuleId(q) == 
mtimeRef || getModuleId(q) == strRef || getModuleId(q)== mmathRef;
+               else p = q;
                if( !simple)
                        return 0;
-               p = q;
        }
        return 1;
 }
diff --git a/monetdb5/optimizer/opt_partition.mx 
b/monetdb5/optimizer/opt_partition.mx
--- a/monetdb5/optimizer/opt_partition.mx
+++ b/monetdb5/optimizer/opt_partition.mx
@@ -213,6 +213,116 @@ OPTplanCntrl(Client cntxt, MalBlkPtr mb,
        return cmb;
 }
 
+static int
+OPTinitcode(Client cntxt, MalBlkPtr mb)
+{
+    InstrPtr p;
+    str s;
+    str l = NULL;
+
+    (void) cntxt;
+
+    /* _x := remote.connect(uri,"monetdb","monetdb","msql"); */
+    p = newStmt(mb, remoteRef,connectRef);
+    s = GDKgetenv("merovingian_uri");
+    if (s == NULL)  /* aparently not under Merovingian control, fall back to 
local only */
+               s= "dummyconnection";
+        /* SABAOTHgetLocalConnection(&l);*/
+    p= pushStr(mb,p, s == NULL ? l : s);
+    p= pushStr(mb,p,"monetdb");
+    p= pushStr(mb,p,"monetdb");
+    p= pushStr(mb,p,"msql");
+    if (l)
+        GDKfree(l);
+    return getArg(p,0);
+}
+
+static MalBlkPtr
+OPTpartitionStub(Client cntxt, MalBlkPtr mb, MalBlkPtr pmb)
+{
+       MalBlkPtr smb = 0;
+       Symbol s;
+       InstrPtr sig, q, ret;
+       int j,k,conn, *arg;
+       char nme[BUFSIZ];
+
+       /* define the sub query stub for remote processing */
+       snprintf(nme,BUFSIZ,"%s_stub",getFunctionId( getInstrPtr(mb,0)));
+       s = newFunction(userRef, putName(nme, strlen(nme)),FUNCTIONsymbol);
+       if ( s == NULL)
+               return 0;
+       freeMalBlk(s->def);
+       s->def = copyMalBlk(pmb);       /* get variables */
+       smb = s->def;
+       if ( newMalBlkStmt(smb,smb->ssize) < 0 )
+               return 0;
+       pushInstruction(smb, copyInstruction(pmb->stmt[0]));
+       getFunctionId( getInstrPtr(smb,0)) = putName(nme,strlen(nme));
+       insertSymbol(cntxt->nspace,s);
+
+       conn = OPTinitcode(cntxt,smb);
+       sig = getInstrPtr(smb,0);
+       arg = (int*) GDKzalloc(sizeof(int) * sig->argc);
+       /* k:= remote.put(conn,kvar) */
+       for (j= sig->retc; j < sig->argc; j++) {
+               q= newFcnCall(smb,remoteRef,putRef);
+               setVarType(smb, getArg(q,0), TYPE_str);
+               setVarUDFtype(smb, getArg(q,0));
+               q= pushArgument(smb,q,conn);
+               q= pushArgument(smb,q,getArg(sig,j));
+               arg[j]= getArg(q,0);
+       }
+
+       /* (k1,...kn):= remote.exec(conn,slicing,qry,version....) */
+       snprintf(nme,BUFSIZ,"%s_plan",getFunctionId( getInstrPtr(mb,0)));
+       q = newInstruction(smb,ASSIGNsymbol);
+       getModuleId(q) = remoteRef;
+       getFunctionId(q) = execRef;
+       q->retc=  q->argc= 0;
+       for (j=0; j < sig->retc; j++){
+               arg[j]= newTmpVariable(smb,TYPE_str);
+               q = pushReturn(smb,q,arg[j]);
+       }
+       q= pushArgument(smb,q,conn);
+       q= pushStr(smb,q,userRef);
+       q= pushStr(smb,q,putName(nme,strlen(nme)));
+       /* deal with all arguments ! */
+       for (j=sig->retc; j < sig->argc; j++)
+               q = pushArgument(smb,q,arg[j]);
+       pushInstruction(smb,q);
+
+       /* return exec_qry; */
+       ret = newInstruction(smb, ASSIGNsymbol);
+       ret->barrier= RETURNsymbol;
+       ret->argc = ret->retc = 0;
+       /* l:=remote.get(conn,k) */
+       for ( j=0; j< sig->retc; j++){
+               q= newFcnCall(smb,remoteRef,getRef);
+               q= pushArgument(smb,q,conn);
+               q= pushArgument(smb,q,arg[j]);
+               k= getArg(q,0);
+               setVarType(smb,k, getArgType(smb,sig,j));
+               setVarUDFtype(smb, k);
+               ret = pushArgument(smb,ret,k);
+               ret = pushReturn(smb,ret,getArg(sig,j));
+       }
+
+    newCatchStmt(smb, "ANYexception");
+    q = newStmt(smb, remoteRef, disconnectRef);
+    pushArgument(smb, q, conn);
+    newRaiseStmt(smb, "ANYexception");   /* pass to caller */
+    newExitStmt(smb, "ANYexception");
+
+    q = newStmt(smb, remoteRef, disconnectRef);
+    pushArgument(smb, q, conn);
+       if ( sig->retc)
+               pushInstruction(smb,ret);
+    pushEndInstruction(smb);
+
+       GDKfree(arg);
+       return smb;
+}
+
 /* prepare access to partitions by injection of the materialize instructions */
 static int
 OPTpreparePartition(MalBlkPtr nmb, InstrPtr p, Slices *slices, int pc)
@@ -313,22 +423,28 @@ OPTsliceColumn(Client cntxt, MalBlkPtr n
 #define BLOCKED 1
 #define REQUIRED 2
 #define SUPPORTIVE 3
+#define EXPORTED 4
+#define KEEPLOCAL 5
+
+#ifdef _DEBUG_OPT_PARTITION_ 
+static char *status[6]= {"", "blocked  ", "required ", "supportive ", 
"exported ", "keeplocal "};
+#endif
 
 static int 
 OPTplanFragment(Client cntxt, MalBlkPtr mb, Slices *slices)
 {
-       char *plan,*vars;
+       int *plan,*vars;
        int i, j, k, limit, last;
        InstrPtr ret, call, p, *old;
        Symbol s;
-       MalBlkPtr nmb, cmb;
+       MalBlkPtr nmb, cmb, smb;
        str msg;
        char nme[BUFSIZ];
 
-       plan = GDKzalloc(mb->ssize);
+       plan = GDKzalloc(mb->ssize * sizeof(int));
        if( plan == 0)
                return 0;
-       vars = GDKzalloc(mb->vsize);
+       vars = GDKzalloc(mb->vsize * sizeof(int));
        if( vars == 0){
                GDKfree(plan);
                return 0;
@@ -361,7 +477,7 @@ OPTplanFragment(Client cntxt, MalBlkPtr 
 
        /* Phase 1: determine all variables/instructions indirectly dependent 
on a fragmented column */
        last = limit;
-       for ( i = 0; i < limit ; i++) {
+       for ( i = 1; i < limit ; i++) {
                p = old[i];
                if ( p->token == ENDsymbol || i > last) {
                        plan[i] = REQUIRED;
@@ -370,59 +486,55 @@ OPTplanFragment(Client cntxt, MalBlkPtr 
                if ( getModuleId(p) == sqlRef && (getFunctionId(p) == bindRef 
|| getFunctionId(p) == bindidxRef) &&
                        strcmp(slices->schema, getVarConstant(mb, 
getArg(p,2)).val.sval) == 0 &&
                        strcmp(slices->table, getVarConstant(mb, 
getArg(p,3)).val.sval) == 0 ) {
-                       vars[getArg(p,0)] = REQUIRED;
                        plan[i] = REQUIRED;
-               } else
-               /* all arguments should be free to use in distributed setting */
-               for( j = p->retc; j < p->argc; j++)
-               if (vars[getArg(p,j)] == BLOCKED) 
-                       plan[i] = BLOCKED;
+               } 
 
                /* blocking instructions are those that require data exchange 
or total view */
                if (    getModuleId(p) == algebraRef && getFunctionId(p) == 
joinRef ) {
-                       if (vars[getArg(p,1)] == REQUIRED && vars[getArg(p,2)] 
== REQUIRED)  {
-                               /* not possible to delegate */
-                               plan[i] = BLOCKED;
-                       } else {
-                               /* other variable is supportive */
-                               if (vars[getArg(p,1)] != REQUIRED) 
-                                       vars[getArg(p,1)] = SUPPORTIVE; 
-                               if (vars[getArg(p,2)] != REQUIRED) 
-                                       vars[getArg(p,2)] = SUPPORTIVE; 
+                       /* be aware that supportive subqueries may produce 
pivot sets */
+                       /* this means we have to enforce the following */
+                       if ( vars[getArg(p,1)] == REQUIRED && vars[getArg(p,2)] 
!= REQUIRED  )
                                plan[i] = SUPPORTIVE;
-                       }
                } else
                if (    (getModuleId(p) == groupRef && (getFunctionId(p) == 
doneRef || getFunctionId(p) == newRef ||getFunctionId(p) == deriveRef) )  ||
                                getModuleId(p) == pqueueRef || getModuleId(p) 
== aggrRef || getModuleId(p) == ioRef ||
-                               (getModuleId(p) == sqlRef && (getFunctionId(p) 
== resultSetRef || getFunctionId(p) == putName("exportValue",11) )) ||
-                               (getModuleId(p) == algebraRef 
&&(getFunctionId(p) == sliceRef || getFunctionId(p)==markTRef)) )  {
-                       /* add the targets of its argument to the output */
+                               (getModuleId(p) == algebraRef 
&&(getFunctionId(p) == sliceRef || getFunctionId(p)==markTRef 
||getFunctionId(p) ==markHRef)) )  {
+                       /* this is blocking for partitioned columns, this can 
be achieved by propagating the REQUIRED property over variables */
+                       for( j = p->retc; j < p->argc; j++)
+                       if (vars[getArg(p,j)] == REQUIRED )
+                               break;
+                       if ( j != p->argc)
+                               plan[i] = BLOCKED;
+               } else
+               if (    (getModuleId(p) == sqlRef && (getFunctionId(p) == 
resultSetRef || getFunctionId(p) == putName("exportValue",11) ) )  ||
+                               getModuleId(p) == ioRef )
                        plan[i] = BLOCKED;
-               }
 
                if( plan[i] == BLOCKED){
                        for ( j= 0; j< p->retc; j++)
                                vars[getArg(p,j)] = BLOCKED;
                } else {
-                       for( j = 0; j < p->argc; j++)
-                       if (vars[getArg(p,j)] == REQUIRED ) 
+                       /* one blocking argument blocks the instruction */
+                       for( j = p->retc; j < p->argc; j++)
+                       if (vars[getArg(p,j)] == BLOCKED ) 
                                break;
-                       if ( j != p->argc)
+                       if ( j != p->argc )
+                               plan[i]= BLOCKED;
+
+                       /* one required then instruction is needed */
+                       k = 0;
+                       for( j = p->retc; j < p->argc; j++)
+                       if (vars[getArg(p,j)] == REQUIRED )
+                               k++;
+                       if (k && k == p->argc-p->retc && plan[i] != BLOCKED )
                                plan[i]= REQUIRED;
+                       else
+                       if ( k  && plan[i] != BLOCKED )
+                               plan[i] = REQUIRED;
 
-                       for( j = 0; j < p->argc; j++)
-                       if (vars[getArg(p,j)] == SUPPORTIVE ) 
-                               break;
-                       if ( j != p->argc && plan[i] != REQUIRED)
-                               plan[i]= SUPPORTIVE;
-
-                       if ( plan[i] == REQUIRED)
-                               for ( j= 0; j< p->argc; j++)
-                                       vars[getArg(p,j)] = REQUIRED;
-                       if ( plan[i] == SUPPORTIVE)
-                               for ( j= 0; j< p->argc; j++)
-                               if ( vars[getArg(p,j)] == 0)
-                                       vars[getArg(p,j)] = SUPPORTIVE;
+                       for ( j= 0; j< p->retc; j++)
+                       if ( vars[getArg(p,j)] == 0)
+                               vars[getArg(p,j)] = plan[i];
                }
        }
 
@@ -430,103 +542,118 @@ OPTplanFragment(Client cntxt, MalBlkPtr 
        mnstr_printf(cntxt->fdout,"\n#phase 1\n");
        for( i= 0; i< limit; i++)
        if (plan[i] ) {
-               switch (plan[i]) {
-               case BLOCKED:
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to