Changeset: 8e15764fb0ef for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8e15764fb0ef
Modified Files:
        sql/backends/monet5/iot/50_iot.sql
        sql/backends/monet5/iot/Tests/All
        sql/backends/monet5/iot/Tests/iot00.sql
        sql/backends/monet5/iot/Tests/iot01.sql
        sql/backends/monet5/iot/Tests/petrinet00.mal
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/basket.h
        sql/backends/monet5/iot/basket.mal
        sql/backends/monet5/iot/iot.c
        sql/backends/monet5/iot/iot.h
        sql/backends/monet5/iot/petrinet.c
        sql/backends/monet5/iot/petrinet.h
        sql/backends/monet5/iot/petrinet.mal
Branch: iot
Log Message:

Intermittent commit


diffs (291 lines):

diff --git a/sql/backends/monet5/iot/50_iot.sql 
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -81,6 +81,8 @@ create procedure iot.window("schema" str
 create procedure iot.window("schema" string, "table" string, elem int, slide 
int)
        external name iot.window;
 
+create procedure iot.wait() external name iot.wait;
+
 -- Inspection tables
 create function iot.baskets()
 returns table( "schema" string, "table" string, "status" string, threshold 
int, winsize int, winstride int, timeslice int, timestride int, beat int, seen 
timestamp, events int)
diff --git a/sql/backends/monet5/iot/Tests/All 
b/sql/backends/monet5/iot/Tests/All
--- a/sql/backends/monet5/iot/Tests/All
+++ b/sql/backends/monet5/iot/Tests/All
@@ -2,7 +2,12 @@ iot00
 iot01
 iot02
 iot03
+#iot05
+iot10
+iot12
+iot15
+receptor00
 receptor01
-cleaup
-petrinet00
+cleanup
+#petrinet00
 webtest
diff --git a/sql/backends/monet5/iot/Tests/iot00.sql 
b/sql/backends/monet5/iot/Tests/iot00.sql
--- a/sql/backends/monet5/iot/Tests/iot00.sql
+++ b/sql/backends/monet5/iot/Tests/iot00.sql
@@ -20,6 +20,7 @@ select * from  iot.outputplaces();
 
 -- stop all continuous queries
 call iot.deactivate();
+call iot.wait();
 
 insert into stmp values('2005-09-23 12:34:26.736',1,12.34);
 select * from stmp;
diff --git a/sql/backends/monet5/iot/Tests/iot01.sql 
b/sql/backends/monet5/iot/Tests/iot01.sql
--- a/sql/backends/monet5/iot/Tests/iot01.sql
+++ b/sql/backends/monet5/iot/Tests/iot01.sql
@@ -14,7 +14,12 @@ insert into stmp values('2005-09-23 12:3
 insert into stmp values('2005-09-23 12:34:27.000',1,12.0);
 insert into stmp values('2005-09-23 12:34:28.000',1,13.0);
 
+-- deactivate all when streams are empty.
+call iot.deactivate();
+
+-- stream table should be empty now
 select * from stmp;
+-- and result delivered
 select * from result;
 
 select * from  iot.queries();
diff --git a/sql/backends/monet5/iot/Tests/petrinet00.mal 
b/sql/backends/monet5/iot/Tests/petrinet00.mal
--- a/sql/backends/monet5/iot/Tests/petrinet00.mal
+++ b/sql/backends/monet5/iot/Tests/petrinet00.mal
@@ -5,5 +5,5 @@ end hello;
 
 petrinet.register("user","hello");
 
-(mod:bat[:str],fcn:bat[:str], 
status:bat[:str],lastrun:bat[:timestamp],cycles:bat[:int], 
events:bat[:int],time:bat[:lng],error:bat[:str]):= petrinet.queries();
+(mod:bat[:str],fcn:bat[:str], 
status:bat[:str],lastrun:bat[:timestamp],cycles:bat[:int], 
events:bat[:int],time:bat[:lng],error:bat[:str]):= iot.queries();
 io.print(mod,fcn, status,lastrun,cycles, events,time,error);
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -399,13 +399,9 @@ BSKTreset(void *ret)
 
 /* collect the binary files and append them to what we have */
 #define MAXLINE 4096
-str 
-BSKTpushBasket(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+str
+BSKTimportInternal(int bskt)
 {
-    str sch = *getArgReference_str(stk, pci, 1);
-    str tbl = *getArgReference_str(stk, pci, 2);
-    str dir = *getArgReference_str(stk, pci, 3);
-    int bskt;
        char buf[PATHLENGTH];
        node *n;
        mvc *m = NULL;
@@ -416,14 +412,7 @@ BSKTpushBasket(Client cntxt, MalBlkPtr m
        FILE *f;
        long fsize;
        char line[MAXLINE];
-
-       msg= getSQLContext(cntxt,NULL, &m, NULL);
-       if( msg != MAL_SUCCEED)
-               return msg;
-       BSKTregisterInternal(cntxt, mb, sch, tbl);
-    bskt = BSKTlocate(sch,tbl);
-       if (bskt == 0)
-               throw(SQL, "iot.basket", "Could not find the basket 
%s.%s",sch,tbl);
+       str dir = baskets[bskt].source;
 
        // check access permission to directory first
        if( access (dir , F_OK | R_OK)){
@@ -538,10 +527,30 @@ recover:
        }
 
        MT_lock_unset(&iotLock);
-    (void) mb;
     return msg;
 }
 
+str 
+BSKTimport(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+    str sch = *getArgReference_str(stk, pci, 1);
+    str tbl = *getArgReference_str(stk, pci, 2);
+    str dir = *getArgReference_str(stk, pci, 3);
+    int bskt;
+       str msg= MAL_SUCCEED;
+       mvc *m = NULL;
+
+       msg= getSQLContext(cntxt,NULL, &m, NULL);
+       if( msg != MAL_SUCCEED)
+               return msg;
+       BSKTregisterInternal(cntxt, mb, sch, tbl);
+    bskt = BSKTlocate(sch,tbl);
+       if (bskt == 0)
+               throw(SQL, "iot.basket", "Could not find the basket 
%s.%s",sch,tbl);
+       baskets[bskt].source = GDKstrdup(dir);
+       return BSKTimportInternal(bskt);
+}
+
 /* remove tuples from a basket according to the sliding policy */
 #define ColumnShift(B,TPE, STRIDE) { \
        TPE *first= (TPE*) Tloc(B, BUNfirst(B));\
diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h
--- a/sql/backends/monet5/iot/basket.h
+++ b/sql/backends/monet5/iot/basket.h
@@ -25,16 +25,6 @@
 #include "mal_interpreter.h"
 #include "sql.h"
 
-#ifdef WIN32
-#ifndef LIBIOT
-#define iot_export extern __declspec(dllimport)
-#else
-#define iot_export extern __declspec(dllexport)
-#endif
-#else
-#define iot_export extern
-#endif
-
 /* #define _DEBUG_DATACELL     debug this module */
 #define BSKTout GDKout
 #define MAXBSKT 64
@@ -102,8 +92,9 @@ iot_export str BSKTappend(Client cntxt, 
 iot_export str BSKTdelete(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 iot_export str BSKTclear(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 iot_export str BSKTcommit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
-iot_export str BSKTpushBasket(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
+iot_export str BSKTimport(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 iot_export str BSKTupdate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 iot_export str BSKTerror(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
+iot_export str BSKTimportInternal(int bskt);
 
 #endif
diff --git a/sql/backends/monet5/iot/basket.mal 
b/sql/backends/monet5/iot/basket.mal
--- a/sql/backends/monet5/iot/basket.mal
+++ b/sql/backends/monet5/iot/basket.mal
@@ -84,9 +84,9 @@ command reset():void
 address BSKTreset
 comment "Remove all baskets";
 
-pattern iot.basket(sch:str, tbl:str, dir:str):void
-address BSKTpushBasket
-comment "Push a directory with the binary files";
+pattern iot.import(sch:str, tbl:str, dir:str):void
+address BSKTimportBasket
+comment "Import a single directory with the binary files for a stream table";
 
 pattern iot.baskets()(sch:bat[:str],nme:bat[:str], status:bat[:str], 
threshold:bat[:int], winsize:bat[:int], winstride:bat[:int], 
timeslice:bat[:int], 
 timestride:bat[:int], beat:bat[:int], seen:bat[:timestamp], events:bat[:int])
diff --git a/sql/backends/monet5/iot/iot.c b/sql/backends/monet5/iot/iot.c
--- a/sql/backends/monet5/iot/iot.c
+++ b/sql/backends/monet5/iot/iot.c
@@ -191,6 +191,7 @@ IOTreceptorThread(void *dummy)
                baskets[idx].table_name, 
                baskets[idx].source);
        /* continously scan the container for baskets */
+               BSKTimportInternal(idx);
 }
 
 str
diff --git a/sql/backends/monet5/iot/iot.h b/sql/backends/monet5/iot/iot.h
--- a/sql/backends/monet5/iot/iot.h
+++ b/sql/backends/monet5/iot/iot.h
@@ -27,6 +27,16 @@
 #include "basket.h"
 #include "petrinet.h"
 
+#ifdef WIN32
+#ifndef LIBIOT
+#define iot_export extern __declspec(dllimport)
+#else
+#define iot_export extern __declspec(dllexport)
+#endif
+#else
+#define iot_export extern
+#endif
+
 #define _DEBUG_IOT_ if(0)
 
 iot_export MT_Lock iotLock;
diff --git a/sql/backends/monet5/iot/petrinet.c 
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -385,6 +385,22 @@ PNexecute( void *n)
        _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all 
unlocked\n",node->modname, node->fcnname);
 }
 
+/* keep track of running tasks */
+static int PNtasks;
+
+str
+PNwait(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       (void) mb;
+       (void) stk;
+       (void) pci;
+       while (PNtasks) { /* scheduler is paused */
+               _DEBUG_PETRINET_ mnstr_printf(cntxt->fdout, 
"#petrinet.controller %d outstanding tasks\n", PNtasks);
+               MT_sleep_ms(cycleDelay);  
+       }
+       return MAL_SUCCEED;
+}
+
 static void
 PNscheduler(void *dummy)
 {
@@ -419,6 +435,7 @@ PNscheduler(void *dummy)
                   non empty. You can only trigger on empty baskets using a 
heartbeat */
                memset((void*) claimed, 0, MAXBSKT);
                now = GDKusec();
+               PNtasks=0;
                for (k = i = 0; i < pnettop; i++) 
                if ( pnet[i].status == PNREADY ){
                        pnet[i].enabled = 1;
@@ -464,8 +481,11 @@ PNscheduler(void *dummy)
                                enabled[k++] = i;
                                _DEBUG_PETRINET_ mnstr_printf(PNout, 
"#petrinet: %s.%s enabled \n", pnet[i].modname, pnet[i].fcnname);
                        } 
+                       PNtasks += pnet[i].enabled;
                }
                analysis = GDKusec() - now;
+               if( PNtasks)
+                       _DEBUG_PETRINET_ mnstr_printf(PNout, "#Run %d 
queries\n", PNtasks);
 
                /* Execute each enabled transformation */
                /* Tricky part is here a single stream used by multiple 
transitions */
diff --git a/sql/backends/monet5/iot/petrinet.h 
b/sql/backends/monet5/iot/petrinet.h
--- a/sql/backends/monet5/iot/petrinet.h
+++ b/sql/backends/monet5/iot/petrinet.h
@@ -26,7 +26,6 @@
 #define _DEBUG_PETRINET_ if(1)
 
 #define PNout mal_clients[1].fdout
-/*#define  _BASKET_SIZE_*/
 
 #ifdef WIN32
 #ifndef LIBDATACELL
@@ -53,6 +52,7 @@ iot_export str PNcycles(Client cntxt, Ma
 iot_export str PNdump(void *ret);
 
 iot_export str PNperiod(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
+iot_export str PNwait(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 
 iot_export str PNanalysis(Client cntxt, MalBlkPtr mb, int pn);
 iot_export str PNanalyseWrapper(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
diff --git a/sql/backends/monet5/iot/petrinet.mal 
b/sql/backends/monet5/iot/petrinet.mal
--- a/sql/backends/monet5/iot/petrinet.mal
+++ b/sql/backends/monet5/iot/petrinet.mal
@@ -58,6 +58,10 @@ pattern analyse(mod:str, fcn:str)
 address PNanalyseWrapper
 comment "Check the input/output relationship";
 
+pattern iot.wait()
+address PNwait
+comment "Wait until there is no task enabled";
+
 command iot.queries() (mod:bat[:str],fcn:bat[:str], 
status:bat[:str],lastrun:bat[:timestamp],cycles:bat[:int], 
events:bat[:int],time:bat[:lng],error:bat[:str])
 address PNtable
 comment "Inspect the iot queries";
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to