Changeset: d27d5a0d4600 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d27d5a0d4600
Modified Files:
monetdb5/extras/dvf/dvf.c
monetdb5/extras/dvf/dvf.h
monetdb5/extras/dvf/opt_dvf.c
Branch: DVframework
Log Message:
mounter: added iterative mounts.
diffs (284 lines):
diff --git a/monetdb5/extras/dvf/dvf.c b/monetdb5/extras/dvf/dvf.c
--- a/monetdb5/extras/dvf/dvf.c
+++ b/monetdb5/extras/dvf/dvf.c
@@ -7,11 +7,6 @@
#include "dvf.h"
#include "mal_interpreter.h"
-#define NUM_RET_MOUNT 4
-
-int get_column_num(str schema_name, str table_name, str column_name);
-int get_column_type(str schema_name, str table_name, int column_num);
-
int get_column_type(str schema_name, str table_name, int column_num)
{
if(strcmp(schema_name, "mseed") != 0 || strcmp(table_name, "data") != 0)
diff --git a/monetdb5/extras/dvf/dvf.h b/monetdb5/extras/dvf/dvf.h
--- a/monetdb5/extras/dvf/dvf.h
+++ b/monetdb5/extras/dvf/dvf.h
@@ -17,6 +17,10 @@
#include "mal_function.h"
#include "opt_prelude.h"
+#define NUM_RET_MOUNT 4
+
+int get_column_num(str schema_name, str table_name, str column_name);
+int get_column_type(str schema_name, str table_name, int column_num);
dvf_export str plan_modifier(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr p);
/* TODO: What is the following line? */
diff --git a/monetdb5/extras/dvf/opt_dvf.c b/monetdb5/extras/dvf/opt_dvf.c
--- a/monetdb5/extras/dvf/opt_dvf.c
+++ b/monetdb5/extras/dvf/opt_dvf.c
@@ -92,6 +92,7 @@
#include "monetdb_config.h"
#include "opt_dvf.h"
+#include "dvf.h"
#include "mal_interpreter.h"
#include "opt_statistics.h"
@@ -100,19 +101,22 @@ OPTdvfImplementation(Client cntxt, MalBl
{
//TODO: Replace these with a proper (global) constants
str sys_schema_name = "sys";
+ str schema_name = "mseed";
str data_table_identifier = "data";
str file_location_identifier = "file_location";
+ str mountRef = putName("mount", 5);
+ str miniseedRef = putName("miniseed", 8);
str dvfRef = putName("dvf", 3);
str planmodifierRef = putName("plan_modifier", 13);
//states of finding the pattern
- int state = 0; //0: start, 1:found v3, 2:found v5, 3:found v6;
+ int state = 0; //0: start, 1:found v3, 2:found v5, 3:done with
injection;
//state variables (instruction index) numbered with state
int i1 = 0, i2 = 0;
- InstrPtr *old = NULL, q = NULL, r = NULL, s = NULL, t = NULL, *ps_iter
= NULL;
- int i, limit, actions = 0;
+ InstrPtr *old = NULL, q = NULL, r = NULL, s = NULL, t = NULL, b = NULL,
m = NULL, e = NULL, *ps_iter = NULL;
+ int i, limit, which_column, actions = 0;
stk = stk; //to escape 'unused' parameter error.
pci = pci; //to escape 'unused' parameter error.
@@ -125,11 +129,10 @@ OPTdvfImplementation(Client cntxt, MalBl
old = mb->stmt;
limit= mb->stop;
- ps_iter = ps_iter;
printf("mode:%d\n", mode);
/* iterate over the instructions of the input MAL program */
- for (i = 1; i < limit; i++) /* the plan signature can be skipped safely
*/
+ for (i = 1; i < limit; i++, limit = mb->stop) /* the plan signature can
be skipped safely */
{
InstrPtr p = old[i];
@@ -142,7 +145,8 @@ OPTdvfImplementation(Client cntxt, MalBl
p->retc == 1 &&
strcmp(getVarConstant(mb, getArg(p, 2)).val.sval,
sys_schema_name) != 0 &&
strstr(getVarConstant(mb, getArg(p, 3)).val.sval,
data_table_identifier) == NULL &&
- strcmp(getVarConstant(mb, getArg(p, 4)).val.sval,
file_location_identifier) == 0)
+ strcmp(getVarConstant(mb, getArg(p, 4)).val.sval,
file_location_identifier) == 0 &&
+ state <= 3)
{
i1 = i;
state = 1;
@@ -186,7 +190,7 @@ OPTdvfImplementation(Client cntxt, MalBl
/* check for
* v7 := sql.bind(..., schema_name, data_table_name, ..., ...);
*/
- else if((state == 1 || state == 2) &&
+ else if((state == 1 || state == 2 || state == 3) &&
getModuleId(p) == sqlRef &&
getFunctionId(p) == bindRef &&
p->argc == 6 &&
@@ -206,7 +210,134 @@ OPTdvfImplementation(Client cntxt, MalBl
if(mode == 1)
{
+ /* v7:bat[:oid,:int] :=
bat.new(:oid,:int); #BAT for file_id in data
+ * v8:bat[:oid,:int] :=
bat.new(:oid,:int); #BAT for seq_no in data
+ * v9:bat[:oid,:int] :=
bat.new(:oid,:timestamp); #BAT for sample_time in data
+ * ... #BAT for sample_value in
data
+ * (t1, t2) := group.done(v6);
+ * t3 := bat.mirror(t1);
+ * t4 := algebra.leftjoin(t3,
v6);
+ * barrier (o, fileLocation) :=
iterator.new(t4);
+ * (v71:bat[:oid,:int],
v81:bat[:oid,:int], v91:bat[:oid,:timestamp], ...)
:=miniseed.mount(fileLocation);
+ * v7 := sql.append(v7,v71);
+ * v8 := sql.append(v8,v81);
+ * v9 := sql.append(v9,v91);
+ * ...
+ * redo (o, fileLocation) :=
iterator.next(t4);
+ * exit (o,fileLocation);
+ */
+ int a = 0, type;
+ ps_iter =
(InstrPtr*)GDKmalloc(2*NUM_RET_MOUNT*sizeof(InstrPtr)); /* for the bat.new and
bat.append commands. */
+
+ /* create group.done
instruction */
+ q = newInstruction(mb,
ASSIGNsymbol);
+ setModuleId(q, groupRef);
+ setFunctionId(q, doneRef);
+ q = pushReturn(mb, q,
newTmpVariable(mb, TYPE_bat));
+ q = pushReturn(mb, q,
newTmpVariable(mb, TYPE_bat));
+ q = pushArgument(mb, q,
getArg(old[i2], 0));
+
+
+ /* create bat.mirror
instruction */
+ s = newInstruction(mb,
ASSIGNsymbol);
+ setModuleId(s, batRef);
+ setFunctionId(s, mirrorRef);
+ s = pushReturn(mb, s,
newTmpVariable(mb, TYPE_bat));
+ s = pushArgument(mb, s,
getArg(q, 0));
+
+ /* create algebra.leftjoin
instruction */
+ t = newInstruction(mb,
ASSIGNsymbol);
+ setModuleId(t, algebraRef);
+ setFunctionId(t, leftjoinRef);
+ t = pushReturn(mb, t,
newTmpVariable(mb, TYPE_bat));
+ t = pushArgument(mb, t,
getArg(s, 0));
+ t = pushArgument(mb, t,
getArg(old[i2], 0));
+
+ /* create barrier instruction */
+ b = newInstruction(mb,
ASSIGNsymbol);
+ setModuleId(b, iteratorRef);
+ setFunctionId(b, newRef);
+ b->barrier = BARRIERsymbol;
+ b = pushReturn(mb, b,
newTmpVariable(mb, TYPE_any)); /* o */
+ b = pushReturn(mb, b,
newTmpVariable(mb, TYPE_any)); /* fileLocation iterator */
+ b = pushArgument(mb, b,
getArg(t, 0));
+
+ /* create redo instruction */
+ r = newInstruction(mb,
ASSIGNsymbol);
+ setModuleId(r, iteratorRef);
+ setFunctionId(r, nextRef);
+ r->barrier = REDOsymbol;
+ r = pushReturn(mb, r, getArg(b,
0)); /* o */
+ r = pushReturn(mb, r, getArg(b,
1)); /* fileLocation iterator */
+ r = pushArgument(mb, r,
getArg(t, 0));
+
+ /* create mount instruction */
+ m = newInstruction(mb,
ASSIGNsymbol);
+ setModuleId(m, miniseedRef);
+ setFunctionId(m, mountRef);
+
+ for(; a < NUM_RET_MOUNT; a++)
+ {
+ type =
get_column_type(schema_name, data_table_identifier, a);
+ if(type < 0)
+ {
+
printf("dvf.get_column_num is not defined yet for schema: %s and table: %s and
column: %d.\n", schema_name, data_table_identifier, a);
+ return -1;
+ }
+
+ type =
newBatType(TYPE_oid, type);
+
+ /* create bat.new
instructions */
+ ps_iter[a] =
newInstruction(mb, ASSIGNsymbol);
+ setModuleId(ps_iter[a],
batRef);
+
setFunctionId(ps_iter[a], newRef);
+ ps_iter[a] =
pushReturn(mb, ps_iter[a], newTmpVariable(mb, type)); /* v7, v8, ... */
+ ps_iter[a] =
pushType(mb, ps_iter[a], getHeadType(type));
+ ps_iter[a] =
pushType(mb, ps_iter[a], getTailType(type));
+
+ /* push returns of
mount instruction */
+ m = pushReturn(mb, m,
newTmpVariable(mb, type));
+
+ /* create sql.append
instructions */
+
ps_iter[a+NUM_RET_MOUNT] = newInstruction(mb, ASSIGNsymbol);
+
setModuleId(ps_iter[a+NUM_RET_MOUNT], batRef);
+
setFunctionId(ps_iter[a+NUM_RET_MOUNT], appendRef);
+
ps_iter[a+NUM_RET_MOUNT] = pushReturn(mb, ps_iter[a+NUM_RET_MOUNT],
getArg(ps_iter[a], 0));
+
ps_iter[a+NUM_RET_MOUNT] = pushArgument(mb, ps_iter[a+NUM_RET_MOUNT],
getArg(ps_iter[a], 0));
+
ps_iter[a+NUM_RET_MOUNT] = pushArgument(mb, ps_iter[a+NUM_RET_MOUNT], getArg(m,
a));
+ }
+
+ /* push arg of mount
instruction */
+ m = pushArgument(mb, m,
getArg(b, 1));
+
+ /* create exit instruction */
+ e = newInstruction(mb,
ASSIGNsymbol);
+ e->barrier = EXITsymbol;
+ e = pushReturn(mb, e, getArg(b,
0)); /* o */
+ e = pushReturn(mb, e, getArg(b,
1)); /* fileLocation iterator */
+
+ /* insert the new instructions
in pc i2+1 */
+ insertInstruction(mb, e, i2+1);
+ insertInstruction(mb, r, i2+1);
+ for(a = NUM_RET_MOUNT-1; a >=
0; a--)
+ {
+ insertInstruction(mb,
ps_iter[a+NUM_RET_MOUNT], i2+1);
+ }
+ insertInstruction(mb, m, i2+1);
+ insertInstruction(mb, b, i2+1);
+
+ insertInstruction(mb, t, i2+1);
+ insertInstruction(mb, s, i2+1);
+ insertInstruction(mb, q, i2+1);
+
+ for(a = NUM_RET_MOUNT-1; a >=
0; a--)
+ {
+ insertInstruction(mb,
ps_iter[a], i2+1);
+ }
+
+ actions += 7 + NUM_RET_MOUNT *
2;
+ state = 3;
}
else
@@ -243,23 +374,44 @@ OPTdvfImplementation(Client cntxt, MalBl
/* create dvf.plan_modifier
instruction */
q = newInstruction(mb,
ASSIGNsymbol);
- q->argc = 3;
- q->retc = 1;
setModuleId(q, dvfRef);
setFunctionId(q,
planmodifierRef);
q = pushReturn(mb, q,
newTmpVariable(mb, TYPE_void));
- getArg(q, 1) = getArg(p, 2);
- getArg(q, 2) = getArg(t, 0);
+ q = pushArgument(mb, q,
getArg(p, 2));
+ q = pushArgument(mb, q,
getArg(t, 0));
- /* insert the new instruction
in pc i2+1 */
+ /* insert the new instructions
in pc i2+1 */
insertInstruction(mb, q, i2+1);
insertInstruction(mb, t, i2+1);
insertInstruction(mb, s, i2+1);
insertInstruction(mb, r, i2+1);
- actions++;
+ actions += 4;
goto finish;
}
+
+ break;
+ case 3:
+ /* injection is done. Now it is time to
replace return bats of sql.binds for data table with appended bats */
+ /* mode should be 1 */
+
+ which_column =
get_column_num(schema_name, getVarConstant(mb, getArg(p, 3)).val.sval,
+
getVarConstant(mb, getArg(p, 4)).val.sval);
+ if(which_column < 0)
+ {
+ printf("dvf.get_column_num is
not defined yet for schema: %s and table: %s and column: %s.",
+ schema_name,
getVarConstant(mb, getArg(p, 3)).val.sval, getVarConstant(mb, getArg(p,
4)).val.sval);
+ return -1;
+ }
+
+ r = newInstruction(mb, ASSIGNsymbol);
+ r = pushReturn(mb, r, getArg(p, 0));
+ r = pushArgument(mb, r,
getArg(ps_iter[which_column+NUM_RET_MOUNT], 0));
+
+ insertInstruction(mb, r, i+1);
+ removeInstruction(mb, p);
+
+ actions += 2;
}
}
}
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list