Changeset: 221ff088d289 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=221ff088d289
Added Files:
sql/backends/monet5/Tests/cfunction03.sql
Modified Files:
sql/backends/monet5/sql_cquery.c
Branch: trails
Log Message:
Store continuous UDFs results into the temporary stream tables craeted in the
register stage.
diffs (202 lines):
diff --git a/sql/backends/monet5/Tests/cfunction03.sql
b/sql/backends/monet5/Tests/cfunction03.sql
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/Tests/cfunction03.sql
@@ -0,0 +1,31 @@
+-- Test a continuous function returning a table
+create table results3 (aa int, bb text);
+
+create function cfunc3(input text) returns table (aa integer, bb text) begin
+ declare s int;
+ set s = 0;
+ while true do
+ set s = s + 1;
+ yield table (select s, input);
+ end while;
+end;
+
+start continuous function cfunc3('test') with heartbeat 1000 cycles 3;
+
+pause continuous cfunc3;
+
+create procedure cproc3() begin
+ insert into results3 (select aa, bb from tmp.cfunc3);
+end;
+
+start continuous procedure cproc3() with cycles 3;
+
+call cquery.wait(4000);
+
+stop continuous cfunc3;
+
+select aa, bb from results3;
+
+drop function cfunc3;
+drop procedure cproc3;
+drop table results3;
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -118,12 +118,9 @@ CQfree(Client cntxt, int idx)
//clean the baskets if so
cleanBaskets(idx);
if(cntxt && IS_UNION(pnet[idx].func)) {
- oid prevID;
- str prevUserName;
-
//change IDs
- prevID = cntxt->user;
- prevUserName = cntxt->username;
+ oid prevID = cntxt->user;
+ str prevUserName = cntxt->username;
cntxt->user = CQ_SCHEDULER_CLIENTID;
if(AUTHgetUsername(&cntxt->username, cntxt) == MAL_SUCCEED) {
backend* be = (backend*) cntxt->sqlcontext;
@@ -438,7 +435,7 @@ CQregister(Client cntxt, str sname, str
const char* err_message = (which & mod_continuous_function) ?
"function" : "procedure";
char* cq_id = NULL;
char buffer[IDLENGTH];
- int i, idx, varid, cid, freeMB = 0;
+ int i, idx, varid, cid, freeMB = 0, mvc_var = 0;
backend* be = (backend*) cntxt->sqlcontext;
mvc *m = be->mvc;
sql_schema *s = NULL, *tmp_schema = NULL;
@@ -514,7 +511,7 @@ CQregister(Client cntxt, str sname, str
}
found = f->func;
- if(found->res && list_length(found->res)) { //for functions we have to
store the results somewhere
+ if(found->res) { //for functions we have to store the results somewhere
if(IS_UNION(found)) { //if it is a UNION (returning a table),
we store it in a
oid prevID;
str prevUserName;
@@ -570,29 +567,33 @@ CQregister(Client cntxt, str sname, str
if((mb = newMalBlk(8)) == NULL) { //create MalBlk and initialize it
CQ_MALLOC_FAIL(finish)
}
- if((p = newInstruction(NULL, "user", cq_id)) == NULL) {
+ if((q = newInstruction(NULL, "user", cq_id)) == NULL) {
CQ_MALLOC_FAIL(finish)
}
- p->token = FUNCTIONsymbol;
- p->barrier = 0;
+ q->token = FUNCTIONsymbol;
+ q->barrier = 0;
if((varid = newVariable(mb, cq_id, strlen(cq_id), TYPE_void)) < 0) {
CQ_MALLOC_FAIL(finish)
}
- setDestVar(p, varid);
- pushInstruction(mb, p);
+ setDestVar(q, varid);
+ pushInstruction(mb, q);
+ setArgType(mb, q, 0, TYPE_void);
- if((q = newStmt(mb, sqlRef, transactionRef)) == NULL) {
+ if((q = newStmt(mb, sqlRef, transactionRef)) == NULL) { //transaction
reference
CQ_MALLOC_FAIL(finish)
}
- setArgType(mb,q, 0, TYPE_void);
- if ((p = newStmt(mb, "user", fname)) == NULL) { //add the UDF call
statement
+ if(found->res && IS_UNION(found)) { //add output basket
+ if((q = newStmt(mb, sqlRef, mvcRef)) == NULL) {
+ CQ_MALLOC_FAIL(finish)
+ }
+ setArgType(mb, q, 0, TYPE_int);
+ mvc_var = getDestVar(q);
+ }
+ if ((q = newStmt(mb, "user", fname)) == NULL) { //add the UDF call
statement
CQ_MALLOC_FAIL(finish)
}
- /*if (f->res && list_length(f->res)) {
- sql_subtype *res = f->res->h->data;
- setVarType(mb, getArg(q, 0), res->type->localtype);
- setVarUDFtype(mb, getArg(q, 0));
- }*/
+ if(found->res && IS_UNION(found))
+ q->argc = q->retc = 0;
for (i = 0, argn = found->ops->h; i < argc && argn; i++, argn =
argn->next) { //add variables to the MAL block
sql_subtype tpe = ((sql_arg *) argn->data)->type;
atom *a = args[i];
@@ -614,13 +615,80 @@ CQregister(Client cntxt, str sname, str
*val = dst;
/* make sure we return the correct type (not the storage type)
*/
val->vtype = tpe.type->localtype;
- p = pushValue(mb, p, val);
+ q = pushValue(mb, q, val);
if(val->vtype == TYPE_str) //if the input variable is of type
str we must free it
GDKfree(val->val.sval);
- if(p == NULL) {
+ if(q == NULL) {
CQ_MALLOC_FAIL(finish);
}
}
+
+ if(found->res && IS_UNION(found)) {
+ int except_var;
+ p = q;
+
+ q= newStmt(mb, basketRef, registerRef); //register the output
basket
+ q= pushArgument(mb, q, mvc_var);
+ getArg(q, 0) = mvc_var = newTmpVariable(mb, TYPE_int);
+ q= pushStr(mb, q, "tmp");
+ q= pushStr(mb, q, ralias);
+ q= pushInt(mb, q, 1);
+
+ q= newStmt(mb, basketRef, lockRef); //lock it
+ q= pushArgument(mb, q, mvc_var);
+ getArg(q, 0) = mvc_var = newTmpVariable(mb, TYPE_int);
+ q= pushStr(mb, q, "tmp");
+ q= pushStr(mb, q, ralias);
+
+ for (argn = found->res->h; argn; argn = argn->next) {
+ sql_arg* arg = (sql_arg *) argn->data;
+ sql_subtype tpe = ((sql_arg *) argn->data)->type;
+ int type = newBatType(tpe.type->localtype);
+ int nextbid = newTmpVariable(mb, type);
+ p = pushReturn(mb, p, nextbid);
+
+ q= newStmt(mb, basketRef, appendRef); //append to the
basket the output results of teh UDF
+ q= pushArgument(mb, q, mvc_var);
+ getArg(q, 0) = mvc_var = newTmpVariable(mb, TYPE_int);
+ q= pushStr(mb, q, "tmp");
+ q= pushStr(mb, q, ralias);
+ q= pushStr(mb, q, arg->name);
+ q= pushArgument(mb, q, nextbid);
+ }
+ q = newAssignment(mb);
+ except_var = getArg(q, 0) = newVariable(mb, "SQLexception", 12,
TYPE_str);
+ setVarUDFtype(mb, except_var);
+ q->barrier = CATCHsymbol;
+
+ q = newStmt(mb,basketRef, errorRef);
+ q = pushStr(mb, q, "user");
+ q = pushStr(mb, q, cq_id);
+ q = pushArgument(mb, q, except_var);
+
+ q = newAssignment(mb);
+ getArg(q, 0) = except_var;
+ q->barrier = EXITsymbol;
+
+ q = newAssignment(mb);
+ except_var = getArg(q, 0) = newVariable(mb, "MALexception", 12,
TYPE_str);
+ setVarUDFtype(mb, except_var);
+ q->barrier = CATCHsymbol;
+
+ q = newStmt(mb,basketRef, errorRef);
+ q = pushStr(mb, q, "user");
+ q = pushStr(mb, q, cq_id);
+ q = pushArgument(mb, q, except_var);
+
+ q = newAssignment(mb);
+ getArg(q, 0) = except_var;
+ q->barrier = EXITsymbol;
+
+ q= newStmt(mb, basketRef, unlockRef); //unlock basket in the end
+ q= pushArgument(mb, q, mvc_var);
+ q= pushStr(mb, q, "tmp");
+ q= pushStr(mb, q, ralias);
+ }
+
if((q = newStmt(mb, sqlRef, commitRef)) == NULL) {
CQ_MALLOC_FAIL(finish)
}
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list