Changeset: bc4b64ac7303 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=bc4b64ac7303
Modified Files:
        sql/backends/monet5/50_cquery.mal
        sql/backends/monet5/rel_bin.c
        sql/backends/monet5/sql_cat.c
        sql/backends/monet5/sql_cquery.c
        sql/backends/monet5/sql_cquery.h
        sql/include/sql_catalog.h
        sql/server/rel_psm.c
        sql/server/rel_semantic.c
        sql/server/sql_parser.h
        sql/server/sql_parser.y
Branch: timetrails
Log Message:

Cleaned the SQL catalog for registering continuous queries. Only retrieving the 
correct continuous procedure based on the number of arguments is missing


diffs (truncated from 998 to 300 lines):

diff --git a/sql/backends/monet5/50_cquery.mal 
b/sql/backends/monet5/50_cquery.mal
--- a/sql/backends/monet5/50_cquery.mal
+++ b/sql/backends/monet5/50_cquery.mal
@@ -82,3 +82,8 @@ comment "Debug a single continuous query
 command dump()
 address CQdump
 comment "Show the status of the query scheduler";
+
+# initializer code
+command prelude() :void
+address CQprelude;
+cquery.prelude();
diff --git a/sql/backends/monet5/rel_bin.c b/sql/backends/monet5/rel_bin.c
--- a/sql/backends/monet5/rel_bin.c
+++ b/sql/backends/monet5/rel_bin.c
@@ -17,7 +17,6 @@
 #include "rel_updates.h"
 #include "rel_optimizer.h"
 #include "sql_env.h"
-#include "sql_cquery.h"
 
 #define OUTER_ZERO 64
 
@@ -169,7 +168,7 @@ static stmt *column(backend *be, stmt *v
        return val;
 }
 
-static stmt *RelColumn(backend *be, stmt *val )
+static stmt *Column(backend *be, stmt *val )
 {
        if (val->nrcols == 0)
                val = const_column(be, val);
@@ -455,21 +454,6 @@ exp_bin(backend *be, sql_exp *e, stmt *l
                        s = stmt_func(be, stmt_list(be, l), sa_strdup(sql->sa, 
f->func->base.name), f->func->rel, (f->func->type == F_UNION));
                else
                        s = stmt_Nop(be, stmt_list(be, l), e->f);
-
-               if (f->func->type == F_CONTINUOUS_PROCEDURE) {
-                       char *petrinetResponse;
-                       char *sname = f->func->s->base.name;
-                       char *fname = f->func->base.name;
-                       if (!CQlocate(sname, fname)) { //if the continuous 
procedure is not registered in the catalog then we register it
-                               petrinetResponse = 
CQregisterInternal(MCgetClient(sql->clientid), sname, fname);
-                       }
-                       if (!petrinetResponse) {
-                               petrinetResponse = CQresumeInternal(sname, 
fname);
-                       }
-                       if (petrinetResponse) {
-                               return sql_error(sql, 02, "M0M27!START 
CONTINUOUS PROCEDURE internal error: %s", petrinetResponse);
-                       }
-               }
        }       break;
        case e_aggr: {
                list *attr = e->l; 
@@ -1867,7 +1851,7 @@ rel2bin_join(backend *be, sql_rel *rel, 
 
                /* as append isn't save, we append to a new copy */
                if (rel->op == op_left || rel->op == op_full || rel->op == 
op_right)
-                       s = RelColumn(be, s);
+                       s = Column(be, s);
                if (rel->op == op_left || rel->op == op_full)
                        s = stmt_append(be, s, stmt_project(be, ld, c));
                if (rel->op == op_right || rel->op == op_full) 
@@ -1884,7 +1868,7 @@ rel2bin_join(backend *be, sql_rel *rel, 
 
                /* as append isn't save, we append to a new copy */
                if (rel->op == op_left || rel->op == op_full || rel->op == 
op_right)
-                       s = RelColumn(be, s);
+                       s = Column(be, s);
                if (rel->op == op_left || rel->op == op_full) 
                        s = stmt_append(be, s, stmt_const(be, ld, 
(c->flag&OUTER_ZERO)?stmt_atom_lng(be, 0):stmt_atom(be, atom_general(sql->sa, 
tail_type(c), NULL))));
                if (rel->op == op_right || rel->op == op_full) 
@@ -2135,7 +2119,7 @@ rel2bin_union(backend *be, sql_rel *rel,
                const char *nme = column_name(sql->sa, c1);
                stmt *s;
 
-               s = stmt_append(be, RelColumn(be, c1), c2);
+               s = stmt_append(be, Column(be, c1), c2);
                s = stmt_alias(be, s, rnme, nme);
                list_append(l, s);
        }
diff --git a/sql/backends/monet5/sql_cat.c b/sql/backends/monet5/sql_cat.c
--- a/sql/backends/monet5/sql_cat.c
+++ b/sql/backends/monet5/sql_cat.c
@@ -485,8 +485,8 @@ drop_func(mvc *sql, char *sname, char *n
                        if (!action && mvc_check_dependency(sql, func->base.id, 
!IS_PROC(func) ? FUNC_DEPENDENCY : PROC_DEPENDENCY, NULL))
                                return sql_message("DROP %s%s: there are 
database objects dependent on %s%s %s;", KF, F, kf, f, func->base.name);
                        //if it is a continuous procedure we must remove it 
first from the Petrinet
-                       if(type == F_CONTINUOUS_PROCEDURE && CQlocate(sname, 
func->base.name)) {
-                               err = CQderegisterInternal(sname, 
func->base.name);
+                       if(type == F_CONTINUOUS_PROCEDURE) {
+                               err = CQderegisterInternal(sname, 
func->base.name, 1);
                                if(err) {
                                        return sql_message("DROP %s%s: internal 
error on %s%s %s: %s", KF, F, kf, f, func->base.name, err);
                                }
@@ -510,8 +510,8 @@ drop_func(mvc *sql, char *sname, char *n
                                return sql_message("DROP %s%s: there are 
database objects dependent on %s%s %s;", KF, F, kf, f, func->base.name);
                        }
                        //if it is a continuous procedure we must remove it 
first from the Petrinet
-                       if(type == F_CONTINUOUS_PROCEDURE && CQlocate(sname, 
func->base.name)) {
-                               err = CQderegisterInternal(sname, 
func->base.name);
+                       if(type == F_CONTINUOUS_PROCEDURE) {
+                               err = CQderegisterInternal(sname, 
func->base.name, 1);
                                if(err) {
                                        return sql_message("DROP %s%s: internal 
error on %s%s %s: %s", KF, F, kf, f, func->base.name, err);
                                }
@@ -727,9 +727,15 @@ continuous_procedure(mvc *sql, char *sna
 {
        sql_schema *s = NULL;
        char *F = NULL;
-       str petrinetResponse = MAL_SUCCEED;
+       str petrinetResponse = NULL;
 
        switch (action) {
+               case START_CONTINUOUS_PROCEDURE:
+                       F = "START CONTINUOUS PROCEDURE";
+                       break;
+               case RESTART_CONTINUOUS_PROCEDURE:
+                       F = "RESTART CONTINUOUS PROCEDURE";
+                       break;
                case INTERRUPT_CONTINUOUS_PROCEDURE:
                        F = "INTERRUPT CONTINUOUS PROCEDURE";
                        break;
@@ -749,10 +755,18 @@ continuous_procedure(mvc *sql, char *sna
                node *n = find_sql_func_node(s, fid);
                if (n) {
                        sql_func *func = n->data;
-                       if (!mvc_schema_privs(sql, s)) {
+                       if(func->type != F_CONTINUOUS_PROCEDURE) {
+                               return sql_message("3F000!%s: %s is not a 
continuous procedure", F, s->base.name);
+                       } else if (!mvc_schema_privs(sql, s)) {
                                return sql_message("3F000!%s: access denied for 
%s to schema ;'%s'", F, stack_get_string(sql, "current_user"), s->base.name);
                        }
                        switch (action) {
+                               case START_CONTINUOUS_PROCEDURE:
+                                       petrinetResponse = 
CQregisterInternal(MCgetClient(sql->clientid), sname, cpname, 
REGISTER_AND_START_CQUERY);
+                                       break;
+                               case RESTART_CONTINUOUS_PROCEDURE:
+                                       petrinetResponse = 
CQregisterInternal(MCgetClient(sql->clientid), sname, cpname, RESTART_CQUERY);
+                                       break;
                                case INTERRUPT_CONTINUOUS_PROCEDURE:
                                        petrinetResponse = 
CQpauseInternal(sname, cpname);
                                        break;
@@ -760,7 +774,7 @@ continuous_procedure(mvc *sql, char *sna
                                        petrinetResponse = 
CQresumeInternal(sname, cpname);
                                        break;
                                case HALT_CONTINUOUS_PROCEDURE:
-                                       petrinetResponse = 
CQderegisterInternal(sname, cpname);
+                                       petrinetResponse = 
CQderegisterInternal(sname, cpname, 0);
                                        break;
                        }
                        if(petrinetResponse) {
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -50,7 +50,7 @@
 #include "mal_builder.h"
 #include "opt_prelude.h"
 
-static str statusname[7] = { "init", "register", "readytorun", "running", 
"waiting", "paused","stopping"};
+static str statusname[7] = { "init", "register", "readytorun", "running", 
"waiting", "paused", "stopping"};
 
 static str CQstartScheduler(void);
 static int CQinit;
@@ -72,7 +72,6 @@ MT_Lock ttrLock MT_LOCK_INITIALIZER("cqu
 static void
 CQfree(int idx)
 {
-       MT_lock_set(&ttrLock);
        if( pnet[idx].mb)
                freeMalBlk(pnet[idx].mb);
        if( pnet[idx].stk)
@@ -83,7 +82,6 @@ CQfree(int idx)
                pnet[idx] = pnet[idx+1];
        pnettop--;
        memset((void*) (pnet+idx), 0, sizeof(CQnode));
-       MT_lock_unset(&ttrLock);
 }
 
 /* We need a lock table for all stream tables considered
@@ -246,7 +244,7 @@ wrapup:
        throw(SQL,"cquery.status",MAL_MALLOC_FAIL);
 }
 
-int
+static int
 CQlocate(str modname, str fcnname)
 {
        int i;
@@ -258,7 +256,7 @@ CQlocate(str modname, str fcnname)
        return i;
 }
 
-/* capture and remember errors */
+/* capture and remember errors: WARNING no locks in this call yet! */
 str
 CQerror(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
@@ -278,7 +276,7 @@ CQerror(Client cntxt, MalBlkPtr mb, MalS
        return MAL_SUCCEED;
 }
 
-/* A debugging routine */
+/* A debugging routine: WARNING no locks in this call yet! */
 str
 CQshow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
@@ -301,18 +299,14 @@ CQshow(Client cntxt, MalBlkPtr mb, MalSt
 /* Make sure we do not re-use the same source more than once */
 /* Avoid any concurreny conflict */
 static str
-CQanalysis(Client cntxt, MalBlkPtr mb, int pn)
+CQanalysis(Client cntxt, MalBlkPtr mb, int idx)
 {
-       int i, j, idx, bskt;
+       int i, j, bskt;
        InstrPtr p;
        str msg= MAL_SUCCEED, sch, tbl;
        (void) cntxt;
 
        p = getInstrPtr(mb, 0);
-       idx = CQlocate(getModuleId(p), getFunctionId(p));
-       if( idx != pnettop)
-               throw(MAL,"cquery.analysis","Duplicate or unknown registration 
of %s.%s \n", getModuleId(p), getFunctionId(p));
-       MT_lock_unset(&ttrLock);
        for (i = 0; msg== MAL_SUCCEED && i < mb->stop; i++) {
                p = getInstrPtr(mb, i);
                if (getModuleId(p) == basketRef && (getFunctionId(p) == 
registerRef || getFunctionId(p) == bindRef)){
@@ -331,22 +325,22 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
                                        }
                                }
                        }
-                       
+
                        // we only need a single column for window size testing
-                       for( j=0; j< MAXSTREAMS && pnet[pn].baskets[j]; j++)
-                       if( strcmp(sch, baskets[pnet[pn].baskets[j]].schema) == 
0 &&
-                               strcmp(tbl, baskets[pnet[pn].baskets[j]].table) 
== 0 )
+                       for( j=0; j< MAXSTREAMS && pnet[idx].baskets[j]; j++)
+                       if( strcmp(sch, baskets[pnet[idx].baskets[j]].schema) 
== 0 &&
+                               strcmp(tbl, 
baskets[pnet[idx].baskets[j]].table) == 0 )
                                break;
                        if ( j == MAXSTREAMS){
                                msg = 
createException(MAL,"cquery.analysis","too many stream table columns\n");
                                continue;
                        }
 
-                       if ( pnet[pn].baskets[j] )
+                       if ( pnet[idx].baskets[j] )
                                continue;
 
-                       pnet[pn].baskets[j] = bskt;
-                       pnet[pn].inout[j] = STREAM_IN;
+                       pnet[idx].baskets[j] = bskt;
+                       pnet[idx].inout[j] = STREAM_IN;
                }
 
                // Pick up the window constraint from the query definition
@@ -375,11 +369,6 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
                        baskets[bskt].stride = stride;
                }
        }
-       MT_lock_set(&ttrLock);
-       if( msg != MAL_SUCCEED){
-               // restore the state for later re-use
-               CQfree(pn);
-       }
        return msg;
 }
 
@@ -400,7 +389,7 @@ IOTprocedureStmt(Client cntxt, MalBlkPtr
                return msg;
        s = mvc_bind_schema(m, schema);
        if (s == NULL)
-               throw(SQL, "cquery.register", "Schema missing");
+               return createException(SQL,"cquery.register","Schema missing");
        /*tr = m->session->tr;*/
        for (o = s->funcs.set->h; o; o = o->next) {
                f = o->data;
@@ -413,11 +402,11 @@ IOTprocedureStmt(Client cntxt, MalBlkPtr
                        return MAL_SUCCEED;
                }
        }
-       throw(SQL, "cquery.register", "SQL procedure missing");
+       return createException(SQL,"cquery.register","SQL procedure missing");
 }
 
 str
-CQregisterInternal(Client cntxt, str modnme, str fcnnme)
+CQregisterInternal(Client cntxt, str modnme, str fcnnme, int action)
 {
        int i;
        InstrPtr sig,q;
@@ -432,16 +421,19 @@ CQregisterInternal(Client cntxt, str mod
                s = findSymbolInModule(scope, putName(fcnnme));
 
        if (s == NULL)
-               throw(MAL, "cquery.register", "Could not find SQL procedure");
-
-       if (pnettop == MAXCQ)
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to