Changeset: ba3272475f41 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ba3272475f41
Modified Files:
        sql/backends/monet5/50_cquery.mal
        sql/backends/monet5/Tests/cquery05.sql
        sql/backends/monet5/sql_cquery.c
Branch: timetrails
Log Message:

First scheduler round working


diffs (truncated from 489 to 300 lines):

diff --git a/sql/backends/monet5/50_cquery.mal 
b/sql/backends/monet5/50_cquery.mal
--- a/sql/backends/monet5/50_cquery.mal
+++ b/sql/backends/monet5/50_cquery.mal
@@ -37,10 +37,10 @@ address CQpause
 comment "Deactivate all continuous queries";
 
 pattern deregister(mod:str, fcn:str)
-address CQrelease
+address CQderegister
 comment "Remove a continuous query";
 pattern deregister()
-address CQrelease
+address CQderegister
 comment "Remove all continuous queries";
 
 pattern wait(cnt:int)
diff --git a/sql/backends/monet5/Tests/cquery05.sql 
b/sql/backends/monet5/Tests/cquery05.sql
--- a/sql/backends/monet5/Tests/cquery05.sql
+++ b/sql/backends/monet5/Tests/cquery05.sql
@@ -1,10 +1,10 @@
 -- A simple continuous query over non-stream relations
 -- controlled by a heartbeat.
-create table tmp.result(i integer);
+create table cqresult05(i integer);
 
 create procedure cq_basic()
 begin
-       insert into tmp.result (select count(*) from tmp.result);
+       insert into cqresult05 (select count(*) from cqresult05);
 end;
 
 -- register the CQ
@@ -13,13 +13,13 @@ call cquery.register('sys','cq_basic');
 -- The scheduler executes this CQ every 50 milliseconds
 call cquery.heartbeat('sys','cq_basic',50);
 
--- reactivate all continuous queries
-call cquery.resume();
+-- reactivate this continuous query
+call cquery.resume('sys','cq_basic');
 call cquery.wait(2000);
-call cquery.pause();
+call cquery.pause('sys','cq_basic');
 
 select 'RESULT';
-select * from tmp.result;
+select * from cqresult05;
 
 select * from cquery.summary();
 select * from cquery.log();
@@ -28,4 +28,4 @@ select * from cquery.log();
 call cquery.deregister('sys','cq_basic');
 
 drop procedure cq_basic;
-drop table tmp.result;
+drop table cqresult05;
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -51,7 +51,6 @@
 
 #define MAXCQ 200           /* it is the minimum, if we need more space 
GDKrealloc */
 #define MAXSTREAMS 128         /* limit the number of stream columns to be 
looked after per query*/
-#define PNDELAY 20                     /* forced delay between PN scheduler 
cycles */
 
 static str statusname[7] = { "init", "register", "readytorun", "running", 
"waiting", "paused","stopping"};
 
@@ -71,6 +70,7 @@ static BAT *CQ_id_error = 0;
 #define STREAM_OUT     4
 
 typedef struct {
+       str mod,fcn;    /* The SQL command to be used */
        MalBlkPtr mb;   /* The wrapped query block call in a transaction */
        MalStkPtr stk;  /* Needed for execution */
 
@@ -89,7 +89,8 @@ typedef struct {
        lng beats;              /* heart beat stride for procedures activations 
*/
 
        MT_Id   tid;    /* Thread responsible */
-       timestamp seen; /* last executed */
+       lng             run;    /* last executed relative to start of server */
+       timestamp seen;
        str error;
        lng time;
 } CQnode;
@@ -98,7 +99,7 @@ CQnode pnet[MAXCQ];
 int pnettop = 0;
 
 static int pnstatus = CQINIT;
-static int cycleDelay = 50; /* be careful, it affects response/throughput 
timings */
+static int cycleDelay = 200; /* be careful, it affects response/throughput 
timings */
 static MT_Lock ttrLock MT_LOCK_INITIALIZER("cqueryLock");
 
 static void
@@ -113,7 +114,11 @@ CQfree(int idx)
                GDKfree(pnet[idx].schema[j]);
                GDKfree(pnet[idx].tables[j]);
                GDKfree(pnet[idx].column[j]);
+               if( pnet[idx].bats[j])
+                       BBPunfix(pnet[idx].bats[j]->batCacheid);
        }
+       GDKfree(pnet[idx].mod);
+       GDKfree(pnet[idx].fcn);
        memset((void*) (pnet+idx), 0, sizeof(CQnode));
 }
 
@@ -216,11 +221,9 @@ static int
 CQlocate(str modname, str fcnname)
 {
        int i;
-       InstrPtr sig;
 
        for (i = 0; i < pnettop; i++){
-               sig = getInstrPtr(pnet[i].mb,0);
-               if (strcmp(getModuleId(sig), modname) == 0 && 
strcmp(getFunctionId(sig), fcnname) == 0)
+               if (strcmp(pnet[i].mod, modname) == 0 && strcmp(pnet[i].fcn, 
fcnname) == 0)
                        return i;
        }
        return i;
@@ -363,6 +366,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M
 
 #ifdef DEBUG_CQUERY
        fprintf(stderr, "#cquery register %s.%s\n", 
getModuleId(sig),getFunctionId(sig));
+       fprintFunction(stderr,mb,0,LIST_MAL_ALL);
 #endif
        memset((void*) (pnet+pnettop), 0, sizeof(CQnode));
 
@@ -382,11 +386,14 @@ CQregister(Client cntxt, MalBlkPtr mb, M
 
        msg = CQanalysis(cntxt, mb, pnettop);
        if(msg == MAL_SUCCEED) {
+               pnet[pnettop].mod = GDKstrdup(modnme);
+               pnet[pnettop].fcn = GDKstrdup(fcnnme);
                pnet[pnettop].mb = nmb;
                pnet[pnettop].stk = prepareMALstack(nmb, nmb->vsize);
 
                pnet[pnettop].cycles = int_nil; 
                pnet[pnettop].beats = lng_nil;
+               pnet[pnettop].run  = lng_nil;
                pnet[pnettop].seen = *timestamp_nil;
                pnet[pnettop].status = CQPAUSE;
                pnettop++;
@@ -421,15 +428,12 @@ CQresume(Client cntxt, MalBlkPtr mb, Mal
 #endif
        MT_lock_set(&ttrLock);
        for( ; idx < last; idx++)
-       {
                pnet[idx].status = CQWAIT;
-       }
        MT_lock_unset(&ttrLock);
 
        /* start the scheduler if needed */
-       if(CQinit) {
+       if(CQinit == 0) {
                msg = CQstartScheduler();
-               CQinit =1;
        }
        return msg;
 }
@@ -455,9 +459,7 @@ CQpause(Client cntxt, MalBlkPtr mb, MalS
 #endif
        MT_lock_set(&ttrLock);
        for( ; idx < last; idx++)
-       {
                pnet[idx].status = CQPAUSE;
-       }
        MT_lock_unset(&ttrLock);
        return MAL_SUCCEED;
 }
@@ -485,9 +487,7 @@ CQcycles(Client cntxt, MalBlkPtr mb, Mal
 #endif
        MT_lock_set(&ttrLock);
        for( ; idx < last; idx++)
-       {
                pnet[idx].cycles = cycles;
-       }
        MT_lock_unset(&ttrLock);
        return MAL_SUCCEED;
 }
@@ -497,6 +497,7 @@ CQheartbeat(Client cntxt, MalBlkPtr mb, 
 {
        str sch, fcn;
        int beats, idx=0, last= pnettop;
+       str msg = MAL_SUCCEED;
        (void) cntxt;
        (void) mb;
 
@@ -514,16 +515,18 @@ CQheartbeat(Client cntxt, MalBlkPtr mb, 
        } else{
                beats = *getArgReference_int(stk,pci,1);
 #ifdef DEBUG_CQUERY
-               fprintf(stderr, "#set the heartbeat %d\n",beats);
+               fprintf(stderr, "#set the heartbeat %d ms\n",beats);
 #endif
        }
        MT_lock_set(&ttrLock);
        for( ; idx < last; idx++)
-       {
-               pnet[idx].beats = beats;
+               pnet[idx].beats = beats * 1000; /* minimal 1 ms */
+       MT_lock_unset(&ttrLock);
+       /* start the scheduler if needed */
+       if(CQinit == 0) {
+               msg = CQstartScheduler();
        }
-       MT_lock_unset(&ttrLock);
-       return MAL_SUCCEED;
+       return msg;
 }
 
 str
@@ -622,24 +625,27 @@ str
 CQdump(void *ret)
 {
        int i, k;
-       InstrPtr sig;
 
        fprintf(stderr, "#scheduler status %s\n", statusname[pnstatus]);
        for (i = 0; i < pnettop; i++) {
-               sig = getInstrPtr(pnet[i].mb,0);
-               fprintf(stderr, "#[%d]\t%s.%s %s\n",
-                               i, getModuleId(sig), getFunctionId(sig), 
statusname[pnet[i].status]);
-               if (pnet[i].error)
-                       fprintf(stderr, "#%s\n", pnet[i].error);
-               fprintf(stderr, "#streams ");
+               fprintf(stderr, "#[%d]\t%s.%s %s ",
+                               i, pnet[i].mod, pnet[i].fcn, 
statusname[pnet[i].status]);
+               if ( pnet[i].beats != lng_nil)
+                       fprintf(stderr, "beats="LLFMT" ", pnet[i].beats);
+                       
+               if( pnet[i].inout[0])
+                       fprintf(stderr, " streams ");
                for (k = 0; k < MAXSTREAMS && pnet[i].schema[k]; k++)
                if( pnet[i].inout[k] == STREAM_IN)
                        fprintf(stderr, "%s.%s ", pnet[i].schema[k], 
pnet[i].tables[k]);
-               fprintf(stderr, " --> ");
+               if( pnet[i].inout[0])
+                       fprintf(stderr, " --> ");
                for (k = 0; k < MAXSTREAMS && pnet[i].schema[k]; k++)
                if( pnet[i].inout[k] == STREAM_OUT)
                        fprintf(stderr, "%s.%s ", pnet[i].schema[k], 
pnet[i].tables[k]);
-               fprintf(stderr, "#\n ");
+               if (pnet[i].error)
+                       fprintf(stderr, " errors:%s", pnet[i].error);
+               fprintf(stderr, "\n");
        }
        (void) ret;
        return MAL_SUCCEED;
@@ -660,34 +666,24 @@ static void
 CQexecute( Client cntxt, int idx)
 {
        CQnode *node= pnet+ idx;
-       InstrPtr sig;
+       str msg;
 
        if( pnstatus != CQRUNNING)
                return;
-       sig = getInstrPtr(node->mb,0);
-#ifdef DEBUG_CQUERY
-       fprintf(stderr, "#cquery.execute %s.%s\n", getModuleId(sig), 
getFunctionId(sig));
-#endif
        // first grab exclusive access to all streams.
 
 #ifdef DEBUG_CQUERY
-       fprintf(stderr, "#cquery.execute %s.%s all locked\n",getModuleId(sig), 
getFunctionId(sig));
+       fprintf(stderr, "#cquery.execute %s.%s locked\n",node->mod, node->fcn);
        fprintFunction(stderr, node->mb, 0, LIST_MAL_NAME | LIST_MAL_VALUE  | 
LIST_MAL_MAPI);
 #endif
 
-       (void)runMALsequence(cntxt, node->mb, 1, 0, node->stk, 0, 0);
-
-#ifdef DEBUG_CQUERY
-       fprintf(stderr, "#cquery.execute %s.%s transition done:\n", 
getModuleId(sig), getFunctionId(sig));
-#endif
+       msg = runMALsequence(cntxt, node->mb, 1, 0, node->stk, 0, 0);
+       if( msg != MAL_SUCCEED)
+               pnet[idx].error = msg;
 
-       // remember the time last accessed
-       (void) MTIMEcurrent_timestamp(&node->seen);
-       
        // release all locks held
-
 #ifdef DEBUG_CQUERY
-       fprintf(stderr, "#cquery.execute %s.%s finished\n", getModuleId(sig), 
getFunctionId(sig));
+       fprintf(stderr, "#cquery.execute %s.%s finished\n", node->mod, 
node->fcn);
 #endif
        MT_lock_set(&ttrLock);
        if( node->status != CQPAUSE)
@@ -701,19 +697,18 @@ CQscheduler(void *dummy)
        int i, j;
        int k = -1;
        int pntasks;
+       int delay = cycleDelay;
        Client cntxt = (Client) dummy;
        str msg = MAL_SUCCEED;
-       lng t, analysis, now;
+       lng t, now;
        BAT *claimed[MAXSTREAMS];
-       timestamp ts, tn;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to