Changeset: bc4b64ac7303 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=bc4b64ac7303
Modified Files:
sql/backends/monet5/50_cquery.mal
sql/backends/monet5/rel_bin.c
sql/backends/monet5/sql_cat.c
sql/backends/monet5/sql_cquery.c
sql/backends/monet5/sql_cquery.h
sql/include/sql_catalog.h
sql/server/rel_psm.c
sql/server/rel_semantic.c
sql/server/sql_parser.h
sql/server/sql_parser.y
Branch: timetrails
Log Message:
Cleaned the SQL catalog for registering continuous queries. Only retrieving the
correct continuous procedure based on the number of arguments is missing
diffs (truncated from 998 to 300 lines):
diff --git a/sql/backends/monet5/50_cquery.mal
b/sql/backends/monet5/50_cquery.mal
--- a/sql/backends/monet5/50_cquery.mal
+++ b/sql/backends/monet5/50_cquery.mal
@@ -82,3 +82,8 @@ comment "Debug a single continuous query
command dump()
address CQdump
comment "Show the status of the query scheduler";
+
+# initializer code
+command prelude() :void
+address CQprelude;
+cquery.prelude();
diff --git a/sql/backends/monet5/rel_bin.c b/sql/backends/monet5/rel_bin.c
--- a/sql/backends/monet5/rel_bin.c
+++ b/sql/backends/monet5/rel_bin.c
@@ -17,7 +17,6 @@
#include "rel_updates.h"
#include "rel_optimizer.h"
#include "sql_env.h"
-#include "sql_cquery.h"
#define OUTER_ZERO 64
@@ -169,7 +168,7 @@ static stmt *column(backend *be, stmt *v
return val;
}
-static stmt *RelColumn(backend *be, stmt *val )
+static stmt *Column(backend *be, stmt *val )
{
if (val->nrcols == 0)
val = const_column(be, val);
@@ -455,21 +454,6 @@ exp_bin(backend *be, sql_exp *e, stmt *l
s = stmt_func(be, stmt_list(be, l), sa_strdup(sql->sa,
f->func->base.name), f->func->rel, (f->func->type == F_UNION));
else
s = stmt_Nop(be, stmt_list(be, l), e->f);
-
- if (f->func->type == F_CONTINUOUS_PROCEDURE) {
- char *petrinetResponse;
- char *sname = f->func->s->base.name;
- char *fname = f->func->base.name;
- if (!CQlocate(sname, fname)) { //if the continuous
procedure is not registered in the catalog then we register it
- petrinetResponse =
CQregisterInternal(MCgetClient(sql->clientid), sname, fname);
- }
- if (!petrinetResponse) {
- petrinetResponse = CQresumeInternal(sname,
fname);
- }
- if (petrinetResponse) {
- return sql_error(sql, 02, "M0M27!START
CONTINUOUS PROCEDURE internal error: %s", petrinetResponse);
- }
- }
} break;
case e_aggr: {
list *attr = e->l;
@@ -1867,7 +1851,7 @@ rel2bin_join(backend *be, sql_rel *rel,
/* as append isn't save, we append to a new copy */
if (rel->op == op_left || rel->op == op_full || rel->op ==
op_right)
- s = RelColumn(be, s);
+ s = Column(be, s);
if (rel->op == op_left || rel->op == op_full)
s = stmt_append(be, s, stmt_project(be, ld, c));
if (rel->op == op_right || rel->op == op_full)
@@ -1884,7 +1868,7 @@ rel2bin_join(backend *be, sql_rel *rel,
/* as append isn't save, we append to a new copy */
if (rel->op == op_left || rel->op == op_full || rel->op ==
op_right)
- s = RelColumn(be, s);
+ s = Column(be, s);
if (rel->op == op_left || rel->op == op_full)
s = stmt_append(be, s, stmt_const(be, ld,
(c->flag&OUTER_ZERO)?stmt_atom_lng(be, 0):stmt_atom(be, atom_general(sql->sa,
tail_type(c), NULL))));
if (rel->op == op_right || rel->op == op_full)
@@ -2135,7 +2119,7 @@ rel2bin_union(backend *be, sql_rel *rel,
const char *nme = column_name(sql->sa, c1);
stmt *s;
- s = stmt_append(be, RelColumn(be, c1), c2);
+ s = stmt_append(be, Column(be, c1), c2);
s = stmt_alias(be, s, rnme, nme);
list_append(l, s);
}
diff --git a/sql/backends/monet5/sql_cat.c b/sql/backends/monet5/sql_cat.c
--- a/sql/backends/monet5/sql_cat.c
+++ b/sql/backends/monet5/sql_cat.c
@@ -485,8 +485,8 @@ drop_func(mvc *sql, char *sname, char *n
if (!action && mvc_check_dependency(sql, func->base.id,
!IS_PROC(func) ? FUNC_DEPENDENCY : PROC_DEPENDENCY, NULL))
return sql_message("DROP %s%s: there are
database objects dependent on %s%s %s;", KF, F, kf, f, func->base.name);
//if it is a continuous procedure we must remove it
first from the Petrinet
- if(type == F_CONTINUOUS_PROCEDURE && CQlocate(sname,
func->base.name)) {
- err = CQderegisterInternal(sname,
func->base.name);
+ if(type == F_CONTINUOUS_PROCEDURE) {
+ err = CQderegisterInternal(sname,
func->base.name, 1);
if(err) {
return sql_message("DROP %s%s: internal
error on %s%s %s: %s", KF, F, kf, f, func->base.name, err);
}
@@ -510,8 +510,8 @@ drop_func(mvc *sql, char *sname, char *n
return sql_message("DROP %s%s: there are
database objects dependent on %s%s %s;", KF, F, kf, f, func->base.name);
}
//if it is a continuous procedure we must remove it
first from the Petrinet
- if(type == F_CONTINUOUS_PROCEDURE && CQlocate(sname,
func->base.name)) {
- err = CQderegisterInternal(sname,
func->base.name);
+ if(type == F_CONTINUOUS_PROCEDURE) {
+ err = CQderegisterInternal(sname,
func->base.name, 1);
if(err) {
return sql_message("DROP %s%s: internal
error on %s%s %s: %s", KF, F, kf, f, func->base.name, err);
}
@@ -727,9 +727,15 @@ continuous_procedure(mvc *sql, char *sna
{
sql_schema *s = NULL;
char *F = NULL;
- str petrinetResponse = MAL_SUCCEED;
+ str petrinetResponse = NULL;
switch (action) {
+ case START_CONTINUOUS_PROCEDURE:
+ F = "START CONTINUOUS PROCEDURE";
+ break;
+ case RESTART_CONTINUOUS_PROCEDURE:
+ F = "RESTART CONTINUOUS PROCEDURE";
+ break;
case INTERRUPT_CONTINUOUS_PROCEDURE:
F = "INTERRUPT CONTINUOUS PROCEDURE";
break;
@@ -749,10 +755,18 @@ continuous_procedure(mvc *sql, char *sna
node *n = find_sql_func_node(s, fid);
if (n) {
sql_func *func = n->data;
- if (!mvc_schema_privs(sql, s)) {
+ if(func->type != F_CONTINUOUS_PROCEDURE) {
+ return sql_message("3F000!%s: %s is not a
continuous procedure", F, s->base.name);
+ } else if (!mvc_schema_privs(sql, s)) {
return sql_message("3F000!%s: access denied for
%s to schema ;'%s'", F, stack_get_string(sql, "current_user"), s->base.name);
}
switch (action) {
+ case START_CONTINUOUS_PROCEDURE:
+ petrinetResponse =
CQregisterInternal(MCgetClient(sql->clientid), sname, cpname,
REGISTER_AND_START_CQUERY);
+ break;
+ case RESTART_CONTINUOUS_PROCEDURE:
+ petrinetResponse =
CQregisterInternal(MCgetClient(sql->clientid), sname, cpname, RESTART_CQUERY);
+ break;
case INTERRUPT_CONTINUOUS_PROCEDURE:
petrinetResponse =
CQpauseInternal(sname, cpname);
break;
@@ -760,7 +774,7 @@ continuous_procedure(mvc *sql, char *sna
petrinetResponse =
CQresumeInternal(sname, cpname);
break;
case HALT_CONTINUOUS_PROCEDURE:
- petrinetResponse =
CQderegisterInternal(sname, cpname);
+ petrinetResponse =
CQderegisterInternal(sname, cpname, 0);
break;
}
if(petrinetResponse) {
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -50,7 +50,7 @@
#include "mal_builder.h"
#include "opt_prelude.h"
-static str statusname[7] = { "init", "register", "readytorun", "running",
"waiting", "paused","stopping"};
+static str statusname[7] = { "init", "register", "readytorun", "running",
"waiting", "paused", "stopping"};
static str CQstartScheduler(void);
static int CQinit;
@@ -72,7 +72,6 @@ MT_Lock ttrLock MT_LOCK_INITIALIZER("cqu
static void
CQfree(int idx)
{
- MT_lock_set(&ttrLock);
if( pnet[idx].mb)
freeMalBlk(pnet[idx].mb);
if( pnet[idx].stk)
@@ -83,7 +82,6 @@ CQfree(int idx)
pnet[idx] = pnet[idx+1];
pnettop--;
memset((void*) (pnet+idx), 0, sizeof(CQnode));
- MT_lock_unset(&ttrLock);
}
/* We need a lock table for all stream tables considered
@@ -246,7 +244,7 @@ wrapup:
throw(SQL,"cquery.status",MAL_MALLOC_FAIL);
}
-int
+static int
CQlocate(str modname, str fcnname)
{
int i;
@@ -258,7 +256,7 @@ CQlocate(str modname, str fcnname)
return i;
}
-/* capture and remember errors */
+/* capture and remember errors: WARNING no locks in this call yet! */
str
CQerror(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
@@ -278,7 +276,7 @@ CQerror(Client cntxt, MalBlkPtr mb, MalS
return MAL_SUCCEED;
}
-/* A debugging routine */
+/* A debugging routine: WARNING no locks in this call yet! */
str
CQshow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
@@ -301,18 +299,14 @@ CQshow(Client cntxt, MalBlkPtr mb, MalSt
/* Make sure we do not re-use the same source more than once */
/* Avoid any concurreny conflict */
static str
-CQanalysis(Client cntxt, MalBlkPtr mb, int pn)
+CQanalysis(Client cntxt, MalBlkPtr mb, int idx)
{
- int i, j, idx, bskt;
+ int i, j, bskt;
InstrPtr p;
str msg= MAL_SUCCEED, sch, tbl;
(void) cntxt;
p = getInstrPtr(mb, 0);
- idx = CQlocate(getModuleId(p), getFunctionId(p));
- if( idx != pnettop)
- throw(MAL,"cquery.analysis","Duplicate or unknown registration
of %s.%s \n", getModuleId(p), getFunctionId(p));
- MT_lock_unset(&ttrLock);
for (i = 0; msg== MAL_SUCCEED && i < mb->stop; i++) {
p = getInstrPtr(mb, i);
if (getModuleId(p) == basketRef && (getFunctionId(p) ==
registerRef || getFunctionId(p) == bindRef)){
@@ -331,22 +325,22 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
}
}
}
-
+
// we only need a single column for window size testing
- for( j=0; j< MAXSTREAMS && pnet[pn].baskets[j]; j++)
- if( strcmp(sch, baskets[pnet[pn].baskets[j]].schema) ==
0 &&
- strcmp(tbl, baskets[pnet[pn].baskets[j]].table)
== 0 )
+ for( j=0; j< MAXSTREAMS && pnet[idx].baskets[j]; j++)
+ if( strcmp(sch, baskets[pnet[idx].baskets[j]].schema)
== 0 &&
+ strcmp(tbl,
baskets[pnet[idx].baskets[j]].table) == 0 )
break;
if ( j == MAXSTREAMS){
msg =
createException(MAL,"cquery.analysis","too many stream table columns\n");
continue;
}
- if ( pnet[pn].baskets[j] )
+ if ( pnet[idx].baskets[j] )
continue;
- pnet[pn].baskets[j] = bskt;
- pnet[pn].inout[j] = STREAM_IN;
+ pnet[idx].baskets[j] = bskt;
+ pnet[idx].inout[j] = STREAM_IN;
}
// Pick up the window constraint from the query definition
@@ -375,11 +369,6 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
baskets[bskt].stride = stride;
}
}
- MT_lock_set(&ttrLock);
- if( msg != MAL_SUCCEED){
- // restore the state for later re-use
- CQfree(pn);
- }
return msg;
}
@@ -400,7 +389,7 @@ IOTprocedureStmt(Client cntxt, MalBlkPtr
return msg;
s = mvc_bind_schema(m, schema);
if (s == NULL)
- throw(SQL, "cquery.register", "Schema missing");
+ return createException(SQL,"cquery.register","Schema missing");
/*tr = m->session->tr;*/
for (o = s->funcs.set->h; o; o = o->next) {
f = o->data;
@@ -413,11 +402,11 @@ IOTprocedureStmt(Client cntxt, MalBlkPtr
return MAL_SUCCEED;
}
}
- throw(SQL, "cquery.register", "SQL procedure missing");
+ return createException(SQL,"cquery.register","SQL procedure missing");
}
str
-CQregisterInternal(Client cntxt, str modnme, str fcnnme)
+CQregisterInternal(Client cntxt, str modnme, str fcnnme, int action)
{
int i;
InstrPtr sig,q;
@@ -432,16 +421,19 @@ CQregisterInternal(Client cntxt, str mod
s = findSymbolInModule(scope, putName(fcnnme));
if (s == NULL)
- throw(MAL, "cquery.register", "Could not find SQL procedure");
-
- if (pnettop == MAXCQ)
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list