Changeset: 1a2308c76615 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=1a2308c76615
Added Files:
sql/backends/monet5/Tests/cquery20.sql
Removed Files:
sql/backends/monet5/Tests/cq00.sql
Modified Files:
sql/backends/monet5/Tests/All
sql/backends/monet5/Tests/cqstream00.sql
sql/backends/monet5/Tests/cqstream01.sql
sql/backends/monet5/Tests/cqstream02.sql
sql/backends/monet5/Tests/cqstream03.sql
sql/backends/monet5/Tests/cquery00.sql
sql/backends/monet5/Tests/cquery05.sql
sql/backends/monet5/Tests/cquery10.sql
sql/backends/monet5/cquery.mal
sql/backends/monet5/sql_cquery.c
sql/backends/monet5/sql_cquery.h
sql/backends/monet5/sql_execute.c
sql/scripts/50_cquery.sql
Branch: trails
Log Message:
Basic control flow working
The start/pause/resume/stop continuous processing seems to work.
However, the transactions are not yet properly committed to get
the data into persistent tables.
diffs (truncated from 732 to 300 lines):
diff --git a/sql/backends/monet5/Tests/All b/sql/backends/monet5/Tests/All
--- a/sql/backends/monet5/Tests/All
+++ b/sql/backends/monet5/Tests/All
@@ -82,12 +82,10 @@ basket00
epoch
# use the continuous primitives
-cq00
-
-# direct script procedure interface
cquery00
cquery05
cquery10
+cquery20
cqstream00
cqstream01
diff --git a/sql/backends/monet5/Tests/cqstream00.sql
b/sql/backends/monet5/Tests/cqstream00.sql
--- a/sql/backends/monet5/Tests/cqstream00.sql
+++ b/sql/backends/monet5/Tests/cqstream00.sql
@@ -18,28 +18,26 @@ begin
insert into result1 select * from stmp2 where val <12;
insert into result2 select * from stmp2 where val >12;
end;
-call cquery.register('sys','cq_splitter');
-select * from cquery.status();
--- START cq_splitter;
-call cquery.resume('sys','cq_splitter');
+start continuous sys.cq_splitter();
-- wait for a few seconds for scheduler to do work
call cquery.wait(1000);
-- STOP cq_splitter;
-call cquery.pause('sys','cq_splitter');
+pause continuous sys.cq_splitter();
+
+select * from cquery.status();
+--select * from cquery.status();
+--select * from cquery.log();
select 'RESULT';
select * from stmp2;
select val from result1;
select val from result2;
---select * from cquery.status();
---select * from cquery.log();
-
-- ideally auto remove upon dropping the procedure
-call cquery.deregister('sys','cq_splitter');
+stop continuous sys.cq_splitter();
drop procedure cq_splitter;
drop table stmp2;
diff --git a/sql/backends/monet5/Tests/cqstream01.sql
b/sql/backends/monet5/Tests/cqstream01.sql
--- a/sql/backends/monet5/Tests/cqstream01.sql
+++ b/sql/backends/monet5/Tests/cqstream01.sql
@@ -8,23 +8,21 @@ insert into stmp2 values('2005-09-23 12:
insert into stmp2 values('2005-09-23 12:34:28.000',1,15.0);
-- CREATE CONTINUOUS QUERY cq_window
-create procedure cq_window()
+create continuous procedure cq_window()
begin
-- The window ensures a maximal number of tuples to consider
-- Could be considered a property of the stream table
call cquery.window('sys','stmp2',2);
insert into result2 select * from stmp2 where val >12;
end;
-call cquery.register('sys','cq_window');
--- START cq_window;
-call cquery.resume('sys','cq_window');
+start continuous sys.cq_window();
-- wait for a few seconds for scheduler to do its swork
call cquery.wait(1000);
-- STOP cq_window;
-call cquery.pause('sys','cq_window');
+pause continuous sys.cq_window();
select 'RESULT';
select val from stmp2;
@@ -33,7 +31,7 @@ select val from result2;
--select * from cquery.log();
-- ideally auto remove upon dropping the procedure
-call cquery.deregister('sys','cq_window');
+stop continuous sys.cq_window();
drop procedure cq_window;
drop table stmp2;
diff --git a/sql/backends/monet5/Tests/cqstream02.sql
b/sql/backends/monet5/Tests/cqstream02.sql
--- a/sql/backends/monet5/Tests/cqstream02.sql
+++ b/sql/backends/monet5/Tests/cqstream02.sql
@@ -11,7 +11,7 @@ begin
set tmp_total = tmp_total + (select sum(val) from sys.stmp10),
tmp_count = tmp_count + (select count(*) from sys.stmp10);
end;
-call cquery.register('sys','cq_collector');
+start continuous sys.cq_collector();
insert into stmp10 values('2005-09-23 12:34:26.000',1,9.0);
insert into stmp10 values('2005-09-23 12:34:27.000',1,11.0);
@@ -20,12 +20,11 @@ insert into stmp10 values('2005-09-23 12
-- Run the query a few times
call cquery.cycles(3);
-call cquery.resume('sys','cq_collector');
call cquery.wait(1000);
-call cquery.pause('sys','cq_collector');
-call cquery.deregister('sys','cq_collector');
+pause continuous sys.cq_collector();
+stop continuous sys.cq_collector();
select * from tmp_aggregate;
diff --git a/sql/backends/monet5/Tests/cqstream03.sql
b/sql/backends/monet5/Tests/cqstream03.sql
--- a/sql/backends/monet5/Tests/cqstream03.sql
+++ b/sql/backends/monet5/Tests/cqstream03.sql
@@ -15,12 +15,12 @@ begin
insert into agenda13 select count(*), 'full batch' from tmp13;
end if;
end;
-call cquery.register('sys','cq_agenda');
+start continuous sys.cq_agenda();
call cquery.heartbeat('sys','cq_agenda',1000);
select * from cquery.status();
-call cquery.deregister('sys','cq_agenda');
+stop continuous sys.cq_agenda();
drop procedure cq_agenda;
drop table tmp13;
drop table agenda13;
diff --git a/sql/backends/monet5/Tests/cquery00.sql
b/sql/backends/monet5/Tests/cquery00.sql
--- a/sql/backends/monet5/Tests/cquery00.sql
+++ b/sql/backends/monet5/Tests/cquery00.sql
@@ -1,20 +1,21 @@
+-- test ordinary procedure call to update a stream
create stream table testing (a int);
insert into testing values(123);
create table results (a int);
-create continuous procedure myfirstcq()
+create continuous procedure myproc()
begin
insert into results select a from sys.testing;
END;
-- a continuous procedure can be called like any other procedure
-call myfirstcq();
+call myproc();
select * from results;
-select * from functions wherE name = 'myfirstcq';
+select * from functions where name = 'myproc';
-drop procedure myfirstcq;
+drop procedure myproc;
drop table results;
drop table testing;
diff --git a/sql/backends/monet5/Tests/cquery05.sql
b/sql/backends/monet5/Tests/cquery05.sql
--- a/sql/backends/monet5/Tests/cquery05.sql
+++ b/sql/backends/monet5/Tests/cquery05.sql
@@ -1,5 +1,5 @@
--- A simple continuous query over non-stream relations
--- controlled by a heartbeat.
+-- A simple continuous procedure over non-stream relations
+-- The procedure will not be executed before a heartbeat is set.
create table cqresult05(i integer);
create procedure cq_basic()
@@ -7,25 +7,23 @@ begin
insert into cqresult05 (select count(*) from cqresult05);
end;
--- register the CQ
-call cquery.register('sys','cq_basic');
+start continuous sys.cq_basic();
--- The scheduler executes this CQ every 1000 milliseconds
call cquery.heartbeat('sys','cq_basic',1000);
--- reactivate this continuous query
-call cquery.resume('sys','cq_basic');
call cquery.wait(2100);
-call cquery.pause('sys','cq_basic');
+
+pause continuous sys.cq_basic();
+
+--select * from cquery.status();
+--select * from cquery.summary();
+--select * from cquery.log();
+
+stop continuous sys.cq_basic();
select 'RESULT';
select * from cqresult05;
---select * from cquery.summary();
---select * from cquery.log();
-
--- ideally auto remove upon dropping the procedure
-call cquery.deregister('sys','cq_basic');
drop procedure cq_basic;
drop table cqresult05;
diff --git a/sql/backends/monet5/Tests/cquery10.sql
b/sql/backends/monet5/Tests/cquery10.sql
--- a/sql/backends/monet5/Tests/cquery10.sql
+++ b/sql/backends/monet5/Tests/cquery10.sql
@@ -1,6 +1,6 @@
-- A simple continuous query over non-stream relations
-- controlled by a cycle count
-create table result(i integer);
+create stream table result(i integer);
create procedure cq_cycles()
begin
@@ -8,18 +8,18 @@ begin
end;
-- register the CQ
-call cquery.register('sys','cq_cycles');
+start continuous sys.cq_cycles();
-- The scheduler interval is 1 sec
-call cquery.heartbeat('sys','cq_cycles',1000);
+--call cquery.heartbeat('sys','cq_cycles',1000);
-- The scheduler executes all CQ at most 5 rounds
call cquery.cycles('sys','cq_cycles',3);
-- reactivate all continuous queries
-call cquery.resume();
+
call cquery.wait(4000);
-call cquery.pause();
+pause continuous sys.cq_cycles();
select 'RESULT';
select * from result;
@@ -28,7 +28,7 @@ select * from result;
--select * from cquery.log();
-- ideally auto remove upon dropping the procedure
-call cquery.deregister('sys','cq_cycles');
+stop continuous sys.cq_cycles();
drop procedure cq_cycles;
drop table result;
diff --git a/sql/backends/monet5/Tests/cq00.sql
b/sql/backends/monet5/Tests/cquery20.sql
rename from sql/backends/monet5/Tests/cq00.sql
rename to sql/backends/monet5/Tests/cquery20.sql
--- a/sql/backends/monet5/Tests/cq00.sql
+++ b/sql/backends/monet5/Tests/cquery20.sql
@@ -1,6 +1,6 @@
-- create a continuous query and find it
-create table cqtbl(i integer);
+create stream table cqtbl(i integer);
-- the hello example
create continuous procedure cqfoo(v integer)
@@ -18,6 +18,7 @@ select * from cqtbl;
start continuous cqfoo(321);
select * from cqtbl;
+stop continuous cqfoo(321);
--stop continuous cqfoo;
select * from cqtbl;
diff --git a/sql/backends/monet5/cquery.mal b/sql/backends/monet5/cquery.mal
--- a/sql/backends/monet5/cquery.mal
+++ b/sql/backends/monet5/cquery.mal
@@ -17,7 +17,12 @@
module cquery;
+pattern registersql(mod:str, fcn:str, arg:any...)
+address CQprocedure
+comment "Add a continuous SQL procedure to the Petri-net scheduler. It will
analyse
+the MAL block to determine the input/output dependencies and firing
conditions.";
pattern registersql(mod:str, fcn:str)
+
address CQprocedure
comment "Add a continuous SQL procedure to the Petri-net scheduler. It will
analyse
the MAL block to determine the input/output dependencies and firing
conditions.";
@@ -72,7 +77,7 @@ address CQerror
comment "Remember the error seen";
# continuous query status analysis
-pattern status()
(tick:bat[:timestamp],mod:bat[:str],fcn:bat[:str],state:bat[:str],error:bat[:str])
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list