Changeset: cdecc63da1dc for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=cdecc63da1dc
Modified Files:
monetdb5/optimizer/opt_iot.h
sql/backends/monet5/iot/50_iot.sql
sql/backends/monet5/iot/basket.c
sql/backends/monet5/iot/iot.c
sql/backends/monet5/iot/petrinet.c
sql/backends/monet5/iot/petrinet.h
sql/backends/monet5/iot/petrinet.mal
Branch: iot
Log Message:
Intermittent commit
diffs (267 lines):
diff --git a/monetdb5/optimizer/opt_iot.h b/monetdb5/optimizer/opt_iot.h
--- a/monetdb5/optimizer/opt_iot.h
+++ b/monetdb5/optimizer/opt_iot.h
@@ -23,7 +23,7 @@
#include "opt_support.h"
#include "opt_pipes.h"
-#define OPTDEBUGiot if (1)
-//#define OPTDEBUGiot if (optDebug & ((lng) 1 << DEBUG_OPT_DATACELL))
+//#define OPTDEBUGiot if (1)
+#define OPTDEBUGiot if (optDebug & ((lng) 1 << DEBUG_OPT_IOT))
opt_export int OPTiotImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
#endif
diff --git a/sql/backends/monet5/iot/50_iot.sql
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -59,6 +59,15 @@ create function iot.queries()
returns table( "schema" string, "function" string, status string, lastrun
timestamp, cycles int, events int, time bigint, error string)
external name iot.queries;
+-- next causes crash
+--create function iot.places2()
+-- returns table( "schema" string, "table" string, "schema" string, "query"
string)
+-- external name iot.places;
+
+create function iot.places()
+ returns table( "s" string, "t" string, "sch" string, "qry" string)
+ external name iot.places;
+
-- create function iot.errors()
-- returns table( "schema" string, "table" string, error string)
-- external name iot.errors;
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -165,8 +165,13 @@ BSKTregister(Client cntxt, MalBlkPtr mb,
if ( msg != MAL_SUCCEED)
return msg;
- sch = *getArgReference_str(stk, pci, 1);
- tbl = *getArgReference_str(stk, pci, 2);
+ if( stk == 0){
+ sch = getVarConstant(mb, getArg(pci,1)).val.sval;
+ tbl = getVarConstant(mb, getArg(pci,2)).val.sval;
+ } else{
+ sch = *getArgReference_str(stk, pci, 1);
+ tbl = *getArgReference_str(stk, pci, 2);
+ }
/* check double registration */
if( BSKTlocate(sch, tbl) > 0)
diff --git a/sql/backends/monet5/iot/iot.c b/sql/backends/monet5/iot/iot.c
--- a/sql/backends/monet5/iot/iot.c
+++ b/sql/backends/monet5/iot/iot.c
@@ -135,7 +135,6 @@ IOTquery(Client cntxt, MalBlkPtr mb, Mal
}
if (msg == MAL_SUCCEED) {
_DEBUG_IOT_ fprintf(stderr,"#iot: continuous query plan\n");
- _DEBUG_IOT_ printFunction(cntxt->fdout, s->def, 0,
LIST_MAL_ALL);
msg = PNregisterInternal(cntxt, s->def);
}
return msg;
diff --git a/sql/backends/monet5/iot/petrinet.c
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -50,8 +50,6 @@
#define PNcontrolInfinit 1 /* infinit loop of PNController */
#define PNcontrolEnd 2 /* when all factories are disable PNController
exits */
-#define _DEBUG_PETRINET_
-
static str statusname[6] = { "<unknown>", "init", "paused", "running", "stop",
"error" };
/*static int controlRounds = PNcontrolInfinit;*/
@@ -179,16 +177,12 @@ PNstatus( Client cntxt, MalBlkPtr mb, Ma
if ( i == pnettop)
throw(SQL,"iot.pause","Continuous query not found");
pnet[i].status = newstatus;
-#ifdef _DEBUG_PETRINET_
- mnstr_printf(PNout, "#scheduler status %s.%s %s\n", modname,fcnname,
statusname[newstatus]);
-#endif
+ _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler status %s.%s
%s\n", modname,fcnname, statusname[newstatus]);
return MAL_SUCCEED;
}
for ( i = 0; i < pnettop; i++){
pnet[i].status = newstatus;
-#ifdef _DEBUG_PETRINET_
- mnstr_printf(PNout, "#scheduler status %s\n",
statusname[newstatus]);
-#endif
+ _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler status %s\n",
statusname[newstatus]);
}
MT_lock_unset(&iotLock);
return MAL_SUCCEED;
@@ -210,9 +204,7 @@ PNstop(Client cntxt, MalBlkPtr mb, MalSt
str
PNcycles(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
-#ifdef _DEBUG_PETRINET_
- mnstr_printf(PNout, "#scheduler cycles set \n");
-#endif
+ _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler cycles set \n");
(void) cntxt;
(void) mb;
(void) stk;
@@ -257,34 +249,19 @@ str PNdump(void *ret)
str
PNanalysis(Client cntxt, MalBlkPtr mb)
{
- int i, j, k;
+ int i;
InstrPtr p;
- str tbl;
+ str msg= MAL_SUCCEED;
- (void) cntxt;
- /* first check for errors */
- for (i = 0; i < mb->stop; i++) {
+ for (i = 0; msg== MAL_SUCCEED && i < mb->stop; i++) {
p = getInstrPtr(mb, i);
- if (getModuleId(p) == basketRef && getFunctionId(p) ==
registerRef) {
- tbl = getVarConstant(mb, getArg(p, p->argc -
1)).val.sval;
- for (j = 0; j < pnettop; j++)
- for (k = 0; k < MAXBSKT && pnet[j].places[k];
k++)
- if (strcmp(tbl,
baskets[pnet[j].places[k]].table) == 0)
- throw(MAL, "iot.register",
"Duplicate use of continuous query input");
+ if (getModuleId(p) == basketRef && getFunctionId(p) ==
registerRef){
+ msg =BSKTregister(cntxt,mb,0,p);
}
}
- for (i = 0; i < mb->stop; i++) {
- p = getInstrPtr(mb, i);
- if (getModuleId(p) == basketRef && getFunctionId(p) ==
registerRef) {
- tbl = getVarConstant(mb, getArg(p, p->argc -
1)).val.sval;
- }
- if (getModuleId(p) == basketRef && getFunctionId(p) ==
putName("pass", 4)) {
- tbl = getVarConstant(mb, getArg(p, p->retc)).val.sval;
- mnstr_printf(cntxt->fdout, "#output basket %s \n", tbl);
- }
- }
- return MAL_SUCCEED;
+ return msg;
}
+
/*
* The PetriNet controller lives in an separate thread.
* It cycles through all transition nodes, hunting for paused queries that can
fire.
@@ -298,9 +275,7 @@ PNexecute( void *n)
{
PNnode *node= (PNnode *) n;
node->status = BSKTPAUSE;
-#ifdef _DEBUG_PETRINET_
- mnstr_printf(PNout, "#petrinet.executed %s.%s\n",node->modname,
node->fcnname);
-#endif
+ _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.executed
%s.%s\n",node->modname, node->fcnname);
}
static void
PNcontroller(void *dummy)
@@ -368,9 +343,7 @@ PNcontroller(void *dummy)
for (m = 0; m < k; m++) {
i = enabled[m];
if (pnet[i].enabled ) {
-#ifdef _DEBUG_PETRINET_
- mnstr_printf(cntxt->fdout, "#Run transition %s
\n", pnet[i].fcnname);
-#endif
+ _DEBUG_PETRINET_ mnstr_printf(cntxt->fdout,
"#Run transition %s \n", pnet[i].fcnname);
(void)
MTIMEcurrent_timestamp(&baskets[idx].seen);
t = GDKusec();
@@ -410,11 +383,8 @@ PNstartScheduler(void)
{
MT_Id pid;
int s;
-#ifdef _DEBUG_PETRINET_
- PNdump(&s);
-#else
(void) s;
-#endif
+ _DEBUG_PETRINET_ PNdump(&s);
if (status== BSKTINIT && MT_create_thread(&pid, PNcontroller, &s,
MT_THR_JOINABLE) != 0){
GDKerror( "petrinet creation failed");
@@ -500,3 +470,52 @@ wrapup:
BBPunfix(error->batCacheid);
throw(MAL, "iot.queries", MAL_MALLOC_FAIL);
}
+
+str PNplaces(bat *schemaId, bat *tableId, bat *modnameId, bat *fcnnameId)
+{
+ BAT *schema, *table, *modname = NULL, *fcnname = NULL;
+ int i,j;
+
+ schema = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT);
+ if (schema == 0)
+ goto wrapup;
+ BATseqbase(schema, 0);
+
+ table = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT);
+ if (table == 0)
+ goto wrapup;
+ BATseqbase(table, 0);
+
+ modname = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT);
+ if (modname == 0)
+ goto wrapup;
+ BATseqbase(modname, 0);
+
+ fcnname = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT);
+ if (fcnname == 0)
+ goto wrapup;
+ BATseqbase(fcnname, 0);
+
+ for (i = 0; i < pnettop; i++)
+ for( j =0; j < MAXBSKT && pnet[i].places[j]; j++){
+ BUNappend(schema, baskets[pnet[i].places[j]].schema, FALSE);
+ BUNappend(table, baskets[pnet[i].places[j]].table, FALSE);
+ BUNappend(modname, pnet[i].modname, FALSE);
+ BUNappend(fcnname, pnet[i].fcnname, FALSE);
+ }
+ BBPkeepref(*schemaId = schema->batCacheid);
+ BBPkeepref(*tableId = table->batCacheid);
+ BBPkeepref(*modnameId = modname->batCacheid);
+ BBPkeepref(*fcnnameId = fcnname->batCacheid);
+ return MAL_SUCCEED;
+wrapup:
+ if (schema)
+ BBPunfix(schema->batCacheid);
+ if (table)
+ BBPunfix(table->batCacheid);
+ if (modname)
+ BBPunfix(modname->batCacheid);
+ if (fcnname)
+ BBPunfix(fcnname->batCacheid);
+ throw(MAL, "iot.places", MAL_MALLOC_FAIL);
+}
diff --git a/sql/backends/monet5/iot/petrinet.h
b/sql/backends/monet5/iot/petrinet.h
--- a/sql/backends/monet5/iot/petrinet.h
+++ b/sql/backends/monet5/iot/petrinet.h
@@ -23,7 +23,7 @@
#include "sql_scenario.h"
#include "basket.h"
-#define _DEBUG_PETRINET_
+#define _DEBUG_PETRINET_ if(1)
#define PNout GDKout
/*#define _BASKET_SIZE_*/
@@ -50,5 +50,6 @@ iot_export str PNdump(void *ret);
iot_export str PNanalysis(Client cntxt, MalBlkPtr mb);
iot_export str PNanalyseWrapper(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
iot_export str PNtable(bat *modnameId, bat *fcnnameId, bat *statusId, bat
*seenId, bat *cyclesId, bat *eventsId, bat *timeId, bat * errorId);
+iot_export str PNplaces(bat *schemaId, bat *streamId, bat *modnameId, bat
*fcnnameId);
#endif
diff --git a/sql/backends/monet5/iot/petrinet.mal
b/sql/backends/monet5/iot/petrinet.mal
--- a/sql/backends/monet5/iot/petrinet.mal
+++ b/sql/backends/monet5/iot/petrinet.mal
@@ -66,6 +66,10 @@ command iot.queries() (mod:bat[:str],fcn
address PNtable
comment "Inspect the iot queries";
+command iot.places() (sch:bat[:str], tbl:bat[:str],mod:bat[:str],fcn:bat[:str])
+address PNplaces
+comment "Inspect the destination of the places";
+
command dump()
address PNdump
comment "Show the status of the Petri net";
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list