Changeset: 08c3d7d45b05 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=08c3d7d45b05
Modified Files:
        sql/backends/monet5/Tests/cfunction01.sql
        sql/backends/monet5/Tests/cfunction01.stable.err
        sql/backends/monet5/Tests/cfunction01.stable.out
        sql/backends/monet5/sql_cquery.c
Branch: trails
Log Message:

Use an output stream table for CQ UDFs returning scalars as well


diffs (truncated from 302 to 300 lines):

diff --git a/sql/backends/monet5/Tests/cfunction01.sql 
b/sql/backends/monet5/Tests/cfunction01.sql
--- a/sql/backends/monet5/Tests/cfunction01.sql
+++ b/sql/backends/monet5/Tests/cfunction01.sql
@@ -15,20 +15,19 @@ begin
 END;
 select * from functions where name ='aggr01';
 
--- a continuous function can be called used like any other function?
-select aggr01(); #should return 0
+select result from tmp.aggr01; #error
 
 start continuous function aggr01();
 call cquery.wait(1000); #wait to be started
 
-select aggr01(); #should return 0
+select result from tmp.aggr01; #should be empty
 pause continuous aggr01;
 
 insert into ftmp values(1),(1);
 resume continuous aggr01;
 
 call cquery.wait(1000); #wait for processing
-select aggr01(); #should return 2
+select result from tmp.aggr01; #should return 2
 
 pause continuous aggr01;
 insert into ftmp values(2),(2);
@@ -36,10 +35,10 @@ insert into ftmp values(3),(3);
 
 resume continuous aggr01;
 call cquery.wait(1000);
-select aggr01(); #should return 6
+select result from tmp.aggr01; #should return 2,4,6
 
 call cquery.wait(1000);
-select aggr01(); #should return 6
+select result from tmp.aggr01; #should return 2,4,6
 
 stop continuous aggr01;
 drop function aggr01;
diff --git a/sql/backends/monet5/Tests/cfunction01.stable.err 
b/sql/backends/monet5/Tests/cfunction01.stable.err
--- a/sql/backends/monet5/Tests/cfunction01.stable.err
+++ b/sql/backends/monet5/Tests/cfunction01.stable.err
@@ -27,6 +27,10 @@ stderr of test 'cfunction01` in director
 # 16:27:02 >  "mclient" "-lsql" "-ftest" "-Eutf-8" "-i" "-e" 
"--host=/var/tmp/mtest-2361" "--port=36085"
 # 16:27:02 >  
 
+MAPI  = (monetdb) /var/tmp/mtest-26476/.s.monetdb.32321
+QUERY = select result from tmp.aggr01; #error
+ERROR = !SELECT: no such table 'aggr01'
+CODE  = 42S02
 
 # 16:27:07 >  
 # 16:27:07 >  "Done."
diff --git a/sql/backends/monet5/Tests/cfunction01.stable.out 
b/sql/backends/monet5/Tests/cfunction01.stable.out
--- a/sql/backends/monet5/Tests/cfunction01.stable.out
+++ b/sql/backends/monet5/Tests/cfunction01.stable.out
@@ -84,27 +84,20 @@ Ready.
 % id,  name,   func,   mod,    language,       type,   side_effect,    varres, 
vararg, schema_id # name
 % int, varchar,        varchar,        varchar,        int,    int,    
boolean,        boolean,        boolean,        int # type
 % 4,   6,      175,    4,      1,      1,      5,      5,      5,      4 # 
length
-[ 8669,        "aggr01",       "create function aggr01() \nreturns 
integer\nbegin\n declare s int;\n set s = 0;\n while (true)\n do\n set s = s + 
(select count(*) from ftmp);\n yield s ; \n end while;\n return s;\nend;",   
"user", 2,      1,      false,  false,  false,  2000    ]
-#select aggr01(); #should return 0
-% .L2 # table_name
-% L2 # name
+[ 8727,        "aggr01",       "create function aggr01() \nreturns 
integer\nbegin\n declare s int;\n set s = 0;\n while (true)\n do\n set s = s + 
(select count(*) from ftmp);\n yield s ; \n end while;\n return s;\nend;",   
"user", 2,      1,      false,  false,  false,  2000    ]
+#start continuous function aggr01();
+#select result from tmp.aggr01; #should be empty
+% tmp.aggr01 # table_name
+% result # name
 % int # type
 % 1 # length
-[ 0    ]
-#start continuous function aggr01();
-#select aggr01(); #should return 0
-% .L2 # table_name
-% L2 # name
-% int # type
-% 1 # length
-[ 0    ]
 #pause continuous aggr01;
 #insert into ftmp values(1),(1);
 [ 2    ]
 #resume continuous aggr01;
-#select aggr01(); #should return 2
-% .L2 # table_name
-% L2 # name
+#select result from tmp.aggr01; #should return 2
+% tmp.aggr01 # table_name
+% result # name
 % int # type
 % 1 # length
 [ 2    ]
@@ -114,17 +107,21 @@ Ready.
 #insert into ftmp values(3),(3);
 [ 2    ]
 #resume continuous aggr01;
-#select aggr01(); #should return 6
-% .L2 # table_name
-% L2 # name
+#select result from tmp.aggr01; #should return 2,4,6
+% tmp.aggr01 # table_name
+% result # name
 % int # type
 % 1 # length
+[ 2    ]
+[ 4    ]
 [ 6    ]
-#select aggr01(); #should return 6
-% .L2 # table_name
-% L2 # name
+#select result from tmp.aggr01; #should return 2,4,6
+% tmp.aggr01 # table_name
+% result # name
 % int # type
 % 1 # length
+[ 2    ]
+[ 4    ]
 [ 6    ]
 #stop continuous aggr01;
 #drop function aggr01;
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -117,7 +117,7 @@ CQfree(Client cntxt, int idx)
 
        //clean the baskets if so
        cleanBaskets(idx);
-       if(cntxt && IS_UNION(pnet[idx].func)) {
+       if(cntxt && pnet[idx].func->res) {
                //change IDs
                oid prevID = cntxt->user;
                str prevUserName = cntxt->username;
@@ -351,7 +351,7 @@ finish:
 /* Make sure we do not re-use the same source more than once */
 /* Avoid any concurrency conflict */
 static str
-CQanalysis(Client cntxt, MalBlkPtr mb, int idx, int isUnion, str alias)
+CQanalysis(Client cntxt, MalBlkPtr mb, int idx, sql_func* func, str alias)
 {
        int i, j, bskt, binout;
        InstrPtr p;
@@ -388,7 +388,7 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
                        pnet[idx].inout[j] = binout == 0 ? STREAM_IN : 
STREAM_OUT;
                }
        }
-       if(isUnion && msg == MAL_SUCCEED) { //register the output stream into 
the baskets
+       if(func->res && msg == MAL_SUCCEED) { //register the output stream into 
the baskets
                for( j=0; j< MAXSTREAMS && pnet[idx].baskets[j]; j++);
                if ( j == MAXSTREAMS){
                        msg = 
createException(MAL,"cquery.analysis",SQLSTATE(3F000) "Too many stream table 
columns\n");
@@ -507,50 +507,46 @@ CQregister(Client cntxt, str sname, str 
        }
 
        found = f->func;
-       if(found->res) { //for functions we have to store the results somewhere
-               if(IS_UNION(found)) { //if it is a UNION (returning a table), 
we store it in a
-                       oid prevID;
-                       str prevUserName;
-                       //change IDs
-                       prevID = cntxt->user;
-                       prevUserName = cntxt->username;
-                       cntxt->user = CQ_SCHEDULER_CLIENTID;
-                       if((msg = AUTHgetUsername(&cntxt->username, cntxt)) == 
MAL_SUCCEED) {
-                               if((tmp_schema = mvc_bind_schema(m, "tmp")) == 
NULL) {
-                                       msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to bind tmp 
schema\n");
-                                       goto revertids;
-                               }
-                               if(mvc_bind_table(m, tmp_schema, ralias)) {
-                                       msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Table tmp.%s already 
exists\n", ralias);
-                                       goto revertids;
-                               }
-                               if((t = mvc_create_stream_table(m, tmp_schema, 
ralias, tt_stream_temp, 0, SQL_DECLARED_TABLE,
-                                                                               
                CA_PRESERVE, -1, DEFAULT_TABLE_WINDOW, DEFAULT_TABLE_STRIDE)) 
== NULL) {
-                                       msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed create internal 
stream table\n");
+       if(found->res) { //for functions we have to store the results in an 
output result table
+               oid prevID;
+               str prevUserName;
+               //change IDs
+               prevID = cntxt->user;
+               prevUserName = cntxt->username;
+               cntxt->user = CQ_SCHEDULER_CLIENTID;
+               if((msg = AUTHgetUsername(&cntxt->username, cntxt)) == 
MAL_SUCCEED) {
+                       if((tmp_schema = mvc_bind_schema(m, "tmp")) == NULL) {
+                               msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to bind tmp 
schema\n");
+                               goto revertids;
+                       }
+                       if(mvc_bind_table(m, tmp_schema, ralias)) {
+                               msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Table tmp.%s already 
exists\n", ralias);
+                               goto revertids;
+                       }
+                       if((t = mvc_create_stream_table(m, tmp_schema, ralias, 
tt_stream_temp, 0, SQL_DECLARED_TABLE,
+                                                                               
        CA_PRESERVE, -1, DEFAULT_TABLE_WINDOW, DEFAULT_TABLE_STRIDE)) == NULL) {
+                               msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed create internal 
stream table\n");
+                               goto revertids;
+                       }
+                       for (argn = found->res->h; argn; argn = argn->next) {
+                               sql_arg* arg = (sql_arg *) argn->data;
+                               if(!mvc_create_column(m, t, arg->name, 
&arg->type)) {
+                                       msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to create 
internal stream table\n");
                                        goto revertids;
                                }
-                               for (argn = found->res->h; argn; argn = 
argn->next) {
-                                       sql_arg* arg = (sql_arg *) argn->data;
-                                       if(!mvc_create_column(m, t, arg->name, 
&arg->type)) {
-                                               msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to create 
internal stream table\n");
-                                               goto revertids;
-                                       }
-                               }
-                               msg = create_table_or_view(m, "tmp", ralias, t, 
SQL_LOCAL_TEMP_STREAM);
-                               //msg = sql_grant_table_privs(m, "public", 
PRIV_SELECT, "tmp", ralias, NULL, 0, USER_MONETDB);
                        }
+                       msg = create_table_or_view(m, "tmp", ralias, t, 
SQL_LOCAL_TEMP_STREAM);
+                       //msg = sql_grant_table_privs(m, "public", PRIV_SELECT, 
"tmp", ralias, NULL, 0, USER_MONETDB);
+               }
        revertids:
-                       //set the IDs back
-                       if(cntxt->username)
-                               GDKfree(cntxt->username);
-                       cntxt->user = prevID;
-                       cntxt->username = prevUserName;
-                       if(msg) {
-                               FREE_CQ_MB(finish)
-                       }
-               } /*else if(IS_FUNC(found)) {
-
-               }*/
+               //set the IDs back
+               if(cntxt->username)
+                       GDKfree(cntxt->username);
+               cntxt->user = prevID;
+               cntxt->username = prevUserName;
+               if(msg) {
+                       FREE_CQ_MB(finish)
+               }
        }
 
        MT_lock_set(&ttrLock);
@@ -578,7 +574,7 @@ CQregister(Client cntxt, str sname, str 
        if((q = newStmt(mb, sqlRef, transactionRef)) == NULL) { //transaction 
reference
                CQ_MALLOC_FAIL(finish)
        }
-       if(found->res && IS_UNION(found)) { //add output basket
+       if(found->res) { //add output basket
                if((q = newStmt(mb, sqlRef, mvcRef)) == NULL) {
                        CQ_MALLOC_FAIL(finish)
                }
@@ -588,7 +584,7 @@ CQregister(Client cntxt, str sname, str 
        if ((q = newStmt(mb, "user", fname)) == NULL) { //add the UDF call 
statement
                CQ_MALLOC_FAIL(finish)
        }
-       if(found->res && IS_UNION(found))
+       if(found->res)
                q->argc = q->retc = 0;
        for (i = 0, argn = found->ops->h; i < argc && argn; i++, argn = 
argn->next) { //add variables to the MAL block
                sql_subtype tpe = ((sql_arg *) argn->data)->type;
@@ -619,7 +615,7 @@ CQregister(Client cntxt, str sname, str 
                }
        }
 
-       if(found->res && IS_UNION(found)) {
+       if(found->res) {
                int except_var;
                p = q;
 
@@ -639,8 +635,7 @@ CQregister(Client cntxt, str sname, str 
                for (argn = found->res->h; argn; argn = argn->next) {
                        sql_arg* arg = (sql_arg *) argn->data;
                        sql_subtype tpe = ((sql_arg *) argn->data)->type;
-                       int type = newBatType(tpe.type->localtype);
-                       int nextbid = newTmpVariable(mb, type);
+                       int nextbid = newTmpVariable(mb, IS_UNION(found) ? 
newBatType(tpe.type->localtype) : tpe.type->localtype);
                        p = pushReturn(mb, p, nextbid);
 
                        q= newStmt(mb, basketRef, appendRef); //append to the 
basket the output results of the UDF
@@ -730,7 +725,7 @@ CQregister(Client cntxt, str sname, str 
                                err_message, ralias);
                FREE_CQ_MB(unlock)
        }
-       if((msg = CQanalysis(cntxt, sym->def, pnettop, IS_UNION(found), 
ralias)) != MAL_SUCCEED) {
+       if((msg = CQanalysis(cntxt, sym->def, pnettop, found, ralias)) != 
MAL_SUCCEED) {
                cleanBaskets(pnettop);
                FREE_CQ_MB(unlock)
        }
@@ -1120,7 +1115,7 @@ CQderegister(Client cntxt, str alias)
        if(myID != cq_pid) {
                pnet[idx].status = CQSTOP;
                this_alias = pnet[idx].alias;
-               if(IS_UNION(pnet[idx].func)) {
+               if(pnet[idx].func->res) {
                        for( i=0; i < pnettop && !falias; i++){
                                if(i != idx) {
                                        for( j=0; j< MAXSTREAMS && 
pnet[i].baskets[j] && !falias; j++){
@@ -1410,7 +1405,7 @@ CQscheduler(void *dummy)
                }
                start_trans = 0;
                for (i = 0; i < pnettop ; i++) { //if there is a continuous 
function to delete, we must start a transaction
-                       if( pnet[i].status == CQDELETE && 
IS_UNION(pnet[i].func))
+                       if( pnet[i].status == CQDELETE && pnet[i].func->res)
                                start_trans = 1;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to