Changeset: d27d5a0d4600 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d27d5a0d4600
Modified Files:
        monetdb5/extras/dvf/dvf.c
        monetdb5/extras/dvf/dvf.h
        monetdb5/extras/dvf/opt_dvf.c
Branch: DVframework
Log Message:

mounter: added iterative mounts.


diffs (284 lines):

diff --git a/monetdb5/extras/dvf/dvf.c b/monetdb5/extras/dvf/dvf.c
--- a/monetdb5/extras/dvf/dvf.c
+++ b/monetdb5/extras/dvf/dvf.c
@@ -7,11 +7,6 @@
 #include "dvf.h"
 #include "mal_interpreter.h"
 
-#define NUM_RET_MOUNT 4
-
-int get_column_num(str schema_name, str table_name, str column_name);
-int get_column_type(str schema_name, str table_name, int column_num);
-
 int get_column_type(str schema_name, str table_name, int column_num)
 {
        if(strcmp(schema_name, "mseed") != 0 || strcmp(table_name, "data") != 0)
diff --git a/monetdb5/extras/dvf/dvf.h b/monetdb5/extras/dvf/dvf.h
--- a/monetdb5/extras/dvf/dvf.h
+++ b/monetdb5/extras/dvf/dvf.h
@@ -17,6 +17,10 @@
 #include "mal_function.h"
 #include "opt_prelude.h"
 
+#define NUM_RET_MOUNT 4
+
+int get_column_num(str schema_name, str table_name, str column_name);
+int get_column_type(str schema_name, str table_name, int column_num);
 dvf_export str plan_modifier(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr p);
 
 /* TODO: What is the following line? */
diff --git a/monetdb5/extras/dvf/opt_dvf.c b/monetdb5/extras/dvf/opt_dvf.c
--- a/monetdb5/extras/dvf/opt_dvf.c
+++ b/monetdb5/extras/dvf/opt_dvf.c
@@ -92,6 +92,7 @@
 
 #include "monetdb_config.h"
 #include "opt_dvf.h"
+#include "dvf.h"
 #include "mal_interpreter.h"
 #include "opt_statistics.h"
 
@@ -100,19 +101,22 @@ OPTdvfImplementation(Client cntxt, MalBl
 {
        //TODO: Replace these with a proper (global) constants
        str sys_schema_name = "sys";
+       str schema_name = "mseed";
        str data_table_identifier = "data";
        str file_location_identifier = "file_location";
+       str mountRef = putName("mount", 5);
+       str miniseedRef = putName("miniseed", 8);
        str dvfRef = putName("dvf", 3);
        str planmodifierRef = putName("plan_modifier", 13);
 
        //states of finding the pattern
-       int state = 0; //0: start, 1:found v3, 2:found v5, 3:found v6;
+       int state = 0; //0: start, 1:found v3, 2:found v5, 3:done with 
injection;
 
        //state variables (instruction index) numbered with state
        int i1 = 0, i2 = 0;
 
-       InstrPtr *old = NULL, q = NULL, r = NULL, s = NULL, t = NULL, *ps_iter 
= NULL;
-       int i, limit, actions = 0;
+       InstrPtr *old = NULL, q = NULL, r = NULL, s = NULL, t = NULL, b = NULL, 
m = NULL, e = NULL, *ps_iter = NULL;
+       int i, limit, which_column, actions = 0;
 
        stk = stk; //to escape 'unused' parameter error.
        pci = pci; //to escape 'unused' parameter error.
@@ -125,11 +129,10 @@ OPTdvfImplementation(Client cntxt, MalBl
        old = mb->stmt;
        limit= mb->stop;
        
-       ps_iter = ps_iter;
        printf("mode:%d\n", mode);
        
        /* iterate over the instructions of the input MAL program */
-       for (i = 1; i < limit; i++) /* the plan signature can be skipped safely 
*/
+       for (i = 1; i < limit; i++, limit = mb->stop) /* the plan signature can 
be skipped safely */
        {
                InstrPtr p = old[i];
 
@@ -142,7 +145,8 @@ OPTdvfImplementation(Client cntxt, MalBl
                        p->retc == 1 &&
                        strcmp(getVarConstant(mb, getArg(p, 2)).val.sval, 
sys_schema_name) != 0 &&
                        strstr(getVarConstant(mb, getArg(p, 3)).val.sval, 
data_table_identifier) == NULL &&
-                       strcmp(getVarConstant(mb, getArg(p, 4)).val.sval, 
file_location_identifier) == 0)
+                       strcmp(getVarConstant(mb, getArg(p, 4)).val.sval, 
file_location_identifier) == 0 &&
+                       state <= 3)
                {
                        i1 = i;
                        state = 1;
@@ -186,7 +190,7 @@ OPTdvfImplementation(Client cntxt, MalBl
                /* check for
                 * v7 := sql.bind(..., schema_name, data_table_name, ..., ...);
                 */
-               else if((state == 1 || state == 2) &&
+               else if((state == 1 || state == 2 || state == 3) &&
                        getModuleId(p) == sqlRef &&
                        getFunctionId(p) == bindRef &&
                        p->argc == 6 &&
@@ -206,7 +210,134 @@ OPTdvfImplementation(Client cntxt, MalBl
                                        
                                        if(mode == 1)
                                        {
+                                               /* v7:bat[:oid,:int] := 
bat.new(:oid,:int); #BAT for file_id in data
+                                                * v8:bat[:oid,:int] := 
bat.new(:oid,:int); #BAT for seq_no in data
+                                                * v9:bat[:oid,:int] := 
bat.new(:oid,:timestamp); #BAT for sample_time in data
+                                                * ... #BAT for sample_value in 
data
+                                                * (t1, t2) := group.done(v6);
+                                                *  t3 := bat.mirror(t1);
+                                                *  t4 := algebra.leftjoin(t3, 
v6);
+                                                * barrier (o, fileLocation) := 
iterator.new(t4);
+                                                * (v71:bat[:oid,:int], 
v81:bat[:oid,:int], v91:bat[:oid,:timestamp], ...) 
:=miniseed.mount(fileLocation);
+                                                * v7 := sql.append(v7,v71);
+                                                * v8 := sql.append(v8,v81);
+                                                * v9 := sql.append(v9,v91);
+                                                * ...
+                                                * redo (o, fileLocation) := 
iterator.next(t4);
+                                                * exit (o,fileLocation);
+                                                */
                                                
+                                               int a = 0, type;
+                                               ps_iter = 
(InstrPtr*)GDKmalloc(2*NUM_RET_MOUNT*sizeof(InstrPtr)); /* for the bat.new and 
bat.append commands. */
+                                               
+                                               /* create group.done 
instruction */
+                                               q = newInstruction(mb, 
ASSIGNsymbol);
+                                               setModuleId(q, groupRef);
+                                               setFunctionId(q, doneRef);
+                                               q = pushReturn(mb, q, 
newTmpVariable(mb, TYPE_bat));
+                                               q = pushReturn(mb, q, 
newTmpVariable(mb, TYPE_bat));
+                                               q = pushArgument(mb, q, 
getArg(old[i2], 0));
+                                               
+                                               
+                                               /* create bat.mirror 
instruction */
+                                               s = newInstruction(mb, 
ASSIGNsymbol);
+                                               setModuleId(s, batRef);
+                                               setFunctionId(s, mirrorRef);
+                                               s = pushReturn(mb, s, 
newTmpVariable(mb, TYPE_bat));
+                                               s = pushArgument(mb, s, 
getArg(q, 0));
+                                               
+                                               /* create algebra.leftjoin 
instruction */
+                                               t = newInstruction(mb, 
ASSIGNsymbol);
+                                               setModuleId(t, algebraRef);
+                                               setFunctionId(t, leftjoinRef);
+                                               t = pushReturn(mb, t, 
newTmpVariable(mb, TYPE_bat));
+                                               t = pushArgument(mb, t, 
getArg(s, 0));
+                                               t = pushArgument(mb, t, 
getArg(old[i2], 0));
+                                               
+                                               /* create barrier instruction */
+                                               b = newInstruction(mb, 
ASSIGNsymbol);
+                                               setModuleId(b, iteratorRef);
+                                               setFunctionId(b, newRef);
+                                               b->barrier = BARRIERsymbol;
+                                               b = pushReturn(mb, b, 
newTmpVariable(mb, TYPE_any)); /* o */
+                                               b = pushReturn(mb, b, 
newTmpVariable(mb, TYPE_any)); /* fileLocation iterator */
+                                               b = pushArgument(mb, b, 
getArg(t, 0));
+                                               
+                                               /* create redo instruction */
+                                               r = newInstruction(mb, 
ASSIGNsymbol);
+                                               setModuleId(r, iteratorRef);
+                                               setFunctionId(r, nextRef);
+                                               r->barrier = REDOsymbol;
+                                               r = pushReturn(mb, r, getArg(b, 
0)); /* o */
+                                               r = pushReturn(mb, r, getArg(b, 
1)); /* fileLocation iterator */
+                                               r = pushArgument(mb, r, 
getArg(t, 0));
+                                               
+                                               /* create mount instruction */
+                                               m = newInstruction(mb, 
ASSIGNsymbol);
+                                               setModuleId(m, miniseedRef);
+                                               setFunctionId(m, mountRef);
+                                               
+                                               for(; a < NUM_RET_MOUNT; a++)
+                                               {
+                                                       type = 
get_column_type(schema_name, data_table_identifier, a);
+                                                       if(type < 0)
+                                                       {
+                                                               
printf("dvf.get_column_num is not defined yet for schema: %s and table: %s and 
column: %d.\n", schema_name, data_table_identifier, a);
+                                                               return -1;
+                                                       }
+                                                       
+                                                       type = 
newBatType(TYPE_oid, type);
+                                                       
+                                                       /* create bat.new 
instructions */
+                                                       ps_iter[a] = 
newInstruction(mb, ASSIGNsymbol);
+                                                       setModuleId(ps_iter[a], 
batRef);
+                                                       
setFunctionId(ps_iter[a], newRef);
+                                                       ps_iter[a] = 
pushReturn(mb, ps_iter[a], newTmpVariable(mb, type)); /* v7, v8, ... */
+                                                       ps_iter[a] = 
pushType(mb, ps_iter[a], getHeadType(type));
+                                                       ps_iter[a] = 
pushType(mb, ps_iter[a], getTailType(type));
+                                                       
+                                                       /* push returns of 
mount instruction */
+                                                       m = pushReturn(mb, m, 
newTmpVariable(mb, type));
+                                                       
+                                                       /* create sql.append 
instructions */
+                                                       
ps_iter[a+NUM_RET_MOUNT] = newInstruction(mb, ASSIGNsymbol);
+                                                       
setModuleId(ps_iter[a+NUM_RET_MOUNT], batRef);
+                                                       
setFunctionId(ps_iter[a+NUM_RET_MOUNT], appendRef);
+                                                       
ps_iter[a+NUM_RET_MOUNT] = pushReturn(mb, ps_iter[a+NUM_RET_MOUNT], 
getArg(ps_iter[a], 0));
+                                                       
ps_iter[a+NUM_RET_MOUNT] = pushArgument(mb, ps_iter[a+NUM_RET_MOUNT], 
getArg(ps_iter[a], 0));
+                                                       
ps_iter[a+NUM_RET_MOUNT] = pushArgument(mb, ps_iter[a+NUM_RET_MOUNT], getArg(m, 
a));
+                                               }
+                                               
+                                               /* push arg of mount 
instruction */
+                                               m = pushArgument(mb, m, 
getArg(b, 1));
+                                               
+                                               /* create exit instruction */
+                                               e = newInstruction(mb, 
ASSIGNsymbol);
+                                               e->barrier = EXITsymbol;
+                                               e = pushReturn(mb, e, getArg(b, 
0)); /* o */
+                                               e = pushReturn(mb, e, getArg(b, 
1)); /* fileLocation iterator */
+                                               
+                                               /* insert the new instructions 
in pc i2+1 */
+                                               insertInstruction(mb, e, i2+1);
+                                               insertInstruction(mb, r, i2+1);
+                                               for(a = NUM_RET_MOUNT-1; a >= 
0; a--)
+                                               {
+                                                       insertInstruction(mb, 
ps_iter[a+NUM_RET_MOUNT], i2+1);
+                                               }
+                                               insertInstruction(mb, m, i2+1);
+                                               insertInstruction(mb, b, i2+1);
+                                               
+                                               insertInstruction(mb, t, i2+1);
+                                               insertInstruction(mb, s, i2+1);
+                                               insertInstruction(mb, q, i2+1);
+                                               
+                                               for(a = NUM_RET_MOUNT-1; a >= 
0; a--)
+                                               {
+                                                       insertInstruction(mb, 
ps_iter[a], i2+1);
+                                               }
+                                               
+                                               actions += 7 + NUM_RET_MOUNT * 
2;
+                                               state = 3;
                                                
                                        }
                                        else
@@ -243,23 +374,44 @@ OPTdvfImplementation(Client cntxt, MalBl
 
                                                /* create dvf.plan_modifier 
instruction */
                                                q = newInstruction(mb, 
ASSIGNsymbol);
-                                               q->argc = 3;
-                                               q->retc = 1;
                                                setModuleId(q, dvfRef);
                                                setFunctionId(q, 
planmodifierRef);
                                                q = pushReturn(mb, q, 
newTmpVariable(mb, TYPE_void));
-                                               getArg(q, 1) = getArg(p, 2);
-                                               getArg(q, 2) = getArg(t, 0);
+                                               q = pushArgument(mb, q, 
getArg(p, 2));
+                                               q = pushArgument(mb, q, 
getArg(t, 0));
 
-                                               /* insert the new instruction 
in pc i2+1 */
+                                               /* insert the new instructions 
in pc i2+1 */
                                                insertInstruction(mb, q, i2+1);
                                                insertInstruction(mb, t, i2+1);
                                                insertInstruction(mb, s, i2+1);
                                                insertInstruction(mb, r, i2+1);
 
-                                               actions++;
+                                               actions += 4;
                                                goto finish;
                                        }
+                                       
+                                       break;
+                               case 3:
+                                       /* injection is done. Now it is time to 
replace return bats of sql.binds for data table with appended bats */
+                                       /* mode should be 1 */
+                                       
+                                       which_column = 
get_column_num(schema_name, getVarConstant(mb, getArg(p, 3)).val.sval, 
+                                                                         
getVarConstant(mb, getArg(p, 4)).val.sval);
+                                       if(which_column < 0)
+                                       {
+                                               printf("dvf.get_column_num is 
not defined yet for schema: %s and table: %s and column: %s.",
+                                                     schema_name, 
getVarConstant(mb, getArg(p, 3)).val.sval, getVarConstant(mb, getArg(p, 
4)).val.sval);
+                                               return -1;
+                                       }
+                                       
+                                       r = newInstruction(mb, ASSIGNsymbol);
+                                       r = pushReturn(mb, r, getArg(p, 0));
+                                       r = pushArgument(mb, r, 
getArg(ps_iter[which_column+NUM_RET_MOUNT], 0));
+                                       
+                                       insertInstruction(mb, r, i+1);
+                                       removeInstruction(mb, p);
+                                       
+                                       actions += 2;
                        }
                }
        }
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to