Changeset: 2d92b97757bd for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=2d92b97757bd
Added Files:
sql/backends/monet5/Tests/cfunction03.sql
Modified Files:
sql/backends/monet5/Tests/All
sql/backends/monet5/Tests/cfunction00.sql
sql/backends/monet5/Tests/cfunction00.stable.out
sql/backends/monet5/Tests/cfunction01.sql
sql/backends/monet5/Tests/cfunction01.stable.out
sql/backends/monet5/Tests/cfunction02.sql
sql/backends/monet5/Tests/cfunction02.stable.out
sql/backends/monet5/sql_basket.c
sql/backends/monet5/sql_cquery.c
Branch: trails
Log Message:
Fix CQ exception handling
diffs (273 lines):
diff --git a/sql/backends/monet5/Tests/All b/sql/backends/monet5/Tests/All
--- a/sql/backends/monet5/Tests/All
+++ b/sql/backends/monet5/Tests/All
@@ -115,6 +115,7 @@ cqstream03
cfunction00
cfunction01
cfunction02
+cfunction03
factory00
factory01
diff --git a/sql/backends/monet5/Tests/cfunction00.sql
b/sql/backends/monet5/Tests/cfunction00.sql
--- a/sql/backends/monet5/Tests/cfunction00.sql
+++ b/sql/backends/monet5/Tests/cfunction00.sql
@@ -10,7 +10,6 @@ begin
end while;
return s;
END;
-select name from functions where name ='aggr00';
-- to call a continuous function in the scheduler, we must pass the keyword
"function" explicitly
start continuous function aggr00();
diff --git a/sql/backends/monet5/Tests/cfunction00.stable.out
b/sql/backends/monet5/Tests/cfunction00.stable.out
--- a/sql/backends/monet5/Tests/cfunction00.stable.out
+++ b/sql/backends/monet5/Tests/cfunction00.stable.out
@@ -37,12 +37,7 @@ Ready.
# end while;
# return s;
#END;
-#select name from functions where name ='aggr00';
-% sys.functions # table_name
-% name # name
-% varchar # type
-% 6 # length
-[ "aggr00" ]
+#start continuous function aggr00();
#select aggr00(); #should return 1
% .L2 # table_name
% L2 # name
diff --git a/sql/backends/monet5/Tests/cfunction01.sql
b/sql/backends/monet5/Tests/cfunction01.sql
--- a/sql/backends/monet5/Tests/cfunction01.sql
+++ b/sql/backends/monet5/Tests/cfunction01.sql
@@ -13,7 +13,6 @@ begin
end while;
return s;
END;
-select * from functions where name ='aggr01';
select result from tmp.aggr01; #error
diff --git a/sql/backends/monet5/Tests/cfunction01.stable.out
b/sql/backends/monet5/Tests/cfunction01.stable.out
--- a/sql/backends/monet5/Tests/cfunction01.stable.out
+++ b/sql/backends/monet5/Tests/cfunction01.stable.out
@@ -79,12 +79,6 @@ Ready.
# end while;
# return s;
#END;
-#select * from functions where name ='aggr01';
-% sys.functions, sys.functions, sys.functions, sys.functions,
sys.functions, sys.functions, sys.functions, sys.functions, sys.functions,
sys.functions # table_name
-% id, name, func, mod, language, type, side_effect, varres,
vararg, schema_id # name
-% int, varchar, varchar, varchar, int, int,
boolean, boolean, boolean, int # type
-% 4, 6, 175, 4, 1, 1, 5, 5, 5, 4 #
length
-[ 8727, "aggr01", "create function aggr01() \nreturns
integer\nbegin\n declare s int;\n set s = 0;\n while (true)\n do\n set s = s +
(select count(*) from ftmp);\n yield s ; \n end while;\n return s;\nend;",
"user", 2, 1, false, false, false, 2000 ]
#start continuous function aggr01();
#select result from tmp.aggr01; #should be empty
% tmp.aggr01 # table_name
diff --git a/sql/backends/monet5/Tests/cfunction02.sql
b/sql/backends/monet5/Tests/cfunction02.sql
--- a/sql/backends/monet5/Tests/cfunction02.sql
+++ b/sql/backends/monet5/Tests/cfunction02.sql
@@ -1,7 +1,7 @@
-- Test a continuous function returning a table
-create table results3 (aa int, bb text);
+create table results2 (aa int, bb text);
-create function cfunc3(input text) returns table (aa integer, bb text) begin
+create function cfunc2(input text) returns table (aa integer, bb text) begin
declare s int;
set s = 0;
while true do
@@ -10,22 +10,22 @@ create function cfunc3(input text) retur
end while;
end;
-start continuous function cfunc3('test') with heartbeat 1000 cycles 3;
+start continuous function cfunc2('test') with heartbeat 1000 cycles 3;
-pause continuous cfunc3;
+pause continuous cfunc2;
-create procedure cproc3() begin
- insert into results3 (select aa, bb from tmp.cfunc3);
+create procedure cproc2() begin
+ insert into results2 (select aa, bb from tmp.cfunc2);
end;
-start continuous procedure cproc3() with cycles 3;
+start continuous procedure cproc2() with cycles 2;
call cquery.wait(4000);
-stop continuous cfunc3;
+stop continuous cfunc2;
-select aa, bb from results3;
+select aa, bb from results2;
-drop function cfunc3;
-drop procedure cproc3;
-drop table results3;
+drop function cfunc2;
+drop procedure cproc2;
+drop table results2;
diff --git a/sql/backends/monet5/Tests/cfunction02.stable.out
b/sql/backends/monet5/Tests/cfunction02.stable.out
--- a/sql/backends/monet5/Tests/cfunction02.stable.out
+++ b/sql/backends/monet5/Tests/cfunction02.stable.out
@@ -39,19 +39,18 @@ Ready.
#create procedure cproc3() begin
# insert into results3 (select aa, bb from tmp.cfunc3);
#end;
-#start continuous procedure cproc3() with cycles 3;
-#stop continuous cfunc3;
-#select aa, bb from results3;
-% sys.results3, sys.results3 # table_name
+#start continuous procedure cproc2() with cycles 2;
+#stop continuous cfunc2;
+#select aa, bb from results2;
+% sys.results2, sys.results2 # table_name
% aa, bb # name
% int, clob # type
% 1, 4 # length
[ 1, "test" ]
[ 1, "test" ]
-[ 1, "test" ]
-#drop function cfunc3;
-#drop procedure cproc3;
-#drop table results3;
+#drop function cfunc2;
+#drop procedure cproc2;
+#drop table results2;
# 17:04:58 >
# 17:04:58 > "Done."
diff --git a/sql/backends/monet5/Tests/cfunction03.sql
b/sql/backends/monet5/Tests/cfunction03.sql
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/Tests/cfunction03.sql
@@ -0,0 +1,18 @@
+-- Test continuous functions with limited runs
+create table results3 (aa time);
+
+create function cfunc3(input time) returns table (aa time) begin
+ while true do
+ yield table (select input);
+ end while;
+end;
+
+start continuous function cfunc3(time '15:00:00') with heartbeat 100 cycles 3;
+
+call cquery.wait(2000);
+
+select aa from time;
+
+drop function cfunc3;
+drop procedure cproc3;
+drop table results3;
diff --git a/sql/backends/monet5/sql_basket.c b/sql/backends/monet5/sql_basket.c
--- a/sql/backends/monet5/sql_basket.c
+++ b/sql/backends/monet5/sql_basket.c
@@ -723,9 +723,11 @@ BSKTerror(Client cntxt, MalBlkPtr mb, Ma
if( idx <= 0)
throw(SQL,"basket.error",SQLSTATE(3F000) "Stream table %s.%s
not registered\n",sname,tname);
- baskets[idx].error = GDKstrdup(error);
- if(baskets[idx].error == NULL)
- throw(SQL,"basket.error",SQLSTATE(HY001) MAL_MALLOC_FAIL);
+ if(error) {
+ baskets[idx].error = GDKstrdup(error);
+ if(baskets[idx].error == NULL)
+ throw(SQL,"basket.error",SQLSTATE(HY001)
MAL_MALLOC_FAIL);
+ }
return MAL_SUCCEED;
}
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -60,7 +60,6 @@ static int pnstatus = CQINIT;
static int cycleDelay = 200; /* be careful, it affects response/throughput
timings */
static MT_Lock ttrLock;
static MT_Id cq_pid = 0;
-static int CQ_counter = 0;
static BAT *CQ_id_tick = 0;
static BAT *CQ_id_alias = 0;
@@ -401,8 +400,6 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
}
#define FREE_CQ_MB(X) \
- if(cq_id) \
- GDKfree(cq_id); \
if(mb) \
freeMalBlk(mb); \
if(ralias) \
@@ -420,15 +417,13 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
str
CQregister(Client cntxt, str sname, str fname, int argc, atom **args, str
alias, int which, lng heartbeats, lng startat, int cycles)
{
- str msg = MAL_SUCCEED, rschema = NULL, ralias = NULL;
+ str msg = MAL_SUCCEED, rschema = NULL, ralias = NULL, raliasdup = NULL;
InstrPtr p = NULL, q = NULL;
Symbol sym;
CQnode *pnew;
MalBlkPtr mb = NULL, prev;
const char* err_message = (which & mod_continuous_function) ?
"function" : "procedure";
- char* cq_id = NULL;
- char buffer[IDLENGTH];
- int i, idx, varid, cid, freeMB = 0, mvc_var = 0;
+ int i, idx, varid, freeMB = 0, mvc_var = 0;
backend* be = (backend*) cntxt->sqlcontext;
mvc *m = be->mvc;
sql_schema *s = NULL, *tmp_schema = NULL;
@@ -549,22 +544,20 @@ CQregister(Client cntxt, str sname, str
}
}
- MT_lock_set(&ttrLock);
- cid = ++CQ_counter;
- MT_lock_unset(&ttrLock);
- (void) snprintf(buffer, sizeof(buffer), "cq_%d", cid); //set the CQ ID
- if((cq_id = GDKstrdup(buffer)) == NULL) {
- CQ_MALLOC_FAIL(finish)
- }
if((mb = newMalBlk(8)) == NULL) { //create MalBlk and initialize it
CQ_MALLOC_FAIL(finish)
}
- if((q = newInstruction(NULL, "user", cq_id)) == NULL) {
+ if((raliasdup = GDKstrdup(ralias)) == NULL) {
+ CQ_MALLOC_FAIL(finish)
+ }
+ if((q = newInstruction(NULL, "tmp", raliasdup)) == NULL) {
+ GDKfree(raliasdup);
CQ_MALLOC_FAIL(finish)
}
q->token = FUNCTIONsymbol;
q->barrier = 0;
- if((varid = newVariable(mb, cq_id, strlen(cq_id), TYPE_void)) < 0) {
+ if((varid = newVariable(mb, ralias, strlen(ralias), TYPE_void)) < 0) {
+ freeInstruction(q);
CQ_MALLOC_FAIL(finish)
}
setDestVar(q, varid);
@@ -652,8 +645,8 @@ CQregister(Client cntxt, str sname, str
q->barrier = CATCHsymbol;
q = newStmt(mb,basketRef, errorRef);
- q = pushStr(mb, q, "user");
- q = pushStr(mb, q, cq_id);
+ q = pushStr(mb, q, "tmp");
+ q = pushStr(mb, q, ralias);
q = pushArgument(mb, q, except_var);
q = newAssignment(mb);
@@ -666,8 +659,8 @@ CQregister(Client cntxt, str sname, str
q->barrier = CATCHsymbol;
q = newStmt(mb,basketRef, errorRef);
- q = pushStr(mb, q, "user");
- q = pushStr(mb, q, cq_id);
+ q = pushStr(mb, q, "tmp");
+ q = pushStr(mb, q, ralias);
q = pushArgument(mb, q, except_var);
q = newAssignment(mb);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list