Changeset: 30550d946c8d for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=30550d946c8d
Modified Files:
sql/backends/monet5/Tests/cfunction00.stable.out
sql/backends/monet5/Tests/cquery04.sql
sql/backends/monet5/Tests/cquery04.stable.out
sql/backends/monet5/sql_basket.c
sql/backends/monet5/sql_cquery.c
sql/backends/monet5/sql_cquery.h
sql/include/sql_catalog.h
sql/server/sql_parser.y
sql/server/sql_scan.c
Branch: trails
Log Message:
Started to implement delayed continuous queries, which means that they will
start at some point in the future. (e.g. START CONTINUOUS ... AT '12:00:00')
diffs (truncated from 380 to 300 lines):
diff --git a/sql/backends/monet5/Tests/cfunction00.stable.out
b/sql/backends/monet5/Tests/cfunction00.stable.out
--- a/sql/backends/monet5/Tests/cfunction00.stable.out
+++ b/sql/backends/monet5/Tests/cfunction00.stable.out
@@ -42,7 +42,7 @@ Ready.
% id, name, func, mod, language, type, side_effect, varres,
vararg, schema_id # name
% int, varchar, varchar, varchar, int, int,
boolean, boolean, boolean, int # type
% 4, 6, 147, 4, 1, 1, 5, 5, 5, 4 #
length
-[ 8516, "aggr00", "create function aggr00() \nreturns
integer\nbegin\n declare s int;\n set s = 0;\n while (true)\n do\n set s = s +
1;\n yield s;\n end while;\n return s;\nend;", "user", 2, 1,
false, false, false, 2000 ]
+[ 8538, "aggr00", "create function aggr00() \nreturns
integer\nbegin\n declare s int;\n set s = 0;\n while (true)\n do\n set s = s +
1;\n yield s;\n end while;\n return s;\nend;", "user", 2, 1,
false, false, false, 2000 ]
#select aggr00(); #should return 1
% .L2 # table_name
% L2 # name
diff --git a/sql/backends/monet5/Tests/cquery04.sql
b/sql/backends/monet5/Tests/cquery04.sql
--- a/sql/backends/monet5/Tests/cquery04.sql
+++ b/sql/backends/monet5/Tests/cquery04.sql
@@ -1,37 +1,37 @@
-- Test strides on a stream table
-create stream table cqinput06(aaa integer) set window 2 stride all;
+create stream table cqinput04(aaa integer) set window 2 stride all;
-create table cqresult06(aaa integer);
+create table cqresult04(aaa integer);
-create procedure cq_basic06()
+create procedure cq_basic04()
begin
- insert into cqresult06 (select count(*) from cqinput06);
+ insert into cqresult04 (select count(*) from cqinput04);
end;
-start continuous procedure sys.cq_basic06();
+start continuous procedure sys.cq_basic04();
-insert into cqinput06 values (1), (2);
+insert into cqinput04 values (1), (2);
-insert into cqinput06 values (3), (4);
+insert into cqinput04 values (3), (4);
call cquery.wait(1000);
-select aaa from cqresult06; --output 2 tuples with value 2
+select aaa from cqresult04; --output 2 tuples with value 2
-insert into cqinput06 values (5);
+insert into cqinput04 values (5);
call cquery.wait(1000);
-select aaa from cqresult06; --output 2 tuples with value 2
+select aaa from cqresult04; --output 2 tuples with value 2
-insert into cqinput06 values (6);
+insert into cqinput04 values (6);
call cquery.wait(1000);
-select aaa from cqresult06; --output 3 tuples with value 2
+select aaa from cqresult04; --output 3 tuples with value 2
-stop continuous procedure sys.cq_basic06();
+stop continuous procedure sys.cq_basic04();
-drop procedure cq_basic06;
-drop table cqinput06;
-drop table cqresult06;
+drop procedure cq_basic04;
+drop table cqinput04;
+drop table cqresult04;
diff --git a/sql/backends/monet5/Tests/cquery04.stable.out
b/sql/backends/monet5/Tests/cquery04.stable.out
--- a/sql/backends/monet5/Tests/cquery04.stable.out
+++ b/sql/backends/monet5/Tests/cquery04.stable.out
@@ -75,8 +75,8 @@ Ready.
[ 2 ]
#insert into cqinput06 values (3), (4);
[ 2 ]
-#select * from cqresult06; --output 2 tuples with value 2
-% sys.cqresult06 # table_name
+#select aaa from cqresult04; --output 2 tuples with value 2
+% sys.cqresult04 # table_name
% aaa # name
% int # type
% 1 # length
@@ -84,8 +84,8 @@ Ready.
[ 2 ]
#insert into cqinput06 values (5);
[ 1 ]
-#select * from cqresult06; --output 2 tuples with value 2
-% sys.cqresult06 # table_name
+#select aaa from cqresult04; --output 2 tuples with value 2
+% sys.cqresult04 # table_name
% aaa # name
% int # type
% 1 # length
@@ -93,8 +93,8 @@ Ready.
[ 2 ]
#insert into cqinput06 values (6);
[ 1 ]
-#select * from cqresult06; --output 3 tuples with value 2
-% sys.cqresult06 # table_name
+#select aaa from cqresult04; --output 3 tuples with value 2
+% sys.cqresult04 # table_name
% aaa # name
% int # type
% 1 # length
diff --git a/sql/backends/monet5/sql_basket.c b/sql/backends/monet5/sql_basket.c
--- a/sql/backends/monet5/sql_basket.c
+++ b/sql/backends/monet5/sql_basket.c
@@ -95,7 +95,7 @@ BSKTclean(int idx)
baskets[idx].table = NULL;
baskets[idx].error = NULL;
baskets[idx].window = 0;
- baskets[idx].stride = -1;
+ baskets[idx].stride = STRIDE_ALL;
baskets[idx].count = 0;
baskets[idx].events = 0;
baskets[idx].seen = *timestamp_nil;
@@ -220,7 +220,7 @@ BSKTwindow(Client cntxt, MalBlkPtr mb, M
throw(MAL,"basket.window",SQLSTATE(42000) "negative window not
allowed\n");
if( pci->argc == 5) {
stride = *getArgReference_int(stk,pci,4);
- if( stride < -1)
+ if( stride < STRIDE_ALL)
throw(MAL,"basket.stride",SQLSTATE(42000) "negative
stride not allowed\n");
if( window < stride)
throw(MAL,"basket.window",SQLSTATE(42000) "the window
size must not be smaller than the stride size\n");
@@ -381,12 +381,12 @@ BSKTtumbleInternal(Client cntxt, str sch
int i;
(void) cntxt;
- if( stride < -1)
+ if( stride < STRIDE_ALL)
throw(MAL,"basket.tumble",SQLSTATE(42000) "negative stride not
allowed\n");
_DEBUG_BASKET_ fprintf(stderr,"Tumble %s.%s %d
elements\n",sch,tbl,stride);
if( stride == 0)
return MAL_SUCCEED;
- if( stride == -1) /*IMPORTANT set the implementation stride size to the
window size */
+ if( stride == STRIDE_ALL) /*IMPORTANT set the implementation stride
size to the window size */
stride = window;
for(i=0; i< baskets[bskt].ncols ; i++){
b = baskets[bskt].bats[i];
@@ -443,7 +443,7 @@ str
BSKTtumble(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
str sch, tbl, msg;
- int idx, elw, elm = -1;
+ int idx, elw, elm = STRIDE_ALL;
(void) cntxt;
(void) mb;
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -625,7 +625,8 @@ CQregister(Client cntxt, MalBlkPtr mb, M
}
pnet[pnettop].cycles = cycles;
pnet[pnettop].beats = SET_HEARTBEATS(heartbeats);
- pnet[pnettop].run = lng_nil;
+ pnet[pnettop].startat = 0;
+ pnet[pnettop].run = 0;
pnet[pnettop].seen = *timestamp_nil;
pnet[pnettop].status = CQWAIT;
pnettop++;
@@ -930,7 +931,7 @@ str
CQheartbeat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
str sch, fcn, msg = MAL_SUCCEED;
- int beats, j, idx=0, last= pnettop;
+ int heartbeats, j, there_is_window_constraint, idx=0, last= pnettop;
(void) cntxt;
(void) mb;
@@ -945,37 +946,39 @@ CQheartbeat(Client cntxt, MalBlkPtr mb,
goto finish;
}
last = idx+1;
- beats = *getArgReference_int(stk,pci,3);
+ heartbeats = *getArgReference_int(stk,pci,3);
#ifdef DEBUG_CQUERY
fprintf(stderr, "#set the heartbeat of %s.%s to
%d\n",sch,fcn,beats);
#endif
} else{
- beats = *getArgReference_int(stk,pci,1);
+ heartbeats = *getArgReference_int(stk,pci,1);
#ifdef DEBUG_CQUERY
fprintf(stderr, "#set the heartbeat %d ms\n",beats);
#endif
}
- if(beats < 0 && beats != NO_HEARTBEAT){
+ if(heartbeats < 0){
msg = createException(SQL,"cquery.heartbeat",SQLSTATE(42000)
"The heartbeats value must be non negative\n");
goto finish;
}
+
for( ; idx < last; idx++){
- if( pnet[idx].baskets[0]){
- msg = createException(SQL,"cquery.heartbeat",
-
SQLSTATE(42000) "Heartbeat ignored, a window constraint exists\n");
- break;
- }
- if(beats != NO_HEARTBEAT) {
- for(j=0; j < MAXSTREAMS && pnet[idx].baskets[j]; j++) {
- if(pnet[idx].inout[j] == STREAM_IN &&
baskets[pnet[idx].baskets[j]].window != DEFAULT_TABLE_WINDOW) {
- // can not set the beat due to stream
window constraint
- pnet[idx].beats =
SET_HEARTBEATS(NO_HEARTBEAT);
- goto finish;
- }
+ there_is_window_constraint = 0;
+ for(j=0; j < MAXSTREAMS && !there_is_window_constraint &&
pnet[idx].baskets[j]; j++) {
+ if(baskets[pnet[idx].baskets[j]].window !=
DEFAULT_TABLE_WINDOW) {
+ there_is_window_constraint = 1;
}
}
- pnet[idx].beats = SET_HEARTBEATS(beats);
+ if(heartbeats != NO_HEARTBEAT && there_is_window_constraint) {
+ msg = createException(SQL, "cquery.heartbeat",
+
SQLSTATE(42000) "Heartbeat ignored, a window constraint exists\n");
+ goto finish;
+ }
}
+
+ for( ; idx < last; idx++){
+ pnet[idx].beats = SET_HEARTBEATS(heartbeats);
+ }
+
finish:
MT_lock_unset(&ttrLock);
return msg;
@@ -1097,11 +1100,10 @@ CQdump(void *ret)
fprintf(stderr, "#scheduler status %s\n", statusname[pnstatus]);
for (i = 0; i < pnettop; i++) {
- fprintf(stderr, "#[%d]\t%s.%s %s ",
- i, pnet[i].mod, pnet[i].fcn,
statusname[pnet[i].status]);
- if ( pnet[i].beats != NO_HEARTBEAT)
- fprintf(stderr, "beats="LLFMT" ", pnet[i].beats);
-
+ fprintf(stderr, "#[%d]\t%s.%s %s ", i, pnet[i].mod,
pnet[i].fcn, statusname[pnet[i].status]);
+ fprintf(stderr, "beats="LLFMT" ", pnet[i].beats);
+ fprintf(stderr, "startat="LLFMT" ", pnet[i].startat);
+ fprintf(stderr, "cycles=%d ", pnet[i].cycles);
if( pnet[i].inout[0])
fprintf(stderr, " streams ");
for (k = 0; k < MAXSTREAMS && pnet[i].baskets[k]; k++)
@@ -1163,10 +1165,7 @@ CQexecute( Client cntxt, int idx)
static void
CQscheduler(void *dummy)
{
- int i, j;
- int k = -1;
- int pntasks;
- int delay = cycleDelay;
+ int i, j, k = -1, pntasks, delay = cycleDelay;
Client cntxt = (Client) dummy;
str msg = MAL_SUCCEED;
lng t, now;
@@ -1202,14 +1201,12 @@ CQscheduler(void *dummy)
if( pnet[i].beats == NO_HEARTBEAT && pnet[i].baskets[0]
== 0)
pnet[i].enabled = 0;
- if( pnet[i].enabled && pnet[i].beats > 0){
- if( pnet[i].run != lng_nil ) {
- pnet[i].enabled = now >= pnet[i].run +
pnet[i].beats;
+ if( pnet[i].enabled && (pnet[i].beats > 0 ||
pnet[i].startat != NO_STARTAT)){
+ pnet[i].enabled = now >= pnet[i].run +
pnet[i].beats + pnet[i].startat;
#ifdef DEBUG_CQUERY_SCHEDULER
- fprintf(stderr,"#beat %s.%s
"LLFMT"("LLFMT") %s\n", pnet[i].mod, pnet[i].fcn,
- pnet[i].run + pnet[i].beats,
now, (pnet[i].enabled? "enabled":"disabled"));
+ fprintf(stderr,"#beat %s.%s "LLFMT"("LLFMT")
%s\n", pnet[i].mod, pnet[i].fcn,
+ pnet[i].run + pnet[i].beats +
pnet[i].startat, now, (pnet[i].enabled? "enabled":"disabled"));
#endif
- }
}
/* check if all input baskets are available */
diff --git a/sql/backends/monet5/sql_cquery.h b/sql/backends/monet5/sql_cquery.h
--- a/sql/backends/monet5/sql_cquery.h
+++ b/sql/backends/monet5/sql_cquery.h
@@ -53,7 +53,8 @@ typedef struct {
int inout[MAXSTREAMS]; /* how the stream tables are used, needed for
locking */
int cycles; /* limit the number of invocations before dying
*/
- lng beats; /* heart beat stride for procedures activations
*/
+ lng beats; /* heart beat stride for procedures activations
-> must be in microseconds */
+ lng startat; /* start at the CQ at that precise moment (UNIX
timestamp) -> must be in microseconds */
MT_Id tid; /* Thread responsible */
lng run; /* last executed relative to start of server */
diff --git a/sql/include/sql_catalog.h b/sql/include/sql_catalog.h
--- a/sql/include/sql_catalog.h
+++ b/sql/include/sql_catalog.h
@@ -109,8 +109,10 @@
#define isDeclaredSchema(s) (strcmp(s->base.name, dt_schema) == 0)
/* continuous queries default parameters */
+#define NO_STARTAT 0
#define NO_HEARTBEAT lng_nil
#define NO_CYCLES int_nil
+#define DEFAULT_CP_STARTAT NO_STARTAT /* no heartbeat set */
#define DEFAULT_CP_HEARTBEAT NO_HEARTBEAT /* no heartbeat set */
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list