Changeset: 713ae47947ba for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=713ae47947ba
Modified Files:
clients/mapiclient/mhelp.c
sql/backends/monet5/Tests/cfunction03.sql
sql/backends/monet5/sql_cquery.c
sql/include/sql_catalog.h
sql/server/rel_schema.c
sql/server/sql_parser.y
sql/storage/sql_storage.h
sql/storage/store.c
Branch: trails
Log Message:
I just can't find the bug. The CQ scheduler client does not see the temporary
stream table somehow :(
diffs (truncated from 430 to 300 lines):
diff --git a/clients/mapiclient/mhelp.c b/clients/mapiclient/mhelp.c
--- a/clients/mapiclient/mhelp.c
+++ b/clients/mapiclient/mhelp.c
@@ -198,7 +198,7 @@ SQLhelp sqlhelp[] = {
"",
"CREATE TABLE [ IF NOT EXISTS ] qname table_source [STORAGE ident
string]\n"
"CREATE TABLE [ IF NOT EXISTS ] qname FROM LOADER function_ref\n"
- "CREATE [ LOCAL | GLOBAL ] TEMP[ORARY] TABLE [ IF NOT EXISTS ] qname
table_source [on_commit]",
+ "CREATE TEMP[ORARY] TABLE [ IF NOT EXISTS ] qname table_source
[on_commit]",
"table_source,on_commit,function_ref",
NULL},
{"CREATE TRIGGER",
diff --git a/sql/backends/monet5/Tests/cfunction03.sql
b/sql/backends/monet5/Tests/cfunction03.sql
--- a/sql/backends/monet5/Tests/cfunction03.sql
+++ b/sql/backends/monet5/Tests/cfunction03.sql
@@ -3,16 +3,16 @@ create table results3 (aa time);
create function cfunc3(input time) returns table (aa time) begin
while true do
+ insert into results3 values (input);
yield table (select input);
end while;
end;
-start continuous function cfunc3(time '15:00:00') with heartbeat 100 cycles 3;
+start continuous function cfunc3(time '15:00:00') with heartbeat 1000 cycles 3;
call cquery.wait(2000);
-select aa from time;
+select count(*) from results3;
drop function cfunc3;
-drop procedure cproc3;
drop table results3;
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -53,7 +53,7 @@
#include "mal_authorize.h"
#include "mtime.h"
-static str statusname[7] = {"init", "register", "readytorun", "running",
"waiting", "paused", "stopping"};
+static const str statusname[8] = {"init", "paused", "running", "pausing",
"error", "stopping", "stopping", "stopping"};
static str CQstartScheduler(void);
static int pnstatus = CQINIT;
@@ -70,7 +70,6 @@ static CQnode *pnet = 0;
static int pnetLimit = 0, pnettop = 0;
#define CQ_SCHEDULER_CLIENTID 0
-#define CQ_SQL_QUERY_SIZE 1024
#define SET_HEARTBEATS(X) (X != HEARTBEAT_NIL) ? X : HEARTBEAT_NIL /* minimal
1 ms */
@@ -112,31 +111,18 @@ static void
CQfree(Client cntxt, int idx)
{
int i;
- InstrPtr p;
//clean the baskets if so
cleanBaskets(idx);
if(cntxt && pnet[idx].func->res) {
- //change IDs
- oid prevID = cntxt->user;
- str prevUserName = cntxt->username;
- cntxt->user = CQ_SCHEDULER_CLIENTID;
- if(AUTHgetUsername(&cntxt->username, cntxt) == MAL_SUCCEED) {
- backend* be = (backend*) cntxt->sqlcontext;
- mvc *m = be->mvc;
- sql_schema *s = mvc_bind_schema(m, "tmp");
- sql_table *t = mvc_bind_table(m, s, pnet[idx].alias);
- mvc_drop_table(m, s, t, 0);
- GDKfree(cntxt->username);
- }
- cntxt->user = prevID;
- cntxt->username = prevUserName;
+ backend* be = (backend*) cntxt->sqlcontext;
+ mvc *m = be->mvc;
+ sql_schema *s = mvc_bind_schema(m, "tmp");
+ sql_table *t = mvc_bind_table(m, s, pnet[idx].alias);
+ mvc_drop_table(m, s, t, 0);
}
- if( pnet[idx].mb) {
- p = getInstrPtr(pnet[idx].mb, 0);
- GDKfree(p->fcnname); //Free the CQ id
+ if( pnet[idx].mb)
freeMalBlk(pnet[idx].mb);
- }
if( pnet[idx].stk)
freeStack(pnet[idx].stk);
if(pnet[idx].error)
@@ -480,7 +466,12 @@ CQregister(Client cntxt, str sname, str
}
for (i = 0; i < argc; i++) { //prepare the arguments for the backend
creation
atom *a = args[i];
- list_append(l, stmt_varnr(be, i, &a->tpe));
+ stmt *r = stmt_varnr(be, i, &a->tpe);
+ if (!r) {
+ list_destroy(l);
+ CQ_MALLOC_FAIL(finish)
+ }
+ list_append(l, r);
}
if (backend_create_subfunc(be, f, l) < 0) { //create the backend
function
msg = createException(SQL,"cquery.register",SQLSTATE(3F000)
"Failed to generate backend function\n");
@@ -503,45 +494,28 @@ CQregister(Client cntxt, str sname, str
found = f->func;
if(found->res) { //for functions we have to store the results in an
output result table
- oid prevID;
- str prevUserName;
- //change IDs
- prevID = cntxt->user;
- prevUserName = cntxt->username;
- cntxt->user = CQ_SCHEDULER_CLIENTID;
- if((msg = AUTHgetUsername(&cntxt->username, cntxt)) ==
MAL_SUCCEED) {
- if((tmp_schema = mvc_bind_schema(m, "tmp")) == NULL) {
- msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to bind tmp
schema\n");
- goto revertids;
- }
- if(mvc_bind_table(m, tmp_schema, ralias)) {
- msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Table tmp.%s already
exists\n", ralias);
- goto revertids;
- }
- if((t = mvc_create_stream_table(m, tmp_schema, ralias,
tt_stream_temp, 0, SQL_DECLARED_TABLE,
-
CA_PRESERVE, -1, DEFAULT_TABLE_WINDOW, DEFAULT_TABLE_STRIDE)) == NULL) {
- msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed create internal
stream table\n");
- goto revertids;
- }
- for (argn = found->res->h; argn; argn = argn->next) {
- sql_arg* arg = (sql_arg *) argn->data;
- if(!mvc_create_column(m, t, arg->name,
&arg->type)) {
- msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to create
internal stream table\n");
- goto revertids;
- }
- }
- msg = create_table_or_view(m, "tmp", ralias, t,
SQL_LOCAL_TEMP_STREAM);
- //msg = sql_grant_table_privs(m, "public", PRIV_SELECT,
"tmp", ralias, NULL, 0, USER_MONETDB);
+ if((tmp_schema = mvc_bind_schema(m, "tmp")) == NULL) {
+ msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to bind tmp
schema\n");
+ FREE_CQ_MB(finish)
}
- revertids:
- //set the IDs back
- if(cntxt->username)
- GDKfree(cntxt->username);
- cntxt->user = prevID;
- cntxt->username = prevUserName;
- if(msg) {
+ if(mvc_bind_table(m, tmp_schema, ralias)) {
+ msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Table tmp.%s already
exists\n", ralias);
FREE_CQ_MB(finish)
}
+ if((t = mvc_create_stream_table(m, tmp_schema, ralias,
tt_stream_temp, 0, SQL_DECLARED_TABLE,
+
CA_PRESERVE, -1, DEFAULT_TABLE_WINDOW, DEFAULT_TABLE_STRIDE)) == NULL) {
+ msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed create internal
stream table\n");
+ FREE_CQ_MB(finish)
+ }
+ for (argn = found->res->h; argn; argn = argn->next) {
+ sql_arg* arg = (sql_arg *) argn->data;
+ if(!mvc_create_column(m, t, arg->name, &arg->type)) {
+ msg =
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to create
internal stream table\n");
+ FREE_CQ_MB(finish)
+ }
+ }
+ msg = create_table_or_view(m, "tmp", ralias, t,
SQL_TEMP_STREAM);
+ //msg = sql_grant_table_privs(m, "public", PRIV_SELECT, "tmp",
ralias, NULL, 0, USER_MONETDB);
}
if((mb = newMalBlk(8)) == NULL) { //create MalBlk and initialize it
@@ -1197,7 +1171,7 @@ CQdump(void *ret)
{
int i, k;
- MT_lock_set(&ttrLock);
+ MT_lock_set(&ttrLock); //TODO IO inside a lock looks bad :(
fprintf(stderr, "#scheduler status %s\n", statusname[pnstatus]);
for (i = 0; i < pnettop; i++) {
fprintf(stderr, "#[%d]\t%s.%s %s ", i,
pnet[i].func->s->base.name, pnet[i].func->base.name,
statusname[pnet[i].status]);
@@ -1260,14 +1234,14 @@ CQexecute( Client cntxt, int idx)
node->status = CQWAIT;
#ifdef DEBUG_CQUERY
- fprintf(stderr, "#cquery.execute %s finished %s\n",
node->alias, (msg?msg:""));
+ fprintf(stderr, "#cquery.execute %s finished %s\n", node->alias,
(msg?msg:""));
#endif
}
static void
CQscheduler(void *dummy)
{
- int i, j, k = -1, pntasks, delay = cycleDelay, start_trans;
+ int i, j, k = -1, pntasks, delay = cycleDelay, start_trans = 0;
Client c = (Client) dummy;
mvc* m;
str msg = MAL_SUCCEED;
@@ -1373,49 +1347,60 @@ CQscheduler(void *dummy)
/* Execute each enabled transformation */
/* Tricky part is here a single stream used by multiple
transitions */
- for (i = 0; i < pnettop; i++)
- if( pnet[i].enabled){
- delay = cycleDelay;
+ for (i = 0; i < pnettop; i++) {
+ if( pnet[i].enabled){
+ delay = cycleDelay;
#ifdef DEBUG_CQUERY
- fprintf(stderr, "#Run transition %s cycle=%d\n",
pnet[i].alias, pnet[i].cycles);
+ fprintf(stderr, "#Run transition %s
cycle=%d\n", pnet[i].alias, pnet[i].cycles);
#endif
- t = GDKusec();
- pnet[i].status = CQRUNNING;
- if( !GDKexiting())
- CQexecute(c, i);
- if( pnet[i].cycles != CYCLES_NIL && pnet[i].cycles > 0)
{
- pnet[i].cycles--;
- if(pnet[i].cycles == 0) //if it was the last
cycle of the CQ, remove it
- pnet[i].status = CQDELETE;
- }
- if(pnet[i].status != CQDELETE) {
- pnet[i].run = now;
/* last executed */
- pnet[i].time = GDKusec() - t; /* keep around
in microseconds */
- (void) MTIMEcurrent_timestamp(&pnet[i].seen);
- pnet[i].enabled = 0;
- CQentry(i);
+ t = GDKusec();
+ pnet[i].status = CQRUNNING;
+ if( !GDKexiting())
+ CQexecute(c, i);
+ if( pnet[i].cycles != CYCLES_NIL &&
pnet[i].cycles > 0) {
+ pnet[i].cycles--;
+ if(pnet[i].cycles == 0) //if it was the
last cycle of the CQ, remove it
+ pnet[i].status = CQDELETE;
+ }
+ if(pnet[i].status != CQDELETE) {
+ pnet[i].run = now;
/* last executed */
+ pnet[i].time = GDKusec() - t; /* keep
around in microseconds */
+ (void)
MTIMEcurrent_timestamp(&pnet[i].seen);
+ pnet[i].enabled = 0;
+ CQentry(i);
+ }
}
}
- start_trans = 0;
for (i = 0; i < pnettop ; i++) { //if there is a continuous
function to delete, we must start a transaction
- if( pnet[i].status == CQDELETE && pnet[i].func->res)
- start_trans = 1;
- }
- if(start_trans) {
- SQLtrans(m);
- if (!m->sa)
- m->sa = sa_create();
- if (!m->sa) {
- fprintf(stderr, "CQscheduler sql allocation
failure\n");
- goto terminate;
+ if (pnet[i].status == CQDELETE) {
+ if (pnet[i].func->res) {
+ *m->errstr = 0;
+ SQLtrans(m);
+ if (*m->errstr) {
+ fprintf(stderr, "CQscheduler
internal error: %s\n", m->errstr);
+ *m->errstr = 0;
+ goto terminate;
+ }
+ if (!m->sa) {
+ m->sa = sa_create();
+ if (!m->sa) {
+ fprintf(stderr,
"CQscheduler internal error: allocation failure\n");
+ goto terminate;
+ }
+ }
+ start_trans = 1;
+ }
+ CQfree(c, i);
+ if (start_trans) {
+ start_trans = 0;
+ if((msg = SQLautocommit(m)) !=
MAL_SUCCEED) {
+ fprintf(stderr, "CQscheduler
internal error: %s\n", msg);
+ GDKfree(msg);
+ goto terminate;
+ }
+ }
}
}
- for(i = pnettop - 1; i >= 0; i--) { //more defensive way to
stop continuous queries from the scheduler itself
- if( pnet[i].status == CQDELETE)
- CQfree(c, i);
- }
- if(start_trans)
- SQLautocommit(m);
if( pnettop == 0)
pnstatus = CQSTOP;
/* we should actually delay until the next heartbeat or
insertion into the streams */
@@ -1523,7 +1508,7 @@ CQreset(void)
}
pnet = NULL;
MT_lock_destroy(&ttrLock);
- (void) BSKTshutdown(); //this must be last!!
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list