Changeset: 713ae47947ba for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=713ae47947ba
Modified Files:
        clients/mapiclient/mhelp.c
        sql/backends/monet5/Tests/cfunction03.sql
        sql/backends/monet5/sql_cquery.c
        sql/include/sql_catalog.h
        sql/server/rel_schema.c
        sql/server/sql_parser.y
        sql/storage/sql_storage.h
        sql/storage/store.c
Branch: trails
Log Message:

I just can't find the bug. The CQ scheduler client does not see the temporary 
stream table somehow :(


diffs (truncated from 430 to 300 lines):

diff --git a/clients/mapiclient/mhelp.c b/clients/mapiclient/mhelp.c
--- a/clients/mapiclient/mhelp.c
+++ b/clients/mapiclient/mhelp.c
@@ -198,7 +198,7 @@ SQLhelp sqlhelp[] = {
         "",
         "CREATE TABLE [ IF NOT EXISTS ] qname table_source [STORAGE ident 
string]\n"
         "CREATE TABLE [ IF NOT EXISTS ] qname FROM LOADER function_ref\n"
-        "CREATE [ LOCAL | GLOBAL ] TEMP[ORARY] TABLE [ IF NOT EXISTS ] qname 
table_source [on_commit]",
+        "CREATE TEMP[ORARY] TABLE [ IF NOT EXISTS ] qname table_source 
[on_commit]",
         "table_source,on_commit,function_ref",
         NULL},
        {"CREATE TRIGGER",
diff --git a/sql/backends/monet5/Tests/cfunction03.sql 
b/sql/backends/monet5/Tests/cfunction03.sql
--- a/sql/backends/monet5/Tests/cfunction03.sql
+++ b/sql/backends/monet5/Tests/cfunction03.sql
@@ -3,16 +3,16 @@ create table results3 (aa time);
 
 create function cfunc3(input time) returns table (aa time) begin
     while true do
+        insert into results3 values (input);
         yield table (select input);
     end while;
 end;
 
-start continuous function cfunc3(time '15:00:00') with heartbeat 100 cycles 3;
+start continuous function cfunc3(time '15:00:00') with heartbeat 1000 cycles 3;
 
 call cquery.wait(2000);
 
-select aa from time;
+select count(*) from results3;
 
 drop function cfunc3;
-drop procedure cproc3;
 drop table results3;
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -53,7 +53,7 @@
 #include "mal_authorize.h"
 #include "mtime.h"
 
-static str statusname[7] = {"init", "register", "readytorun", "running", 
"waiting", "paused", "stopping"};
+static const str statusname[8] = {"init", "paused", "running", "pausing", 
"error", "stopping", "stopping", "stopping"};
 
 static str CQstartScheduler(void);
 static int pnstatus = CQINIT;
@@ -70,7 +70,6 @@ static CQnode *pnet = 0;
 static int pnetLimit = 0, pnettop = 0;
 
 #define CQ_SCHEDULER_CLIENTID     0
-#define CQ_SQL_QUERY_SIZE      1024
 
 #define SET_HEARTBEATS(X) (X != HEARTBEAT_NIL) ? X : HEARTBEAT_NIL /* minimal 
1 ms */
 
@@ -112,31 +111,18 @@ static void
 CQfree(Client cntxt, int idx)
 {
        int i;
-       InstrPtr p;
 
        //clean the baskets if so
        cleanBaskets(idx);
        if(cntxt && pnet[idx].func->res) {
-               //change IDs
-               oid prevID = cntxt->user;
-               str prevUserName = cntxt->username;
-               cntxt->user = CQ_SCHEDULER_CLIENTID;
-               if(AUTHgetUsername(&cntxt->username, cntxt) == MAL_SUCCEED) {
-                       backend* be = (backend*) cntxt->sqlcontext;
-                       mvc *m = be->mvc;
-                       sql_schema *s = mvc_bind_schema(m, "tmp");
-                       sql_table *t = mvc_bind_table(m, s, pnet[idx].alias);
-                       mvc_drop_table(m, s, t, 0);
-                       GDKfree(cntxt->username);
-               }
-               cntxt->user = prevID;
-               cntxt->username = prevUserName;
+               backend* be = (backend*) cntxt->sqlcontext;
+               mvc *m = be->mvc;
+               sql_schema *s = mvc_bind_schema(m, "tmp");
+               sql_table *t = mvc_bind_table(m, s, pnet[idx].alias);
+               mvc_drop_table(m, s, t, 0);
        }
-       if( pnet[idx].mb) {
-               p = getInstrPtr(pnet[idx].mb, 0);
-               GDKfree(p->fcnname); //Free the CQ id
+       if( pnet[idx].mb)
                freeMalBlk(pnet[idx].mb);
-       }
        if( pnet[idx].stk)
                freeStack(pnet[idx].stk);
        if(pnet[idx].error)
@@ -480,7 +466,12 @@ CQregister(Client cntxt, str sname, str 
        }
        for (i = 0; i < argc; i++) { //prepare the arguments for the backend 
creation
                atom *a = args[i];
-               list_append(l, stmt_varnr(be, i, &a->tpe));
+               stmt *r = stmt_varnr(be, i, &a->tpe);
+               if (!r) {
+                       list_destroy(l);
+                       CQ_MALLOC_FAIL(finish)
+               }
+               list_append(l, r);
        }
        if (backend_create_subfunc(be, f, l) < 0) { //create the backend 
function
                msg = createException(SQL,"cquery.register",SQLSTATE(3F000) 
"Failed to generate backend function\n");
@@ -503,45 +494,28 @@ CQregister(Client cntxt, str sname, str 
 
        found = f->func;
        if(found->res) { //for functions we have to store the results in an 
output result table
-               oid prevID;
-               str prevUserName;
-               //change IDs
-               prevID = cntxt->user;
-               prevUserName = cntxt->username;
-               cntxt->user = CQ_SCHEDULER_CLIENTID;
-               if((msg = AUTHgetUsername(&cntxt->username, cntxt)) == 
MAL_SUCCEED) {
-                       if((tmp_schema = mvc_bind_schema(m, "tmp")) == NULL) {
-                               msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to bind tmp 
schema\n");
-                               goto revertids;
-                       }
-                       if(mvc_bind_table(m, tmp_schema, ralias)) {
-                               msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Table tmp.%s already 
exists\n", ralias);
-                               goto revertids;
-                       }
-                       if((t = mvc_create_stream_table(m, tmp_schema, ralias, 
tt_stream_temp, 0, SQL_DECLARED_TABLE,
-                                                                               
        CA_PRESERVE, -1, DEFAULT_TABLE_WINDOW, DEFAULT_TABLE_STRIDE)) == NULL) {
-                               msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed create internal 
stream table\n");
-                               goto revertids;
-                       }
-                       for (argn = found->res->h; argn; argn = argn->next) {
-                               sql_arg* arg = (sql_arg *) argn->data;
-                               if(!mvc_create_column(m, t, arg->name, 
&arg->type)) {
-                                       msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to create 
internal stream table\n");
-                                       goto revertids;
-                               }
-                       }
-                       msg = create_table_or_view(m, "tmp", ralias, t, 
SQL_LOCAL_TEMP_STREAM);
-                       //msg = sql_grant_table_privs(m, "public", PRIV_SELECT, 
"tmp", ralias, NULL, 0, USER_MONETDB);
+               if((tmp_schema = mvc_bind_schema(m, "tmp")) == NULL) {
+                       msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to bind tmp 
schema\n");
+                       FREE_CQ_MB(finish)
                }
-       revertids:
-               //set the IDs back
-               if(cntxt->username)
-                       GDKfree(cntxt->username);
-               cntxt->user = prevID;
-               cntxt->username = prevUserName;
-               if(msg) {
+               if(mvc_bind_table(m, tmp_schema, ralias)) {
+                       msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Table tmp.%s already 
exists\n", ralias);
                        FREE_CQ_MB(finish)
                }
+               if((t = mvc_create_stream_table(m, tmp_schema, ralias, 
tt_stream_temp, 0, SQL_DECLARED_TABLE,
+                                                                               
CA_PRESERVE, -1, DEFAULT_TABLE_WINDOW, DEFAULT_TABLE_STRIDE)) == NULL) {
+                       msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed create internal 
stream table\n");
+                       FREE_CQ_MB(finish)
+               }
+               for (argn = found->res->h; argn; argn = argn->next) {
+                       sql_arg* arg = (sql_arg *) argn->data;
+                       if(!mvc_create_column(m, t, arg->name, &arg->type)) {
+                               msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed to create 
internal stream table\n");
+                               FREE_CQ_MB(finish)
+                       }
+               }
+               msg = create_table_or_view(m, "tmp", ralias, t, 
SQL_TEMP_STREAM);
+               //msg = sql_grant_table_privs(m, "public", PRIV_SELECT, "tmp", 
ralias, NULL, 0, USER_MONETDB);
        }
 
        if((mb = newMalBlk(8)) == NULL) { //create MalBlk and initialize it
@@ -1197,7 +1171,7 @@ CQdump(void *ret)
 {
        int i, k;
 
-       MT_lock_set(&ttrLock);
+       MT_lock_set(&ttrLock); //TODO IO inside a lock looks bad :(
        fprintf(stderr, "#scheduler status %s\n", statusname[pnstatus]);
        for (i = 0; i < pnettop; i++) {
                fprintf(stderr, "#[%d]\t%s.%s %s ", i, 
pnet[i].func->s->base.name, pnet[i].func->base.name, 
statusname[pnet[i].status]);
@@ -1260,14 +1234,14 @@ CQexecute( Client cntxt, int idx)
                node->status = CQWAIT;
 
 #ifdef DEBUG_CQUERY
-               fprintf(stderr, "#cquery.execute %s finished %s\n", 
node->alias, (msg?msg:""));
+       fprintf(stderr, "#cquery.execute %s finished %s\n", node->alias, 
(msg?msg:""));
 #endif
 }
 
 static void
 CQscheduler(void *dummy)
 {
-       int i, j, k = -1, pntasks, delay = cycleDelay, start_trans;
+       int i, j, k = -1, pntasks, delay = cycleDelay, start_trans = 0;
        Client c = (Client) dummy;
        mvc* m;
        str msg = MAL_SUCCEED;
@@ -1373,49 +1347,60 @@ CQscheduler(void *dummy)
 
                /* Execute each enabled transformation */
                /* Tricky part is here a single stream used by multiple 
transitions */
-               for (i = 0; i < pnettop; i++)
-               if( pnet[i].enabled){
-                       delay = cycleDelay;
+               for (i = 0; i < pnettop; i++) {
+                       if( pnet[i].enabled){
+                               delay = cycleDelay;
 #ifdef DEBUG_CQUERY
-                       fprintf(stderr, "#Run transition %s cycle=%d\n", 
pnet[i].alias, pnet[i].cycles);
+                               fprintf(stderr, "#Run transition %s 
cycle=%d\n", pnet[i].alias, pnet[i].cycles);
 #endif
-                       t = GDKusec();
-                       pnet[i].status = CQRUNNING;
-                       if( !GDKexiting())
-                               CQexecute(c, i);
-                       if( pnet[i].cycles != CYCLES_NIL && pnet[i].cycles > 0) 
{
-                               pnet[i].cycles--;
-                               if(pnet[i].cycles == 0) //if it was the last 
cycle of the CQ, remove it
-                                       pnet[i].status = CQDELETE;
-                       }
-                       if(pnet[i].status != CQDELETE) {
-                               pnet[i].run = now;                              
/* last executed */
-                               pnet[i].time = GDKusec() - t;   /* keep around 
in microseconds */
-                               (void) MTIMEcurrent_timestamp(&pnet[i].seen);
-                               pnet[i].enabled = 0;
-                               CQentry(i);
+                               t = GDKusec();
+                               pnet[i].status = CQRUNNING;
+                               if( !GDKexiting())
+                                       CQexecute(c, i);
+                               if( pnet[i].cycles != CYCLES_NIL && 
pnet[i].cycles > 0) {
+                                       pnet[i].cycles--;
+                                       if(pnet[i].cycles == 0) //if it was the 
last cycle of the CQ, remove it
+                                               pnet[i].status = CQDELETE;
+                               }
+                               if(pnet[i].status != CQDELETE) {
+                                       pnet[i].run = now;                      
        /* last executed */
+                                       pnet[i].time = GDKusec() - t;   /* keep 
around in microseconds */
+                                       (void) 
MTIMEcurrent_timestamp(&pnet[i].seen);
+                                       pnet[i].enabled = 0;
+                                       CQentry(i);
+                               }
                        }
                }
-               start_trans = 0;
                for (i = 0; i < pnettop ; i++) { //if there is a continuous 
function to delete, we must start a transaction
-                       if( pnet[i].status == CQDELETE && pnet[i].func->res)
-                               start_trans = 1;
-               }
-               if(start_trans) {
-                       SQLtrans(m);
-                       if (!m->sa)
-                               m->sa = sa_create();
-                       if (!m->sa) {
-                               fprintf(stderr, "CQscheduler sql allocation 
failure\n");
-                               goto terminate;
+                       if (pnet[i].status == CQDELETE) {
+                               if (pnet[i].func->res) {
+                                       *m->errstr = 0;
+                                       SQLtrans(m);
+                                       if (*m->errstr) {
+                                               fprintf(stderr, "CQscheduler 
internal error: %s\n", m->errstr);
+                                               *m->errstr = 0;
+                                               goto terminate;
+                                       }
+                                       if (!m->sa) {
+                                               m->sa = sa_create();
+                                               if (!m->sa) {
+                                                       fprintf(stderr, 
"CQscheduler internal error: allocation failure\n");
+                                                       goto terminate;
+                                               }
+                                       }
+                                       start_trans = 1;
+                               }
+                               CQfree(c, i);
+                               if (start_trans) {
+                                       start_trans = 0;
+                                       if((msg = SQLautocommit(m)) != 
MAL_SUCCEED) {
+                                               fprintf(stderr, "CQscheduler 
internal error: %s\n", msg);
+                                               GDKfree(msg);
+                                               goto terminate;
+                                       }
+                               }
                        }
                }
-               for(i = pnettop - 1; i >= 0; i--) { //more defensive way to 
stop continuous queries from the scheduler itself
-                       if( pnet[i].status == CQDELETE)
-                               CQfree(c, i);
-               }
-               if(start_trans)
-                       SQLautocommit(m);
                if( pnettop == 0)
                        pnstatus = CQSTOP;
                /* we should actually delay until the next heartbeat or 
insertion into the streams */
@@ -1523,7 +1508,7 @@ CQreset(void)
        }
        pnet = NULL;
        MT_lock_destroy(&ttrLock);
-       (void) BSKTshutdown(); //this must be last!!
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to