Changeset: 8e15764fb0ef for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8e15764fb0ef
Modified Files:
sql/backends/monet5/iot/50_iot.sql
sql/backends/monet5/iot/Tests/All
sql/backends/monet5/iot/Tests/iot00.sql
sql/backends/monet5/iot/Tests/iot01.sql
sql/backends/monet5/iot/Tests/petrinet00.mal
sql/backends/monet5/iot/basket.c
sql/backends/monet5/iot/basket.h
sql/backends/monet5/iot/basket.mal
sql/backends/monet5/iot/iot.c
sql/backends/monet5/iot/iot.h
sql/backends/monet5/iot/petrinet.c
sql/backends/monet5/iot/petrinet.h
sql/backends/monet5/iot/petrinet.mal
Branch: iot
Log Message:
Intermittent commit
diffs (291 lines):
diff --git a/sql/backends/monet5/iot/50_iot.sql
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -81,6 +81,8 @@ create procedure iot.window("schema" str
create procedure iot.window("schema" string, "table" string, elem int, slide
int)
external name iot.window;
+create procedure iot.wait() external name iot.wait;
+
-- Inspection tables
create function iot.baskets()
returns table( "schema" string, "table" string, "status" string, threshold
int, winsize int, winstride int, timeslice int, timestride int, beat int, seen
timestamp, events int)
diff --git a/sql/backends/monet5/iot/Tests/All
b/sql/backends/monet5/iot/Tests/All
--- a/sql/backends/monet5/iot/Tests/All
+++ b/sql/backends/monet5/iot/Tests/All
@@ -2,7 +2,12 @@ iot00
iot01
iot02
iot03
+#iot05
+iot10
+iot12
+iot15
+receptor00
receptor01
-cleaup
-petrinet00
+cleanup
+#petrinet00
webtest
diff --git a/sql/backends/monet5/iot/Tests/iot00.sql
b/sql/backends/monet5/iot/Tests/iot00.sql
--- a/sql/backends/monet5/iot/Tests/iot00.sql
+++ b/sql/backends/monet5/iot/Tests/iot00.sql
@@ -20,6 +20,7 @@ select * from iot.outputplaces();
-- stop all continuous queries
call iot.deactivate();
+call iot.wait();
insert into stmp values('2005-09-23 12:34:26.736',1,12.34);
select * from stmp;
diff --git a/sql/backends/monet5/iot/Tests/iot01.sql
b/sql/backends/monet5/iot/Tests/iot01.sql
--- a/sql/backends/monet5/iot/Tests/iot01.sql
+++ b/sql/backends/monet5/iot/Tests/iot01.sql
@@ -14,7 +14,12 @@ insert into stmp values('2005-09-23 12:3
insert into stmp values('2005-09-23 12:34:27.000',1,12.0);
insert into stmp values('2005-09-23 12:34:28.000',1,13.0);
+-- deactivate all when streams are empty.
+call iot.deactivate();
+
+-- stream table should be empty now
select * from stmp;
+-- and result delivered
select * from result;
select * from iot.queries();
diff --git a/sql/backends/monet5/iot/Tests/petrinet00.mal
b/sql/backends/monet5/iot/Tests/petrinet00.mal
--- a/sql/backends/monet5/iot/Tests/petrinet00.mal
+++ b/sql/backends/monet5/iot/Tests/petrinet00.mal
@@ -5,5 +5,5 @@ end hello;
petrinet.register("user","hello");
-(mod:bat[:str],fcn:bat[:str],
status:bat[:str],lastrun:bat[:timestamp],cycles:bat[:int],
events:bat[:int],time:bat[:lng],error:bat[:str]):= petrinet.queries();
+(mod:bat[:str],fcn:bat[:str],
status:bat[:str],lastrun:bat[:timestamp],cycles:bat[:int],
events:bat[:int],time:bat[:lng],error:bat[:str]):= iot.queries();
io.print(mod,fcn, status,lastrun,cycles, events,time,error);
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -399,13 +399,9 @@ BSKTreset(void *ret)
/* collect the binary files and append them to what we have */
#define MAXLINE 4096
-str
-BSKTpushBasket(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+str
+BSKTimportInternal(int bskt)
{
- str sch = *getArgReference_str(stk, pci, 1);
- str tbl = *getArgReference_str(stk, pci, 2);
- str dir = *getArgReference_str(stk, pci, 3);
- int bskt;
char buf[PATHLENGTH];
node *n;
mvc *m = NULL;
@@ -416,14 +412,7 @@ BSKTpushBasket(Client cntxt, MalBlkPtr m
FILE *f;
long fsize;
char line[MAXLINE];
-
- msg= getSQLContext(cntxt,NULL, &m, NULL);
- if( msg != MAL_SUCCEED)
- return msg;
- BSKTregisterInternal(cntxt, mb, sch, tbl);
- bskt = BSKTlocate(sch,tbl);
- if (bskt == 0)
- throw(SQL, "iot.basket", "Could not find the basket
%s.%s",sch,tbl);
+ str dir = baskets[bskt].source;
// check access permission to directory first
if( access (dir , F_OK | R_OK)){
@@ -538,10 +527,30 @@ recover:
}
MT_lock_unset(&iotLock);
- (void) mb;
return msg;
}
+str
+BSKTimport(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ str sch = *getArgReference_str(stk, pci, 1);
+ str tbl = *getArgReference_str(stk, pci, 2);
+ str dir = *getArgReference_str(stk, pci, 3);
+ int bskt;
+ str msg= MAL_SUCCEED;
+ mvc *m = NULL;
+
+ msg= getSQLContext(cntxt,NULL, &m, NULL);
+ if( msg != MAL_SUCCEED)
+ return msg;
+ BSKTregisterInternal(cntxt, mb, sch, tbl);
+ bskt = BSKTlocate(sch,tbl);
+ if (bskt == 0)
+ throw(SQL, "iot.basket", "Could not find the basket
%s.%s",sch,tbl);
+ baskets[bskt].source = GDKstrdup(dir);
+ return BSKTimportInternal(bskt);
+}
+
/* remove tuples from a basket according to the sliding policy */
#define ColumnShift(B,TPE, STRIDE) { \
TPE *first= (TPE*) Tloc(B, BUNfirst(B));\
diff --git a/sql/backends/monet5/iot/basket.h b/sql/backends/monet5/iot/basket.h
--- a/sql/backends/monet5/iot/basket.h
+++ b/sql/backends/monet5/iot/basket.h
@@ -25,16 +25,6 @@
#include "mal_interpreter.h"
#include "sql.h"
-#ifdef WIN32
-#ifndef LIBIOT
-#define iot_export extern __declspec(dllimport)
-#else
-#define iot_export extern __declspec(dllexport)
-#endif
-#else
-#define iot_export extern
-#endif
-
/* #define _DEBUG_DATACELL debug this module */
#define BSKTout GDKout
#define MAXBSKT 64
@@ -102,8 +92,9 @@ iot_export str BSKTappend(Client cntxt,
iot_export str BSKTdelete(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
iot_export str BSKTclear(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
iot_export str BSKTcommit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
-iot_export str BSKTpushBasket(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
+iot_export str BSKTimport(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
iot_export str BSKTupdate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
iot_export str BSKTerror(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
+iot_export str BSKTimportInternal(int bskt);
#endif
diff --git a/sql/backends/monet5/iot/basket.mal
b/sql/backends/monet5/iot/basket.mal
--- a/sql/backends/monet5/iot/basket.mal
+++ b/sql/backends/monet5/iot/basket.mal
@@ -84,9 +84,9 @@ command reset():void
address BSKTreset
comment "Remove all baskets";
-pattern iot.basket(sch:str, tbl:str, dir:str):void
-address BSKTpushBasket
-comment "Push a directory with the binary files";
+pattern iot.import(sch:str, tbl:str, dir:str):void
+address BSKTimportBasket
+comment "Import a single directory with the binary files for a stream table";
pattern iot.baskets()(sch:bat[:str],nme:bat[:str], status:bat[:str],
threshold:bat[:int], winsize:bat[:int], winstride:bat[:int],
timeslice:bat[:int],
timestride:bat[:int], beat:bat[:int], seen:bat[:timestamp], events:bat[:int])
diff --git a/sql/backends/monet5/iot/iot.c b/sql/backends/monet5/iot/iot.c
--- a/sql/backends/monet5/iot/iot.c
+++ b/sql/backends/monet5/iot/iot.c
@@ -191,6 +191,7 @@ IOTreceptorThread(void *dummy)
baskets[idx].table_name,
baskets[idx].source);
/* continously scan the container for baskets */
+ BSKTimportInternal(idx);
}
str
diff --git a/sql/backends/monet5/iot/iot.h b/sql/backends/monet5/iot/iot.h
--- a/sql/backends/monet5/iot/iot.h
+++ b/sql/backends/monet5/iot/iot.h
@@ -27,6 +27,16 @@
#include "basket.h"
#include "petrinet.h"
+#ifdef WIN32
+#ifndef LIBIOT
+#define iot_export extern __declspec(dllimport)
+#else
+#define iot_export extern __declspec(dllexport)
+#endif
+#else
+#define iot_export extern
+#endif
+
#define _DEBUG_IOT_ if(0)
iot_export MT_Lock iotLock;
diff --git a/sql/backends/monet5/iot/petrinet.c
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -385,6 +385,22 @@ PNexecute( void *n)
_DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all
unlocked\n",node->modname, node->fcnname);
}
+/* keep track of running tasks */
+static int PNtasks;
+
+str
+PNwait(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ (void) mb;
+ (void) stk;
+ (void) pci;
+ while (PNtasks) { /* scheduler is paused */
+ _DEBUG_PETRINET_ mnstr_printf(cntxt->fdout,
"#petrinet.controller %d outstanding tasks\n", PNtasks);
+ MT_sleep_ms(cycleDelay);
+ }
+ return MAL_SUCCEED;
+}
+
static void
PNscheduler(void *dummy)
{
@@ -419,6 +435,7 @@ PNscheduler(void *dummy)
non empty. You can only trigger on empty baskets using a
heartbeat */
memset((void*) claimed, 0, MAXBSKT);
now = GDKusec();
+ PNtasks=0;
for (k = i = 0; i < pnettop; i++)
if ( pnet[i].status == PNREADY ){
pnet[i].enabled = 1;
@@ -464,8 +481,11 @@ PNscheduler(void *dummy)
enabled[k++] = i;
_DEBUG_PETRINET_ mnstr_printf(PNout,
"#petrinet: %s.%s enabled \n", pnet[i].modname, pnet[i].fcnname);
}
+ PNtasks += pnet[i].enabled;
}
analysis = GDKusec() - now;
+ if( PNtasks)
+ _DEBUG_PETRINET_ mnstr_printf(PNout, "#Run %d
queries\n", PNtasks);
/* Execute each enabled transformation */
/* Tricky part is here a single stream used by multiple
transitions */
diff --git a/sql/backends/monet5/iot/petrinet.h
b/sql/backends/monet5/iot/petrinet.h
--- a/sql/backends/monet5/iot/petrinet.h
+++ b/sql/backends/monet5/iot/petrinet.h
@@ -26,7 +26,6 @@
#define _DEBUG_PETRINET_ if(1)
#define PNout mal_clients[1].fdout
-/*#define _BASKET_SIZE_*/
#ifdef WIN32
#ifndef LIBDATACELL
@@ -53,6 +52,7 @@ iot_export str PNcycles(Client cntxt, Ma
iot_export str PNdump(void *ret);
iot_export str PNperiod(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
+iot_export str PNwait(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
iot_export str PNanalysis(Client cntxt, MalBlkPtr mb, int pn);
iot_export str PNanalyseWrapper(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
diff --git a/sql/backends/monet5/iot/petrinet.mal
b/sql/backends/monet5/iot/petrinet.mal
--- a/sql/backends/monet5/iot/petrinet.mal
+++ b/sql/backends/monet5/iot/petrinet.mal
@@ -58,6 +58,10 @@ pattern analyse(mod:str, fcn:str)
address PNanalyseWrapper
comment "Check the input/output relationship";
+pattern iot.wait()
+address PNwait
+comment "Wait until there is no task enabled";
+
command iot.queries() (mod:bat[:str],fcn:bat[:str],
status:bat[:str],lastrun:bat[:timestamp],cycles:bat[:int],
events:bat[:int],time:bat[:lng],error:bat[:str])
address PNtable
comment "Inspect the iot queries";
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list