Changeset: 02a87691020a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=02a87691020a
Modified Files:
        monetdb5/extras/dvf/dvf.c
        monetdb5/extras/dvf/dvf.mal
        monetdb5/extras/dvf/opt_dvf.mx
        monetdb5/mal/mal_interpreter.mx
        monetdb5/modules/kernel/algebra.mx
        monetdb5/optimizer/opt_mergetable.mx
        sql/backends/monet5/miniseed/miniseed.c
Branch: DVframework
Log Message:

Mergetable optimizer is able to run after modifying MAL plan in the execution 
time.
Fixed several bugs.


diffs (truncated from 456 to 300 lines):

diff --git a/monetdb5/extras/dvf/dvf.c b/monetdb5/extras/dvf/dvf.c
--- a/monetdb5/extras/dvf/dvf.c
+++ b/monetdb5/extras/dvf/dvf.c
@@ -57,14 +57,22 @@ str plan_modifier(Client cntxt, MalBlkPt
        str data_table_identifier = "data";
        str mountRef = putName("mount", 5);
        str miniseedRef = putName("miniseed", 8);
-//     str dvfRef = putName("dvf", 3);
-//     str planmodifierRef = putName("plan_modifier", 13);
+       str dvfRef = putName("dvf", 3);
+       str planmodifierRef = putName("plan_modifier", 13);
        
        InstrPtr *old = NULL, *mounts = NULL, q = NULL, r = NULL, s = NULL, o = 
NULL;
-       int i, limit, slimit, actions = 0;
+       int i, j, k, limit, slimit, actions = 0;
        int num_fl = 0;
        BUN b1 = 0, b2 = 0;
        
+       /* Declarations for copying of vars into stack and making a recursive 
runMALsequence call */
+       str msg = MAL_SUCCEED;
+       MalStkPtr stk_new;
+       ValPtr lhs, rhs;
+       int startpc = 1000000, old_vtop = 0;
+       MalBlkPtr copy_old, copy_mb;
+       bit is_stack_new = FALSE;
+       
        BAT *BAT_fl = NULL; //BAT for file_locations
        
        bit after_first_data_bind = FALSE;
@@ -74,6 +82,13 @@ str plan_modifier(Client cntxt, MalBlkPt
        
        BATiter fli;
        
+       VarRecord low, high;
+       
+       /* prepare to set low and high oids of return vars of mounts */
+       high.value.vtype= low.value.vtype= TYPE_oid;
+       high.value.val.oval = 0;
+       low.value.val.oval = 0;
+       
        /* check for logical error: mb must never be NULL */
        assert (mb != NULL);
        
@@ -101,12 +116,17 @@ str plan_modifier(Client cntxt, MalBlkPt
                throw(MAL, "dvf.plan_modifier", MAL_MALLOC_FAIL);
        
        /* save the old stage of the MAL block */
+       copy_mb = copyMalBlk(mb);
+       copy_old = mb;
+       mb = copy_mb;
+       
        old = mb->stmt;
        limit= mb->stop;
        slimit = mb->ssize;
        
+       
        /* initialize the statement list. Notice, the symbol table remains 
intact */
-       if (newMalBlkStmt(mb, mb->ssize) < 0)
+       if (newMalBlkStmt(mb, slimit) < 0)
                return 0;
        
        /* iterate over the instructions of the input MAL program, skip the 
dvf.plan_modifier itself. */
@@ -115,9 +135,19 @@ str plan_modifier(Client cntxt, MalBlkPt
                InstrPtr p = old[i];
                
                /* check for
+                * dvf.plan_modifier(...);
+                */
+               if(getModuleId(p) == dvfRef && 
+                       getFunctionId(p) == planmodifierRef)
+               {
+                       startpc = i;
+                       pushInstruction(mb, copyInstruction(old[i]));
+//                     save_pci = copyInstruction(old[i]);
+               }
+               /* check for
                 * v6 := sql.bind(..., schema_name, data_table_name, ..., ...);
                 */
-               if(getModuleId(p) == sqlRef && 
+               else if(getModuleId(p) == sqlRef && 
                        getFunctionId(p) == bindRef &&
                        p->argc == 6 &&
                        p->retc == 1 &&
@@ -155,10 +185,16 @@ str plan_modifier(Client cntxt, MalBlkPt
                                                if(type < 0)
                                                        throw(MAL, 
"dvf.get_column_num", "is not defined yet for schema: %s and table: %s and 
column: %s.", *schema_name, getVarConstant(mb, getArg(p, 3)).val.sval, 
getVarConstant(mb, getArg(p, 4)).val.sval);
                                                q = pushReturn(mb, q, 
newTmpVariable(mb, newBatType(TYPE_oid, type)));
+                                               varSetProp(mb, getArg(q, a), 
PropertyIndex("hlb"), op_gte, (ptr) &low.value);
+                                               varSetProp(mb, getArg(q, a), 
PropertyIndex("hub"), op_lt, (ptr) &high.value);
                                        }
                                        
                                        q = pushStr(mb, q, t);
                                        
+                                       low.value.val.oval += 1;
+                                       high.value.val.oval += 1;
+                                       
+                                       // copy the value of constant string 
from stack. Otherwise cannot reach the actual value
                                        
VALcopy(&stk->stk[q->argv[NUM_RET_MOUNT]], &getVarConstant(mb, getArg(q, 
NUM_RET_MOUNT)));
                                        
                                        mounts[which_fl] = q;
@@ -182,6 +218,7 @@ str plan_modifier(Client cntxt, MalBlkPt
                        setModuleId(r, matRef);
                        setFunctionId(r, newRef);
                        r = pushReturn(mb, r, newTmpVariable(mb, TYPE_any)); // 
push tmp var to pass to markH.
+//                     r = pushReturn(mb, r, getArg(p, 0));
                        which_column = get_column_num(*schema_name, 
getVarConstant(mb, getArg(p, 3)).val.sval, 
                                                      getVarConstant(mb, 
getArg(p, 4)).val.sval);
                        if(which_column < 0)
@@ -192,6 +229,8 @@ str plan_modifier(Client cntxt, MalBlkPt
                                r = pushArgument(mb, r, 
getArg(mounts[which_mount], which_column));
                        }
                        
+                       setVarInit(mb, getArg(r, 0));
+                       
                        // push the new instruction
                        pushInstruction(mb, r);
                        actions++;
@@ -207,13 +246,25 @@ str plan_modifier(Client cntxt, MalBlkPt
                        // push the new instruction
                        pushInstruction(mb, s);
                        actions++;
+//                     s = s;
+                       
+                       // comment out in the old for reusing the old
+                       copy_old->stmt[i]->token = REMsymbol;
                        
                }
                else
                {
                        // push instruction
-                       pushInstruction(mb, old[i]);
+                       pushInstruction(mb, copyInstruction(old[i]));
+                       
                        if (p->token == ENDsymbol) break;
+                       
+                       // comment out in the old for reusing the old
+                       if(i > startpc)
+                       {
+//                             old[i]->token = REMsymbol;
+                               copy_old->stmt[i]->token = REMsymbol;
+                       }
                }
        }
        /* We would like to retain everything from the ENDsymbol
@@ -222,79 +273,130 @@ str plan_modifier(Client cntxt, MalBlkPt
         */
        for(i++; i<limit; i++)
                if (old[i])
-                       pushInstruction(mb, old[i]);
+                       pushInstruction(mb, copyInstruction(old[i]));
        
-       /*
+       
+       /* save the vtop before calling any optimizer. */
+       old_vtop = mb->vtop;
+       
+       /* call necessary optimizers
         * 
*optimizer.inline();optimizer.remap();optimizer.evaluate();optimizer.costModel();optimizer.coercions();optimizer.emptySet();optimizer.aliases();
 
optimizer.mergetable();optimizer.deadcode();optimizer.commonTerms();optimizer.groups();optimizer.joinPath();optimizer.reorder();optimizer.deadcode();optimizer.reduce();optimizer.history();optimizer.multiplex();optimizer.accumulators();optimizer.garbageCollector();
         */
        
-       o = newFcnCall(mb, "optimizer", "inline");
+//     o = newFcnCall(mb, "optimizer", "inline");
+//     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
+//     o = newFcnCall(mb, "optimizer", "remap");
+//     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
+//     o = newFcnCall(mb, "optimizer", "evaluate");
+//     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
+//     o = newFcnCall(mb, "optimizer", "costModel");
+//     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
+//     o = newFcnCall(mb, "optimizer", "coercions");
+//     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
+//     o = newFcnCall(mb, "optimizer", "emptySet");
+//     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
+//     o = newFcnCall(mb, "optimizer", "aliases");
+//     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
+       o = newFcnCall(mb, "optimizer", "mergetable");
        typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-       o = newFcnCall(mb, "optimizer", "remap");
-       typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-       o = newFcnCall(mb, "optimizer", "evaluate");
-       typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-       o = newFcnCall(mb, "optimizer", "costModel");
-       typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-       o = newFcnCall(mb, "optimizer", "coercions");
-       typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-       o = newFcnCall(mb, "optimizer", "emptySet");
-       typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-       o = newFcnCall(mb, "optimizer", "aliases");
-       typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-//     o = newFcnCall(mb, "optimizer", "mergetable");
+//     o = newFcnCall(mb, "optimizer", "deadcode");
 //     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-       o = newFcnCall(mb, "optimizer", "deadcode");
-       typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-       o = newFcnCall(mb, "optimizer", "commonTerms");
-       typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-       o = newFcnCall(mb, "optimizer", "groups");
-       typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-       o = newFcnCall(mb, "optimizer", "joinPath");
-       typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
+//     o = newFcnCall(mb, "optimizer", "commonTerms");
+//     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
+//     o = newFcnCall(mb, "optimizer", "groups");
+//     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
+//     o = newFcnCall(mb, "optimizer", "joinPath");
+//     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
 //     o = newFcnCall(mb, "optimizer", "reorder");
 //     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-       o = newFcnCall(mb, "optimizer", "deadcode");
-       typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
+//     o = newFcnCall(mb, "optimizer", "deadcode");
+//     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
 //     o = newFcnCall(mb, "optimizer", "reduce");
 //     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-       o = newFcnCall(mb, "optimizer", "history");
-       typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-       o = newFcnCall(mb, "optimizer", "multiplex");
-       typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-       o = newFcnCall(mb, "optimizer", "accumulators");
-       typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
-       o = newFcnCall(mb, "optimizer", "garbageCollector");
-       typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
+//     o = newFcnCall(mb, "optimizer", "history");
+//     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
+//     o = newFcnCall(mb, "optimizer", "multiplex");
+//     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
+//     o = newFcnCall(mb, "optimizer", "accumulators");
+//     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
+//     o = newFcnCall(mb, "optimizer", "garbageCollector");
+//     typeChecker(cntxt->fdout, cntxt->nspace, mb, o, FALSE);
        optimizeMALBlock(cntxt, mb);
        //o = o;
        
 //     chkProgram(cntxt->fdout, cntxt->nspace, mb);
 //     printFunction(cntxt->fdout,mb, 0, LIST_MAL_EXPLAIN);
-               
+       
+       /* New variables might have been created by the optimizers, so their 
values has to be copied into the stack. However, there might not be enough 
space in stack for them. We cannot reallocate the stack, but we may create our 
own enlarged stack, then run the rest of the plan with our own stack. */
+       
+       // arrange the new stack without freeing the old one.
+       if (stk->stksize > mb->vsize)
+               stk_new = stk;
+       else
+       {
+               k = ((mb->vsize / STACKINCR) + 1) * STACKINCR;
+               stk_new = newGlobalStack(k);
+               memcpy(stk_new, stk, stackSize(stk->stksize));
+               stk_new->stksize = k;
+               is_stack_new = TRUE;
+       }
+       
+       //copy values into the new stack
+       for (j = 0; j < mb->vtop; j++) {
+               lhs = &stk_new->stk[j];
+               if (isVarConstant(mb, j) > 0) {
+                       if (!isVarDisabled(mb, j)) {
+                               rhs = &getVarConstant(mb, j);
+                               VALcopy(lhs, rhs);
+                       }
+               } else if (j > old_vtop) {
+                       lhs->vtype = getVarGDKType(mb, j);
+                       lhs->val.pval = 0;
+                       lhs->len = 0;
+               }
+       }
+       
+       // adjust variable lifetimes
+       malGarbageCollector(mb);
+       
+       // checks
+       //      chkTypes(cntxt->fdout, cntxt->nspace, mb, FALSE);
+       //      chkFlow(cntxt->fdout, mb);
+       //      chkDeclarations(cntxt->fdout, mb);
+       
+//     chkProgram(cntxt->fdout, cntxt->nspace, mb);
+//     printFunction(cntxt->fdout,mb, 0, LIST_MAL_EXPLAIN);
+       
+       // run rest of the plan
+       msg = runMALsequence(cntxt, mb, startpc+1, mb->stop, stk_new, stk_new, 
mb->stmt[startpc]);
+       
+//     msg = runMAL(cntxt, mb, startpc+1, mb, stk, save_pci);
+       
+       if(msg != MAL_SUCCEED)
+       {
+               throw(MAL, "dvf.plan_modifier", "From the recursive call: %s", 
msg);
+       }
+       
+       // restore the commented out old plan
+//     mb->stmt = old;
+       
+//     chkProgram(cntxt->fdout, cntxt->nspace, copy_old);
+//     printFunction(cntxt->fdout, copy_old, 0, LIST_MAL_EXPLAIN);
+       
        /* any remaining MAL instruction records are removed */
-       for(; i<slimit; i++)
+       for(i = 0; i<slimit; i++)
                if (old[i])
                        freeInstruction(old[i]);
                
        GDKfree(old);
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to