Changeset: 6be9cf687ba7 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6be9cf687ba7
Modified Files:
        sql/backends/monet5/Tests/cquery00.sql
        sql/backends/monet5/Tests/cquery00.stable.out
        sql/backends/monet5/Tests/cquery13.sql
        sql/server/rel_psm.c
        sql/server/rel_schema.c
        sql/server/rel_semantic.c
Branch: trails
Log Message:

Moved continuous queries calls into rel_psm so they can be called from UDFs.


diffs (truncated from 344 to 300 lines):

diff --git a/sql/backends/monet5/Tests/cquery00.sql 
b/sql/backends/monet5/Tests/cquery00.sql
--- a/sql/backends/monet5/Tests/cquery00.sql
+++ b/sql/backends/monet5/Tests/cquery00.sql
@@ -1,21 +1,21 @@
 -- test ordinary procedure call to update a stream
-create stream table testing (a int);
-insert into testing values(123);
+create stream table cqinput00 (a int);
+insert into cqinput00 values(123);
 
-create table results (a int);
+create table cqresults00 (a int);
 
-create procedure myproc()
+create procedure cquery00()
 begin 
-       insert into results select a from sys.testing; 
+       insert into cqresults00 select a from sys.cqinput00;
 END;
 
 -- a continuous procedure can be called like any other procedure
-call myproc();
+call cquery00();
 
-select * from results;
+select * from cqresults00;
 
---select * from functions where name = 'myproc';
+--select * from functions where name = 'cquery00';
 
-drop procedure myproc;
-drop table results;
-drop table testing;
+drop procedure cquery00;
+drop table cqresults00;
+drop table cqinput00;
diff --git a/sql/backends/monet5/Tests/cquery00.stable.out 
b/sql/backends/monet5/Tests/cquery00.stable.out
--- a/sql/backends/monet5/Tests/cquery00.stable.out
+++ b/sql/backends/monet5/Tests/cquery00.stable.out
@@ -33,8 +33,8 @@ Ready.
 #begin 
 #      insert into results select a from sys.testing; 
 #END;
-#select * from results;
-% sys.results # table_name
+#select * from cqresults00;
+% sys.cqresults00 # table_name
 % a # name
 % int # type
 % 3 # length
diff --git a/sql/backends/monet5/Tests/cquery13.sql 
b/sql/backends/monet5/Tests/cquery13.sql
--- a/sql/backends/monet5/Tests/cquery13.sql
+++ b/sql/backends/monet5/Tests/cquery13.sql
@@ -12,9 +12,9 @@ begin
 end;
 
 start continuous sys.cq_query13a() with heartbeat 3000;
-start continuous sys.cq_query13b() with cycles 1;
+start continuous sys.cq_query13b() with heartbeat 1000 cycles 1;
 
-call cquery.wait(1500);
+call cquery.wait(2500);
 
 drop procedure sys.cq_query13a;
 drop procedure sys.cq_query13b;
diff --git a/sql/server/rel_psm.c b/sql/server/rel_psm.c
--- a/sql/server/rel_psm.c
+++ b/sql/server/rel_psm.c
@@ -596,6 +596,87 @@ has_return( list *l )
        return 0;
 }
 
+static sql_exp*
+start_continuous_query(mvc *sql, symbol *s) {
+       dlist *l = s->data.lval;
+       AtomNode* an = NULL;
+       lng start_at = 0;
+       str msg = NULL;
+
+       an = (AtomNode*) l->h->next->next->next->next->data.sym;
+       if(an && (msg = convert_atom_into_unix_timestamp(an->a, &start_at)) != 
NULL){
+               return sql_error(sql, 01, "%s", msg);
+       }
+
+       sql->continuous |= l->h->data.i_val; /* start query */
+       sql->heartbeats = l->h->next->next->next->data.l_val;
+       sql->startat = start_at;
+       sql->cycles = l->h->next->next->next->next->next->data.i_val;
+       sql->cq_alias = l->h->next->next->next->next->next->next->data.sval;
+       return rel_psm_call(sql, l->h->next->next->data.sym, 
l->h->next->data.i_val);
+}
+
+static sql_rel *
+rel_single_continuous_query(mvc *sql, dnode *w) {
+       sql_rel *rel;
+       list *exps;
+       int action = 0;
+       AtomNode* an;
+       lng start_at_parsed = 0;
+       str msg = NULL;
+
+       action |= w->data.i_val; /* pause, resume or stop query? */
+       action |= w->next->data.i_val; /* procedure or function? */
+
+       if(action & mod_resume_continuous) {
+               an = (AtomNode*) w->next->next->next->next->data.sym;
+               if(an){
+                       if((msg = convert_atom_into_unix_timestamp(an->a, 
&start_at_parsed)) != NULL)
+                               return sql_error(sql, 02, "%s", msg);
+               }
+       }
+
+       rel = rel_create(sql->sa);
+       exps = new_exp_list(sql->sa);
+
+       append(exps, exp_atom_clob(sql->sa, w->next->next->data.sval)); //alias
+       append(exps, exp_atom_int(sql->sa, action));
+       if(action & mod_resume_continuous) {
+               append(exps, exp_atom_lng(sql->sa, 
w->next->next->next->data.l_val)); //heartbeats
+               append(exps, exp_atom_lng(sql->sa, start_at_parsed)); //start 
at value
+               append(exps, exp_atom_int(sql->sa, 
w->next->next->next->next->next->data.i_val)); //cycles
+       } else {
+               append(exps, exp_atom_lng(sql->sa, 0));
+               append(exps, exp_atom_lng(sql->sa, 0));
+               append(exps, exp_atom_int(sql->sa, 0));
+       }
+
+       rel->l = NULL;
+       rel->r = NULL;
+       rel->op = op_ddl;
+       rel->flag = DDL_CHANGE_SINGLE_CP;
+       rel->exps = exps;
+       rel->card = 0;
+       rel->nrcols = 0;
+       return rel;
+}
+
+static sql_rel *
+rel_all_continuous_queries(mvc *sql, int action) {
+       sql_rel *rel = rel_create(sql->sa);
+       list *exps = new_exp_list(sql->sa);
+       append(exps, exp_atom_int(sql->sa, action));
+
+       rel->l = NULL;
+       rel->r = NULL;
+       rel->op = op_ddl;
+       rel->flag = DDL_CHANGE_ALL_CP;
+       rel->exps = exps;
+       rel->card = 0;
+       rel->nrcols = 0;
+       return rel;
+}
+
 static list *
 sequential_block (mvc *sql, sql_subtype *restype, list *restypelist, dlist 
*blk, char *opt_label, int is_func) 
 {
@@ -635,27 +716,17 @@ sequential_block (mvc *sql, sql_subtype 
                        reslist = rel_psm_case(sql, restype, restypelist, 
s->data.lval->h, is_func);
                        break;
                case SQL_CALL:
-                       sql->continuous = 0;
                        res = rel_psm_call(sql, s->data.sym, 0);
                        break;
-               case SQL_START_CONTINUOUS_QUERY: {
-                       dlist *l = s->data.lval;
-                       AtomNode* an = NULL;
-                       lng start_at = 0;
-                       str msg = NULL;
-
-                       an = (AtomNode*) l->h->next->next->next->next->data.sym;
-                       if(an && (msg = convert_atom_into_unix_timestamp(an->a, 
&start_at)) != NULL){
-                               return sql_error(sql, 01, "%s", msg);
-                       }
-
-                       sql->continuous |= l->h->data.i_val; /* start query */
-                       sql->heartbeats = l->h->next->next->next->data.l_val;
-                       sql->startat = start_at;
-                       sql->cycles = 
l->h->next->next->next->next->next->data.i_val;
-                       sql->cq_alias = 
l->h->next->next->next->next->next->next->data.sval;
-                       res = rel_psm_call(sql, l->h->next->next->data.sym, 
l->h->next->data.i_val);
-               } break;
+               case SQL_START_CONTINUOUS_QUERY:
+                       res = start_continuous_query(sql, s);
+                       break;
+               case SQL_CHANGE_CONTINUOUS_QUERY:
+                       res = exp_rel(sql, rel_single_continuous_query(sql, 
s->data.lval->h));
+                       break;
+               case SQL_ALL_CONTINUOUS_QUERIES:
+                       res = exp_rel(sql, rel_all_continuous_queries(sql, 
s->data.i_val));
+                       break;
                case SQL_RETURN:
                case SQL_YIELD:
                        /*If it is not a function it cannot have a return 
statement*/
@@ -1463,25 +1534,18 @@ rel_psm(mvc *sql, symbol *s)
                ret = rel_psm_stmt(sql->sa, rel_psm_call(sql, s->data.sym, 0));
                sql->type = Q_UPDATE;
                break;
-       case SQL_START_CONTINUOUS_QUERY: {
-               dlist *l = s->data.lval;
-               AtomNode* an = NULL;
-               lng start_at = 0;
-               str msg = NULL;
-
-               an = (AtomNode*) l->h->next->next->next->next->data.sym;
-               if(an && (msg = convert_atom_into_unix_timestamp(an->a, 
&start_at)) != NULL){
-                       return sql_error(sql, 01, "%s", msg);
-               }
-
-               sql->continuous |= l->h->data.i_val; /* start query */
-               sql->heartbeats = l->h->next->next->next->data.l_val;
-               sql->startat = start_at;
-               sql->cycles = l->h->next->next->next->next->next->data.i_val;
-               sql->cq_alias = 
l->h->next->next->next->next->next->next->data.sval;
-               ret = rel_psm_stmt(sql->sa, rel_psm_call(sql, 
l->h->next->next->data.sym, l->h->next->data.i_val));
+       case SQL_START_CONTINUOUS_QUERY:
+               ret = rel_psm_stmt(sql->sa, start_continuous_query(sql, s));
                sql->type = Q_UPDATE;
-       }       break;
+               break;
+       case SQL_CHANGE_CONTINUOUS_QUERY:
+               ret = rel_psm_stmt(sql->sa, exp_rel(sql, 
rel_single_continuous_query(sql, s->data.lval->h)));
+               sql->type = Q_UPDATE;
+               break;
+       case SQL_ALL_CONTINUOUS_QUERIES:
+               ret = rel_psm_stmt(sql->sa, exp_rel(sql, 
rel_all_continuous_queries(sql, s->data.i_val)));
+               sql->type = Q_UPDATE;
+               break;
        case SQL_CREATE_TABLE_LOADER:
        {
            dlist *l = s->data.lval;
diff --git a/sql/server/rel_schema.c b/sql/server/rel_schema.c
--- a/sql/server/rel_schema.c
+++ b/sql/server/rel_schema.c
@@ -17,7 +17,6 @@
 #include "rel_psm.h"
 #include "sql_parser.h"
 #include "sql_privileges.h"
-#include "sql_timestamps.h"
 
 #define qname_index(qname) qname_table(qname)
 #define qname_func(qname) qname_table(qname)
@@ -1180,67 +1179,6 @@ rel_drop_type(mvc *sql, dlist *qname, in
 }
 
 static sql_rel *
-rel_single_continuous_query(mvc *sql, dlist *l) {
-       sql_rel *rel;
-       list *exps;
-       int action = 0;
-       AtomNode* an;
-       lng start_at_parsed = 0;
-       str msg = NULL;
-
-       action |= l->h->data.i_val; /* pause, resume or stop query? */
-       action |= l->h->next->data.i_val; /* procedure or function? */
-
-       if(action & mod_resume_continuous) {
-               an = (AtomNode*) l->h->next->next->next->next->data.sym;
-               if(an){
-                       if((msg = convert_atom_into_unix_timestamp(an->a, 
&start_at_parsed)) != NULL)
-                               return sql_error(sql, 02, "%s", msg);
-               }
-       }
-
-       rel = rel_create(sql->sa);
-       exps = new_exp_list(sql->sa);
-
-       append(exps, exp_atom_clob(sql->sa, l->h->next->next->data.sval)); 
//alias
-       append(exps, exp_atom_int(sql->sa, action));
-       if(action & mod_resume_continuous) {
-               append(exps, exp_atom_lng(sql->sa, 
l->h->next->next->next->data.l_val)); //heartbeats
-               append(exps, exp_atom_lng(sql->sa, start_at_parsed)); //start 
at value
-               append(exps, exp_atom_int(sql->sa, 
l->h->next->next->next->next->next->data.i_val)); //cycles
-       } else {
-               append(exps, exp_atom_lng(sql->sa, 0));
-               append(exps, exp_atom_lng(sql->sa, 0));
-               append(exps, exp_atom_int(sql->sa, 0));
-       }
-
-       rel->l = NULL;
-       rel->r = NULL;
-       rel->op = op_ddl;
-       rel->flag = DDL_CHANGE_SINGLE_CP;
-       rel->exps = exps;
-       rel->card = 0;
-       rel->nrcols = 0;
-       return rel;
-}
-
-static sql_rel *
-rel_all_continuous_queries(mvc *sql, int action) {
-       sql_rel *rel = rel_create(sql->sa);
-       list *exps = new_exp_list(sql->sa);
-       append(exps, exp_atom_int(sql->sa, action));
-
-       rel->l = NULL;
-       rel->r = NULL;
-       rel->op = op_ddl;
-       rel->flag = DDL_CHANGE_ALL_CP;
-       rel->exps = exps;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to