Changeset: 30550d946c8d for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=30550d946c8d
Modified Files:
        sql/backends/monet5/Tests/cfunction00.stable.out
        sql/backends/monet5/Tests/cquery04.sql
        sql/backends/monet5/Tests/cquery04.stable.out
        sql/backends/monet5/sql_basket.c
        sql/backends/monet5/sql_cquery.c
        sql/backends/monet5/sql_cquery.h
        sql/include/sql_catalog.h
        sql/server/sql_parser.y
        sql/server/sql_scan.c
Branch: trails
Log Message:

Started to implement delayed continuous queries, which means that they will 
start at some point in the future. (e.g. START CONTINUOUS ... AT '12:00:00')


diffs (truncated from 380 to 300 lines):

diff --git a/sql/backends/monet5/Tests/cfunction00.stable.out 
b/sql/backends/monet5/Tests/cfunction00.stable.out
--- a/sql/backends/monet5/Tests/cfunction00.stable.out
+++ b/sql/backends/monet5/Tests/cfunction00.stable.out
@@ -42,7 +42,7 @@ Ready.
 % id,  name,   func,   mod,    language,       type,   side_effect,    varres, 
vararg, schema_id # name
 % int, varchar,        varchar,        varchar,        int,    int,    
boolean,        boolean,        boolean,        int # type
 % 4,   6,      147,    4,      1,      1,      5,      5,      5,      4 # 
length
-[ 8516,        "aggr00",       "create function aggr00() \nreturns 
integer\nbegin\n declare s int;\n set s = 0;\n while (true)\n do\n set s = s + 
1;\n yield s;\n end while;\n return s;\nend;",       "user", 2,      1,      
false,  false,  false,  2000    ]
+[ 8538,        "aggr00",       "create function aggr00() \nreturns 
integer\nbegin\n declare s int;\n set s = 0;\n while (true)\n do\n set s = s + 
1;\n yield s;\n end while;\n return s;\nend;",       "user", 2,      1,      
false,  false,  false,  2000    ]
 #select aggr00(); #should return 1
 % .L2 # table_name
 % L2 # name
diff --git a/sql/backends/monet5/Tests/cquery04.sql 
b/sql/backends/monet5/Tests/cquery04.sql
--- a/sql/backends/monet5/Tests/cquery04.sql
+++ b/sql/backends/monet5/Tests/cquery04.sql
@@ -1,37 +1,37 @@
 -- Test strides on a stream table
-create stream table cqinput06(aaa integer) set window 2 stride all;
+create stream table cqinput04(aaa integer) set window 2 stride all;
 
-create table cqresult06(aaa integer);
+create table cqresult04(aaa integer);
 
-create procedure cq_basic06()
+create procedure cq_basic04()
 begin
-       insert into cqresult06 (select count(*) from cqinput06);
+       insert into cqresult04 (select count(*) from cqinput04);
 end;
 
-start continuous procedure sys.cq_basic06();
+start continuous procedure sys.cq_basic04();
 
-insert into cqinput06 values (1), (2);
+insert into cqinput04 values (1), (2);
 
-insert into cqinput06 values (3), (4);
+insert into cqinput04 values (3), (4);
 
 call cquery.wait(1000);
 
-select aaa from cqresult06; --output 2 tuples with value 2
+select aaa from cqresult04; --output 2 tuples with value 2
 
-insert into cqinput06 values (5);
+insert into cqinput04 values (5);
 
 call cquery.wait(1000);
 
-select aaa from cqresult06; --output 2 tuples with value 2
+select aaa from cqresult04; --output 2 tuples with value 2
 
-insert into cqinput06 values (6);
+insert into cqinput04 values (6);
 
 call cquery.wait(1000);
 
-select aaa from cqresult06; --output 3 tuples with value 2
+select aaa from cqresult04; --output 3 tuples with value 2
 
-stop continuous procedure sys.cq_basic06();
+stop continuous procedure sys.cq_basic04();
 
-drop procedure cq_basic06;
-drop table cqinput06;
-drop table cqresult06;
+drop procedure cq_basic04;
+drop table cqinput04;
+drop table cqresult04;
diff --git a/sql/backends/monet5/Tests/cquery04.stable.out 
b/sql/backends/monet5/Tests/cquery04.stable.out
--- a/sql/backends/monet5/Tests/cquery04.stable.out
+++ b/sql/backends/monet5/Tests/cquery04.stable.out
@@ -75,8 +75,8 @@ Ready.
 [ 2    ]
 #insert into cqinput06 values (3), (4);
 [ 2    ]
-#select * from cqresult06; --output 2 tuples with value 2
-% sys.cqresult06 # table_name
+#select aaa from cqresult04; --output 2 tuples with value 2
+% sys.cqresult04 # table_name
 % aaa # name
 % int # type
 % 1 # length
@@ -84,8 +84,8 @@ Ready.
 [ 2    ]
 #insert into cqinput06 values (5);
 [ 1    ]
-#select * from cqresult06; --output 2 tuples with value 2
-% sys.cqresult06 # table_name
+#select aaa from cqresult04; --output 2 tuples with value 2
+% sys.cqresult04 # table_name
 % aaa # name
 % int # type
 % 1 # length
@@ -93,8 +93,8 @@ Ready.
 [ 2    ]
 #insert into cqinput06 values (6);
 [ 1    ]
-#select * from cqresult06; --output 3 tuples with value 2
-% sys.cqresult06 # table_name
+#select aaa from cqresult04; --output 3 tuples with value 2
+% sys.cqresult04 # table_name
 % aaa # name
 % int # type
 % 1 # length
diff --git a/sql/backends/monet5/sql_basket.c b/sql/backends/monet5/sql_basket.c
--- a/sql/backends/monet5/sql_basket.c
+++ b/sql/backends/monet5/sql_basket.c
@@ -95,7 +95,7 @@ BSKTclean(int idx)
                baskets[idx].table = NULL;
                baskets[idx].error = NULL;
                baskets[idx].window = 0;
-               baskets[idx].stride = -1;
+               baskets[idx].stride = STRIDE_ALL;
                baskets[idx].count = 0;
                baskets[idx].events = 0;
                baskets[idx].seen =  *timestamp_nil;
@@ -220,7 +220,7 @@ BSKTwindow(Client cntxt, MalBlkPtr mb, M
                throw(MAL,"basket.window",SQLSTATE(42000) "negative window not 
allowed\n");
        if( pci->argc == 5) {
                stride = *getArgReference_int(stk,pci,4);
-               if( stride < -1)
+               if( stride < STRIDE_ALL)
                        throw(MAL,"basket.stride",SQLSTATE(42000) "negative 
stride not allowed\n");
                if( window < stride)
                        throw(MAL,"basket.window",SQLSTATE(42000) "the window 
size must not be smaller than the stride size\n");
@@ -381,12 +381,12 @@ BSKTtumbleInternal(Client cntxt, str sch
        int i;
        (void) cntxt;
 
-       if( stride < -1)
+       if( stride < STRIDE_ALL)
                throw(MAL,"basket.tumble",SQLSTATE(42000) "negative stride not 
allowed\n");
        _DEBUG_BASKET_ fprintf(stderr,"Tumble %s.%s %d 
elements\n",sch,tbl,stride);
        if( stride == 0)
                return MAL_SUCCEED;
-       if( stride == -1) /*IMPORTANT set the implementation stride size to the 
window size */
+       if( stride == STRIDE_ALL) /*IMPORTANT set the implementation stride 
size to the window size */
                stride = window;
        for(i=0; i< baskets[bskt].ncols ; i++){
                b = baskets[bskt].bats[i];
@@ -443,7 +443,7 @@ str
 BSKTtumble(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        str sch, tbl, msg;
-       int idx, elw, elm = -1;
+       int idx, elw, elm = STRIDE_ALL;
 
        (void) cntxt;
        (void) mb;
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -625,7 +625,8 @@ CQregister(Client cntxt, MalBlkPtr mb, M
        }
        pnet[pnettop].cycles = cycles;
        pnet[pnettop].beats = SET_HEARTBEATS(heartbeats);
-       pnet[pnettop].run  = lng_nil;
+       pnet[pnettop].startat = 0;
+       pnet[pnettop].run = 0;
        pnet[pnettop].seen = *timestamp_nil;
        pnet[pnettop].status = CQWAIT;
        pnettop++;
@@ -930,7 +931,7 @@ str
 CQheartbeat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        str sch, fcn, msg = MAL_SUCCEED;
-       int beats, j, idx=0, last= pnettop;
+       int heartbeats, j, there_is_window_constraint, idx=0, last= pnettop;
        (void) cntxt;
        (void) mb;
 
@@ -945,37 +946,39 @@ CQheartbeat(Client cntxt, MalBlkPtr mb, 
                        goto finish;
                }
                last = idx+1;
-               beats = *getArgReference_int(stk,pci,3);
+               heartbeats = *getArgReference_int(stk,pci,3);
 #ifdef DEBUG_CQUERY
                fprintf(stderr, "#set the heartbeat of %s.%s to 
%d\n",sch,fcn,beats);
 #endif
        } else{
-               beats = *getArgReference_int(stk,pci,1);
+               heartbeats = *getArgReference_int(stk,pci,1);
 #ifdef DEBUG_CQUERY
                fprintf(stderr, "#set the heartbeat %d ms\n",beats);
 #endif
        }
-       if(beats < 0 && beats != NO_HEARTBEAT){
+       if(heartbeats < 0){
                msg = createException(SQL,"cquery.heartbeat",SQLSTATE(42000) 
"The heartbeats value must be non negative\n");
                goto finish;
        }
+
        for( ; idx < last; idx++){
-               if( pnet[idx].baskets[0]){
-                       msg = createException(SQL,"cquery.heartbeat",
-                                                                 
SQLSTATE(42000) "Heartbeat ignored, a window constraint exists\n");
-                       break;
-               }
-               if(beats != NO_HEARTBEAT) {
-                       for(j=0; j < MAXSTREAMS && pnet[idx].baskets[j]; j++) {
-                               if(pnet[idx].inout[j] == STREAM_IN && 
baskets[pnet[idx].baskets[j]].window != DEFAULT_TABLE_WINDOW) {
-                                       // can not set the beat due to stream 
window constraint
-                                       pnet[idx].beats = 
SET_HEARTBEATS(NO_HEARTBEAT);
-                                       goto finish;
-                               }
+               there_is_window_constraint = 0;
+               for(j=0; j < MAXSTREAMS && !there_is_window_constraint && 
pnet[idx].baskets[j]; j++) {
+                       if(baskets[pnet[idx].baskets[j]].window != 
DEFAULT_TABLE_WINDOW) {
+                               there_is_window_constraint = 1;
                        }
                }
-               pnet[idx].beats = SET_HEARTBEATS(beats);
+               if(heartbeats != NO_HEARTBEAT && there_is_window_constraint) {
+                       msg = createException(SQL, "cquery.heartbeat",
+                                                                 
SQLSTATE(42000) "Heartbeat ignored, a window constraint exists\n");
+                       goto finish;
+               }
        }
+
+       for( ; idx < last; idx++){
+               pnet[idx].beats = SET_HEARTBEATS(heartbeats);
+       }
+
        finish:
        MT_lock_unset(&ttrLock);
        return msg;
@@ -1097,11 +1100,10 @@ CQdump(void *ret)
 
        fprintf(stderr, "#scheduler status %s\n", statusname[pnstatus]);
        for (i = 0; i < pnettop; i++) {
-               fprintf(stderr, "#[%d]\t%s.%s %s ",
-                               i, pnet[i].mod, pnet[i].fcn, 
statusname[pnet[i].status]);
-               if ( pnet[i].beats != NO_HEARTBEAT)
-                       fprintf(stderr, "beats="LLFMT" ", pnet[i].beats);
-
+               fprintf(stderr, "#[%d]\t%s.%s %s ", i, pnet[i].mod, 
pnet[i].fcn, statusname[pnet[i].status]);
+               fprintf(stderr, "beats="LLFMT" ", pnet[i].beats);
+               fprintf(stderr, "startat="LLFMT" ", pnet[i].startat);
+               fprintf(stderr, "cycles=%d ", pnet[i].cycles);
                if( pnet[i].inout[0])
                        fprintf(stderr, " streams ");
                for (k = 0; k < MAXSTREAMS && pnet[i].baskets[k]; k++)
@@ -1163,10 +1165,7 @@ CQexecute( Client cntxt, int idx)
 static void
 CQscheduler(void *dummy)
 {
-       int i, j;
-       int k = -1;
-       int pntasks;
-       int delay = cycleDelay;
+       int i, j, k = -1, pntasks, delay = cycleDelay;
        Client cntxt = (Client) dummy;
        str msg = MAL_SUCCEED;
        lng t, now;
@@ -1202,14 +1201,12 @@ CQscheduler(void *dummy)
                        if( pnet[i].beats == NO_HEARTBEAT && pnet[i].baskets[0] 
== 0)
                                pnet[i].enabled = 0;
 
-                       if( pnet[i].enabled && pnet[i].beats > 0){
-                               if( pnet[i].run != lng_nil ) {
-                                       pnet[i].enabled = now >= pnet[i].run + 
pnet[i].beats;
+                       if( pnet[i].enabled && (pnet[i].beats > 0 || 
pnet[i].startat != NO_STARTAT)){
+                               pnet[i].enabled = now >= pnet[i].run + 
pnet[i].beats + pnet[i].startat;
 #ifdef DEBUG_CQUERY_SCHEDULER
-                                       fprintf(stderr,"#beat %s.%s  
"LLFMT"("LLFMT") %s\n", pnet[i].mod, pnet[i].fcn,
-                                               pnet[i].run + pnet[i].beats, 
now, (pnet[i].enabled? "enabled":"disabled"));
+                               fprintf(stderr,"#beat %s.%s  "LLFMT"("LLFMT") 
%s\n", pnet[i].mod, pnet[i].fcn,
+                                       pnet[i].run + pnet[i].beats + 
pnet[i].startat, now, (pnet[i].enabled? "enabled":"disabled"));
 #endif
-                               }
                        }
 
                        /* check if all input baskets are available */
diff --git a/sql/backends/monet5/sql_cquery.h b/sql/backends/monet5/sql_cquery.h
--- a/sql/backends/monet5/sql_cquery.h
+++ b/sql/backends/monet5/sql_cquery.h
@@ -53,7 +53,8 @@ typedef struct {
        int inout[MAXSTREAMS]; /* how the stream tables are used, needed for 
locking */
 
        int cycles;             /* limit the number of invocations before dying 
*/
-       lng beats;              /* heart beat stride for procedures activations 
*/
+       lng beats;              /* heart beat stride for procedures activations 
-> must be in microseconds */
+       lng startat;    /* start at the CQ at that precise moment (UNIX 
timestamp) -> must be in microseconds */
 
        MT_Id   tid;    /* Thread responsible */
        lng             run;    /* last executed relative to start of server */
diff --git a/sql/include/sql_catalog.h b/sql/include/sql_catalog.h
--- a/sql/include/sql_catalog.h
+++ b/sql/include/sql_catalog.h
@@ -109,8 +109,10 @@
 #define isDeclaredSchema(s)    (strcmp(s->base.name, dt_schema) == 0)
 
 /* continuous queries default parameters */
+#define NO_STARTAT                       0
 #define NO_HEARTBEAT               lng_nil
 #define NO_CYCLES                  int_nil
+#define DEFAULT_CP_STARTAT      NO_STARTAT /* no heartbeat set */
 #define DEFAULT_CP_HEARTBEAT  NO_HEARTBEAT /* no heartbeat set */
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to