Changeset: 08c3d7d45b05 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=08c3d7d45b05
Modified Files:
sql/backends/monet5/Tests/cfunction01.sql
sql/backends/monet5/Tests/cfunction01.stable.err
sql/backends/monet5/Tests/cfunction01.stable.out
sql/backends/monet5/sql_cquery.c
Branch: trails
Log Message:
Use an output stream table for CQ UDFs returning scalars as well
diffs (truncated from 302 to 300 lines):
diff --git a/sql/backends/monet5/Tests/cfunction01.sql
b/sql/backends/monet5/Tests/cfunction01.sql
--- a/sql/backends/monet5/Tests/cfunction01.sql
+++ b/sql/backends/monet5/Tests/cfunction01.sql
@@ -15,20 +15,19 @@ begin
END;
select * from functions where name ='aggr01';
--- a continuous function can be called used like any other function?
-select aggr01(); #should return 0
+select result from tmp.aggr01; #error
start continuous function aggr01();
call cquery.wait(1000); #wait to be started
-select aggr01(); #should return 0
+select result from tmp.aggr01; #should be empty
pause continuous aggr01;
insert into ftmp values(1),(1);
resume continuous aggr01;
call cquery.wait(1000); #wait for processing
-select aggr01(); #should return 2
+select result from tmp.aggr01; #should return 2
pause continuous aggr01;
insert into ftmp values(2),(2);
@@ -36,10 +35,10 @@ insert into ftmp values(3),(3);
resume continuous aggr01;
call cquery.wait(1000);
-select aggr01(); #should return 6
+select result from tmp.aggr01; #should return 2,4,6
call cquery.wait(1000);
-select aggr01(); #should return 6
+select result from tmp.aggr01; #should return 2,4,6
stop continuous aggr01;
drop function aggr01;
diff --git a/sql/backends/monet5/Tests/cfunction01.stable.err
b/sql/backends/monet5/Tests/cfunction01.stable.err
--- a/sql/backends/monet5/Tests/cfunction01.stable.err
+++ b/sql/backends/monet5/Tests/cfunction01.stable.err
@@ -27,6 +27,10 @@ stderr of test 'cfunction01` in director
# 16:27:02 > "mclient" "-lsql" "-ftest" "-Eutf-8" "-i" "-e"
"--host=/var/tmp/mtest-2361" "--port=36085"
# 16:27:02 >
+MAPI = (monetdb) /var/tmp/mtest-26476/.s.monetdb.32321
+QUERY = select result from tmp.aggr01; #error
+ERROR = !SELECT: no such table 'aggr01'
+CODE = 42S02
# 16:27:07 >
# 16:27:07 > "Done."
diff --git a/sql/backends/monet5/Tests/cfunction01.stable.out
b/sql/backends/monet5/Tests/cfunction01.stable.out
--- a/sql/backends/monet5/Tests/cfunction01.stable.out
+++ b/sql/backends/monet5/Tests/cfunction01.stable.out
@@ -84,27 +84,20 @@ Ready.
% id, name, func, mod, language, type, side_effect, varres,
vararg, schema_id # name
% int, varchar, varchar, varchar, int, int,
boolean, boolean, boolean, int # type
% 4, 6, 175, 4, 1, 1, 5, 5, 5, 4 #
length
-[ 8669, "aggr01", "create function aggr01() \nreturns
integer\nbegin\n declare s int;\n set s = 0;\n while (true)\n do\n set s = s +
(select count(*) from ftmp);\n yield s ; \n end while;\n return s;\nend;",
"user", 2, 1, false, false, false, 2000 ]
-#select aggr01(); #should return 0
-% .L2 # table_name
-% L2 # name
+[ 8727, "aggr01", "create function aggr01() \nreturns
integer\nbegin\n declare s int;\n set s = 0;\n while (true)\n do\n set s = s +
(select count(*) from ftmp);\n yield s ; \n end while;\n return s;\nend;",
"user", 2, 1, false, false, false, 2000 ]
+#start continuous function aggr01();
+#select result from tmp.aggr01; #should be empty
+% tmp.aggr01 # table_name
+% result # name
% int # type
% 1 # length
-[ 0 ]
-#start continuous function aggr01();
-#select aggr01(); #should return 0
-% .L2 # table_name
-% L2 # name
-% int # type
-% 1 # length
-[ 0 ]
#pause continuous aggr01;
#insert into ftmp values(1),(1);
[ 2 ]
#resume continuous aggr01;
-#select aggr01(); #should return 2
-% .L2 # table_name
-% L2 # name
+#select result from tmp.aggr01; #should return 2
+% tmp.aggr01 # table_name
+% result # name
% int # type
% 1 # length
[ 2 ]
@@ -114,17 +107,21 @@ Ready.
#insert into ftmp values(3),(3);
[ 2 ]
#resume continuous aggr01;
-#select aggr01(); #should return 6
-% .L2 # table_name
-% L2 # name
+#select result from tmp.aggr01; #should return 2,4,6
+% tmp.aggr01 # table_name
+% result # name
% int # type
% 1 # length
+[ 2 ]
+[ 4 ]
[ 6 ]
-#select aggr01(); #should return 6
-% .L2 # table_name
-% L2 # name
+#select result from tmp.aggr01; #should return 2,4,6
+% tmp.aggr01 # table_name
+% result # name
% int # type
% 1 # length
+[ 2 ]
+[ 4 ]
[ 6 ]
#stop continuous aggr01;
#drop function aggr01;
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -117,7 +117,7 @@ CQfree(Client cntxt, int idx)
//clean the baskets if so
cleanBaskets(idx);
- if(cntxt && IS_UNION(pnet[idx].func)) {
+ if(cntxt && pnet[idx].func->res) {
//change IDs
oid prevID = cntxt->user;
str prevUserName = cntxt->username;
@@ -351,7 +351,7 @@ finish:
/* Make sure we do not re-use the same source more than once */
/* Avoid any concurrency conflict */
static str
-CQanalysis(Client cntxt, MalBlkPtr mb, int idx, int isUnion, str alias)
+CQanalysis(Client cntxt, MalBlkPtr mb, int idx, sql_func* func, str alias)
{
int i, j, bskt, binout;
InstrPtr p;
@@ -388,7 +388,7 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
pnet[idx].inout[j] = binout == 0 ? STREAM_IN :
STREAM_OUT;
}
}
- if(isUnion && msg == MAL_SUCCEED) { //register the output stream into
the baskets
+ if(func->res && msg == MAL_SUCCEED) { //register the output stream into
the baskets
for( j=0; j< MAXSTREAMS && pnet[idx].baskets[j]; j++);
if ( j == MAXSTREAMS){
msg =
createException(MAL,"cquery.analysis",SQLSTATE(3F000) "Too many stream table
columns\n");
@@ -507,50 +507,46 @@ CQregister(Client cntxt, str sname, str
}
found = f->func;
- if(found->res) { //for functions we have to store the results somewhere
- if(IS_UNION(found)) { //if it is a UNION (returning a table),
we store it in a
- oid prevID;
- str prevUserName;
- //change IDs
- prevID = cntxt->user;
- prevUserName = cntxt->username;
- cntxt->user = CQ_SCHEDULER_CLIENTID;
- if((msg = AUTHgetUsername(&cntxt->username, cntxt)) ==
MAL_SUCCEED) {
- if((tmp_schema = mvc_bind_schema(m, "tmp")) ==
NULL) {
- msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to bind tmp
schema\n");
- goto revertids;
- }
- if(mvc_bind_table(m, tmp_schema, ralias)) {
- msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Table tmp.%s already
exists\n", ralias);
- goto revertids;
- }
- if((t = mvc_create_stream_table(m, tmp_schema,
ralias, tt_stream_temp, 0, SQL_DECLARED_TABLE,
-
CA_PRESERVE, -1, DEFAULT_TABLE_WINDOW, DEFAULT_TABLE_STRIDE))
== NULL) {
- msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed create internal
stream table\n");
+ if(found->res) { //for functions we have to store the results in an
output result table
+ oid prevID;
+ str prevUserName;
+ //change IDs
+ prevID = cntxt->user;
+ prevUserName = cntxt->username;
+ cntxt->user = CQ_SCHEDULER_CLIENTID;
+ if((msg = AUTHgetUsername(&cntxt->username, cntxt)) ==
MAL_SUCCEED) {
+ if((tmp_schema = mvc_bind_schema(m, "tmp")) == NULL) {
+ msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to bind tmp
schema\n");
+ goto revertids;
+ }
+ if(mvc_bind_table(m, tmp_schema, ralias)) {
+ msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Table tmp.%s already
exists\n", ralias);
+ goto revertids;
+ }
+ if((t = mvc_create_stream_table(m, tmp_schema, ralias,
tt_stream_temp, 0, SQL_DECLARED_TABLE,
+
CA_PRESERVE, -1, DEFAULT_TABLE_WINDOW, DEFAULT_TABLE_STRIDE)) == NULL) {
+ msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed create internal
stream table\n");
+ goto revertids;
+ }
+ for (argn = found->res->h; argn; argn = argn->next) {
+ sql_arg* arg = (sql_arg *) argn->data;
+ if(!mvc_create_column(m, t, arg->name,
&arg->type)) {
+ msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to create
internal stream table\n");
goto revertids;
}
- for (argn = found->res->h; argn; argn =
argn->next) {
- sql_arg* arg = (sql_arg *) argn->data;
- if(!mvc_create_column(m, t, arg->name,
&arg->type)) {
- msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to create
internal stream table\n");
- goto revertids;
- }
- }
- msg = create_table_or_view(m, "tmp", ralias, t,
SQL_LOCAL_TEMP_STREAM);
- //msg = sql_grant_table_privs(m, "public",
PRIV_SELECT, "tmp", ralias, NULL, 0, USER_MONETDB);
}
+ msg = create_table_or_view(m, "tmp", ralias, t,
SQL_LOCAL_TEMP_STREAM);
+ //msg = sql_grant_table_privs(m, "public", PRIV_SELECT,
"tmp", ralias, NULL, 0, USER_MONETDB);
+ }
revertids:
- //set the IDs back
- if(cntxt->username)
- GDKfree(cntxt->username);
- cntxt->user = prevID;
- cntxt->username = prevUserName;
- if(msg) {
- FREE_CQ_MB(finish)
- }
- } /*else if(IS_FUNC(found)) {
-
- }*/
+ //set the IDs back
+ if(cntxt->username)
+ GDKfree(cntxt->username);
+ cntxt->user = prevID;
+ cntxt->username = prevUserName;
+ if(msg) {
+ FREE_CQ_MB(finish)
+ }
}
MT_lock_set(&ttrLock);
@@ -578,7 +574,7 @@ CQregister(Client cntxt, str sname, str
if((q = newStmt(mb, sqlRef, transactionRef)) == NULL) { //transaction
reference
CQ_MALLOC_FAIL(finish)
}
- if(found->res && IS_UNION(found)) { //add output basket
+ if(found->res) { //add output basket
if((q = newStmt(mb, sqlRef, mvcRef)) == NULL) {
CQ_MALLOC_FAIL(finish)
}
@@ -588,7 +584,7 @@ CQregister(Client cntxt, str sname, str
if ((q = newStmt(mb, "user", fname)) == NULL) { //add the UDF call
statement
CQ_MALLOC_FAIL(finish)
}
- if(found->res && IS_UNION(found))
+ if(found->res)
q->argc = q->retc = 0;
for (i = 0, argn = found->ops->h; i < argc && argn; i++, argn =
argn->next) { //add variables to the MAL block
sql_subtype tpe = ((sql_arg *) argn->data)->type;
@@ -619,7 +615,7 @@ CQregister(Client cntxt, str sname, str
}
}
- if(found->res && IS_UNION(found)) {
+ if(found->res) {
int except_var;
p = q;
@@ -639,8 +635,7 @@ CQregister(Client cntxt, str sname, str
for (argn = found->res->h; argn; argn = argn->next) {
sql_arg* arg = (sql_arg *) argn->data;
sql_subtype tpe = ((sql_arg *) argn->data)->type;
- int type = newBatType(tpe.type->localtype);
- int nextbid = newTmpVariable(mb, type);
+ int nextbid = newTmpVariable(mb, IS_UNION(found) ?
newBatType(tpe.type->localtype) : tpe.type->localtype);
p = pushReturn(mb, p, nextbid);
q= newStmt(mb, basketRef, appendRef); //append to the
basket the output results of the UDF
@@ -730,7 +725,7 @@ CQregister(Client cntxt, str sname, str
err_message, ralias);
FREE_CQ_MB(unlock)
}
- if((msg = CQanalysis(cntxt, sym->def, pnettop, IS_UNION(found),
ralias)) != MAL_SUCCEED) {
+ if((msg = CQanalysis(cntxt, sym->def, pnettop, found, ralias)) !=
MAL_SUCCEED) {
cleanBaskets(pnettop);
FREE_CQ_MB(unlock)
}
@@ -1120,7 +1115,7 @@ CQderegister(Client cntxt, str alias)
if(myID != cq_pid) {
pnet[idx].status = CQSTOP;
this_alias = pnet[idx].alias;
- if(IS_UNION(pnet[idx].func)) {
+ if(pnet[idx].func->res) {
for( i=0; i < pnettop && !falias; i++){
if(i != idx) {
for( j=0; j< MAXSTREAMS &&
pnet[i].baskets[j] && !falias; j++){
@@ -1410,7 +1405,7 @@ CQscheduler(void *dummy)
}
start_trans = 0;
for (i = 0; i < pnettop ; i++) { //if there is a continuous
function to delete, we must start a transaction
- if( pnet[i].status == CQDELETE &&
IS_UNION(pnet[i].func))
+ if( pnet[i].status == CQDELETE && pnet[i].func->res)
start_trans = 1;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list