Changeset: fd1982bb1698 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=fd1982bb1698
Modified Files:
sql/backends/monet5/datacell/Tests/scenario00.sql
sql/backends/monet5/datacell/basket.c
sql/backends/monet5/datacell/datacell.c
sql/backends/monet5/datacell/emitter.c
sql/backends/monet5/datacell/petrinet.c
sql/backends/monet5/datacell/receptor.c
Branch: default
Log Message:
Cleanup state control
receptors, emitters, and queries can be selectively paused/restarted
diffs (truncated from 420 to 300 lines):
diff --git a/sql/backends/monet5/datacell/Tests/scenario00.sql
b/sql/backends/monet5/datacell/Tests/scenario00.sql
--- a/sql/backends/monet5/datacell/Tests/scenario00.sql
+++ b/sql/backends/monet5/datacell/Tests/scenario00.sql
@@ -18,9 +18,10 @@ call datacell.emitter('datacell.bsktout'
call datacell.query('datacell.pass', 'insert into datacell.bsktout select *
from datacell.bsktin;');
+select * from datacell.receptors(); select * from datacell.emitters(); select
* from datacell.queries(); select * from datacell.baskets();
+
call datacell.resume();
call datacell.dump();
-select * from datacell.receptors(); select * from datacell.emitters(); select
* from datacell.queries(); select * from datacell.baskets();
-- externally, activate the sensor
-- sensor --host=localhost --port=50500 --events=100 --columns=3 --delay=1
--trace
diff --git a/sql/backends/monet5/datacell/basket.c
b/sql/backends/monet5/datacell/basket.c
--- a/sql/backends/monet5/datacell/basket.c
+++ b/sql/backends/monet5/datacell/basket.c
@@ -406,7 +406,7 @@ BSKTgrab(Client cntxt, MalBlkPtr mb, Mal
ret = (int *) getArgReference(stk, pci, i);
b = baskets[bskt].primary[i];
bn = BATcopy(b, b->htype, b->ttype, TRUE);
- cnt = (int) BATcount(bn);
+ cnt = (int) BATcount(b);
BATclear(b, FALSE);
*ret = bn->batCacheid;
BBPkeepref(*ret);
@@ -632,6 +632,7 @@ BSKTtable(int *nameId, int *thresholdId,
BUNappend(winstride, &baskets[i].winstride, FALSE);
BUNappend(beat, &baskets[i].beat, FALSE);
BUNappend(seen, &baskets[i].seen, FALSE);
+ baskets[i].events = (int) BATcount(
baskets[i].primary[0]);
BUNappend(events, &baskets[i].events, FALSE);
BUNappend(timeslice, &baskets[i].timeslice, FALSE);
BUNappend(timestride, &baskets[i].timestride, FALSE);
diff --git a/sql/backends/monet5/datacell/datacell.c
b/sql/backends/monet5/datacell/datacell.c
--- a/sql/backends/monet5/datacell/datacell.c
+++ b/sql/backends/monet5/datacell/datacell.c
@@ -323,6 +323,8 @@ DCpauseScheduler(Client cntxt, MalBlkPtr
(void) mb;
(void) stk;
(void) pci;
+ RCpause(&ret);
+ EMpause(&ret);
return PNpauseScheduler(&ret);
}
diff --git a/sql/backends/monet5/datacell/emitter.c
b/sql/backends/monet5/datacell/emitter.c
--- a/sql/backends/monet5/datacell/emitter.c
+++ b/sql/backends/monet5/datacell/emitter.c
@@ -170,7 +170,7 @@ EMemitterStartInternal(int *ret, str *tb
em->table.format[j - 1].sep = GDKstrdup("\n");
em->table.format[j - 1].seplen = (int) strlen(em->table.format[j -
1].sep);
em->table.nr_attrs = j;
- em->status = BSKTINIT;
+ em->status = BSKTPAUSE;
(void) ret;
#ifdef _DEBUG_EMITTER_
@@ -193,8 +193,6 @@ str EMemitterPause(int *ret, str *nme)
em = EMfind(*nme);
if (em == NULL)
throw(MAL, "emitter.pause", "Emitter not defined");
- if (em->status != BSKTRUNNING)
- throw(MAL, "emitter.pause", "Emitter not running");
em->status = BSKTPAUSE;
@@ -226,7 +224,8 @@ EMpause(int *ret)
Emitter em;
str msg= MAL_SUCCEED;
for (em = emAnchor; em && msg == MAL_SUCCEED; em = em->nxt)
- if (em->status != BSKTINIT) msg = EMemitterPause(ret,
&em->name);
+ if (em->status == BSKTRUNNING)
+ msg = EMemitterPause(ret, &em->name);
return msg;
}
@@ -236,7 +235,7 @@ EMresume(int *ret)
Emitter em;
str msg= MAL_SUCCEED;
for (em = emAnchor; em && msg == MAL_SUCCEED; em = em->nxt)
- if (em->status == BSKTINIT)
+ if (em->status == BSKTPAUSE)
msg= EMemitterResume(ret, &em->name);
return msg;
}
@@ -325,13 +324,16 @@ bodyRestart:
* Consume each event and store the result.
* If the thread is suspended, we sleep for at least one second.
*/
- em->status = BSKTRUNNING;
for (ret = 1; ret >= 0;) {
- while (em->status == BSKTPAUSE) {
+ if ( em->status == BSKTPAUSE){
#ifdef _DEBUG_EMITTER_
- mnstr_printf(EMout, "#Pause emitter\n");
+ mnstr_printf(EMout, "#Pause emitter %s\n",em->name);
#endif
- MT_sleep_ms(em->delay);
+ while (em->status == BSKTPAUSE)
+ MT_sleep_ms(em->delay);
+#ifdef _DEBUG_EMITTER_
+ mnstr_printf(EMout, "#Pause emitter %s
ended\n",em->name);
+#endif
}
if (em->status == BSKTSTOP) {
/* request to finalize the emitter*/
@@ -363,8 +365,6 @@ bodyRestart:
if ((cnt = BATcount(em->table.format[0].c[0]))) {
MTIMEcurrent_timestamp(&baskets[em->bskt].seen);
em->cycles++;
- if (em->status != BSKTRUNNING)
- break;
cnt = BATcount(em->table.format[1].c[0]);
#ifdef _DEBUG_EMITTER_
diff --git a/sql/backends/monet5/datacell/petrinet.c
b/sql/backends/monet5/datacell/petrinet.c
--- a/sql/backends/monet5/datacell/petrinet.c
+++ b/sql/backends/monet5/datacell/petrinet.c
@@ -118,15 +118,7 @@ int pnettop = 0;
int *enabled; /*array that contains the id's of all queries that are
enable to fire*/
-#define PNINIT 3
-#define PNRUNNING 2
-#define PNPAUSE 1
-#define PNSTOPPED 0
-
-#define PNWAITING 4
-static char *statusnames[7] = { "stopped", "paused", "running", "initialize",
"waiting"};
-
-static int status = PNINIT;
+static int status = BSKTINIT;
static int cycleDelay = 10; /* be careful, it affects response/throughput
timings */
str PNstartThread(int *ret);
@@ -192,7 +184,7 @@ str PNregister(Client cntxt, MalBlkPtr m
else
pnet[pnettop].def = GDKstrdup("");
- pnet[pnettop].status = PNWAITING;
+ pnet[pnettop].status = BSKTPAUSE;
pnet[pnettop].cycles = 0;
pnet[pnettop].seen = *timestamp_nil;
/* all the rest is zero */
@@ -201,7 +193,7 @@ str PNregister(Client cntxt, MalBlkPtr m
pnettop++;
msg = PNanalysis(cntxt, s->def);
/* start the scheduler if analysis does not show errors */
- if ( msg == MAL_SUCCEED && status == PNINIT)
+ if ( msg == MAL_SUCCEED && status == BSKTINIT)
return PNstartThread(ret);
return msg;
}
@@ -217,14 +209,14 @@ PNpauseQuery(Client cntxt, MalBlkPtr mb,
for ( i = 0; i < pnettop; i++)
if ( strcmp(qry, pnet[i].name) == 0){
/* stop the query first */
- pnet[i].status = PNPAUSE;
+ pnet[i].status = BSKTPAUSE;
return MAL_SUCCEED;
}
snprintf(buf,BUFSIZ,"datacell.%s", qry);
for ( i = 0; i < pnettop; i++)
if ( strcmp(buf, pnet[i].name) == 0){
/* stop the query first */
- pnet[i].status = PNPAUSE;
+ pnet[i].status = BSKTPAUSE;
return MAL_SUCCEED;
}
if (pnettop)
@@ -243,14 +235,14 @@ PNresumeQuery(Client cntxt, MalBlkPtr mb
for ( i = 0; i < pnettop; i++)
if ( strcmp(qry, pnet[i].name) == 0){
/* stop the query first */
- pnet[i].status = PNWAITING;
+ pnet[i].status = BSKTRUNNING;
return MAL_SUCCEED;
}
snprintf(buf,BUFSIZ,"datacell.%s", qry);
for ( i = 0; i < pnettop; i++)
if ( strcmp(buf, pnet[i].name) == 0){
/* stop the query first */
- pnet[i].status = PNWAITING;
+ pnet[i].status = BSKTRUNNING;
return MAL_SUCCEED;
}
throw(SQL,"datacell.pause","Basket or query not found");
@@ -271,7 +263,7 @@ PNremove(Client cntxt, MalBlkPtr mb, Mal
BSKTtolower(modnme);
BSKTtolower(fcnnme);
- if (status != PNPAUSE)
+ if (status != BSKTPAUSE)
throw(MAL, "datacell.remove", "Scheduler should be paused
first");
(void) mb;
/* check for a continous query */
@@ -280,7 +272,7 @@ PNremove(Client cntxt, MalBlkPtr mb, Mal
s = findSymbolInModule(scope, putName(fcnnme, (int)
strlen(fcnnme)));
if (s == NULL)
throw(SQL, "datacell.remove", "Continuous query found");
- PNPAUSEScheduler(&ret);
+ BSKTPAUSEScheduler(&ret);
for (i = j = 0; i < pnettop; i++)
if (strcmp(nme, pnet[i].name) == 0) {} else
pnet[j++] = pnet[i];
@@ -295,13 +287,13 @@ str PNstopScheduler(int *ret)
int i = 0, j = pnettop;
pnettop = 0; /* don't look at it anymore */
MT_lock_set(&petriLock, "pncontroller");
- status = PNSTOPPED;
+ status = BSKTSTOP;
MT_lock_unset(&petriLock, "pncontroller");
i = 0;
do {
MT_sleep_ms(cycleDelay + 1); /* delay to make it more
tractable */
i++;
- } while (i < 100 && status != PNINIT);
+ } while (i < 100 && status != BSKTINIT);
if (i == 100)
throw(MAL, "datacell.stop", "reset scheduler time out");
for (j--; j >= 0; j--) {
@@ -313,23 +305,22 @@ str PNstopScheduler(int *ret)
str PNresumeScheduler(int *ret)
{
- if (status == PNRUNNING)
- return MAL_SUCCEED;
- if (status == PNPAUSE)
- status = PNRUNNING;
- if (status == PNINIT)
- return PNstartThread(ret);
+ int i;
+
+ for( i =0; i< pnettop; i++)
+ pnet[i].status = BSKTRUNNING;
+ status = BSKTRUNNING;
+ (void) ret;
return MAL_SUCCEED;
}
str PNpauseScheduler(int *ret)
{
- if (status == PNPAUSE)
- return MAL_SUCCEED;
- status = PNPAUSE;
- do
- MT_sleep_ms(cycleDelay); /* delay to make it more tractable */
- while (status == PNRUNNING);
+ int i;
+
+ for( i =0; i< pnettop; i++)
+ pnet[i].status = BSKTPAUSE;
+ status = BSKTPAUSE;
(void) ret;
return MAL_SUCCEED;
}
@@ -337,10 +328,10 @@ str PNpauseScheduler(int *ret)
str PNdump(int *ret)
{
int i, k;
- mnstr_printf(PNout, "#scheduler status %s\n", statusnames[status]);
+ mnstr_printf(PNout, "#scheduler status %s\n", statusname[status]);
for (i = 0; i < pnettop; i++) {
mnstr_printf(PNout, "#[%d]\t%s %s delay %d cycles %d events %d
time " LLFMT " ms\n",
- i, pnet[i].name, statusnames[pnet[i].status],
pnet[i].delay, pnet[i].cycles, pnet[i].events, pnet[i].time / 1000);
+ i, pnet[i].name, statusname[pnet[i].status],
pnet[i].delay, pnet[i].cycles, pnet[i].events, pnet[i].time / 1000);
if (pnet[i].error)
mnstr_printf(PNout, "#%s\n", pnet[i].error);
for (k = 0; k < pnet[i].srctop; k++)
@@ -519,23 +510,22 @@ PNcontroller(void *dummy)
#ifdef _DEBUG_PETRINET_
printFunction(cntxt->fdout, mb, 0, LIST_MAL_ALL);
#endif
- status = PNRUNNING;
do {
if (cycleDelay)
MT_sleep_ms(cycleDelay); /* delay to make it more
tractable */
- while (status == PNPAUSE)
+ while (status == BSKTPAUSE)
;
MT_lock_set(&petriLock, "pncontroller");
- if (status != PNSTOPPED)
+ if (status != BSKTSTOP)
/* collect latest statistics, note that we don't need a
lock here,
because the count need not be accurate to the usec.
It will simply
come back. We also only have to check the sources
that are marked
empty. */
- status = PNRUNNING;
+ status = BSKTRUNNING;
MT_lock_unset(&petriLock, "pncontroller");
now = GDKusec();
- for (k = i = 0; status == PNRUNNING && i < pnettop; i++)
- if ( pnet[i].status != PNPAUSE ){
+ for (k = i = 0; status == BSKTRUNNING && i < pnettop; i++)
+ if ( pnet[i].status != BSKTPAUSE ){
pnet[i].available = 0;
pnet[i].enabled = 0;
for (j = 0; j < pnet[i].srctop; j++) {
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list