Changeset: 98591a60dd6f for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=98591a60dd6f
Added Files:
sql/backends/monet5/Tests/cquery14.sql
Modified Files:
sql/backends/monet5/Tests/All
sql/backends/monet5/sql_cquery.c
sql/backends/monet5/sql_cquery.h
sql/backends/monet5/sql_execute.c
sql/server/rel_psm.c
sql/server/sql_mvc.h
Branch: trails
Log Message:
When pausing or stopping a CQ from another CQ call, delay the action until the
call is done. Also when compiling an UDF with a CQ schedule in the body you
need to avoid to register the UDF in the scheduler.
diffs (truncated from 338 to 300 lines):
diff --git a/sql/backends/monet5/Tests/All b/sql/backends/monet5/Tests/All
--- a/sql/backends/monet5/Tests/All
+++ b/sql/backends/monet5/Tests/All
@@ -99,6 +99,7 @@ cquery10
cquery11
cquery12
cquery13
+cquery14
cqstream00
cqstream01
diff --git a/sql/backends/monet5/Tests/cquery14.sql
b/sql/backends/monet5/Tests/cquery14.sql
new file mode 100644
--- /dev/null
+++ b/sql/backends/monet5/Tests/cquery14.sql
@@ -0,0 +1,26 @@
+--start a continuous query from the body of another one
+create stream table testing14 (a int) set window 1 stride 1;
+create table results14 (a int);
+
+create procedure cq_query14a()
+begin
+ insert into results14 (select * from testing14);
+end;
+
+create procedure cq_query14b()
+begin
+ insert into testing14 values (1), (2), (3);
+ start continuous sys.cq_query14a() with cycles 2;
+end;
+
+start continuous sys.cq_query14b() with cycles 1;
+
+call cquery.wait(2500);
+
+select count(*) from results14; --should be 2
+
+drop procedure sys.cq_query14a;
+drop procedure sys.cq_query14b;
+
+drop table testing14;
+drop table results14;
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -768,6 +768,7 @@ CQpauseInternal(Client cntxt, MalBlkPtr
str msg = MAL_SUCCEED, mb2str = NULL;
mvc* sqlcontext = ((backend *) cntxt->sqlcontext)->mvc;
const char* err_message = (sqlcontext && sqlcontext->continuous &
mod_continuous_function) ? "function" : "procedure";
+ MT_Id myID = MT_getpid();
MT_lock_set(&ttrLock);
if((msg = CQlocateMb(mb, stk, &idx, &mb2str, err_message,
"cquery.pause")) != MAL_SUCCEED) {
@@ -784,12 +785,14 @@ CQpauseInternal(Client cntxt, MalBlkPtr
goto finish;
}
// actually wait if the query was running
- while( pnet[idx].status == CQRUNNING ){
- MT_lock_unset(&ttrLock);
- MT_sleep_ms(5);
- MT_lock_set(&ttrLock);
- if( pnet[idx].status == CQWAIT)
- break;
+ if(myID != cq_pid) {
+ while( pnet[idx].status == CQRUNNING ){
+ MT_lock_unset(&ttrLock);
+ MT_sleep_ms(5);
+ MT_lock_set(&ttrLock);
+ if( pnet[idx].status == CQWAIT)
+ break;
+ }
}
pnet[idx].status = CQPAUSE;
@@ -812,6 +815,7 @@ CQpauseAll(Client cntxt, MalBlkPtr mb, M
{
str msg = MAL_SUCCEED;
int i;
+ MT_Id myID = MT_getpid();
//mvc* smvc;
//ALL_ROOT_CHECK(cntxt, "cquery.pauseall", "PAUSE ");
@@ -827,12 +831,14 @@ CQpauseAll(Client cntxt, MalBlkPtr mb, M
MT_lock_set(&ttrLock);
for(i = 0 ; i < pnettop; i++) {
- while( pnet[i].status == CQRUNNING ){
- MT_lock_unset(&ttrLock);
- MT_sleep_ms(5);
- MT_lock_set(&ttrLock);
- if( pnet[i].status == CQWAIT)
- break;
+ if(myID != cq_pid) {
+ while (pnet[i].status == CQRUNNING) {
+ MT_lock_unset(&ttrLock);
+ MT_sleep_ms(5);
+ MT_lock_set(&ttrLock);
+ if (pnet[i].status == CQWAIT)
+ break;
+ }
}
pnet[i].status = CQPAUSE;
}
@@ -1003,6 +1009,7 @@ CQderegisterInternal(Client cntxt, MalBl
str msg = MAL_SUCCEED, mb2str = NULL;
mvc* sqlcontext = ((backend *) cntxt->sqlcontext)->mvc;
const char* err_message = (sqlcontext && sqlcontext->continuous &
mod_continuous_function) ? "function" : "procedure";
+ MT_Id myID = MT_getpid();
MT_lock_set(&ttrLock);
if((msg = CQlocateMb(mb, stk, &idx, &mb2str, err_message,
"cquery.deregister")) != MAL_SUCCEED) {
@@ -1013,22 +1020,26 @@ CQderegisterInternal(Client cntxt, MalBl
SQLSTATE(42000) "The
continuous %s %s has not yet started\n", err_message, mb2str);
goto unlock;
}
- pnet[idx].status = CQSTOP;
- MT_lock_unset(&ttrLock);
- // actually wait if the query was running
- while( pnet[idx].status != CQDEREGISTER ){
- MT_sleep_ms(5);
- }
- MT_lock_set(&ttrLock);
- CQfree(idx);
- if( pnettop == 0) {
- pnstatus = CQSTOP;
+ if(myID != cq_pid) {
+ pnet[idx].status = CQSTOP;
MT_lock_unset(&ttrLock);
- if(cq_pid > 0) { //this check is need for the compiler
- MT_join_thread(cq_pid);
- cq_pid = 0;
+ // actually wait if the query was running
+ while (pnet[idx].status != CQDEREGISTER) {
+ MT_sleep_ms(5);
}
- goto finish;
+ MT_lock_set(&ttrLock);
+ CQfree(idx);
+ if( pnettop == 0) {
+ pnstatus = CQSTOP;
+ MT_lock_unset(&ttrLock);
+ if(cq_pid > 0) {
+ MT_join_thread(cq_pid);
+ cq_pid = 0;
+ }
+ goto finish;
+ }
+ } else {
+ pnet[idx].status = CQDELETE;
}
unlock:
MT_lock_unset(&ttrLock);
@@ -1050,6 +1061,7 @@ CQderegisterAll(Client cntxt, MalBlkPtr
{
str msg = MAL_SUCCEED;
int i;
+ MT_Id myID = MT_getpid();
//mvc* smvc;
//ALL_ROOT_CHECK(cntxt, "cquery.deregisterall", "STOP ");
@@ -1062,20 +1074,24 @@ CQderegisterAll(Client cntxt, MalBlkPtr
MT_lock_set(&ttrLock);
for(i = 0 ; i < pnettop; i++) {
- pnet[i].status = CQSTOP;
- MT_lock_unset(&ttrLock);
-
- // actually wait if the query was running
- while( pnet[i].status != CQDEREGISTER ){
- MT_sleep_ms(5);
+ if(myID != cq_pid) {
+ pnet[i].status = CQSTOP;
+ MT_lock_unset(&ttrLock);
+ // actually wait if the query was running
+ while( pnet[i].status != CQDEREGISTER ){
+ MT_sleep_ms(5);
+ }
+ MT_lock_set(&ttrLock);
+ CQfree(i);
+ } else {
+ pnet[i].status = CQDELETE;
}
- MT_lock_set(&ttrLock);
- CQfree(i);
i--;
}
- pnstatus = CQSTOP;
+ if(myID != cq_pid)
+ pnstatus = CQSTOP;
MT_lock_unset(&ttrLock);
- if(cq_pid > 0) {
+ if(myID != cq_pid && cq_pid > 0) {
MT_join_thread(cq_pid);
cq_pid = 0;
}
@@ -1266,8 +1282,7 @@ CQscheduler(void *dummy)
pnet[i].cycles--;
if(pnet[i].cycles == 0) { //if it was the last
cycle of the CQ, remove it
CQfree(i);
- if( pnettop == 0)
- pnstatus = CQSTOP;
+ i--;
continue; //an entry was deleted, so
jump over!
}
}
@@ -1277,6 +1292,14 @@ CQscheduler(void *dummy)
pnet[i].enabled = 0;
CQentry(i);
}
+ for(i = pnettop ; i > 0; i--) { //more defensive way to stop
continuous queries from the scheduler itself
+ if( pnet[i].status == CQDELETE){
+ CQfree(i);
+ }
+ i--;
+ }
+ if( pnettop == 0)
+ pnstatus = CQSTOP;
/* after one sweep all threads should be released */
/*
for (m = 0; m < k; m++)
diff --git a/sql/backends/monet5/sql_cquery.h b/sql/backends/monet5/sql_cquery.h
--- a/sql/backends/monet5/sql_cquery.h
+++ b/sql/backends/monet5/sql_cquery.h
@@ -33,6 +33,7 @@
#define CQPAUSE 4 /* not active now */
#define CQSTOP 5 /* stop the scheduler */
#define CQDEREGISTER 6 /* stop the scheduler */
+#define CQDELETE 7 /* stop the scheduler, triggered by the scheduler
itself */
#define INITIAL_MAXCQ 32 /* it is the minimum, if we need more space
GDKrealloc */
#define MAXSTREAMS 128 /* limit the number of stream columns to be looked
after per query*/
diff --git a/sql/backends/monet5/sql_execute.c
b/sql/backends/monet5/sql_execute.c
--- a/sql/backends/monet5/sql_execute.c
+++ b/sql/backends/monet5/sql_execute.c
@@ -342,7 +342,8 @@ SQLrun(Client c, backend *be, mvc *m)
// This include template constants, BAT sizes.
if( m->emod & mod_debug)
mb->keephistory = TRUE;
- if(!m->continuous) /* it's fine to not optimize the MAL block in a
continuous query, as the plan it's just a user module call */
+ /* it's fine to not optimize the MAL block in a continuous query, as
the plan it's just a user module call */
+ if(!m->continuous || (m->continuous & mod_creating_udf))
msg = SQLoptimizeQuery(c, mb);
mb->keephistory = FALSE;
@@ -361,23 +362,28 @@ SQLrun(Client c, backend *be, mvc *m)
SQLsetTrace(c,mb);
msg = runMAL(c, mb, 0, 0);
stopTrace(0);
- } else if(m->continuous & mod_start_continuous) {
- //mnstr_printf(c->fdout, "#Start continuous query\n");
- // hand over the wrapper command to the scheduler
- msg = CQregister(c,mb, 0,0);
- } else if(m->continuous & mod_stop_continuous) {
- //mnstr_printf(c->fdout, "#Stop continuous query\n");
- msg = CQderegister(c,mb, 0,0);
- } else if(m->continuous & mod_pause_continuous) {
- //mnstr_printf(c->fdout, "#Stop continuous query\n");
- msg = CQpause(c,mb, 0,0);
- } else if(m->continuous & mod_resume_continuous) {
- //mnstr_printf(c->fdout, "#Resume continuous query with
changes\n");
- msg = CQresume(c,mb, 0,0);
- } else if(m->continuous & mod_resume_continuous_no_alter) {
- //mnstr_printf(c->fdout, "#Resume continuous query with no
changes\n");
- msg = CQresumeNoAlter(c,mb, 0,0);
+ } else if((m->continuous & ~mod_creating_udf) && !(m->continuous &
mod_creating_udf)) {
+ if(m->continuous & mod_start_continuous) {
+ //mnstr_printf(c->fdout, "#Start continuous query\n");
+ // hand over the wrapper command to the scheduler
+ msg = CQregister(c,mb, 0,0);
+ } else if(m->continuous & mod_stop_continuous) {
+ //mnstr_printf(c->fdout, "#Stop continuous query\n");
+ msg = CQderegister(c,mb, 0,0);
+ } else if(m->continuous & mod_pause_continuous) {
+ //mnstr_printf(c->fdout, "#Pause continuous query\n");
+ msg = CQpause(c,mb, 0,0);
+ } else if(m->continuous & mod_resume_continuous) {
+ //mnstr_printf(c->fdout, "#Resume continuous query with
changes\n");
+ msg = CQresume(c,mb, 0,0);
+ } else if(m->continuous & mod_resume_continuous_no_alter) {
+ //mnstr_printf(c->fdout, "#Resume continuous query with
no changes\n");
+ msg = CQresumeNoAlter(c,mb, 0,0);
+ } else {
+ assert(0);
+ }
} else {
+ m->continuous &= ~mod_creating_udf; //important disable the
check
msg = runMAL(c, mb, 0, 0);
}
m->continuous = 0;
diff --git a/sql/server/rel_psm.c b/sql/server/rel_psm.c
--- a/sql/server/rel_psm.c
+++ b/sql/server/rel_psm.c
@@ -1420,6 +1420,7 @@ rel_psm(mvc *sql, symbol *s)
int lang = l->h->next->next->next->next->next->next->data.i_val;
int repl =
l->h->next->next->next->next->next->next->next->data.i_val;
+ sql->continuous |= mod_creating_udf;
ret = rel_create_func(sql, l->h->data.lval,
l->h->next->data.lval, l->h->next->next->data.sym,
l->h->next->next->next->data.lval, l->h->next->next->next->next->data.lval,
type, lang, repl);
sql->type = Q_SCHEMA;
} break;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list