Changeset: 5688f2bc2b5d for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5688f2bc2b5d
Modified Files:
sql/backends/monet5/iot/50_iot.sql
sql/backends/monet5/iot/basket.c
sql/backends/monet5/iot/basket.h
sql/backends/monet5/iot/basket.mal
sql/backends/monet5/iot/petrinet.c
sql/backends/monet5/iot/petrinet.h
sql/backends/monet5/iot/petrinet.mal
Branch: iot
Log Message:
Add window and heartbeat primitives
diffs (truncated from 385 to 300 lines):
diff --git a/sql/backends/monet5/iot/50_iot.sql
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -40,10 +40,29 @@ create procedure iot.deactivate()
create procedure iot.cycles(n integer)
external name iot.cycles;
+-- set the scheduler periodic delay
+create procedure iot.period(n integer)
+ external name iot.period;
+
-- deliver a new basket with tuples
create procedure iot.basket("schema" string, "table" string, dirpath string)
external name iot.basket;
+-- triggering conditions for each stream
+create procedure iot.threshold("schema" string, "table" string, elem int)
+ external name iot.threshold;
+
+create procedure iot.beat("schema" string, "table" string, msec int)
+ external name iot.beat;
+
+-- cleaup activities
+create procedure iot.window("schema" string, "table" string, elem int)
+ external name iot.window;
+
+create procedure iot.window("schema" string, "table" string, elem int, slide
int)
+ external name iot.window;
+
+
-- Inspection tables
create function iot.baskets()
returns table( "schema" string, "table" string, "status" string, threshold
int, winsize int, winstride int, timeslice int, timestride int, beat int, seen
timestamp, events int)
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -205,13 +205,13 @@ BSKTactivate(Client cntxt, MalBlkPtr mb,
throw(SQL,"basket.activate","Stream table %s.%s not
accessible to activate\n",sch,tbl);
if( baskets[idx].status == BSKTPAUSED){
MT_lock_set(&iotLock);
- baskets[idx].status = BSKTRUNNING;
+ baskets[idx].status = BSKTAVAILABLE;
MT_lock_unset(&iotLock);
}
} else {
MT_lock_set(&iotLock);
for( idx =1; idx <bsktTop; idx++)
- baskets[idx].status = BSKTRUNNING;
+ baskets[idx].status = BSKTAVAILABLE;
MT_lock_unset(&iotLock);
}
return MAL_SUCCEED;
@@ -233,7 +233,7 @@ BSKTdeactivate(Client cntxt, MalBlkPtr m
idx = BSKTlocate(sch, tbl);
if( idx == 0)
throw(SQL,"basket.activate","Stream table %s.%s not
accessible to deactivate\n",sch,tbl);
- if( baskets[idx].status == BSKTRUNNING){
+ if( baskets[idx].status == BSKTAVAILABLE ){
MT_lock_set(&iotLock);
baskets[idx].status = BSKTPAUSED;
MT_lock_unset(&iotLock);
@@ -247,6 +247,68 @@ BSKTdeactivate(Client cntxt, MalBlkPtr m
return MAL_SUCCEED;
}
+str
+BSKTthreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ str sch = *getArgReference_str(stk,pci,1);
+ str tbl = *getArgReference_str(stk,pci,2);
+ int elm = *getArgReference_int(stk,pci,3);
+ int idx;
+
+ (void) cntxt;
+ (void) mb;
+
+ if( elm < 0)
+ throw(SQL,"basket.beat","Positive number of elements
expected]n");
+ idx = BSKTlocate(sch, tbl);
+ if( idx == 0)
+ throw(SQL,"basket.threshold","Stream table %s.%s not accessible
to deactivate\n",sch,tbl);
+ baskets[idx].threshold = elm;
+ return MAL_SUCCEED;
+}
+
+str
+BSKTbeat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ str sch = *getArgReference_str(stk,pci,1);
+ str tbl = *getArgReference_str(stk,pci,2);
+ int ticks = *getArgReference_int(stk,pci,3);
+ int idx;
+
+ (void) cntxt;
+ (void) mb;
+
+ if( ticks < 0)
+ throw(SQL,"basket.beat","Positive beat expected]n");
+ idx = BSKTlocate(sch, tbl);
+ if( idx == 0)
+ throw(SQL,"basket.beat","Stream table %s.%s not accessible to
deactivate\n",sch,tbl);
+ baskets[idx].beat = ticks;
+ return MAL_SUCCEED;
+}
+
+str
+BSKTwindow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ str sch = *getArgReference_str(stk,pci,1);
+ str tbl = *getArgReference_str(stk,pci,2);
+ int elm = *getArgReference_int(stk,pci,3);
+ int idx;
+
+ (void) cntxt;
+ (void) mb;
+ if( elm < 0)
+ throw(SQL,"basket.window","Positive beat expected]n");
+ idx = BSKTlocate(sch, tbl);
+ if( idx == 0)
+ throw(SQL,"basket.window","Stream table %s.%s not accessible to
deactivate\n",sch,tbl);
+ baskets[idx].winsize = elm;
+ baskets[idx].winstride = elm;
+ if ( pci->argc == 5)
+ baskets[idx].winstride = *getArgReference_int(stk,pci,4);
+ return MAL_SUCCEED;
+}
+
static BAT *
BSKTbindColumn(Client cntxt, str sch, str tbl, str col)
{
@@ -272,6 +334,7 @@ BSKTbindColumn(Client cntxt, str sch, st
b = store_funcs.bind_col(m->session->tr,c,RDONLY);
return b;
}
+
str
BSKTbind(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h
--- a/sql/backends/monet5/iot/basket.h
+++ b/sql/backends/monet5/iot/basket.h
@@ -63,7 +63,7 @@ typedef struct{
/* individual streams can be paused and restarted */
-#define BSKTRUNNING 1
+#define BSKTAVAILABLE 1
#define BSKTPAUSED 2
#define BSKTLOCKED 3
@@ -78,6 +78,10 @@ iot_export str BSKTdump(void *ret);
iot_export str BSKTactivate(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
iot_export str BSKTdeactivate(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
+iot_export str BSKTthreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
+iot_export str BSKTbeat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
+iot_export str BSKTwindow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
+
iot_export str BSKTtable( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
iot_export str BSKTtableerrors(bat *nmeId, bat *errorId);
iot_export str BSKTerror(void *ret, str *sch, str *fcn, str *msg);
diff --git a/sql/backends/monet5/iot/basket.mal
b/sql/backends/monet5/iot/basket.mal
--- a/sql/backends/monet5/iot/basket.mal
+++ b/sql/backends/monet5/iot/basket.mal
@@ -61,22 +61,22 @@ command drop(sch:str,tbl:str):void
address BSKTdrop
comment "Remove the basket";
-command threshold(sch:str,tbl:str,N:int):bit
+command iot.threshold(sch:str,tbl:str,N:int):bit
address BSKTthreshold
comment "Set an acceptance threshold of N events before inspecting";
-command window(sch:str,tbl:str,N:lng, S:lng):void
-address BSKTwindow
-comment "Use a window of N event and slide S afterwards";
-
-command timewindow(sch:str,tbl:str, N:lng, S:lng):void
-address BSKTtimewindow
-comment "Use a window of N milliseconds and slide S milliseconds afterwards";
-
-command beat(sch:str,tbl:str,N:lng):bit
+command iot.beat(sch:str,tbl:str,N:lng):bit
address BSKTbeat
comment "Set an delay to N milliseconds";
+command iot.window(sch:str,tbl:str,N:lng):void
+address BSKTwindow
+comment "Use a window of precisely N events and slide afterwards";
+
+command iot.window(sch:str,tbl:str,N:lng, S:lng):void
+address BSKTwindow
+comment "Use a window of precisely N events and slide S events afterwards";
+
command reset():void
address BSKTreset
comment "Remove all baskets";
diff --git a/sql/backends/monet5/iot/petrinet.c
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -81,6 +81,19 @@ int enabled[MAXPN]; /*array that con
static int status = PNINIT;
static int cycleDelay = 1000; /* be careful, it affects response/throughput
timings */
+str PNperiod(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ int period = *getArgReference_int(stk, pci, 1);
+
+ (void) cntxt;
+ (void) mb;
+
+ if ( period < 0)
+ throw(MAL,"iot.period","Period should >= 0");
+ cycleDelay = period;
+ return MAL_SUCCEED;
+}
+
str PNanalyseWrapper(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
Module scope;
@@ -164,7 +177,7 @@ PNregisterInternal(Client cntxt, MalBlkP
pnet[pnettop].mb = nmb;
pnet[pnettop].stk = prepareMALstack(nmb, nmb->vsize);
- pnet[pnettop].status = PNPAUSED;
+ pnet[pnettop].status = PNREADY;
pnet[pnettop].cycles = 0;
pnet[pnettop].seen = *timestamp_nil;
/* all the rest is zero */
@@ -216,7 +229,7 @@ PNactivate(Client cntxt, MalBlkPtr mb, M
str
PNdeactivate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
- return PNstatus(cntxt, mb, stk, pci, PNPAUSED);
+ return PNstatus(cntxt, mb, stk, pci, PNREADY);
}
str
@@ -317,14 +330,14 @@ PNexecute( void *n)
_DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all
locked\n",node->modname, node->fcnname);
msg = runMALsequence(mal_clients, node->mb, 1, 0, node->stk, 0, 0);
- node->status = PNPAUSED;
+ node->status = PNREADY;
_DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s
transition done:%s\n",node->modname, node->fcnname, (msg !=
MAL_SUCCEED?msg:""));
MT_lock_set(&iotLock);
for ( i=0; i< j && node->enabled && node->places[i]; i++) {
idx = node->places[i];
- baskets[idx].status = BSKTRUNNING;
+ baskets[idx].status = BSKTAVAILABLE;
}
MT_lock_unset(&iotLock);
_DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all
unlocked\n",node->modname, node->fcnname);
@@ -339,7 +352,7 @@ PNscheduler(void *dummy)
Client cntxt;
str msg = MAL_SUCCEED;
lng t, analysis, now;
- int claimed[MAXBSKT];
+ char claimed[MAXBSKT];
timestamp ts, tn;
_DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.controller started\n");
@@ -347,36 +360,35 @@ PNscheduler(void *dummy)
if( strcmp(cntxt->scenario, "sql") )
SQLinitEnvironment(cntxt, NULL, NULL, NULL);
- status = PNRUNNING;
+ status = PNRUNNING; // global state
while( pnettop > 0){
if (cycleDelay)
MT_sleep_ms(cycleDelay); /* delay to make it more
tractable */
- while (status == PNPAUSED) { /* scheduler is paused */
+ while (status == PNREADY) { /* scheduler is paused */
MT_sleep_ms(cycleDelay);
_DEBUG_PETRINET_ mnstr_printf(PNout,
"#petrinet.controller paused\n");
}
- /* collect latest statistics, note that we don't need a lock
here,
+ /* Determine which continuous query are eligble to run
+ Collect latest statistics, note that we don't need a lock
here,
because the count need not be accurate to the usec. It will
simply
come back. We also only have to check the places that are
marked
- non empty. */
- for(i=0; i< MAXBSKT; i++)
- claimed[i]=0;
+ non empty. You can only trigger on empty baskets using a
heartbeat */
+ memset((void*) claimed, 0, MAXBSKT);
now = GDKusec();
for (k = i = 0; i < pnettop; i++)
- if ( pnet[i].status == PNRUNNING ){
+ if ( pnet[i].status == PNREADY ){
pnet[i].enabled = 1;
// check if all baskets are available and non-empty
for (j = 0; j < MAXBSKT && pnet[i].enabled &&
pnet[i].places[j]; j++) {
idx = pnet[i].places[j];
- if (baskets[idx].status == BSKTRUNNING &&
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list