Changeset: 98591a60dd6f for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=98591a60dd6f
Added Files:
        sql/backends/monet5/Tests/cquery14.sql
Modified Files:
        sql/backends/monet5/Tests/All
        sql/backends/monet5/sql_cquery.c
        sql/backends/monet5/sql_cquery.h
        sql/backends/monet5/sql_execute.c
        sql/server/rel_psm.c
        sql/server/sql_mvc.h
Branch: trails
Log Message:

When pausing or stopping a CQ from another CQ call, delay the action until the 
call is done. Also when compiling an UDF with a CQ schedule in the body you 
need to avoid to register the UDF in the scheduler.


diffs (truncated from 338 to 300 lines):

diff --git a/sql/backends/monet5/Tests/All b/sql/backends/monet5/Tests/All
--- a/sql/backends/monet5/Tests/All
+++ b/sql/backends/monet5/Tests/All
@@ -99,6 +99,7 @@ cquery10
 cquery11
 cquery12
 cquery13
+cquery14
 
 cqstream00
 cqstream01
diff --git a/sql/backends/monet5/Tests/cquery14.sql 
b/sql/backends/monet5/Tests/cquery14.sql
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/Tests/cquery14.sql
@@ -0,0 +1,26 @@
+--start a continuous query from the body of another one
+create stream table testing14 (a int) set window 1 stride 1;
+create table results14 (a int);
+
+create procedure cq_query14a()
+begin
+       insert into results14 (select * from testing14);
+end;
+
+create procedure cq_query14b()
+begin
+       insert into testing14 values (1), (2), (3);
+       start continuous sys.cq_query14a() with cycles 2;
+end;
+
+start continuous sys.cq_query14b() with cycles 1;
+
+call cquery.wait(2500);
+
+select count(*) from results14; --should be 2
+
+drop procedure sys.cq_query14a;
+drop procedure sys.cq_query14b;
+
+drop table testing14;
+drop table results14;
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -768,6 +768,7 @@ CQpauseInternal(Client cntxt, MalBlkPtr 
        str msg = MAL_SUCCEED, mb2str = NULL;
        mvc* sqlcontext = ((backend *) cntxt->sqlcontext)->mvc;
        const char* err_message = (sqlcontext && sqlcontext->continuous & 
mod_continuous_function) ? "function" : "procedure";
+       MT_Id myID = MT_getpid();
 
        MT_lock_set(&ttrLock);
        if((msg = CQlocateMb(mb, stk, &idx, &mb2str, err_message, 
"cquery.pause")) != MAL_SUCCEED) {
@@ -784,12 +785,14 @@ CQpauseInternal(Client cntxt, MalBlkPtr 
                goto finish;
        }
        // actually wait if the query was running
-       while( pnet[idx].status == CQRUNNING ){
-               MT_lock_unset(&ttrLock);
-               MT_sleep_ms(5);
-               MT_lock_set(&ttrLock);
-               if( pnet[idx].status == CQWAIT)
-                       break;
+       if(myID != cq_pid) {
+               while( pnet[idx].status == CQRUNNING ){
+                       MT_lock_unset(&ttrLock);
+                       MT_sleep_ms(5);
+                       MT_lock_set(&ttrLock);
+                       if( pnet[idx].status == CQWAIT)
+                               break;
+               }
        }
        pnet[idx].status = CQPAUSE;
 
@@ -812,6 +815,7 @@ CQpauseAll(Client cntxt, MalBlkPtr mb, M
 {
        str msg = MAL_SUCCEED;
        int i;
+       MT_Id myID = MT_getpid();
        //mvc* smvc;
 
        //ALL_ROOT_CHECK(cntxt, "cquery.pauseall", "PAUSE ");
@@ -827,12 +831,14 @@ CQpauseAll(Client cntxt, MalBlkPtr mb, M
 
        MT_lock_set(&ttrLock);
        for(i = 0 ; i < pnettop; i++) {
-               while( pnet[i].status == CQRUNNING ){
-                       MT_lock_unset(&ttrLock);
-                       MT_sleep_ms(5);
-                       MT_lock_set(&ttrLock);
-                       if( pnet[i].status == CQWAIT)
-                               break;
+               if(myID != cq_pid) {
+                       while (pnet[i].status == CQRUNNING) {
+                               MT_lock_unset(&ttrLock);
+                               MT_sleep_ms(5);
+                               MT_lock_set(&ttrLock);
+                               if (pnet[i].status == CQWAIT)
+                                       break;
+                       }
                }
                pnet[i].status = CQPAUSE;
        }
@@ -1003,6 +1009,7 @@ CQderegisterInternal(Client cntxt, MalBl
        str msg = MAL_SUCCEED, mb2str = NULL;
        mvc* sqlcontext = ((backend *) cntxt->sqlcontext)->mvc;
        const char* err_message = (sqlcontext && sqlcontext->continuous & 
mod_continuous_function) ? "function" : "procedure";
+       MT_Id myID = MT_getpid();
 
        MT_lock_set(&ttrLock);
        if((msg = CQlocateMb(mb, stk, &idx, &mb2str, err_message, 
"cquery.deregister")) != MAL_SUCCEED) {
@@ -1013,22 +1020,26 @@ CQderegisterInternal(Client cntxt, MalBl
                                                          SQLSTATE(42000) "The 
continuous %s %s has not yet started\n", err_message, mb2str);
                goto unlock;
        }
-       pnet[idx].status = CQSTOP;
-       MT_lock_unset(&ttrLock);
-       // actually wait if the query was running
-       while( pnet[idx].status != CQDEREGISTER ){
-               MT_sleep_ms(5);
-       }
-       MT_lock_set(&ttrLock);
-       CQfree(idx);
-       if( pnettop == 0) {
-               pnstatus = CQSTOP;
+       if(myID != cq_pid) {
+               pnet[idx].status = CQSTOP;
                MT_lock_unset(&ttrLock);
-               if(cq_pid > 0) { //this check is need for the compiler
-                       MT_join_thread(cq_pid);
-                       cq_pid = 0;
+               // actually wait if the query was running
+               while (pnet[idx].status != CQDEREGISTER) {
+                       MT_sleep_ms(5);
                }
-               goto finish;
+               MT_lock_set(&ttrLock);
+               CQfree(idx);
+               if( pnettop == 0) {
+                       pnstatus = CQSTOP;
+                       MT_lock_unset(&ttrLock);
+                       if(cq_pid > 0) {
+                               MT_join_thread(cq_pid);
+                               cq_pid = 0;
+                       }
+                       goto finish;
+               }
+       } else {
+               pnet[idx].status = CQDELETE;
        }
 unlock:
        MT_lock_unset(&ttrLock);
@@ -1050,6 +1061,7 @@ CQderegisterAll(Client cntxt, MalBlkPtr 
 {
        str msg = MAL_SUCCEED;
        int i;
+       MT_Id myID = MT_getpid();
        //mvc* smvc;
 
        //ALL_ROOT_CHECK(cntxt, "cquery.deregisterall", "STOP ");
@@ -1062,20 +1074,24 @@ CQderegisterAll(Client cntxt, MalBlkPtr 
        MT_lock_set(&ttrLock);
 
        for(i = 0 ; i < pnettop; i++) {
-               pnet[i].status = CQSTOP;
-               MT_lock_unset(&ttrLock);
-
-               // actually wait if the query was running
-               while( pnet[i].status != CQDEREGISTER ){
-                       MT_sleep_ms(5);
+               if(myID != cq_pid) {
+                       pnet[i].status = CQSTOP;
+                       MT_lock_unset(&ttrLock);
+                       // actually wait if the query was running
+                       while( pnet[i].status != CQDEREGISTER ){
+                               MT_sleep_ms(5);
+                       }
+                       MT_lock_set(&ttrLock);
+                       CQfree(i);
+               } else {
+                       pnet[i].status = CQDELETE;
                }
-               MT_lock_set(&ttrLock);
-               CQfree(i);
                i--;
        }
-       pnstatus = CQSTOP;
+       if(myID != cq_pid)
+               pnstatus = CQSTOP;
        MT_lock_unset(&ttrLock);
-       if(cq_pid > 0) {
+       if(myID != cq_pid && cq_pid > 0) {
                MT_join_thread(cq_pid);
                cq_pid = 0;
        }
@@ -1266,8 +1282,7 @@ CQscheduler(void *dummy)
                                pnet[i].cycles--;
                                if(pnet[i].cycles == 0) { //if it was the last 
cycle of the CQ, remove it
                                        CQfree(i);
-                                       if( pnettop == 0)
-                                               pnstatus = CQSTOP;
+                                       i--;
                                        continue; //an entry was deleted, so 
jump over!
                                }
                        }
@@ -1277,6 +1292,14 @@ CQscheduler(void *dummy)
                        pnet[i].enabled = 0;
                        CQentry(i);
                }
+               for(i = pnettop ; i > 0; i--) { //more defensive way to stop 
continuous queries from the scheduler itself
+                       if( pnet[i].status == CQDELETE){
+                               CQfree(i);
+                       }
+                       i--;
+               }
+               if( pnettop == 0)
+                       pnstatus = CQSTOP;
                /* after one sweep all threads should be released */
 /*
                for (m = 0; m < k; m++)
diff --git a/sql/backends/monet5/sql_cquery.h b/sql/backends/monet5/sql_cquery.h
--- a/sql/backends/monet5/sql_cquery.h
+++ b/sql/backends/monet5/sql_cquery.h
@@ -33,6 +33,7 @@
 #define CQPAUSE       4 /* not active now */
 #define CQSTOP       5 /* stop the scheduler */
 #define CQDEREGISTER  6 /* stop the scheduler */
+#define CQDELETE      7 /* stop the scheduler, triggered by the scheduler 
itself */
 
 #define INITIAL_MAXCQ  32 /* it is the minimum, if we need more space 
GDKrealloc */
 #define MAXSTREAMS    128 /* limit the number of stream columns to be looked 
after per query*/
diff --git a/sql/backends/monet5/sql_execute.c 
b/sql/backends/monet5/sql_execute.c
--- a/sql/backends/monet5/sql_execute.c
+++ b/sql/backends/monet5/sql_execute.c
@@ -342,7 +342,8 @@ SQLrun(Client c, backend *be, mvc *m)
        // This include template constants, BAT sizes.
        if( m->emod & mod_debug)
                mb->keephistory = TRUE;
-       if(!m->continuous) /* it's fine to not optimize the MAL block in a 
continuous query, as the plan it's just a user module call */
+       /* it's fine to not optimize the MAL block in a continuous query, as 
the plan it's just a user module call */
+       if(!m->continuous || (m->continuous & mod_creating_udf))
                msg = SQLoptimizeQuery(c, mb);
        mb->keephistory = FALSE;
 
@@ -361,23 +362,28 @@ SQLrun(Client c, backend *be, mvc *m)
                SQLsetTrace(c,mb);
                msg = runMAL(c, mb, 0, 0);
                stopTrace(0);
-       } else if(m->continuous & mod_start_continuous) {
-               //mnstr_printf(c->fdout, "#Start continuous query\n");
-               // hand over the wrapper command to the scheduler
-               msg = CQregister(c,mb, 0,0);
-       } else if(m->continuous & mod_stop_continuous) {
-               //mnstr_printf(c->fdout, "#Stop continuous query\n");
-               msg = CQderegister(c,mb, 0,0);
-       } else if(m->continuous & mod_pause_continuous) {
-               //mnstr_printf(c->fdout, "#Stop continuous query\n");
-               msg = CQpause(c,mb, 0,0);
-       } else if(m->continuous & mod_resume_continuous) {
-               //mnstr_printf(c->fdout, "#Resume continuous query with 
changes\n");
-               msg = CQresume(c,mb, 0,0);
-       } else if(m->continuous & mod_resume_continuous_no_alter) {
-               //mnstr_printf(c->fdout, "#Resume continuous query with no 
changes\n");
-               msg = CQresumeNoAlter(c,mb, 0,0);
+       } else if((m->continuous & ~mod_creating_udf) && !(m->continuous & 
mod_creating_udf)) {
+               if(m->continuous & mod_start_continuous) {
+                       //mnstr_printf(c->fdout, "#Start continuous query\n");
+                       // hand over the wrapper command to the scheduler
+                       msg = CQregister(c,mb, 0,0);
+               } else if(m->continuous & mod_stop_continuous) {
+                       //mnstr_printf(c->fdout, "#Stop continuous query\n");
+                       msg = CQderegister(c,mb, 0,0);
+               } else if(m->continuous & mod_pause_continuous) {
+                       //mnstr_printf(c->fdout, "#Pause continuous query\n");
+                       msg = CQpause(c,mb, 0,0);
+               } else if(m->continuous & mod_resume_continuous) {
+                       //mnstr_printf(c->fdout, "#Resume continuous query with 
changes\n");
+                       msg = CQresume(c,mb, 0,0);
+               } else if(m->continuous & mod_resume_continuous_no_alter) {
+                       //mnstr_printf(c->fdout, "#Resume continuous query with 
no changes\n");
+                       msg = CQresumeNoAlter(c,mb, 0,0);
+               } else {
+                       assert(0);
+               }
        } else {
+               m->continuous &= ~mod_creating_udf; //important disable the 
check
                msg = runMAL(c, mb, 0, 0);
        }
        m->continuous = 0;
diff --git a/sql/server/rel_psm.c b/sql/server/rel_psm.c
--- a/sql/server/rel_psm.c
+++ b/sql/server/rel_psm.c
@@ -1420,6 +1420,7 @@ rel_psm(mvc *sql, symbol *s)
                int lang = l->h->next->next->next->next->next->next->data.i_val;
                int repl = 
l->h->next->next->next->next->next->next->next->data.i_val;
 
+               sql->continuous |= mod_creating_udf;
                ret = rel_create_func(sql, l->h->data.lval, 
l->h->next->data.lval, l->h->next->next->data.sym, 
l->h->next->next->next->data.lval, l->h->next->next->next->next->data.lval, 
type, lang, repl);
                sql->type = Q_SCHEMA;
        }       break;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to