Changeset: 77eb40fb1e56 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=77eb40fb1e56
Modified Files:
sql/backends/monet5/iot/50_iot.sql
sql/backends/monet5/iot/Tests/iot00.sql
sql/backends/monet5/iot/Tests/iot05.sql
sql/backends/monet5/iot/Tests/iot10.sql
sql/backends/monet5/iot/Tests/iot99.sql
sql/backends/monet5/iot/basket.c
sql/backends/monet5/iot/basket.mal
sql/backends/monet5/iot/iot.c
sql/backends/monet5/iot/iot.h
sql/backends/monet5/iot/iot.mal
sql/backends/monet5/iot/petrinet.c
sql/backends/monet5/iot/petrinet.h
sql/backends/monet5/iot/petrinet.mal
sql/backends/monet5/sql_optimizer.c
Branch: iot
Log Message:
Intermittent commit
diffs (truncated from 411 to 300 lines):
diff --git a/sql/backends/monet5/iot/50_iot.sql
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -17,49 +17,47 @@
create schema iot;
+-- register and start a continuous query
create procedure iot.query(qry string)
external name iot.query;
create procedure iot.query("schema" string, name string)
external name iot.query;
+-- pause the processing of a continuous query
+create procedure iot.pause ("schema" string, name string)
+ external name iot.pause;
+
create procedure iot.pause ()
external name iot.pause;
-create procedure iot.pause ("schema" string, name string)
- external name iot.pause;
+-- resume the processing of a continuous query
+create procedure iot.resume ("schema" string, name string)
+ external name iot.resume;
create procedure iot.resume ()
external name iot.resume;
-create procedure iot.resume ("schema" string, name string)
- external name iot.resume;
+-- resume with limited the number of scheduler before next pause
+create procedure iot.cycles(n integer)
+ external name iot.cycles;
+
+-- stop and remove a continuous query
+create procedure iot.stop ("schema" string, name string)
+ external name iot.stop;
create procedure iot.stop ()
external name iot.stop;
-create procedure iot."drop" ()
- external name iot."drop";
-
-create procedure iot.stop ("schema" string, name string)
- external name iot.stop;
-
-create procedure iot.dump()
- external name iot.dump;
-
-create procedure iot.petrinet()
- external name iot.petrinet;
-
-
-- Inspection tables
--- create function iot.baskets()
--- returns table( "schema" string, "table" string, threshold int, winsize
int, winstride int, timeslice int, timestride int, beat int, seen timestamp,
events int)
--- external name iot.baskets;
---
--- create function iot.queries()
--- returns table( "schema" string, "function" string, status string, lastrun
timestamp, cycles int, events int, time bigint, error string)
--- external name petrinet.queries;
+create function iot.baskets()
+returns table( "schema" string, "table" string, threshold int, winsize int,
winstride int, timeslice int, timestride int, beat int, seen timestamp, events
int)
+external name iot.baskets;
+
+create function iot.queries()
+ returns table( "schema" string, "function" string, status string, lastrun
timestamp, cycles int, events int, time bigint, error string)
+ external name iot.queries;
-- create function iot.errors()
-- returns table( "schema" string, "table" string, error string)
diff --git a/sql/backends/monet5/iot/Tests/iot00.sql
b/sql/backends/monet5/iot/Tests/iot00.sql
--- a/sql/backends/monet5/iot/Tests/iot00.sql
+++ b/sql/backends/monet5/iot/Tests/iot00.sql
@@ -16,6 +16,4 @@ call iot.query('insert into result selec
call iot.baskets();
call iot.petrinet();
-call iot.dump();
-call iot.drop();
-call iot.dump();
+call iot.stop();
diff --git a/sql/backends/monet5/iot/Tests/iot05.sql
b/sql/backends/monet5/iot/Tests/iot05.sql
--- a/sql/backends/monet5/iot/Tests/iot05.sql
+++ b/sql/backends/monet5/iot/Tests/iot05.sql
@@ -34,7 +34,4 @@ call iot.query('select 1;');
call iot.baskets();
call iot.scheduler();
-call iot.resume();
call iot.stop();
-call iot.drop();
-call iot.dump();
diff --git a/sql/backends/monet5/iot/Tests/iot10.sql
b/sql/backends/monet5/iot/Tests/iot10.sql
--- a/sql/backends/monet5/iot/Tests/iot10.sql
+++ b/sql/backends/monet5/iot/Tests/iot10.sql
@@ -13,9 +13,7 @@ begin
tmp_count = tmp_total + (select count(*) from
iot.stream_tmp);
end;
--- alternative is a simple query
-iot.pause();
iot.query('iot','collector');
-iot.dump();
-iot.resume();
+iot.baskets();
+iot.queries();
iot.stop();
diff --git a/sql/backends/monet5/iot/Tests/iot99.sql
b/sql/backends/monet5/iot/Tests/iot99.sql
--- a/sql/backends/monet5/iot/Tests/iot99.sql
+++ b/sql/backends/monet5/iot/Tests/iot99.sql
@@ -2,8 +2,8 @@
set schema iot;
set optimizer='iot_pipe';
-call iot.receptor();
drop procedure clk1;
drop procedure clk3;
drop procedure collector;
+drop table result;
drop table stream_tmp;
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -203,8 +203,8 @@ BSKTbind(Client cntxt, MalBlkPtr mb, Mal
*ret = 0;
idx= BSKTlocate(sch,tbl);
- if (idx <0)
- throw(SQL,"iot.bind","Stream table not registered");
+ if (idx <= 0)
+ throw(SQL,"iot.bind","Stream table '%s.%s' not
registered",sch,tbl);
for(i=0; i < baskets[idx].count; i++)
if( strcmp(baskets[idx].cols[i], col)== 0 ){
@@ -225,7 +225,7 @@ str BSKTlock(void *ret, str *sch, str *t
int bskt;
bskt = BSKTlocate(*sch, *tbl);
- if (bskt == 0)
+ if (bskt <= 0)
throw(SQL, "basket.lock", "Could not find the basket
%s.%s",*sch,*tbl);
#ifdef _DEBUG_BASKET
stream_printf(BSKTout, "lock group %s.%s\n", *sch, *tbl);
diff --git a/sql/backends/monet5/iot/basket.mal
b/sql/backends/monet5/iot/basket.mal
--- a/sql/backends/monet5/iot/basket.mal
+++ b/sql/backends/monet5/iot/basket.mal
@@ -69,7 +69,7 @@ command reset():void
address BSKTreset
comment "Remove all baskets";
-command baskets()(sch:bat[:str],nme:bat[:str], threshold:bat[:int],
winsize:bat[:int], winstride:bat[:int], timeslice:bat[:int],
+command iot.baskets()(sch:bat[:str],nme:bat[:str], threshold:bat[:int],
winsize:bat[:int], winstride:bat[:int], timeslice:bat[:int],
timestride:bat[:int], beat:bat[:int], seen:bat[:timestamp], events:bat[:int])
address BSKTtable
comment "Inspect the iot baskets";
diff --git a/sql/backends/monet5/iot/iot.c b/sql/backends/monet5/iot/iot.c
--- a/sql/backends/monet5/iot/iot.c
+++ b/sql/backends/monet5/iot/iot.c
@@ -76,7 +76,7 @@ IOTquery(Client cntxt, MalBlkPtr mb, Mal
InstrPtr p;
Module scope;
lng clk = GDKusec();
- char buf[BUFSIZ];
+ char buf[BUFSIZ], name[IDLENGTH];
static int iotquerycnt=0;
@@ -91,26 +91,28 @@ IOTquery(Client cntxt, MalBlkPtr mb, Mal
nme = *getArgReference_str(stk, pci, 2);
/* check existing of the pre-compiled function */
_DEBUG_IOT_ fprintf(stderr,"#iot: locate a SQL procedure
%s.%s()\n",sch,nme);
- msg = IOTprocedureStmt(cntxt, mb, sch, nme);
- if (msg)
- return msg;
- s = findSymbolInModule(cntxt->nspace, putName(nme,
strlen(nme)));
- if (s == NULL)
- throw(SQL, "iot.query", "Definition missing");
- qry = s->def;
} else if (pci->argc == 2){
+ // pre-create the new procedure
sch = "iot";
- snprintf(buf,BUFSIZ,"iot_%d",iotquerycnt++);
- nme = buf;
+ snprintf(name, IDLENGTH,"cquery_%d",iotquerycnt++);
def = *getArgReference_str(stk, pci, 1);
- _DEBUG_IOT_ fprintf(stderr,"#iot: compile a compound expression
%s()\n",def);
// package it as a procedure in the current schema [todo]
+ snprintf(buf,BUFSIZ,"create procedure %s() begin %s;
end",name,def);
+ _DEBUG_IOT_ fprintf(stderr,"#iot.compile: %s\n",buf);
+ nme = name;
msg = SQLstatementIntern(cntxt, &def, nme, 1, 0, 0);
if (msg)
return msg;
- qry = cntxt->curprg->def;
}
+ msg = IOTprocedureStmt(cntxt, mb, sch, nme);
+ if (msg)
+ return msg;
+ s = findSymbolInModule(cntxt->nspace, putName(nme, strlen(nme)));
+ if (s == NULL)
+ throw(SQL, "iot.query", "Definition missing");
+ qry = s->def;
+
_DEBUG_IOT_ fprintf(stderr,"#iot: bake a new continuous query plan\n");
scope = findModule(cntxt->nspace, putName(sch, strlen(sch)));
s = newFunction(putName(sch, strlen(sch)), putName(nme, strlen(nme)),
FUNCTIONsymbol);
@@ -156,15 +158,16 @@ IOTstop(Client cntxt, MalBlkPtr mb, MalS
}
str
-IOTstep(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+IOTcycles(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
- return PNstep(cntxt,mb,stk,pci);
+ return PNcycles(cntxt,mb,stk,pci);
}
-
str
-IOTdump(void *ret)
+IOTgrab(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
- BSKTdump(ret);
- return PNdump(ret);
+ (void) cntxt;
+ (void) mb;
+ (void) stk;
+ (void) pci;
+ return MAL_SUCCEED;
}
-
diff --git a/sql/backends/monet5/iot/iot.h b/sql/backends/monet5/iot/iot.h
--- a/sql/backends/monet5/iot/iot.h
+++ b/sql/backends/monet5/iot/iot.h
@@ -44,6 +44,7 @@ iot_export str IOTquery(Client cntxt, Ma
iot_export str IOTpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
iot_export str IOTresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
iot_export str IOTstop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
-iot_export str IOTstep(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
+iot_export str IOTcycles(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
+iot_export str IOTgrab(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
iot_export str IOTdump(void *ret);
#endif
diff --git a/sql/backends/monet5/iot/iot.mal b/sql/backends/monet5/iot/iot.mal
--- a/sql/backends/monet5/iot/iot.mal
+++ b/sql/backends/monet5/iot/iot.mal
@@ -25,25 +25,25 @@ pattern query(name:str,def:str)
address IOTquery
comment "Add a new continuous query.";
-pattern pause(nme:str):void
+pattern pause(sch:str,nme:str):void
address IOTpause
comment "Pause a specific query";
-pattern resume(nme:str):void
+pattern resume(sch:str,nme:str):void
address IOTresume
comment "(Re)start a query";
-pattern stop(nme:str):void
+pattern stop(sch:str,nme:str):void
address IOTstop
-comment "Remove a query";
+comment "Remove a continuous query";
pattern pause()
address IOTpause
-comment "Pause all queries";
+comment "Pause all continuous queries";
pattern resume()
address IOTresume
-comment "Resume all queries.";
+comment "Resume all continuous queries.";
pattern stop():void
address IOTstop
@@ -57,15 +57,7 @@ pattern grab(dir:str):void
address IOTgrab
comment "Grab a directory with the binary files";
-command dump()
-address IOTdump
-comment "Dump iot status ";
-
-command queries()(nme:bat[:str], status:bat[:str], seen:bat[:timestamp],
cycles:bat[:int], events:bat[:int], time:bat[:lng], error:bat[:str],
def:bat[:str])
-address PNtable
-comment "Return a table with queries registered";
-
command errors()(nme:bat[:str],error:bat[:str])
address BSKTtableerrors
-comment "Return a table the erroneous events";
+comment "Return a table the erroneous events found during query processing";
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list