Changeset: 6be9cf687ba7 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6be9cf687ba7
Modified Files:
sql/backends/monet5/Tests/cquery00.sql
sql/backends/monet5/Tests/cquery00.stable.out
sql/backends/monet5/Tests/cquery13.sql
sql/server/rel_psm.c
sql/server/rel_schema.c
sql/server/rel_semantic.c
Branch: trails
Log Message:
Moved continuous queries calls into rel_psm so they can be called from UDFs.
diffs (truncated from 344 to 300 lines):
diff --git a/sql/backends/monet5/Tests/cquery00.sql
b/sql/backends/monet5/Tests/cquery00.sql
--- a/sql/backends/monet5/Tests/cquery00.sql
+++ b/sql/backends/monet5/Tests/cquery00.sql
@@ -1,21 +1,21 @@
-- test ordinary procedure call to update a stream
-create stream table testing (a int);
-insert into testing values(123);
+create stream table cqinput00 (a int);
+insert into cqinput00 values(123);
-create table results (a int);
+create table cqresults00 (a int);
-create procedure myproc()
+create procedure cquery00()
begin
- insert into results select a from sys.testing;
+ insert into cqresults00 select a from sys.cqinput00;
END;
-- a continuous procedure can be called like any other procedure
-call myproc();
+call cquery00();
-select * from results;
+select * from cqresults00;
---select * from functions where name = 'myproc';
+--select * from functions where name = 'cquery00';
-drop procedure myproc;
-drop table results;
-drop table testing;
+drop procedure cquery00;
+drop table cqresults00;
+drop table cqinput00;
diff --git a/sql/backends/monet5/Tests/cquery00.stable.out
b/sql/backends/monet5/Tests/cquery00.stable.out
--- a/sql/backends/monet5/Tests/cquery00.stable.out
+++ b/sql/backends/monet5/Tests/cquery00.stable.out
@@ -33,8 +33,8 @@ Ready.
#begin
# insert into results select a from sys.testing;
#END;
-#select * from results;
-% sys.results # table_name
+#select * from cqresults00;
+% sys.cqresults00 # table_name
% a # name
% int # type
% 3 # length
diff --git a/sql/backends/monet5/Tests/cquery13.sql
b/sql/backends/monet5/Tests/cquery13.sql
--- a/sql/backends/monet5/Tests/cquery13.sql
+++ b/sql/backends/monet5/Tests/cquery13.sql
@@ -12,9 +12,9 @@ begin
end;
start continuous sys.cq_query13a() with heartbeat 3000;
-start continuous sys.cq_query13b() with cycles 1;
+start continuous sys.cq_query13b() with heartbeat 1000 cycles 1;
-call cquery.wait(1500);
+call cquery.wait(2500);
drop procedure sys.cq_query13a;
drop procedure sys.cq_query13b;
diff --git a/sql/server/rel_psm.c b/sql/server/rel_psm.c
--- a/sql/server/rel_psm.c
+++ b/sql/server/rel_psm.c
@@ -596,6 +596,87 @@ has_return( list *l )
return 0;
}
+static sql_exp*
+start_continuous_query(mvc *sql, symbol *s) {
+ dlist *l = s->data.lval;
+ AtomNode* an = NULL;
+ lng start_at = 0;
+ str msg = NULL;
+
+ an = (AtomNode*) l->h->next->next->next->next->data.sym;
+ if(an && (msg = convert_atom_into_unix_timestamp(an->a, &start_at)) !=
NULL){
+ return sql_error(sql, 01, "%s", msg);
+ }
+
+ sql->continuous |= l->h->data.i_val; /* start query */
+ sql->heartbeats = l->h->next->next->next->data.l_val;
+ sql->startat = start_at;
+ sql->cycles = l->h->next->next->next->next->next->data.i_val;
+ sql->cq_alias = l->h->next->next->next->next->next->next->data.sval;
+ return rel_psm_call(sql, l->h->next->next->data.sym,
l->h->next->data.i_val);
+}
+
+static sql_rel *
+rel_single_continuous_query(mvc *sql, dnode *w) {
+ sql_rel *rel;
+ list *exps;
+ int action = 0;
+ AtomNode* an;
+ lng start_at_parsed = 0;
+ str msg = NULL;
+
+ action |= w->data.i_val; /* pause, resume or stop query? */
+ action |= w->next->data.i_val; /* procedure or function? */
+
+ if(action & mod_resume_continuous) {
+ an = (AtomNode*) w->next->next->next->next->data.sym;
+ if(an){
+ if((msg = convert_atom_into_unix_timestamp(an->a,
&start_at_parsed)) != NULL)
+ return sql_error(sql, 02, "%s", msg);
+ }
+ }
+
+ rel = rel_create(sql->sa);
+ exps = new_exp_list(sql->sa);
+
+ append(exps, exp_atom_clob(sql->sa, w->next->next->data.sval)); //alias
+ append(exps, exp_atom_int(sql->sa, action));
+ if(action & mod_resume_continuous) {
+ append(exps, exp_atom_lng(sql->sa,
w->next->next->next->data.l_val)); //heartbeats
+ append(exps, exp_atom_lng(sql->sa, start_at_parsed)); //start
at value
+ append(exps, exp_atom_int(sql->sa,
w->next->next->next->next->next->data.i_val)); //cycles
+ } else {
+ append(exps, exp_atom_lng(sql->sa, 0));
+ append(exps, exp_atom_lng(sql->sa, 0));
+ append(exps, exp_atom_int(sql->sa, 0));
+ }
+
+ rel->l = NULL;
+ rel->r = NULL;
+ rel->op = op_ddl;
+ rel->flag = DDL_CHANGE_SINGLE_CP;
+ rel->exps = exps;
+ rel->card = 0;
+ rel->nrcols = 0;
+ return rel;
+}
+
+static sql_rel *
+rel_all_continuous_queries(mvc *sql, int action) {
+ sql_rel *rel = rel_create(sql->sa);
+ list *exps = new_exp_list(sql->sa);
+ append(exps, exp_atom_int(sql->sa, action));
+
+ rel->l = NULL;
+ rel->r = NULL;
+ rel->op = op_ddl;
+ rel->flag = DDL_CHANGE_ALL_CP;
+ rel->exps = exps;
+ rel->card = 0;
+ rel->nrcols = 0;
+ return rel;
+}
+
static list *
sequential_block (mvc *sql, sql_subtype *restype, list *restypelist, dlist
*blk, char *opt_label, int is_func)
{
@@ -635,27 +716,17 @@ sequential_block (mvc *sql, sql_subtype
reslist = rel_psm_case(sql, restype, restypelist,
s->data.lval->h, is_func);
break;
case SQL_CALL:
- sql->continuous = 0;
res = rel_psm_call(sql, s->data.sym, 0);
break;
- case SQL_START_CONTINUOUS_QUERY: {
- dlist *l = s->data.lval;
- AtomNode* an = NULL;
- lng start_at = 0;
- str msg = NULL;
-
- an = (AtomNode*) l->h->next->next->next->next->data.sym;
- if(an && (msg = convert_atom_into_unix_timestamp(an->a,
&start_at)) != NULL){
- return sql_error(sql, 01, "%s", msg);
- }
-
- sql->continuous |= l->h->data.i_val; /* start query */
- sql->heartbeats = l->h->next->next->next->data.l_val;
- sql->startat = start_at;
- sql->cycles =
l->h->next->next->next->next->next->data.i_val;
- sql->cq_alias =
l->h->next->next->next->next->next->next->data.sval;
- res = rel_psm_call(sql, l->h->next->next->data.sym,
l->h->next->data.i_val);
- } break;
+ case SQL_START_CONTINUOUS_QUERY:
+ res = start_continuous_query(sql, s);
+ break;
+ case SQL_CHANGE_CONTINUOUS_QUERY:
+ res = exp_rel(sql, rel_single_continuous_query(sql,
s->data.lval->h));
+ break;
+ case SQL_ALL_CONTINUOUS_QUERIES:
+ res = exp_rel(sql, rel_all_continuous_queries(sql,
s->data.i_val));
+ break;
case SQL_RETURN:
case SQL_YIELD:
/*If it is not a function it cannot have a return
statement*/
@@ -1463,25 +1534,18 @@ rel_psm(mvc *sql, symbol *s)
ret = rel_psm_stmt(sql->sa, rel_psm_call(sql, s->data.sym, 0));
sql->type = Q_UPDATE;
break;
- case SQL_START_CONTINUOUS_QUERY: {
- dlist *l = s->data.lval;
- AtomNode* an = NULL;
- lng start_at = 0;
- str msg = NULL;
-
- an = (AtomNode*) l->h->next->next->next->next->data.sym;
- if(an && (msg = convert_atom_into_unix_timestamp(an->a,
&start_at)) != NULL){
- return sql_error(sql, 01, "%s", msg);
- }
-
- sql->continuous |= l->h->data.i_val; /* start query */
- sql->heartbeats = l->h->next->next->next->data.l_val;
- sql->startat = start_at;
- sql->cycles = l->h->next->next->next->next->next->data.i_val;
- sql->cq_alias =
l->h->next->next->next->next->next->next->data.sval;
- ret = rel_psm_stmt(sql->sa, rel_psm_call(sql,
l->h->next->next->data.sym, l->h->next->data.i_val));
+ case SQL_START_CONTINUOUS_QUERY:
+ ret = rel_psm_stmt(sql->sa, start_continuous_query(sql, s));
sql->type = Q_UPDATE;
- } break;
+ break;
+ case SQL_CHANGE_CONTINUOUS_QUERY:
+ ret = rel_psm_stmt(sql->sa, exp_rel(sql,
rel_single_continuous_query(sql, s->data.lval->h)));
+ sql->type = Q_UPDATE;
+ break;
+ case SQL_ALL_CONTINUOUS_QUERIES:
+ ret = rel_psm_stmt(sql->sa, exp_rel(sql,
rel_all_continuous_queries(sql, s->data.i_val)));
+ sql->type = Q_UPDATE;
+ break;
case SQL_CREATE_TABLE_LOADER:
{
dlist *l = s->data.lval;
diff --git a/sql/server/rel_schema.c b/sql/server/rel_schema.c
--- a/sql/server/rel_schema.c
+++ b/sql/server/rel_schema.c
@@ -17,7 +17,6 @@
#include "rel_psm.h"
#include "sql_parser.h"
#include "sql_privileges.h"
-#include "sql_timestamps.h"
#define qname_index(qname) qname_table(qname)
#define qname_func(qname) qname_table(qname)
@@ -1180,67 +1179,6 @@ rel_drop_type(mvc *sql, dlist *qname, in
}
static sql_rel *
-rel_single_continuous_query(mvc *sql, dlist *l) {
- sql_rel *rel;
- list *exps;
- int action = 0;
- AtomNode* an;
- lng start_at_parsed = 0;
- str msg = NULL;
-
- action |= l->h->data.i_val; /* pause, resume or stop query? */
- action |= l->h->next->data.i_val; /* procedure or function? */
-
- if(action & mod_resume_continuous) {
- an = (AtomNode*) l->h->next->next->next->next->data.sym;
- if(an){
- if((msg = convert_atom_into_unix_timestamp(an->a,
&start_at_parsed)) != NULL)
- return sql_error(sql, 02, "%s", msg);
- }
- }
-
- rel = rel_create(sql->sa);
- exps = new_exp_list(sql->sa);
-
- append(exps, exp_atom_clob(sql->sa, l->h->next->next->data.sval));
//alias
- append(exps, exp_atom_int(sql->sa, action));
- if(action & mod_resume_continuous) {
- append(exps, exp_atom_lng(sql->sa,
l->h->next->next->next->data.l_val)); //heartbeats
- append(exps, exp_atom_lng(sql->sa, start_at_parsed)); //start
at value
- append(exps, exp_atom_int(sql->sa,
l->h->next->next->next->next->next->data.i_val)); //cycles
- } else {
- append(exps, exp_atom_lng(sql->sa, 0));
- append(exps, exp_atom_lng(sql->sa, 0));
- append(exps, exp_atom_int(sql->sa, 0));
- }
-
- rel->l = NULL;
- rel->r = NULL;
- rel->op = op_ddl;
- rel->flag = DDL_CHANGE_SINGLE_CP;
- rel->exps = exps;
- rel->card = 0;
- rel->nrcols = 0;
- return rel;
-}
-
-static sql_rel *
-rel_all_continuous_queries(mvc *sql, int action) {
- sql_rel *rel = rel_create(sql->sa);
- list *exps = new_exp_list(sql->sa);
- append(exps, exp_atom_int(sql->sa, action));
-
- rel->l = NULL;
- rel->r = NULL;
- rel->op = op_ddl;
- rel->flag = DDL_CHANGE_ALL_CP;
- rel->exps = exps;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list