Changeset: 5688f2bc2b5d for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=5688f2bc2b5d
Modified Files:
        sql/backends/monet5/iot/50_iot.sql
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/basket.h
        sql/backends/monet5/iot/basket.mal
        sql/backends/monet5/iot/petrinet.c
        sql/backends/monet5/iot/petrinet.h
        sql/backends/monet5/iot/petrinet.mal
Branch: iot
Log Message:

Add window and heartbeat primitives


diffs (truncated from 385 to 300 lines):

diff --git a/sql/backends/monet5/iot/50_iot.sql 
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -40,10 +40,29 @@ create procedure iot.deactivate()
 create procedure iot.cycles(n integer)
        external name iot.cycles;
 
+-- set the scheduler periodic delay
+create procedure iot.period(n integer)
+       external name iot.period;
+
 -- deliver a new basket with tuples
 create procedure iot.basket("schema" string, "table" string, dirpath string)
        external name iot.basket;
 
+-- triggering conditions for each stream
+create procedure iot.threshold("schema" string, "table" string, elem int)
+       external name iot.threshold;
+
+create procedure iot.beat("schema" string, "table" string, msec int)
+       external name iot.beat;
+
+-- cleaup activities 
+create procedure iot.window("schema" string, "table" string, elem int)
+       external name iot.window;
+
+create procedure iot.window("schema" string, "table" string, elem int, slide 
int)
+       external name iot.window;
+
+
 -- Inspection tables
 create function iot.baskets()
 returns table( "schema" string,  "table" string, "status" string,  threshold 
int, winsize int, winstride int,  timeslice int, timestride int, beat int, seen 
timestamp, events int)
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -205,13 +205,13 @@ BSKTactivate(Client cntxt, MalBlkPtr mb,
                        throw(SQL,"basket.activate","Stream table %s.%s not 
accessible to activate\n",sch,tbl);
                if( baskets[idx].status == BSKTPAUSED){
                        MT_lock_set(&iotLock);
-                       baskets[idx].status = BSKTRUNNING;
+                       baskets[idx].status = BSKTAVAILABLE;
                        MT_lock_unset(&iotLock);
                }
        } else {
                MT_lock_set(&iotLock);
                for( idx =1; idx <bsktTop;  idx++)
-                       baskets[idx].status = BSKTRUNNING;
+                       baskets[idx].status = BSKTAVAILABLE;
                MT_lock_unset(&iotLock);
        }
        return MAL_SUCCEED;
@@ -233,7 +233,7 @@ BSKTdeactivate(Client cntxt, MalBlkPtr m
                idx = BSKTlocate(sch, tbl);
                if( idx == 0)
                        throw(SQL,"basket.activate","Stream table %s.%s not 
accessible to deactivate\n",sch,tbl);
-               if( baskets[idx].status == BSKTRUNNING){
+               if( baskets[idx].status == BSKTAVAILABLE ){
                        MT_lock_set(&iotLock);
                        baskets[idx].status = BSKTPAUSED;
                        MT_lock_unset(&iotLock);
@@ -247,6 +247,68 @@ BSKTdeactivate(Client cntxt, MalBlkPtr m
        return MAL_SUCCEED;
 }
 
+str
+BSKTthreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       str sch = *getArgReference_str(stk,pci,1);
+       str tbl = *getArgReference_str(stk,pci,2);
+       int elm = *getArgReference_int(stk,pci,3);
+       int idx;
+
+       (void) cntxt;
+       (void) mb;
+
+       if( elm < 0)
+               throw(SQL,"basket.beat","Positive number of elements 
expected]n");
+       idx = BSKTlocate(sch, tbl);
+       if( idx == 0)
+               throw(SQL,"basket.threshold","Stream table %s.%s not accessible 
to deactivate\n",sch,tbl);
+       baskets[idx].threshold = elm;
+       return MAL_SUCCEED;
+}
+
+str
+BSKTbeat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       str sch = *getArgReference_str(stk,pci,1);
+       str tbl = *getArgReference_str(stk,pci,2);
+       int ticks = *getArgReference_int(stk,pci,3);
+       int idx;
+
+       (void) cntxt;
+       (void) mb;
+
+       if( ticks < 0)
+               throw(SQL,"basket.beat","Positive beat expected]n");
+       idx = BSKTlocate(sch, tbl);
+       if( idx == 0)
+               throw(SQL,"basket.beat","Stream table %s.%s not accessible to 
deactivate\n",sch,tbl);
+       baskets[idx].beat = ticks;
+       return MAL_SUCCEED;
+}
+
+str
+BSKTwindow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       str sch = *getArgReference_str(stk,pci,1);
+       str tbl = *getArgReference_str(stk,pci,2);
+       int elm = *getArgReference_int(stk,pci,3);
+       int idx;
+
+       (void) cntxt;
+       (void) mb;
+       if( elm < 0)
+               throw(SQL,"basket.window","Positive beat expected]n");
+       idx = BSKTlocate(sch, tbl);
+       if( idx == 0)
+               throw(SQL,"basket.window","Stream table %s.%s not accessible to 
deactivate\n",sch,tbl);
+       baskets[idx].winsize = elm;
+       baskets[idx].winstride = elm;
+       if ( pci->argc == 5)
+               baskets[idx].winstride = *getArgReference_int(stk,pci,4);
+       return MAL_SUCCEED;
+}
+
 static BAT *
 BSKTbindColumn(Client cntxt, str sch, str tbl, str col)
 {
@@ -272,6 +334,7 @@ BSKTbindColumn(Client cntxt, str sch, st
                b = store_funcs.bind_col(m->session->tr,c,RDONLY);
        return b;
 }
+
 str
 BSKTbind(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h
--- a/sql/backends/monet5/iot/basket.h
+++ b/sql/backends/monet5/iot/basket.h
@@ -63,7 +63,7 @@ typedef struct{
 
 
 /* individual streams can be paused and restarted */
-#define BSKTRUNNING   1      
+#define BSKTAVAILABLE   1      
 #define BSKTPAUSED    2
 #define BSKTLOCKED    3
 
@@ -78,6 +78,10 @@ iot_export str BSKTdump(void *ret);
 iot_export str BSKTactivate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 iot_export str BSKTdeactivate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 
+iot_export str BSKTthreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
+iot_export str BSKTbeat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
+iot_export str BSKTwindow(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
+
 iot_export str BSKTtable( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 iot_export str BSKTtableerrors(bat *nmeId, bat *errorId);
 iot_export str BSKTerror(void  *ret, str *sch, str *fcn, str *msg);
diff --git a/sql/backends/monet5/iot/basket.mal 
b/sql/backends/monet5/iot/basket.mal
--- a/sql/backends/monet5/iot/basket.mal
+++ b/sql/backends/monet5/iot/basket.mal
@@ -61,22 +61,22 @@ command drop(sch:str,tbl:str):void
 address BSKTdrop
 comment "Remove the basket";
 
-command threshold(sch:str,tbl:str,N:int):bit
+command iot.threshold(sch:str,tbl:str,N:int):bit
 address BSKTthreshold
 comment "Set an acceptance threshold of N events before inspecting";
 
-command window(sch:str,tbl:str,N:lng, S:lng):void
-address BSKTwindow
-comment "Use a window of N event and slide S afterwards";
-
-command timewindow(sch:str,tbl:str, N:lng, S:lng):void
-address BSKTtimewindow
-comment "Use a window of N milliseconds and slide S milliseconds afterwards";
-
-command beat(sch:str,tbl:str,N:lng):bit
+command iot.beat(sch:str,tbl:str,N:lng):bit
 address BSKTbeat
 comment "Set an delay to N milliseconds";
 
+command iot.window(sch:str,tbl:str,N:lng):void
+address BSKTwindow
+comment "Use a window of precisely N events and slide afterwards";
+
+command iot.window(sch:str,tbl:str,N:lng, S:lng):void
+address BSKTwindow
+comment "Use a window of precisely N events and slide S events afterwards";
+
 command reset():void
 address BSKTreset
 comment "Remove all baskets";
diff --git a/sql/backends/monet5/iot/petrinet.c 
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -81,6 +81,19 @@ int enabled[MAXPN];     /*array that con
 static int status = PNINIT;
 static int cycleDelay = 1000; /* be careful, it affects response/throughput 
timings */
 
+str PNperiod(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       int period = *getArgReference_int(stk, pci, 1);
+       
+       (void) cntxt;
+       (void) mb;
+
+       if ( period < 0)
+               throw(MAL,"iot.period","Period should >= 0");
+       cycleDelay = period;
+       return MAL_SUCCEED;
+}
+
 str PNanalyseWrapper(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        Module scope;
@@ -164,7 +177,7 @@ PNregisterInternal(Client cntxt, MalBlkP
        pnet[pnettop].mb = nmb;
        pnet[pnettop].stk = prepareMALstack(nmb, nmb->vsize);
 
-       pnet[pnettop].status = PNPAUSED;
+       pnet[pnettop].status = PNREADY;
        pnet[pnettop].cycles = 0;
        pnet[pnettop].seen = *timestamp_nil;
        /* all the rest is zero */
@@ -216,7 +229,7 @@ PNactivate(Client cntxt, MalBlkPtr mb, M
 
 str
 PNdeactivate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
-       return PNstatus(cntxt, mb, stk, pci, PNPAUSED);
+       return PNstatus(cntxt, mb, stk, pci, PNREADY);
 }
 
 str
@@ -317,14 +330,14 @@ PNexecute( void *n)
        _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all 
locked\n",node->modname, node->fcnname);
 
        msg = runMALsequence(mal_clients, node->mb, 1, 0, node->stk, 0, 0);
-       node->status = PNPAUSED;
+       node->status = PNREADY;
 
        _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s 
transition done:%s\n",node->modname, node->fcnname, (msg != 
MAL_SUCCEED?msg:""));
 
        MT_lock_set(&iotLock);
        for ( i=0; i< j &&  node->enabled && node->places[i]; i++) {
                idx = node->places[i];
-               baskets[idx].status = BSKTRUNNING;
+               baskets[idx].status = BSKTAVAILABLE;
        }
        MT_lock_unset(&iotLock);
        _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all 
unlocked\n",node->modname, node->fcnname);
@@ -339,7 +352,7 @@ PNscheduler(void *dummy)
        Client cntxt;
        str msg = MAL_SUCCEED;
        lng t, analysis, now;
-       int claimed[MAXBSKT];
+       char claimed[MAXBSKT];
        timestamp ts, tn;
 
        _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.controller started\n");
@@ -347,36 +360,35 @@ PNscheduler(void *dummy)
         if( strcmp(cntxt->scenario, "sql") )
                 SQLinitEnvironment(cntxt, NULL, NULL, NULL);
 
-       status = PNRUNNING;
+       status = PNRUNNING; // global state 
 
        while( pnettop > 0){
                if (cycleDelay)
                        MT_sleep_ms(cycleDelay);  /* delay to make it more 
tractable */
-               while (status == PNPAUSED)      { /* scheduler is paused */
+               while (status == PNREADY)       { /* scheduler is paused */
                        MT_sleep_ms(cycleDelay);  
                        _DEBUG_PETRINET_ mnstr_printf(PNout, 
"#petrinet.controller paused\n");
                }
 
-               /* collect latest statistics, note that we don't need a lock 
here,
+               /* Determine which continuous query are eligble to run
+                  Collect latest statistics, note that we don't need a lock 
here,
                   because the count need not be accurate to the usec. It will 
simply
                   come back. We also only have to check the places that are 
marked
-                  non empty. */
-               for(i=0; i< MAXBSKT; i++)
-                       claimed[i]=0;
+                  non empty. You can only trigger on empty baskets using a 
heartbeat */
+               memset((void*) claimed, 0, MAXBSKT);
                now = GDKusec();
                for (k = i = 0; i < pnettop; i++) 
-               if ( pnet[i].status == PNRUNNING ){
+               if ( pnet[i].status == PNREADY ){
                        pnet[i].enabled = 1;
 
                        // check if all baskets are available and non-empty
                        for (j = 0; j < MAXBSKT &&  pnet[i].enabled && 
pnet[i].places[j]; j++) {
                                idx = pnet[i].places[j];
-                               if (baskets[idx].status == BSKTRUNNING && 
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to