Changeset: 1a2308c76615 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=1a2308c76615
Added Files:
        sql/backends/monet5/Tests/cquery20.sql
Removed Files:
        sql/backends/monet5/Tests/cq00.sql
Modified Files:
        sql/backends/monet5/Tests/All
        sql/backends/monet5/Tests/cqstream00.sql
        sql/backends/monet5/Tests/cqstream01.sql
        sql/backends/monet5/Tests/cqstream02.sql
        sql/backends/monet5/Tests/cqstream03.sql
        sql/backends/monet5/Tests/cquery00.sql
        sql/backends/monet5/Tests/cquery05.sql
        sql/backends/monet5/Tests/cquery10.sql
        sql/backends/monet5/cquery.mal
        sql/backends/monet5/sql_cquery.c
        sql/backends/monet5/sql_cquery.h
        sql/backends/monet5/sql_execute.c
        sql/scripts/50_cquery.sql
Branch: trails
Log Message:

Basic control flow working
The start/pause/resume/stop continuous processing seems to work.
However, the transactions are not yet properly committed to get
the data into persistent tables.


diffs (truncated from 732 to 300 lines):

diff --git a/sql/backends/monet5/Tests/All b/sql/backends/monet5/Tests/All
--- a/sql/backends/monet5/Tests/All
+++ b/sql/backends/monet5/Tests/All
@@ -82,12 +82,10 @@ basket00
 epoch
 
 # use the continuous primitives
-cq00
-
-# direct script procedure interface 
 cquery00
 cquery05
 cquery10
+cquery20
 
 cqstream00
 cqstream01
diff --git a/sql/backends/monet5/Tests/cqstream00.sql 
b/sql/backends/monet5/Tests/cqstream00.sql
--- a/sql/backends/monet5/Tests/cqstream00.sql
+++ b/sql/backends/monet5/Tests/cqstream00.sql
@@ -18,28 +18,26 @@ begin
     insert into result1 select * from stmp2 where val <12;
     insert into result2 select * from stmp2 where val >12;
 end;
-call cquery.register('sys','cq_splitter');
-select * from cquery.status();
 
--- START cq_splitter;
-call cquery.resume('sys','cq_splitter');
+start continuous sys.cq_splitter();
 
 -- wait for a few seconds for scheduler to do work
 call cquery.wait(1000);
 
 -- STOP cq_splitter;
-call cquery.pause('sys','cq_splitter');
+pause continuous sys.cq_splitter();
+
+select * from cquery.status();
+--select * from cquery.status();
+--select * from cquery.log();
 
 select 'RESULT';
 select * from stmp2;
 select val from result1;
 select val from result2;
 
---select * from cquery.status();
---select * from cquery.log();
-
 -- ideally auto remove upon dropping the procedure
-call cquery.deregister('sys','cq_splitter');
+stop continuous sys.cq_splitter();
 
 drop procedure cq_splitter;
 drop table stmp2;
diff --git a/sql/backends/monet5/Tests/cqstream01.sql 
b/sql/backends/monet5/Tests/cqstream01.sql
--- a/sql/backends/monet5/Tests/cqstream01.sql
+++ b/sql/backends/monet5/Tests/cqstream01.sql
@@ -8,23 +8,21 @@ insert into stmp2 values('2005-09-23 12:
 insert into stmp2 values('2005-09-23 12:34:28.000',1,15.0);
 
 -- CREATE CONTINUOUS QUERY cq_window
-create procedure cq_window()
+create continuous procedure cq_window()
 begin
        -- The window ensures a maximal number of tuples to consider
        -- Could be considered a property of the stream table
     call cquery.window('sys','stmp2',2);
     insert into result2 select * from stmp2 where val >12;
 end;
-call cquery.register('sys','cq_window');
 
--- START cq_window;
-call cquery.resume('sys','cq_window');
+start continuous sys.cq_window();
 
 -- wait for a few seconds for scheduler to do its swork
 call cquery.wait(1000);
 
 -- STOP cq_window;
-call cquery.pause('sys','cq_window');
+pause continuous sys.cq_window();
 
 select 'RESULT';
 select val from stmp2;
@@ -33,7 +31,7 @@ select val from result2;
 --select * from cquery.log();
 
 -- ideally auto remove upon dropping the procedure
-call cquery.deregister('sys','cq_window');
+stop continuous sys.cq_window();
 
 drop procedure cq_window;
 drop table stmp2;
diff --git a/sql/backends/monet5/Tests/cqstream02.sql 
b/sql/backends/monet5/Tests/cqstream02.sql
--- a/sql/backends/monet5/Tests/cqstream02.sql
+++ b/sql/backends/monet5/Tests/cqstream02.sql
@@ -11,7 +11,7 @@ begin
         set tmp_total = tmp_total + (select sum(val) from sys.stmp10),
             tmp_count = tmp_count + (select count(*) from sys.stmp10);
 end;
-call cquery.register('sys','cq_collector');
+start continuous sys.cq_collector();
 
 insert into stmp10 values('2005-09-23 12:34:26.000',1,9.0);
 insert into stmp10 values('2005-09-23 12:34:27.000',1,11.0);
@@ -20,12 +20,11 @@ insert into stmp10 values('2005-09-23 12
 
 -- Run the query a few times
 call cquery.cycles(3);
-call cquery.resume('sys','cq_collector');
 
 call cquery.wait(1000);
 
-call cquery.pause('sys','cq_collector');
-call cquery.deregister('sys','cq_collector');
+pause continuous sys.cq_collector();
+stop continuous sys.cq_collector();
 
 select * from tmp_aggregate;
 
diff --git a/sql/backends/monet5/Tests/cqstream03.sql 
b/sql/backends/monet5/Tests/cqstream03.sql
--- a/sql/backends/monet5/Tests/cqstream03.sql
+++ b/sql/backends/monet5/Tests/cqstream03.sql
@@ -15,12 +15,12 @@ begin
         insert into agenda13 select count(*), 'full batch' from tmp13;
     end if;
 end;
-call cquery.register('sys','cq_agenda');
+start continuous sys.cq_agenda();
 call cquery.heartbeat('sys','cq_agenda',1000);
 
 select * from cquery.status();
 
-call cquery.deregister('sys','cq_agenda');
+stop continuous sys.cq_agenda();
 drop procedure cq_agenda;
 drop table tmp13;
 drop table agenda13;
diff --git a/sql/backends/monet5/Tests/cquery00.sql 
b/sql/backends/monet5/Tests/cquery00.sql
--- a/sql/backends/monet5/Tests/cquery00.sql
+++ b/sql/backends/monet5/Tests/cquery00.sql
@@ -1,20 +1,21 @@
+-- test ordinary procedure call to update a stream
 create stream table testing (a int);
 insert into testing values(123);
 
 create table results (a int);
 
-create continuous procedure myfirstcq() 
+create continuous procedure myproc() 
 begin 
        insert into results select a from sys.testing; 
 END;
 
 -- a continuous procedure can be called like any other procedure
-call myfirstcq();
+call myproc();
 
 select * from results;
 
-select * from functions wherE name = 'myfirstcq';
+select * from functions where name = 'myproc';
 
-drop procedure myfirstcq;
+drop procedure myproc;
 drop table results;
 drop table testing;
diff --git a/sql/backends/monet5/Tests/cquery05.sql 
b/sql/backends/monet5/Tests/cquery05.sql
--- a/sql/backends/monet5/Tests/cquery05.sql
+++ b/sql/backends/monet5/Tests/cquery05.sql
@@ -1,5 +1,5 @@
--- A simple continuous query over non-stream relations
--- controlled by a heartbeat.
+-- A simple continuous procedure over non-stream relations
+-- The procedure will not be executed before a heartbeat is set.
 create table cqresult05(i integer);
 
 create procedure cq_basic()
@@ -7,25 +7,23 @@ begin
        insert into cqresult05 (select count(*) from cqresult05);
 end;
 
--- register the CQ
-call cquery.register('sys','cq_basic');
+start continuous sys.cq_basic();
 
--- The scheduler executes this CQ every 1000 milliseconds
 call cquery.heartbeat('sys','cq_basic',1000);
 
--- reactivate this continuous query
-call cquery.resume('sys','cq_basic');
 call cquery.wait(2100);
-call cquery.pause('sys','cq_basic');
+
+pause continuous sys.cq_basic();
+
+--select * from cquery.status();
+--select * from cquery.summary();
+--select * from cquery.log();
+
+stop continuous sys.cq_basic();
 
 select 'RESULT';
 select * from cqresult05;
 
---select * from cquery.summary();
---select * from cquery.log();
-
--- ideally auto remove upon dropping the procedure
-call cquery.deregister('sys','cq_basic');
 
 drop procedure cq_basic;
 drop table cqresult05;
diff --git a/sql/backends/monet5/Tests/cquery10.sql 
b/sql/backends/monet5/Tests/cquery10.sql
--- a/sql/backends/monet5/Tests/cquery10.sql
+++ b/sql/backends/monet5/Tests/cquery10.sql
@@ -1,6 +1,6 @@
 -- A simple continuous query over non-stream relations
 -- controlled by a cycle count
-create table result(i integer);
+create stream table result(i integer);
 
 create procedure cq_cycles()
 begin
@@ -8,18 +8,18 @@ begin
 end;
 
 -- register the CQ
-call cquery.register('sys','cq_cycles');
+start continuous sys.cq_cycles();
 
 -- The scheduler interval is 1 sec 
-call cquery.heartbeat('sys','cq_cycles',1000);
+--call cquery.heartbeat('sys','cq_cycles',1000);
 
 -- The scheduler executes all CQ at most 5 rounds
 call cquery.cycles('sys','cq_cycles',3);
 
 -- reactivate all continuous queries
-call cquery.resume();
+
 call cquery.wait(4000);
-call cquery.pause();
+pause continuous sys.cq_cycles();
 
 select 'RESULT';
 select * from result;
@@ -28,7 +28,7 @@ select * from result;
 --select * from cquery.log();
 
 -- ideally auto remove upon dropping the procedure
-call cquery.deregister('sys','cq_cycles');
+stop continuous sys.cq_cycles();
 
 drop procedure cq_cycles;
 drop table result;
diff --git a/sql/backends/monet5/Tests/cq00.sql 
b/sql/backends/monet5/Tests/cquery20.sql
rename from sql/backends/monet5/Tests/cq00.sql
rename to sql/backends/monet5/Tests/cquery20.sql
--- a/sql/backends/monet5/Tests/cq00.sql
+++ b/sql/backends/monet5/Tests/cquery20.sql
@@ -1,6 +1,6 @@
 -- create a continuous query and find it
 
-create table cqtbl(i integer);
+create stream table cqtbl(i integer);
 
 -- the hello example
 create continuous procedure cqfoo(v integer)
@@ -18,6 +18,7 @@ select * from cqtbl;
 start continuous cqfoo(321);
 select * from cqtbl;
 
+stop continuous cqfoo(321);
 --stop continuous cqfoo;
 select * from cqtbl;
 
diff --git a/sql/backends/monet5/cquery.mal b/sql/backends/monet5/cquery.mal
--- a/sql/backends/monet5/cquery.mal
+++ b/sql/backends/monet5/cquery.mal
@@ -17,7 +17,12 @@
 
 module cquery;
 
+pattern registersql(mod:str, fcn:str, arg:any...)
+address CQprocedure
+comment "Add a continuous SQL procedure to the Petri-net scheduler. It will 
analyse
+the MAL block to determine the input/output dependencies and firing 
conditions.";
 pattern registersql(mod:str, fcn:str)
+
 address CQprocedure
 comment "Add a continuous SQL procedure to the Petri-net scheduler. It will 
analyse
 the MAL block to determine the input/output dependencies and firing 
conditions.";
@@ -72,7 +77,7 @@ address CQerror
 comment "Remember the error seen";
 
 # continuous query status analysis
-pattern status() 
(tick:bat[:timestamp],mod:bat[:str],fcn:bat[:str],state:bat[:str],error:bat[:str])
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to