Changeset: eb1c483489cb for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=eb1c483489cb
Modified Files:
sql/backends/monet5/iot/basket.c
sql/backends/monet5/iot/iot.c
sql/backends/monet5/iot/petrinet.c
sql/backends/monet5/iot/petrinet.h
Branch: iot
Log Message:
Ensure proper properties and use global debugging stream
diffs (truncated from 334 to 300 lines):
diff --git a/sql/backends/monet5/iot/basket.c b/sql/backends/monet5/iot/basket.c
--- a/sql/backends/monet5/iot/basket.c
+++ b/sql/backends/monet5/iot/basket.c
@@ -122,7 +122,7 @@ BSKTnewbasket(mvc *m, sql_schema *s, sql
baskets[idx].schema_name = GDKstrdup(s->base.name);
baskets[idx].table_name = GDKstrdup(t->base.name);
- baskets[idx].seen = * timestamp_nil;
+ (void) MTIMEcurrent_timestamp(&baskets[idx].seen);
baskets[idx].status = BSKTWAIT;
baskets[idx].count = 0;
@@ -537,6 +537,7 @@ BSKTtumbleInternal(Client cntxt, str sch
if( BATcount(b) == 0){
baskets[bskt].status = BSKTWAIT;
}
+ BATderiveProps(b, FALSE);
}
return MAL_SUCCEED;
}
@@ -667,7 +668,7 @@ BSKTappend(Client cntxt, MalBlkPtr mb, M
else
BUNappend(bn, value, TRUE);
cnt = BATcount(bn);
- //BATderiveProps(bn, FALSE);
+ BATderiveProps(bn, FALSE);
} else throw(SQL, "basket.append", "Cannot access target column
%s.%s.%s",sname,tname,cname);
if(cnt){
diff --git a/sql/backends/monet5/iot/iot.c b/sql/backends/monet5/iot/iot.c
--- a/sql/backends/monet5/iot/iot.c
+++ b/sql/backends/monet5/iot/iot.c
@@ -29,7 +29,6 @@
#include "petrinet.h"
MT_Lock iotLock MT_LOCK_INITIALIZER("iotLock");
-#define IOTout mal_clients[1].fdout
// locate the SQL procedure in the catalog
static str
@@ -176,13 +175,13 @@ IOTreceptorThread(void *dummy)
if( cntxt == NULL)
return;
//SQLinitClient(cntxt);
- _DEBUG_IOT_ mnstr_printf(IOTout, "#iot.receptor %s.%s started for %s\n",
+ _DEBUG_IOT_ mnstr_printf(GDKout, "#iot.receptor %s.%s started for %s\n",
baskets[idx].schema_name,
baskets[idx].table_name,
baskets[idx].source);
/* continously scan the container for baskets */
BSKTimportInternal(cntxt, idx);
- _DEBUG_IOT_ mnstr_printf(IOTout, "#iot.receptor %s.%s imported the
file\n",
+ _DEBUG_IOT_ mnstr_printf(GDKout, "#iot.receptor %s.%s imported the
file\n",
baskets[idx].schema_name,
baskets[idx].table_name);
}
diff --git a/sql/backends/monet5/iot/petrinet.c
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -78,7 +78,7 @@ typedef struct {
lng time; /* total time spent for all invocations */
} PNnode;
-PNnode pnet[MAXPN];
+PNnode pnet[MAXPN]={0};
int pnettop = 0;
int enabled[MAXPN]; /*array that contains the id's of all queries that are
enable to fire*/
@@ -138,7 +138,7 @@ PNregisterInternal(Client cntxt, MalBlkP
Symbol s;
char buf[IDLENGTH];
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#registerInternal status %d\n",
init);
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#registerInternal status %d\n",
init);
if (pnettop == MAXPN)
GDKerror("petrinet.register:Too many transitions");
@@ -204,13 +204,13 @@ PNstatus( Client cntxt, MalBlkPtr mb, Ma
throw(SQL,"iot.pause","Continuous query not found");
}
pnet[i].status = newstatus;
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler status %s.%s
%s\n", modname,fcnname, statusname[newstatus]);
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler status %s.%s
%s\n", modname,fcnname, statusname[newstatus]);
MT_lock_unset(&iotLock);
return MAL_SUCCEED;
}
for ( i = 0; i < pnettop; i++){
pnet[i].status = newstatus;
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler status %s.%s
%s\n", pnet[i].modname, pnet[i].fcnname, statusname[newstatus]);
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler status %s.%s
%s\n", pnet[i].modname, pnet[i].fcnname, statusname[newstatus]);
}
MT_lock_unset(&iotLock);
return MAL_SUCCEED;
@@ -218,13 +218,13 @@ PNstatus( Client cntxt, MalBlkPtr mb, Ma
str
PNresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#resume scheduler \n");
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#resume scheduler \n");
return PNstatus(cntxt, mb, stk, pci, PNWAIT);
}
str
PNpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#pause scheduler \n");
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#pause scheduler \n");
return PNstatus(cntxt, mb, stk, pci, PNPAUSED);
}
@@ -244,7 +244,7 @@ PNwait(Client cntxt, MalBlkPtr mb, MalSt
str
PNstop(void){
int i,cnt;
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler being stopped\n");
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler being stopped\n");
pnstatus = PNSTOP; // avoid starting new continuous queries
for(cnt=0, i = 0; i < pnettop; i++)
@@ -257,7 +257,7 @@ PNstop(void){
cnt += pnet[i].status != PNWAIT;
} while(cnt);
BSKTclean(0);
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#all queries stopped \n");
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#all queries stopped \n");
return MAL_SUCCEED;
}
@@ -287,7 +287,7 @@ PNderegister(Client cntxt, MalBlkPtr mb,
pnet[i]= pnet[i+1];
memset((void*) (pnet+i), 0, sizeof(PNnode));
pnettop--;
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler deregistered
%s.%s\n", modname,fcnname);
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler deregistered
%s.%s\n", modname,fcnname);
MT_lock_unset(&iotLock);
return MAL_SUCCEED;
}
@@ -298,14 +298,14 @@ PNderegister(Client cntxt, MalBlkPtr mb,
memset((void*) (pnet+i), 0, sizeof(PNnode));
}
pnettop = 0;
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler deregistered all\n");
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler deregistered all\n");
MT_lock_unset(&iotLock);
return MAL_SUCCEED;
}
str
PNcycles(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#scheduler cycles set \n");
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#scheduler cycles set \n");
(void) cntxt;
(void) mb;
(void) stk;
@@ -318,22 +318,22 @@ PNcycles(Client cntxt, MalBlkPtr mb, Mal
str PNdump(void *ret)
{
int i, k, idx;
- mnstr_printf(PNout, "#scheduler status %s\n", statusname[pnstatus]);
+ mnstr_printf(GDKout, "#scheduler status %s\n", statusname[pnstatus]);
for (i = 0; i < pnettop; i++) {
- mnstr_printf(PNout, "#[%d]\t%s.%s %s delay %d cycles %d events
%d time " LLFMT " ms\n",
+ mnstr_printf(GDKout, "#[%d]\t%s.%s %s delay %d cycles %d events
%d time " LLFMT " ms\n",
i, pnet[i].modname, pnet[i].fcnname,
statusname[pnet[i].status], pnet[i].delay, pnet[i].cycles, pnet[i].events,
pnet[i].time / 1000);
if (pnet[i].error)
- mnstr_printf(PNout, "#%s\n", pnet[i].error);
+ mnstr_printf(GDKout, "#%s\n", pnet[i].error);
for (k = 0; k < MAXBSKT && pnet[i].inputs[k]; k++){
idx = pnet[i].inputs[k];
- mnstr_printf(PNout, "#<--\t%s basket "BUNFMT" %d\n",
+ mnstr_printf(GDKout, "#<--\t%s basket "BUNFMT" %d\n",
baskets[idx].table_name,
baskets[idx].count,
baskets[idx].status);
}
for (k = 0; k <MAXBSKT && pnet[i].outputs[k]; k++){
idx = pnet[i].outputs[k];
- mnstr_printf(PNout, "#-->\t%s basket "BUNFMT" %d\n",
+ mnstr_printf(GDKout, "#-->\t%s basket "BUNFMT" %d\n",
baskets[idx].table_name,
baskets[idx].count,
baskets[idx].status);
@@ -399,19 +399,19 @@ PNexecute( void *n)
str msg= MAL_SUCCEED;
lng t = GDKusec();
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute
%s.%s\n",node->modname, node->fcnname);
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet.execute
%s.%s\n",node->modname, node->fcnname);
// first grab exclusive access to all streams.
for (j = 0; j < MAXBSKT && node->inputs[j]; j++)
MT_lock_set(&baskets[node->inputs[j]].lock);
for (j = 0; j < MAXBSKT && node->outputs[j]; j++)
MT_lock_set(&baskets[node->outputs[j]].lock);
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all
locked\n",node->modname, node->fcnname);
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet.execute %s.%s all
locked\n",node->modname, node->fcnname);
msg = runMALsequence(node->client, node->mb, 1, 0, node->stk, 0, 0);
_DEBUG_PETRINET_
- mnstr_printf(PNout, "#petrinet.execute %s.%s transition
done:%s\n",
+ mnstr_printf(GDKout, "#petrinet.execute %s.%s transition
done:%s\n",
node->modname, node->fcnname, (msg != MAL_SUCCEED?msg:""));
// empty the baskets according to their policy
@@ -421,7 +421,7 @@ PNexecute( void *n)
MT_lock_unset(&baskets[node->outputs[j]].lock);
pnet[node->inputs[0]].time += GDKusec() - t; /* keep around in
microseconds */
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.execute %s.%s all
unlocked\n",node->modname, node->fcnname);
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet.execute %s.%s all
unlocked\n",node->modname, node->fcnname);
node->status = PNWAIT;
}
@@ -437,7 +437,7 @@ PNscheduler(void *dummy)
char claimed[MAXBSKT];
timestamp ts, tn;
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.controller started\n");
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#petrinet.controller started\n");
cntxt = MCinitClient(0,0,0); /* run as admin in SQL mode*/
if( cntxt){
if( SQLinitClient(cntxt) != MAL_SUCCEED)
@@ -469,7 +469,7 @@ PNscheduler(void *dummy)
// check if all input baskets are available and
non-empty
for (j = 0; j < MAXBSKT && pnet[i].enabled &&
pnet[i].inputs[j]; j++) {
idx = pnet[i].inputs[j];
- if (baskets[idx].status != BSKTFILLED ){
+ if (baskets[idx].status != BSKTFILLED &&
baskets[idx].heartbeat == 0 ){
pnet[i].enabled = 0;
break;
}
@@ -479,6 +479,7 @@ PNscheduler(void *dummy)
(void) MTIMEtimestamp_add(&tn,
&baskets[idx].seen, &baskets[idx].heartbeat);
if (tn.days < ts.days || (tn.days ==
ts.days && tn.msecs < ts.msecs)) {
pnet[i].enabled = 0;
+ PNcycle--; // it does not count
as a valid cycle.
break;
}
} else
@@ -493,13 +494,13 @@ PNscheduler(void *dummy)
/* a basket can be used in at most one
continuous query at a time */
for (j = 0; j < MAXBSKT && pnet[i].enabled &&
pnet[i].inputs[j]; j++)
if( claimed[pnet[i].inputs[j]]){
- _DEBUG_PETRINET_
mnstr_printf(PNout, "#petrinet: %s.%s enabled twice,disgarded \n",
pnet[i].modname, pnet[i].fcnname);
+ _DEBUG_PETRINET_
mnstr_printf(GDKout, "#petrinet: %s.%s enabled twice,disgarded \n",
pnet[i].modname, pnet[i].fcnname);
pnet[i].enabled = 0;
break;
}
for (j = 0; j < MAXBSKT && pnet[i].enabled &&
pnet[i].outputs[j]; j++)
if( claimed[pnet[i].outputs[j]]){
- _DEBUG_PETRINET_
mnstr_printf(PNout, "#petrinet: %s.%s enabled twice,disgarded \n",
pnet[i].modname, pnet[i].fcnname);
+ _DEBUG_PETRINET_
mnstr_printf(GDKout, "#petrinet: %s.%s enabled twice,disgarded \n",
pnet[i].modname, pnet[i].fcnname);
pnet[i].enabled = 0;
break;
}
@@ -514,7 +515,7 @@ PNscheduler(void *dummy)
/*save the ids of all continuous queries that
can be executed */
enabled[k++] = i;
- _DEBUG_PETRINET_ mnstr_printf(PNout,
"#petrinet: %s.%s enabled \n", pnet[i].modname, pnet[i].fcnname);
+ _DEBUG_PETRINET_ mnstr_printf(GDKout,
"#petrinet: %s.%s enabled \n", pnet[i].modname, pnet[i].fcnname);
}
pntasks += pnet[i].enabled;
}
@@ -525,7 +526,7 @@ PNscheduler(void *dummy)
for (m = 0; m < k; m++) {
i = enabled[m];
if (pnet[i].enabled ) {
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#Run
transition %s \n", pnet[i].fcnname);
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#Run
transition %s \n", pnet[i].fcnname);
t = GDKusec();
// Fork MAL execution thread
@@ -553,22 +554,22 @@ PNscheduler(void *dummy)
}
}
/* after one sweep all threads should be released */
- for (m = 0; m < k; m++) {
- _DEBUG_PETRINET_ mnstr_printf(PNout, "#Terminate query
thread %s \n", pnet[i].fcnname);
- MT_join_thread(pnet[i].tid);
+ for (m = 0; m < k; m++)
+ if(pnet[enabled[m]].tid){
+ _DEBUG_PETRINET_ mnstr_printf(GDKout, "#Terminate query
thread %s \n", pnet[enabled[m]].fcnname);
+ MT_join_thread(pnet[enabled[m]].tid);
}
_DEBUG_PETRINET_ if (pnstatus == PNRUNNING && cycleDelay)
MT_sleep_ms(cycleDelay); /* delay to make it more tractable */
MT_sleep_ms(2);
while (pnstatus == PNPAUSED) { /* scheduler is paused */
MT_sleep_ms(cycleDelay);
- _DEBUG_PETRINET_ mnstr_printf(PNout,
"#petrinet.controller paused\n");
+ _DEBUG_PETRINET_ mnstr_printf(GDKout,
"#petrinet.controller paused\n");
}
}
MCcloseClient(cntxt);
pnstatus = PNINIT;
- _DEBUG_PETRINET_ mnstr_flush(PNout);
(void) dummy;
}
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list