Changeset: 717c2f042c77 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=717c2f042c77
Modified Files:
sql/backends/monet5/iot/Tests/iot00.sql
sql/backends/monet5/iot/iot.c
sql/backends/monet5/iot/petrinet.c
Branch: iot
Log Message:
Intermittent commit
Still cleaning up.
diffs (122 lines):
diff --git a/sql/backends/monet5/iot/Tests/iot00.sql
b/sql/backends/monet5/iot/Tests/iot00.sql
--- a/sql/backends/monet5/iot/Tests/iot00.sql
+++ b/sql/backends/monet5/iot/Tests/iot00.sql
@@ -10,7 +10,7 @@ begin
end;
call iot.query('iot','cq00');
-call iot.query('insert into result select min(t), count(*), avg(val) from
stream_tmp;');
+call iot.query('insert into iot.result select min(t), count(*), avg(val) from
iot.stream_tmp;');
select * from iot.baskets();
select * from iot.queries();
diff --git a/sql/backends/monet5/iot/iot.c b/sql/backends/monet5/iot/iot.c
--- a/sql/backends/monet5/iot/iot.c
+++ b/sql/backends/monet5/iot/iot.c
@@ -91,28 +91,28 @@ IOTquery(Client cntxt, MalBlkPtr mb, Mal
nme = *getArgReference_str(stk, pci, 2);
/* check existing of the pre-compiled function */
_DEBUG_IOT_ fprintf(stderr,"#iot: locate a SQL procedure
%s.%s()\n",sch,nme);
+ msg = IOTprocedureStmt(cntxt, mb, sch, nme);
+ if (msg)
+ return msg;
+ s = findSymbolInModule(cntxt->nspace, putName(nme,
strlen(nme)));
+ if (s == NULL)
+ throw(SQL, "iot.query", "Definition missing");
+ qry = s->def;
} else if (pci->argc == 2){
// pre-create the new procedure
- sch = "iot";
+ sch = "user";
snprintf(name, IDLENGTH,"cquery_%d",iotquerycnt++);
def = *getArgReference_str(stk, pci, 1);
// package it as a procedure in the current schema [todo]
- snprintf(buf,BUFSIZ,"create procedure %s() begin %s;
end",name,def);
+ snprintf(buf,BUFSIZ,"create procedure %s.%s() begin %s;
end",sch,name,def);
_DEBUG_IOT_ fprintf(stderr,"#iot.compile: %s\n",buf);
nme = name;
msg = SQLstatementIntern(cntxt, &def, nme, 1, 0, 0);
if (msg)
return msg;
+ qry = cntxt->curprg->def;
}
- msg = IOTprocedureStmt(cntxt, mb, sch, nme);
- if (msg)
- return msg;
- s = findSymbolInModule(cntxt->nspace, putName(nme, strlen(nme)));
- if (s == NULL)
- throw(SQL, "iot.query", "Definition missing");
- qry = s->def;
-
_DEBUG_IOT_ fprintf(stderr,"#iot: bake a new continuous query plan\n");
scope = findModule(cntxt->nspace, putName(sch, strlen(sch)));
s = newFunction(putName(sch, strlen(sch)), putName(nme, strlen(nme)),
FUNCTIONsymbol);
diff --git a/sql/backends/monet5/iot/petrinet.c
b/sql/backends/monet5/iot/petrinet.c
--- a/sql/backends/monet5/iot/petrinet.c
+++ b/sql/backends/monet5/iot/petrinet.c
@@ -82,7 +82,7 @@ int pnettop = 0;
int enabled[MAXPN]; /*array that contains the id's of all queries that are
enable to fire*/
static int status = BSKTINIT;
-static int cycleDelay = 1; /* be careful, it affects response/throughput
timings */
+static int cycleDelay = 1000; /* be careful, it affects response/throughput
timings */
str PNanalyseWrapper(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
@@ -129,6 +129,7 @@ PNlocate(str modname, str fcnname)
break;
return i;
}
+
/* A transition is only allowed when all inputs are privately used */
str
PNregisterInternal(Client cntxt, MalBlkPtr mb)
@@ -148,6 +149,8 @@ PNregisterInternal(Client cntxt, MalBlkP
pnet[pnettop].modname = GDKstrdup(getModuleId(sig));
pnet[pnettop].fcnname = GDKstrdup(getFunctionId(sig));
+ pnet[pnettop].mb = mb;
+ pnet[pnettop].stk = prepareMALstack(mb, mb->vsize);
pnet[pnettop].status = BSKTPAUSE;
pnet[pnettop].cycles = 0;
@@ -283,7 +286,7 @@ PNexecute( void *n)
{
PNnode *node= (PNnode *) n;
_DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.executed
%s.%s\n",node->modname, node->fcnname);
- runMAL(mal_clients, node->mb, 0,0);
+ runMALsequence(mal_clients, node->mb, 1, 0, node->stk, 0, 0);
node->status = BSKTPAUSE;
}
@@ -306,10 +309,13 @@ PNcontroller(void *dummy)
status = BSKTRUNNING;
while( status != BSKTSTOP && pnettop > 0){
+ _DEBUG_PETRINET_ mnstr_printf(PNout, "#petrinet.controller
step\n");
if (cycleDelay)
MT_sleep_ms(cycleDelay); /* delay to make it more
tractable */
- while (status == BSKTPAUSE) /* scheduler is paused */
+ while (status == BSKTPAUSE) { /* scheduler is paused */
MT_sleep_ms(cycleDelay);
+ _DEBUG_PETRINET_ mnstr_printf(PNout,
"#petrinet.controller paused\n");
+ }
/* collect latest statistics, note that we don't need a lock
here,
because the count need not be accurate to the usec. It will
simply
@@ -393,10 +399,12 @@ PNcontroller(void *dummy)
}
}
}
+ /* after one sweep all threads should be released */
+ for (m = 0; m < k; m++) {
+ MT_join_thread(pnet[i].tid);
+ }
}
- MT_lock_set(&iotLock);
status = BSKTINIT;
- MT_lock_unset(&iotLock);
_DEBUG_PETRINET_ mnstr_flush(PNout);
(void) dummy;
}
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list