Changeset: 221ff088d289 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=221ff088d289
Added Files:
        sql/backends/monet5/Tests/cfunction03.sql
Modified Files:
        sql/backends/monet5/sql_cquery.c
Branch: trails
Log Message:

Store continuous UDFs results into the temporary stream tables craeted in the 
register stage.


diffs (202 lines):

diff --git a/sql/backends/monet5/Tests/cfunction03.sql 
b/sql/backends/monet5/Tests/cfunction03.sql
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/Tests/cfunction03.sql
@@ -0,0 +1,31 @@
+-- Test a continuous function returning a table
+create table results3 (aa int, bb text);
+
+create function cfunc3(input text) returns table (aa integer, bb text) begin
+    declare s int;
+    set s = 0;
+    while true do
+        set s = s + 1;
+        yield table (select s, input);
+    end while;
+end;
+
+start continuous function cfunc3('test') with heartbeat 1000 cycles 3;
+
+pause continuous cfunc3;
+
+create procedure cproc3() begin
+    insert into results3 (select aa, bb from tmp.cfunc3);
+end;
+
+start continuous procedure cproc3() with cycles 3;
+
+call cquery.wait(4000);
+
+stop continuous cfunc3;
+
+select aa, bb from results3;
+
+drop function cfunc3;
+drop procedure cproc3;
+drop table results3;
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -118,12 +118,9 @@ CQfree(Client cntxt, int idx)
        //clean the baskets if so
        cleanBaskets(idx);
        if(cntxt && IS_UNION(pnet[idx].func)) {
-               oid prevID;
-               str prevUserName;
-
                //change IDs
-               prevID = cntxt->user;
-               prevUserName = cntxt->username;
+               oid prevID = cntxt->user;
+               str prevUserName = cntxt->username;
                cntxt->user = CQ_SCHEDULER_CLIENTID;
                if(AUTHgetUsername(&cntxt->username, cntxt) == MAL_SUCCEED) {
                        backend* be = (backend*) cntxt->sqlcontext;
@@ -438,7 +435,7 @@ CQregister(Client cntxt, str sname, str 
        const char* err_message = (which & mod_continuous_function) ? 
"function" : "procedure";
        char* cq_id = NULL;
        char buffer[IDLENGTH];
-       int i, idx, varid, cid, freeMB = 0;
+       int i, idx, varid, cid, freeMB = 0, mvc_var = 0;
        backend* be = (backend*) cntxt->sqlcontext;
        mvc *m = be->mvc;
        sql_schema *s = NULL, *tmp_schema = NULL;
@@ -514,7 +511,7 @@ CQregister(Client cntxt, str sname, str 
        }
 
        found = f->func;
-       if(found->res && list_length(found->res)) { //for functions we have to 
store the results somewhere
+       if(found->res) { //for functions we have to store the results somewhere
                if(IS_UNION(found)) { //if it is a UNION (returning a table), 
we store it in a
                        oid prevID;
                        str prevUserName;
@@ -570,29 +567,33 @@ CQregister(Client cntxt, str sname, str 
        if((mb = newMalBlk(8)) == NULL) { //create MalBlk and initialize it
                CQ_MALLOC_FAIL(finish)
        }
-       if((p = newInstruction(NULL, "user", cq_id)) == NULL) {
+       if((q = newInstruction(NULL, "user", cq_id)) == NULL) {
                CQ_MALLOC_FAIL(finish)
        }
-       p->token = FUNCTIONsymbol;
-       p->barrier = 0;
+       q->token = FUNCTIONsymbol;
+       q->barrier = 0;
        if((varid = newVariable(mb, cq_id, strlen(cq_id), TYPE_void)) < 0) {
                CQ_MALLOC_FAIL(finish)
        }
-       setDestVar(p, varid);
-       pushInstruction(mb, p);
+       setDestVar(q, varid);
+       pushInstruction(mb, q);
+       setArgType(mb, q, 0, TYPE_void);
 
-       if((q = newStmt(mb, sqlRef, transactionRef)) == NULL) {
+       if((q = newStmt(mb, sqlRef, transactionRef)) == NULL) { //transaction 
reference
                CQ_MALLOC_FAIL(finish)
        }
-       setArgType(mb,q, 0, TYPE_void);
-       if ((p = newStmt(mb, "user", fname)) == NULL) { //add the UDF call 
statement
+       if(found->res && IS_UNION(found)) { //add output basket
+               if((q = newStmt(mb, sqlRef, mvcRef)) == NULL) {
+                       CQ_MALLOC_FAIL(finish)
+               }
+               setArgType(mb, q, 0, TYPE_int);
+               mvc_var = getDestVar(q);
+       }
+       if ((q = newStmt(mb, "user", fname)) == NULL) { //add the UDF call 
statement
                CQ_MALLOC_FAIL(finish)
        }
-       /*if (f->res && list_length(f->res)) {
-               sql_subtype *res = f->res->h->data;
-               setVarType(mb, getArg(q, 0), res->type->localtype);
-               setVarUDFtype(mb, getArg(q, 0));
-       }*/
+       if(found->res && IS_UNION(found))
+               q->argc = q->retc = 0;
        for (i = 0, argn = found->ops->h; i < argc && argn; i++, argn = 
argn->next) { //add variables to the MAL block
                sql_subtype tpe = ((sql_arg *) argn->data)->type;
                atom *a = args[i];
@@ -614,13 +615,80 @@ CQregister(Client cntxt, str sname, str 
                *val = dst;
                /* make sure we return the correct type (not the storage type) 
*/
                val->vtype = tpe.type->localtype;
-               p = pushValue(mb, p, val);
+               q = pushValue(mb, q, val);
                if(val->vtype == TYPE_str) //if the input variable is of type 
str we must free it
                        GDKfree(val->val.sval);
-               if(p == NULL) {
+               if(q == NULL) {
                        CQ_MALLOC_FAIL(finish);
                }
        }
+
+       if(found->res && IS_UNION(found)) {
+               int except_var;
+               p = q;
+
+               q= newStmt(mb, basketRef, registerRef); //register the output 
basket
+               q= pushArgument(mb, q, mvc_var);
+               getArg(q, 0) = mvc_var = newTmpVariable(mb, TYPE_int);
+               q= pushStr(mb, q, "tmp");
+               q= pushStr(mb, q, ralias);
+               q= pushInt(mb, q, 1);
+
+               q= newStmt(mb, basketRef, lockRef); //lock it
+               q= pushArgument(mb, q, mvc_var);
+               getArg(q, 0) = mvc_var = newTmpVariable(mb, TYPE_int);
+               q= pushStr(mb, q, "tmp");
+               q= pushStr(mb, q, ralias);
+
+               for (argn = found->res->h; argn; argn = argn->next) {
+                       sql_arg* arg = (sql_arg *) argn->data;
+                       sql_subtype tpe = ((sql_arg *) argn->data)->type;
+                       int type = newBatType(tpe.type->localtype);
+                       int nextbid = newTmpVariable(mb, type);
+                       p = pushReturn(mb, p, nextbid);
+
+                       q= newStmt(mb, basketRef, appendRef); //append to the 
basket the output results of teh UDF
+                       q= pushArgument(mb, q, mvc_var);
+                       getArg(q, 0) = mvc_var = newTmpVariable(mb, TYPE_int);
+                       q= pushStr(mb, q, "tmp");
+                       q= pushStr(mb, q, ralias);
+                       q= pushStr(mb, q, arg->name);
+                       q= pushArgument(mb, q, nextbid);
+               }
+               q = newAssignment(mb);
+               except_var = getArg(q, 0) = newVariable(mb, "SQLexception", 12, 
TYPE_str);
+               setVarUDFtype(mb, except_var);
+               q->barrier = CATCHsymbol;
+
+               q = newStmt(mb,basketRef, errorRef);
+               q = pushStr(mb, q, "user");
+               q = pushStr(mb, q, cq_id);
+               q = pushArgument(mb, q, except_var);
+
+               q = newAssignment(mb);
+               getArg(q, 0) = except_var;
+               q->barrier = EXITsymbol;
+
+               q = newAssignment(mb);
+               except_var = getArg(q, 0) = newVariable(mb, "MALexception", 12, 
TYPE_str);
+               setVarUDFtype(mb, except_var);
+               q->barrier = CATCHsymbol;
+
+               q = newStmt(mb,basketRef, errorRef);
+               q = pushStr(mb, q, "user");
+               q = pushStr(mb, q, cq_id);
+               q = pushArgument(mb, q, except_var);
+
+               q = newAssignment(mb);
+               getArg(q, 0) = except_var;
+               q->barrier = EXITsymbol;
+
+               q= newStmt(mb, basketRef, unlockRef); //unlock basket in the end
+               q= pushArgument(mb, q, mvc_var);
+               q= pushStr(mb, q, "tmp");
+               q= pushStr(mb, q, ralias);
+       }
+
        if((q = newStmt(mb, sqlRef, commitRef)) == NULL) {
                CQ_MALLOC_FAIL(finish)
        }
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to