Changeset: f0ff30527e5f for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f0ff30527e5f
Modified Files:
        sql/backends/monet5/sql_cquery.c
        sql/backends/monet5/sql_cquery.h
Branch: trails
Log Message:

More error scenarios handling.


diffs (truncated from 302 to 300 lines):

diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -394,7 +394,7 @@ CQanalysis(Client cntxt, MalBlkPtr mb, i
                        msg = 
createException(MAL,"cquery.analysis",SQLSTATE(3F000) "Too many stream table 
columns\n");
                } else if((msg = 
BSKTregisterInternal(cntxt,mb,"tmp",alias,&bskt)) == MAL_SUCCEED) {
                        pnet[idx].baskets[j] = bskt;
-                       pnet[idx].inout[j] = STREAM_OUT;
+                       pnet[idx].inout[j] = CQ_OUT;
                }
        }
        return msg;
@@ -524,8 +524,8 @@ CQregister(Client cntxt, str sname, str 
                                        msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Table tmp.%s already 
exists\n", ralias);
                                        goto revertids;
                                }
-                               if((t = mvc_create_stream_table(m, tmp_schema, 
ralias, tt_stream_temp, 0, SQL_DECLARED_TABLE, CA_COMMIT,
-                                                                               
                -1, DEFAULT_TABLE_WINDOW, DEFAULT_TABLE_STRIDE)) == NULL) {
+                               if((t = mvc_create_stream_table(m, tmp_schema, 
ralias, tt_stream_temp, 0, SQL_DECLARED_TABLE,
+                                                                               
                CA_PRESERVE, -1, DEFAULT_TABLE_WINDOW, DEFAULT_TABLE_STRIDE)) 
== NULL) {
                                        msg = 
createException(SQL,"cquery.register",SQLSTATE(3F000) "Failed create internal 
stream table\n");
                                        goto revertids;
                                }
@@ -536,7 +536,7 @@ CQregister(Client cntxt, str sname, str 
                                                goto revertids;
                                        }
                                }
-                               msg = create_table_or_view(m, "tmp", ralias, t, 
tt_stream_temp);
+                               msg = create_table_or_view(m, "tmp", ralias, t, 
SQL_LOCAL_TEMP_STREAM);
                                //msg = sql_grant_table_privs(m, "public", 
PRIV_SELECT, "tmp", ralias, NULL, 0, USER_MONETDB);
                        }
        revertids:
@@ -736,7 +736,7 @@ CQregister(Client cntxt, str sname, str 
        }
        if(heartbeats != HEARTBEAT_NIL) {
                for(i=0; i < MAXSTREAMS && pnet[pnettop].baskets[i]; i++) {
-                       if(baskets[pnet[pnettop].baskets[i]].window != 
DEFAULT_TABLE_WINDOW) {
+                       if(pnet[idx].inout[i] == STREAM_IN && 
baskets[pnet[pnettop].baskets[i]].window != DEFAULT_TABLE_WINDOW) {
                                msg = createException(SQL, "cquery.register",
                                                                          
SQLSTATE(42000) "Heartbeat ignored, a window constraint exists\n");
                                cleanBaskets(pnettop);
@@ -817,7 +817,7 @@ CQresume(str alias, int with_alter, lng 
        }
        if(with_alter && heartbeats != HEARTBEAT_NIL) {
                for(j=0; j < MAXSTREAMS && pnet[idx].baskets[j]; j++) {
-                       if(baskets[pnet[idx].baskets[j]].window != 
DEFAULT_TABLE_WINDOW) {
+                       if(pnet[idx].inout[j] == STREAM_IN && 
baskets[pnet[pnettop].baskets[j]].window != DEFAULT_TABLE_WINDOW) {
                                msg = createException(SQL, "cquery.resume",
                                                                          
SQLSTATE(42000) "Heartbeat ignored, a window constraint exists\n");
                                goto unlock;
@@ -826,6 +826,10 @@ CQresume(str alias, int with_alter, lng 
        }
 
        pnet[idx].status = CQWAIT;
+       if(pnet[idx].error) { //if there was an error registered, delete it
+               GDKfree(pnet[idx].error);
+               pnet[idx].error = MAL_SUCCEED;
+       }
        if(with_alter) {
                pnet[idx].cycles = cycles;
                pnet[idx].beats = SET_HEARTBEATS(heartbeats);
@@ -856,8 +860,13 @@ CQresumeAll(void)
 #endif
 
        MT_lock_set(&ttrLock);
-       for(i = 0 ; i < pnettop; i++)
+       for(i = 0 ; i < pnettop; i++) {
                pnet[i].status = CQWAIT;
+               if(pnet[i].error) {
+                       GDKfree(pnet[i].error);
+                       pnet[i].error = MAL_SUCCEED;
+               }
+       }
 
        /* start the scheduler if needed */
        if(cq_pid == 0) {
@@ -1053,7 +1062,7 @@ CQheartbeat(Client cntxt, MalBlkPtr mb, 
        for( ; idx < last; idx++){
                there_is_window_constraint = 0;
                for(j=0; j < MAXSTREAMS && !there_is_window_constraint && 
pnet[idx].baskets[j]; j++) {
-                       if(baskets[pnet[idx].baskets[j]].window != 
DEFAULT_TABLE_WINDOW) {
+                       if(pnet[idx].inout[j] == STREAM_IN && 
baskets[pnet[idx].baskets[j]].window != DEFAULT_TABLE_WINDOW) {
                                there_is_window_constraint = 1;
                        }
                }
@@ -1097,8 +1106,8 @@ CQwait(Client cntxt, MalBlkPtr mb, MalSt
 str
 CQderegister(Client cntxt, str alias)
 {
-       int idx = 0;
-       str msg = MAL_SUCCEED, this_alias = NULL;
+       int idx = 0, i, j;
+       str msg = MAL_SUCCEED, this_alias = NULL, falias = NULL;
        MT_Id myID = MT_getpid();
 
        MT_lock_set(&ttrLock);
@@ -1111,6 +1120,23 @@ CQderegister(Client cntxt, str alias)
        if(myID != cq_pid) {
                pnet[idx].status = CQSTOP;
                this_alias = pnet[idx].alias;
+               if(IS_UNION(pnet[idx].func)) {
+                       for( i=0; i < pnettop && !falias; i++){
+                               if(i != idx) {
+                                       for( j=0; j< MAXSTREAMS && 
pnet[i].baskets[j] && !falias; j++){
+                                               if( strcmp("tmp", 
baskets[pnet[i].baskets[j]].table->s->base.name) == 0 &&
+                                                       strcmp(this_alias, 
baskets[pnet[i].baskets[j]].table->base.name) == 0 ) {
+                                                       falias = pnet[i].alias;
+                                               }
+                                       }
+                               }
+                       }
+                       if(falias) {
+                               msg = createException(SQL, "cquery.deregister",
+                                                                         
SQLSTATE(42000) "The output stream of this continuous query is being used by 
%s\n", falias);
+                               goto unlock;
+                       }
+               }
                MT_lock_unset(&ttrLock);
                // actually wait if the query was running
                // the CQ might get removed during the sleep calls, so we have 
to make this check
@@ -1198,7 +1224,7 @@ CQdump(void *ret)
                if( pnet[i].inout[0])
                        fprintf(stderr, " --> ");
                for (k = 0; k < MAXSTREAMS && pnet[i].baskets[k]; k++)
-               if( pnet[i].inout[k] == STREAM_OUT)
+               if( pnet[i].inout[k] == STREAM_OUT || pnet[i].inout[k] == 
CQ_OUT)
                        fprintf(stderr, "%s.%s ", 
baskets[pnet[i].baskets[k]].table->s->base.name, 
baskets[pnet[i].baskets[k]].table->base.name);
                if (pnet[i].error)
                        fprintf(stderr, " errors:%s", pnet[i].error);
@@ -1253,8 +1279,9 @@ CQexecute( Client cntxt, int idx)
 static void
 CQscheduler(void *dummy)
 {
-       int i, j, k = -1, pntasks, delay = cycleDelay;
-       Client cntxt = (Client) dummy;
+       int i, j, k = -1, pntasks, delay = cycleDelay, start_trans;
+       Client c = (Client) dummy;
+       mvc* m;
        str msg = MAL_SUCCEED;
        lng t, now;
        timestamp aux;
@@ -1265,6 +1292,13 @@ CQscheduler(void *dummy)
        fprintf(stderr, "#cquery.scheduler started\n");
 #endif
 
+       if ((msg = getSQLContext(c, NULL, &m, NULL)) != MAL_SUCCEED) {
+               fprintf(stderr, "CQscheduler internal error: %s\n", msg);
+               GDKfree(msg);
+               MT_lock_set(&ttrLock);
+               goto terminate;
+       }
+
        MT_lock_set(&ttrLock);
        pnstatus = CQRUNNING; // global state
 
@@ -1279,10 +1313,12 @@ CQscheduler(void *dummy)
                if((msg = MTIMEcurrent_timestamp(&aux)) != MAL_SUCCEED) {
                        fprintf(stderr, "CQscheduler internal error: %s\n", 
msg);
                        GDKfree(msg);
+                       goto terminate;
                }
                if((msg = MTIMEepoch2lng(&now, &aux)) != MAL_SUCCEED) {
                        fprintf(stderr, "CQscheduler internal error: %s\n", 
msg);
                        GDKfree(msg);
+                       goto terminate;
                }
 
                pntasks=0;
@@ -1292,8 +1328,13 @@ CQscheduler(void *dummy)
                                /* Queries are triggered by the heartbeat or  
all window constraints */
                                /* A heartbeat in combination with a window 
constraint is ambiguous */
                                /* At least one constraint should be set */
-                               if( pnet[i].beats == HEARTBEAT_NIL && 
pnet[i].baskets[0] == 0)
+                               if( pnet[i].beats == HEARTBEAT_NIL && 
pnet[i].baskets[0] == 0) {
                                        pnet[i].enabled = 0;
+                                       pnet[i].status = CQERROR;
+                                       pnet[i].error = createException(SQL, 
"cquery.scheduler", SQLSTATE(3F000)
+                                       "Neither a heartbeat or a stream window 
condition exists, this CQ cannot be triggered\n");
+                                       CQentry(i);
+                               }
                                if( pnet[i].enabled && ((pnet[i].beats != 
HEARTBEAT_NIL && pnet[i].beats > 0) || pnet[i].run > 0)) {
                                        pnet[i].enabled = now >= pnet[i].run + 
(pnet[i].beats > 0 ? pnet[i].beats : 0);
 #ifdef DEBUG_CQUERY_SCHEDULER
@@ -1344,7 +1385,7 @@ CQscheduler(void *dummy)
 
                /* Execute each enabled transformation */
                /* Tricky part is here a single stream used by multiple 
transitions */
-               for (i = 0; i < pnettop  ; i++)
+               for (i = 0; i < pnettop; i++)
                if( pnet[i].enabled){
                        delay = cycleDelay;
 #ifdef DEBUG_CQUERY
@@ -1353,48 +1394,42 @@ CQscheduler(void *dummy)
                        t = GDKusec();
                        pnet[i].status = CQRUNNING;
                        if( !GDKexiting())
-                               CQexecute(cntxt, i);
-/*
-                               if (MT_create_thread(&pnet[i].tid, CQexecute, 
(void*) (pnet+i), MT_THR_JOINABLE) < 0){
-                                       msg= 
createException(MAL,"petrinet.scheduler","Can not fork the thread");
-                               } else
-*/
+                               CQexecute(c, i);
                        if( pnet[i].cycles != CYCLES_NIL && pnet[i].cycles > 0) 
{
                                pnet[i].cycles--;
-                               if(pnet[i].cycles == 0) { //if it was the last 
cycle of the CQ, remove it
-                                       CQfree(cntxt, i);
-                                       i--;
-                                       continue; //an entry was deleted, so 
jump over!
-                               }
+                               if(pnet[i].cycles == 0) //if it was the last 
cycle of the CQ, remove it
+                                       pnet[i].status = CQDELETE;
+                       }
+                       if(pnet[i].status != CQDELETE) {
+                               pnet[i].run = now;                              
/* last executed */
+                               pnet[i].time = GDKusec() - t;   /* keep around 
in microseconds */
+                               (void) MTIMEcurrent_timestamp(&pnet[i].seen);
+                               pnet[i].enabled = 0;
+                               CQentry(i);
                        }
-                       pnet[i].run = now;                              /* last 
executed */
-                       pnet[i].time = GDKusec() - t;   /* keep around in 
microseconds */
-                       (void) MTIMEcurrent_timestamp(&pnet[i].seen);
-                       pnet[i].enabled = 0;
-                       CQentry(i);
+               }
+               start_trans = 0;
+               for (i = 0; i < pnettop ; i++) { //if there is a continuous 
function to delete, we must start a transaction
+                       if( pnet[i].status == CQDELETE && 
IS_UNION(pnet[i].func))
+                               start_trans = 1;
+               }
+               if(start_trans) {
+                       SQLtrans(m);
+                       if (!m->sa)
+                               m->sa = sa_create();
+                       if (!m->sa) {
+                               fprintf(stderr, "CQscheduler sql allocation 
failure\n");
+                               goto terminate;
+                       }
                }
                for(i = pnettop - 1; i >= 0; i--) { //more defensive way to 
stop continuous queries from the scheduler itself
-                       if( pnet[i].status == CQDELETE){
-                               CQfree(cntxt, i);
-                       }
+                       if( pnet[i].status == CQDELETE)
+                               CQfree(c, i);
                }
+               if(start_trans)
+                       SQLautocommit(m);
                if( pnettop == 0)
                        pnstatus = CQSTOP;
-               /* after one sweep all threads should be released */
-/*
-               for (m = 0; m < k; m++)
-               if(pnet[enabled[m]].tid){
-#ifdef DEBUG_CQUERY
-                       fprintf(stderr, "#Terminate query thread %s limit %d 
\n", pnet[enabled[m]].fcnname, pnet[enabled[m]].limit);
-#endif
-                       MT_join_thread(pnet[enabled[m]].tid);
-               }
-
-#ifdef DEBUG_CQUERY
-               if (pnstatus == CQRUNNING && cycleDelay) 
MT_sleep_ms(cycleDelay);
-#endif
-               MT_sleep_ms(CQDELAY);
-*/
                /* we should actually delay until the next heartbeat or 
insertion into the streams */
                if ((pntasks == 0 && pnstatus != CQSTOP) || pnstatus == 
CQPAUSE) {
 #ifdef DEBUG_CQUERY
@@ -1410,10 +1445,11 @@ CQscheduler(void *dummy)
 #ifdef DEBUG_CQUERY
        fprintf(stderr, "#cquery.scheduler stopped\n");
 #endif
+terminate:
        pnstatus = CQINIT;
        cq_pid = 0;
-       SQLexitClient(cntxt);
-       MCcloseClient(cntxt, CQ_CLIENT);
+       SQLexitClient(c);
+       MCcloseClient(c, CQ_CLIENT);
        MT_lock_unset(&ttrLock);
 }
 
diff --git a/sql/backends/monet5/sql_cquery.h b/sql/backends/monet5/sql_cquery.h
--- a/sql/backends/monet5/sql_cquery.h
+++ b/sql/backends/monet5/sql_cquery.h
@@ -41,6 +41,7 @@
 
 #define STREAM_IN   1
 #define STREAM_OUT  2
+#define CQ_OUT      3  /* Output stream in continuous functions */
 
 typedef struct {
        sql_func *func; /* The UDF to be called */
@@ -58,7 +59,6 @@ typedef struct {
        lng beats;              /* heart beat stride for procedures activations 
-> must be in microseconds */
        lng run;                /* start at the CQ at that precise moment (UNIX 
timestamp) -> must be in microseconds */
 
-       /*MT_Id tid;     Thread responsible */
        timestamp seen; /* last time the query was seen by the scheduler */
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to