Changeset: cdecc63da1dc for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=cdecc63da1dc
Modified Files:
        monetdb5/optimizer/opt_iot.h
        sql/backends/monet5/iot/50_iot.sql
        sql/backends/monet5/iot/basket.c
        sql/backends/monet5/iot/iot.c
        sql/backends/monet5/iot/petrinet.c
        sql/backends/monet5/iot/petrinet.h
        sql/backends/monet5/iot/petrinet.mal
Branch: iot
Log Message:

Intermittent commit


diffs (267 lines):

diff --git a/monetdb5/optimizer/opt_iot.h b/monetdb5/optimizer/opt_iot.h
--- a/monetdb5/optimizer/opt_iot.h
+++ b/monetdb5/optimizer/opt_iot.h
@@ -23,7 +23,7 @@
 #include "opt_support.h"
 #include "opt_pipes.h"
 
-#define OPTDEBUGiot   if (1)
-//#define OPTDEBUGiot  if (optDebug & ((lng) 1 << DEBUG_OPT_DATACELL))
+//#define OPTDEBUGiot   if (1)
+#define OPTDEBUGiot  if (optDebug & ((lng) 1 << DEBUG_OPT_IOT))
 opt_export int OPTiotImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 #endif
diff --git a/sql/backends/monet5/iot/50_iot.sql 
b/sql/backends/monet5/iot/50_iot.sql
--- a/sql/backends/monet5/iot/50_iot.sql
+++ b/sql/backends/monet5/iot/50_iot.sql
@@ -59,6 +59,15 @@ create function iot.queries()
  returns table( "schema" string,  "function" string, status string, lastrun 
timestamp, cycles int, events int, time bigint, error string)
  external name iot.queries;
 
+-- next causes crash
+--create function iot.places2()
+-- returns table( "schema" string,  "table" string, "schema" string, "query" 
string)
+-- external name iot.places;
+
+create function iot.places()
+ returns table( "s" string,  "t" string, "sch" string, "qry" string)
+ external name iot.places;
+
 -- create function iot.errors()
 -- returns table( "schema" string,  "table" string, error string)
 -- external name iot.errors;
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -165,8 +165,13 @@ BSKTregister(Client cntxt, MalBlkPtr mb,
 
        if ( msg != MAL_SUCCEED)
                return msg;
-       sch = *getArgReference_str(stk, pci, 1);
-       tbl = *getArgReference_str(stk, pci, 2);
+       if( stk == 0){
+               sch = getVarConstant(mb, getArg(pci,1)).val.sval;
+               tbl = getVarConstant(mb, getArg(pci,2)).val.sval;
+       } else{
+               sch = *getArgReference_str(stk, pci, 1);
+               tbl = *getArgReference_str(stk, pci, 2);
+       }
 
        /* check double registration */
        if( BSKTlocate(sch, tbl) > 0)
diff --git a/sql/backends/monet5/iot/iot.c b/sql/backends/monet5/iot/iot.c
--- a/sql/backends/monet5/iot/iot.c
+++ b/sql/backends/monet5/iot/iot.c
@@ -135,7 +135,6 @@ IOTquery(Client cntxt, MalBlkPtr mb, Mal
        }
        if (msg == MAL_SUCCEED) {
                _DEBUG_IOT_ fprintf(stderr,"#iot: continuous query plan\n");
-               _DEBUG_IOT_ printFunction(cntxt->fdout, s->def, 0, 
LIST_MAL_ALL);
                msg = PNregisterInternal(cntxt, s->def);
        }
        return msg;
diff --git a/sql/backends/monet5/iot/petrinet.c 
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -50,8 +50,6 @@
 #define PNcontrolInfinit 1  /* infinit loop of PNController  */
 #define PNcontrolEnd 2      /* when all factories are disable PNController 
exits */
 
-#define _DEBUG_PETRINET_ 
-
 static str statusname[6] = { "<unknown>", "init", "paused", "running", "stop", 
"error" };
 
 /*static int controlRounds = PNcontrolInfinit;*/
@@ -179,16 +177,12 @@ PNstatus( Client cntxt, MalBlkPtr mb, Ma
                if ( i == pnettop)
                        throw(SQL,"iot.pause","Continuous query not found");
                pnet[i].status = newstatus;
-#ifdef _DEBUG_PETRINET_
-       mnstr_printf(PNout, "#scheduler status %s.%s %s\n", modname,fcnname, 
statusname[newstatus]);
-#endif
+               _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler status %s.%s 
%s\n", modname,fcnname, statusname[newstatus]);
                return MAL_SUCCEED;
        }
        for ( i = 0; i < pnettop; i++){
                pnet[i].status = newstatus;
-#ifdef _DEBUG_PETRINET_
-               mnstr_printf(PNout, "#scheduler status %s\n", 
statusname[newstatus]);
-#endif
+               _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler status %s\n", 
statusname[newstatus]);
        }
        MT_lock_unset(&iotLock);
        return MAL_SUCCEED;
@@ -210,9 +204,7 @@ PNstop(Client cntxt, MalBlkPtr mb, MalSt
 
 str
 PNcycles(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
-#ifdef _DEBUG_PETRINET_
-               mnstr_printf(PNout, "#scheduler cycles set \n");
-#endif
+       _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler cycles set \n");
        (void) cntxt;
        (void) mb;
        (void) stk;
@@ -257,34 +249,19 @@ str PNdump(void *ret)
 str
 PNanalysis(Client cntxt, MalBlkPtr mb)
 {
-       int i, j, k;
+       int i;
        InstrPtr p;
-       str tbl;
+       str msg= MAL_SUCCEED;
 
-       (void) cntxt;
-       /* first check for errors */
-       for (i = 0; i < mb->stop; i++) {
+       for (i = 0; msg== MAL_SUCCEED && i < mb->stop; i++) {
                p = getInstrPtr(mb, i);
-               if (getModuleId(p) == basketRef && getFunctionId(p) == 
registerRef) {
-                       tbl = getVarConstant(mb, getArg(p, p->argc - 
1)).val.sval;
-                       for (j = 0; j < pnettop; j++)
-                               for (k = 0; k < MAXBSKT && pnet[j].places[k]; 
k++)
-                                       if (strcmp(tbl, 
baskets[pnet[j].places[k]].table) == 0)
-                                               throw(MAL, "iot.register", 
"Duplicate use of continuous query input");
+               if (getModuleId(p) == basketRef && getFunctionId(p) == 
registerRef){
+                       msg =BSKTregister(cntxt,mb,0,p);
                }
        }
-       for (i = 0; i < mb->stop; i++) {
-               p = getInstrPtr(mb, i);
-               if (getModuleId(p) == basketRef && getFunctionId(p) == 
registerRef) {
-                       tbl = getVarConstant(mb, getArg(p, p->argc - 
1)).val.sval;
-               }
-               if (getModuleId(p) == basketRef && getFunctionId(p) == 
putName("pass", 4)) {
-                       tbl = getVarConstant(mb, getArg(p, p->retc)).val.sval;
-                       mnstr_printf(cntxt->fdout, "#output basket %s \n", tbl);
-               }
-       }
-       return MAL_SUCCEED;
+       return msg;
 }
+
 /*
  * The PetriNet controller lives in an separate thread.
  * It cycles through all transition nodes, hunting for paused queries that can 
fire.
@@ -298,9 +275,7 @@ PNexecute( void *n)
 {
        PNnode *node= (PNnode *) n;
        node->status = BSKTPAUSE;
-#ifdef _DEBUG_PETRINET_
-               mnstr_printf(PNout, "#petrinet.executed %s.%s\n",node->modname, 
node->fcnname);
-#endif
+       _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.executed 
%s.%s\n",node->modname, node->fcnname);
 }
 static void
 PNcontroller(void *dummy)
@@ -368,9 +343,7 @@ PNcontroller(void *dummy)
                for (m = 0; m < k; m++) {
                        i = enabled[m];
                        if (pnet[i].enabled ) {
-#ifdef _DEBUG_PETRINET_
-                               mnstr_printf(cntxt->fdout, "#Run transition %s 
\n", pnet[i].fcnname);
-#endif
+                               _DEBUG_PETRINET_ mnstr_printf(cntxt->fdout, 
"#Run transition %s \n", pnet[i].fcnname);
 
                                (void) 
MTIMEcurrent_timestamp(&baskets[idx].seen);
                                t = GDKusec();
@@ -410,11 +383,8 @@ PNstartScheduler(void)
 {
        MT_Id pid;
        int s;
-#ifdef _DEBUG_PETRINET_
-       PNdump(&s);
-#else
        (void) s;
-#endif
+       _DEBUG_PETRINET_ PNdump(&s);
 
        if (status== BSKTINIT && MT_create_thread(&pid, PNcontroller, &s, 
MT_THR_JOINABLE) != 0){
                GDKerror( "petrinet creation failed");
@@ -500,3 +470,52 @@ wrapup:
                BBPunfix(error->batCacheid);
        throw(MAL, "iot.queries", MAL_MALLOC_FAIL);
 }
+
+str PNplaces(bat *schemaId, bat *tableId, bat *modnameId, bat *fcnnameId)
+{
+       BAT *schema, *table, *modname = NULL, *fcnname = NULL;
+       int i,j;
+
+       schema = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT);
+       if (schema == 0)
+               goto wrapup;
+       BATseqbase(schema, 0);
+
+       table = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT);
+       if (table == 0)
+               goto wrapup;
+       BATseqbase(table, 0);
+
+       modname = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT);
+       if (modname == 0)
+               goto wrapup;
+       BATseqbase(modname, 0);
+
+       fcnname = BATnew(TYPE_void, TYPE_str, BATTINY, TRANSIENT);
+       if (fcnname == 0)
+               goto wrapup;
+       BATseqbase(fcnname, 0);
+
+       for (i = 0; i < pnettop; i++) 
+       for( j =0; j < MAXBSKT && pnet[i].places[j]; j++){
+               BUNappend(schema, baskets[pnet[i].places[j]].schema, FALSE);
+               BUNappend(table, baskets[pnet[i].places[j]].table, FALSE);
+               BUNappend(modname, pnet[i].modname, FALSE);
+               BUNappend(fcnname, pnet[i].fcnname, FALSE);
+       }
+       BBPkeepref(*schemaId = schema->batCacheid);
+       BBPkeepref(*tableId = table->batCacheid);
+       BBPkeepref(*modnameId = modname->batCacheid);
+       BBPkeepref(*fcnnameId = fcnname->batCacheid);
+       return MAL_SUCCEED;
+wrapup:
+       if (schema)
+               BBPunfix(schema->batCacheid);
+       if (table)
+               BBPunfix(table->batCacheid);
+       if (modname)
+               BBPunfix(modname->batCacheid);
+       if (fcnname)
+               BBPunfix(fcnname->batCacheid);
+       throw(MAL, "iot.places", MAL_MALLOC_FAIL);
+}
diff --git a/sql/backends/monet5/iot/petrinet.h 
b/sql/backends/monet5/iot/petrinet.h
--- a/sql/backends/monet5/iot/petrinet.h
+++ b/sql/backends/monet5/iot/petrinet.h
@@ -23,7 +23,7 @@
 #include "sql_scenario.h"
 #include "basket.h"
 
-#define _DEBUG_PETRINET_ 
+#define _DEBUG_PETRINET_ if(1)
 
 #define PNout GDKout
 /*#define  _BASKET_SIZE_*/
@@ -50,5 +50,6 @@ iot_export str PNdump(void *ret);
 iot_export str PNanalysis(Client cntxt, MalBlkPtr mb);
 iot_export str PNanalyseWrapper(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 iot_export str PNtable(bat *modnameId, bat *fcnnameId, bat *statusId, bat 
*seenId, bat *cyclesId, bat *eventsId, bat *timeId, bat * errorId);
+iot_export str PNplaces(bat *schemaId, bat *streamId, bat *modnameId, bat 
*fcnnameId);
 #endif
 
diff --git a/sql/backends/monet5/iot/petrinet.mal 
b/sql/backends/monet5/iot/petrinet.mal
--- a/sql/backends/monet5/iot/petrinet.mal
+++ b/sql/backends/monet5/iot/petrinet.mal
@@ -66,6 +66,10 @@ command iot.queries() (mod:bat[:str],fcn
 address PNtable
 comment "Inspect the iot queries";
 
+command iot.places() (sch:bat[:str], tbl:bat[:str],mod:bat[:str],fcn:bat[:str])
+address PNplaces
+comment "Inspect the destination of the places";
+
 command dump()
 address PNdump
 comment "Show the status of the Petri net";
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to