Changeset: fd1982bb1698 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=fd1982bb1698
Modified Files:
        sql/backends/monet5/datacell/Tests/scenario00.sql
        sql/backends/monet5/datacell/basket.c
        sql/backends/monet5/datacell/datacell.c
        sql/backends/monet5/datacell/emitter.c
        sql/backends/monet5/datacell/petrinet.c
        sql/backends/monet5/datacell/receptor.c
Branch: default
Log Message:

Cleanup state control
receptors, emitters, and queries can be selectively paused/restarted


diffs (truncated from 420 to 300 lines):

diff --git a/sql/backends/monet5/datacell/Tests/scenario00.sql 
b/sql/backends/monet5/datacell/Tests/scenario00.sql
--- a/sql/backends/monet5/datacell/Tests/scenario00.sql
+++ b/sql/backends/monet5/datacell/Tests/scenario00.sql
@@ -18,9 +18,10 @@ call datacell.emitter('datacell.bsktout'
 
 call datacell.query('datacell.pass', 'insert into datacell.bsktout select * 
from datacell.bsktin;');
 
+select * from datacell.receptors(); select * from datacell.emitters(); select 
* from datacell.queries(); select * from datacell.baskets();
+
 call datacell.resume();
 call datacell.dump();
-select * from datacell.receptors(); select * from datacell.emitters(); select 
* from datacell.queries(); select * from datacell.baskets();
 
 -- externally, activate the sensor 
 -- sensor --host=localhost --port=50500 --events=100 --columns=3 --delay=1 
--trace
diff --git a/sql/backends/monet5/datacell/basket.c 
b/sql/backends/monet5/datacell/basket.c
--- a/sql/backends/monet5/datacell/basket.c
+++ b/sql/backends/monet5/datacell/basket.c
@@ -406,7 +406,7 @@ BSKTgrab(Client cntxt, MalBlkPtr mb, Mal
                        ret = (int *) getArgReference(stk, pci, i);
                        b = baskets[bskt].primary[i];
                        bn = BATcopy(b, b->htype, b->ttype, TRUE);
-                       cnt = (int) BATcount(bn);
+                       cnt = (int) BATcount(b);
                        BATclear(b, FALSE);
                        *ret = bn->batCacheid;
                        BBPkeepref(*ret);
@@ -632,6 +632,7 @@ BSKTtable(int *nameId, int *thresholdId,
                        BUNappend(winstride, &baskets[i].winstride, FALSE);
                        BUNappend(beat, &baskets[i].beat, FALSE);
                        BUNappend(seen, &baskets[i].seen, FALSE);
+                       baskets[i].events = (int) BATcount( 
baskets[i].primary[0]);
                        BUNappend(events, &baskets[i].events, FALSE);
                        BUNappend(timeslice, &baskets[i].timeslice, FALSE);
                        BUNappend(timestride, &baskets[i].timestride, FALSE);
diff --git a/sql/backends/monet5/datacell/datacell.c 
b/sql/backends/monet5/datacell/datacell.c
--- a/sql/backends/monet5/datacell/datacell.c
+++ b/sql/backends/monet5/datacell/datacell.c
@@ -323,6 +323,8 @@ DCpauseScheduler(Client cntxt, MalBlkPtr
        (void) mb;
        (void) stk;
        (void) pci;
+       RCpause(&ret);
+       EMpause(&ret);
        return PNpauseScheduler(&ret);
 }
 
diff --git a/sql/backends/monet5/datacell/emitter.c 
b/sql/backends/monet5/datacell/emitter.c
--- a/sql/backends/monet5/datacell/emitter.c
+++ b/sql/backends/monet5/datacell/emitter.c
@@ -170,7 +170,7 @@ EMemitterStartInternal(int *ret, str *tb
        em->table.format[j - 1].sep = GDKstrdup("\n");
        em->table.format[j - 1].seplen = (int) strlen(em->table.format[j - 
1].sep);
        em->table.nr_attrs = j;
-       em->status = BSKTINIT;
+       em->status = BSKTPAUSE;
 
        (void) ret;
 #ifdef _DEBUG_EMITTER_
@@ -193,8 +193,6 @@ str EMemitterPause(int *ret, str *nme)
        em = EMfind(*nme);
        if (em == NULL)
                throw(MAL, "emitter.pause", "Emitter not defined");
-       if (em->status != BSKTRUNNING)
-               throw(MAL, "emitter.pause", "Emitter not running");
 
        em->status = BSKTPAUSE;
 
@@ -226,7 +224,8 @@ EMpause(int *ret)
        Emitter em;
        str msg= MAL_SUCCEED;
        for (em = emAnchor; em && msg == MAL_SUCCEED; em = em->nxt)
-               if (em->status != BSKTINIT) msg = EMemitterPause(ret, 
&em->name);
+               if (em->status == BSKTRUNNING) 
+                       msg = EMemitterPause(ret, &em->name);
        return msg;
 }
 
@@ -236,7 +235,7 @@ EMresume(int *ret)
        Emitter em;
        str msg= MAL_SUCCEED;
        for (em = emAnchor; em && msg == MAL_SUCCEED; em = em->nxt)
-               if (em->status == BSKTINIT)
+               if (em->status == BSKTPAUSE)
                        msg= EMemitterResume(ret, &em->name);
        return msg;
 }
@@ -325,13 +324,16 @@ bodyRestart:
         * Consume each event and store the result.
         * If the thread is suspended, we sleep for at least one second.
         */
-       em->status = BSKTRUNNING;
        for (ret = 1; ret >= 0;) {
-               while (em->status == BSKTPAUSE) {
+               if ( em->status == BSKTPAUSE){
 #ifdef _DEBUG_EMITTER_
-                       mnstr_printf(EMout, "#Pause emitter\n");
+                       mnstr_printf(EMout, "#Pause emitter %s\n",em->name);
 #endif
-                       MT_sleep_ms(em->delay);
+                       while (em->status == BSKTPAUSE)
+                               MT_sleep_ms(em->delay);
+#ifdef _DEBUG_EMITTER_
+                       mnstr_printf(EMout, "#Pause emitter %s 
ended\n",em->name);
+#endif
                }
                if (em->status == BSKTSTOP) {
                        /* request to finalize the emitter*/
@@ -363,8 +365,6 @@ bodyRestart:
                if ((cnt = BATcount(em->table.format[0].c[0]))) {
                        MTIMEcurrent_timestamp(&baskets[em->bskt].seen);
                        em->cycles++;
-                       if (em->status != BSKTRUNNING)
-                               break;
 
                        cnt = BATcount(em->table.format[1].c[0]);
 #ifdef _DEBUG_EMITTER_
diff --git a/sql/backends/monet5/datacell/petrinet.c 
b/sql/backends/monet5/datacell/petrinet.c
--- a/sql/backends/monet5/datacell/petrinet.c
+++ b/sql/backends/monet5/datacell/petrinet.c
@@ -118,15 +118,7 @@ int pnettop = 0;
 
 int *enabled;     /*array that contains the id's of all queries that are 
enable to fire*/
 
-#define PNINIT 3
-#define PNRUNNING 2
-#define PNPAUSE 1
-#define PNSTOPPED 0
-
-#define PNWAITING 4
-static char *statusnames[7] = { "stopped", "paused", "running", "initialize", 
"waiting"};
-
-static int status = PNINIT;
+static int status = BSKTINIT;
 static int cycleDelay = 10; /* be careful, it affects response/throughput 
timings */
 
 str PNstartThread(int *ret);
@@ -192,7 +184,7 @@ str PNregister(Client cntxt, MalBlkPtr m
        else
                pnet[pnettop].def = GDKstrdup("");
 
-       pnet[pnettop].status = PNWAITING;
+       pnet[pnettop].status = BSKTPAUSE;
        pnet[pnettop].cycles = 0;
        pnet[pnettop].seen = *timestamp_nil;
        /* all the rest is zero */
@@ -201,7 +193,7 @@ str PNregister(Client cntxt, MalBlkPtr m
        pnettop++;
        msg = PNanalysis(cntxt, s->def);
        /* start the scheduler if analysis does not show errors */
-       if ( msg == MAL_SUCCEED && status == PNINIT)
+       if ( msg == MAL_SUCCEED && status == BSKTINIT)
                return PNstartThread(ret);
        return msg;
 }
@@ -217,14 +209,14 @@ PNpauseQuery(Client cntxt, MalBlkPtr mb,
        for ( i = 0; i < pnettop; i++)
        if ( strcmp(qry, pnet[i].name) == 0){
                /* stop the query first */
-               pnet[i].status = PNPAUSE;
+               pnet[i].status = BSKTPAUSE;
                return MAL_SUCCEED;
        }
        snprintf(buf,BUFSIZ,"datacell.%s", qry);
        for ( i = 0; i < pnettop; i++)
        if ( strcmp(buf, pnet[i].name) == 0){
                /* stop the query first */
-               pnet[i].status = PNPAUSE;
+               pnet[i].status = BSKTPAUSE;
                return MAL_SUCCEED;
        }
        if (pnettop)
@@ -243,14 +235,14 @@ PNresumeQuery(Client cntxt, MalBlkPtr mb
        for ( i = 0; i < pnettop; i++)
        if ( strcmp(qry, pnet[i].name) == 0){
                /* stop the query first */
-               pnet[i].status = PNWAITING;
+               pnet[i].status = BSKTRUNNING;
                return MAL_SUCCEED;
        }
        snprintf(buf,BUFSIZ,"datacell.%s", qry);
        for ( i = 0; i < pnettop; i++)
        if ( strcmp(buf, pnet[i].name) == 0){
                /* stop the query first */
-               pnet[i].status = PNWAITING;
+               pnet[i].status = BSKTRUNNING;
                return MAL_SUCCEED;
        }
        throw(SQL,"datacell.pause","Basket or query not found");
@@ -271,7 +263,7 @@ PNremove(Client cntxt, MalBlkPtr mb, Mal
        BSKTtolower(modnme);
        BSKTtolower(fcnnme);
 
-       if (status != PNPAUSE)
+       if (status != BSKTPAUSE)
                throw(MAL, "datacell.remove", "Scheduler should be paused 
first");
        (void) mb;
        /* check for a continous query */
@@ -280,7 +272,7 @@ PNremove(Client cntxt, MalBlkPtr mb, Mal
                s = findSymbolInModule(scope, putName(fcnnme, (int) 
strlen(fcnnme)));
        if (s == NULL)
                throw(SQL, "datacell.remove", "Continuous query found");
-       PNPAUSEScheduler(&ret);
+       BSKTPAUSEScheduler(&ret);
        for (i = j = 0; i < pnettop; i++)
                if (strcmp(nme, pnet[i].name) == 0) {} else
                        pnet[j++] = pnet[i];
@@ -295,13 +287,13 @@ str PNstopScheduler(int *ret)
        int i = 0, j = pnettop;
        pnettop = 0;    /* don't look at it anymore */
        MT_lock_set(&petriLock, "pncontroller");
-       status = PNSTOPPED;
+       status = BSKTSTOP;
        MT_lock_unset(&petriLock, "pncontroller");
        i = 0;
        do {
                MT_sleep_ms(cycleDelay + 1);  /* delay to make it more 
tractable */
                i++;
-       } while (i < 100 && status != PNINIT);
+       } while (i < 100 && status != BSKTINIT);
        if (i == 100)
                throw(MAL, "datacell.stop", "reset scheduler time out");
        for (j--; j >= 0; j--) {
@@ -313,23 +305,22 @@ str PNstopScheduler(int *ret)
 
 str PNresumeScheduler(int *ret)
 {
-       if (status == PNRUNNING)
-               return MAL_SUCCEED;
-       if (status == PNPAUSE)
-               status = PNRUNNING;
-       if (status == PNINIT)
-               return PNstartThread(ret);
+       int i;
+
+       for( i =0; i< pnettop; i++)
+               pnet[i].status = BSKTRUNNING;
+       status = BSKTRUNNING;
+       (void) ret;
        return MAL_SUCCEED;
 }
 
 str PNpauseScheduler(int *ret)
 {
-       if (status == PNPAUSE)
-               return MAL_SUCCEED;
-       status = PNPAUSE;
-       do
-               MT_sleep_ms(cycleDelay);  /* delay to make it more tractable */
-       while (status == PNRUNNING);
+       int i;
+
+       for( i =0; i< pnettop; i++)
+               pnet[i].status = BSKTPAUSE;
+       status = BSKTPAUSE;
        (void) ret;
        return MAL_SUCCEED;
 }
@@ -337,10 +328,10 @@ str PNpauseScheduler(int *ret)
 str PNdump(int *ret)
 {
        int i, k;
-       mnstr_printf(PNout, "#scheduler status %s\n", statusnames[status]);
+       mnstr_printf(PNout, "#scheduler status %s\n", statusname[status]);
        for (i = 0; i < pnettop; i++) {
                mnstr_printf(PNout, "#[%d]\t%s %s delay %d cycles %d events %d 
time " LLFMT " ms\n",
-                               i, pnet[i].name, statusnames[pnet[i].status], 
pnet[i].delay, pnet[i].cycles, pnet[i].events, pnet[i].time / 1000);
+                               i, pnet[i].name, statusname[pnet[i].status], 
pnet[i].delay, pnet[i].cycles, pnet[i].events, pnet[i].time / 1000);
                if (pnet[i].error)
                        mnstr_printf(PNout, "#%s\n", pnet[i].error);
                for (k = 0; k < pnet[i].srctop; k++)
@@ -519,23 +510,22 @@ PNcontroller(void *dummy)
 #ifdef _DEBUG_PETRINET_
        printFunction(cntxt->fdout, mb, 0, LIST_MAL_ALL);
 #endif
-       status = PNRUNNING;
        do {
                if (cycleDelay)
                        MT_sleep_ms(cycleDelay);  /* delay to make it more 
tractable */
-               while (status == PNPAUSE)
+               while (status == BSKTPAUSE)
                        ;
                MT_lock_set(&petriLock, "pncontroller");
-               if (status != PNSTOPPED)
+               if (status != BSKTSTOP)
                        /* collect latest statistics, note that we don't need a 
lock here,
                           because the count need not be accurate to the usec. 
It will simply
                           come back. We also only have to check the sources 
that are marked
                           empty. */
-                       status = PNRUNNING;
+                       status = BSKTRUNNING;
                MT_lock_unset(&petriLock, "pncontroller");
                now = GDKusec();
-               for (k = i = 0; status == PNRUNNING && i < pnettop; i++) 
-               if ( pnet[i].status != PNPAUSE ){
+               for (k = i = 0; status == BSKTRUNNING && i < pnettop; i++) 
+               if ( pnet[i].status != BSKTPAUSE ){
                        pnet[i].available = 0;
                        pnet[i].enabled = 0;
                        for (j = 0; j < pnet[i].srctop; j++) {
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to