Changeset: ba3272475f41 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ba3272475f41
Modified Files:
sql/backends/monet5/50_cquery.mal
sql/backends/monet5/Tests/cquery05.sql
sql/backends/monet5/sql_cquery.c
Branch: timetrails
Log Message:
First scheduler round working
diffs (truncated from 489 to 300 lines):
diff --git a/sql/backends/monet5/50_cquery.mal
b/sql/backends/monet5/50_cquery.mal
--- a/sql/backends/monet5/50_cquery.mal
+++ b/sql/backends/monet5/50_cquery.mal
@@ -37,10 +37,10 @@ address CQpause
comment "Deactivate all continuous queries";
pattern deregister(mod:str, fcn:str)
-address CQrelease
+address CQderegister
comment "Remove a continuous query";
pattern deregister()
-address CQrelease
+address CQderegister
comment "Remove all continuous queries";
pattern wait(cnt:int)
diff --git a/sql/backends/monet5/Tests/cquery05.sql
b/sql/backends/monet5/Tests/cquery05.sql
--- a/sql/backends/monet5/Tests/cquery05.sql
+++ b/sql/backends/monet5/Tests/cquery05.sql
@@ -1,10 +1,10 @@
-- A simple continuous query over non-stream relations
-- controlled by a heartbeat.
-create table tmp.result(i integer);
+create table cqresult05(i integer);
create procedure cq_basic()
begin
- insert into tmp.result (select count(*) from tmp.result);
+ insert into cqresult05 (select count(*) from cqresult05);
end;
-- register the CQ
@@ -13,13 +13,13 @@ call cquery.register('sys','cq_basic');
-- The scheduler executes this CQ every 50 milliseconds
call cquery.heartbeat('sys','cq_basic',50);
--- reactivate all continuous queries
-call cquery.resume();
+-- reactivate this continuous query
+call cquery.resume('sys','cq_basic');
call cquery.wait(2000);
-call cquery.pause();
+call cquery.pause('sys','cq_basic');
select 'RESULT';
-select * from tmp.result;
+select * from cqresult05;
select * from cquery.summary();
select * from cquery.log();
@@ -28,4 +28,4 @@ select * from cquery.log();
call cquery.deregister('sys','cq_basic');
drop procedure cq_basic;
-drop table tmp.result;
+drop table cqresult05;
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -51,7 +51,6 @@
#define MAXCQ 200 /* it is the minimum, if we need more space
GDKrealloc */
#define MAXSTREAMS 128 /* limit the number of stream columns to be
looked after per query*/
-#define PNDELAY 20 /* forced delay between PN scheduler
cycles */
static str statusname[7] = { "init", "register", "readytorun", "running",
"waiting", "paused","stopping"};
@@ -71,6 +70,7 @@ static BAT *CQ_id_error = 0;
#define STREAM_OUT 4
typedef struct {
+ str mod,fcn; /* The SQL command to be used */
MalBlkPtr mb; /* The wrapped query block call in a transaction */
MalStkPtr stk; /* Needed for execution */
@@ -89,7 +89,8 @@ typedef struct {
lng beats; /* heart beat stride for procedures activations
*/
MT_Id tid; /* Thread responsible */
- timestamp seen; /* last executed */
+ lng run; /* last executed relative to start of server */
+ timestamp seen;
str error;
lng time;
} CQnode;
@@ -98,7 +99,7 @@ CQnode pnet[MAXCQ];
int pnettop = 0;
static int pnstatus = CQINIT;
-static int cycleDelay = 50; /* be careful, it affects response/throughput
timings */
+static int cycleDelay = 200; /* be careful, it affects response/throughput
timings */
static MT_Lock ttrLock MT_LOCK_INITIALIZER("cqueryLock");
static void
@@ -113,7 +114,11 @@ CQfree(int idx)
GDKfree(pnet[idx].schema[j]);
GDKfree(pnet[idx].tables[j]);
GDKfree(pnet[idx].column[j]);
+ if( pnet[idx].bats[j])
+ BBPunfix(pnet[idx].bats[j]->batCacheid);
}
+ GDKfree(pnet[idx].mod);
+ GDKfree(pnet[idx].fcn);
memset((void*) (pnet+idx), 0, sizeof(CQnode));
}
@@ -216,11 +221,9 @@ static int
CQlocate(str modname, str fcnname)
{
int i;
- InstrPtr sig;
for (i = 0; i < pnettop; i++){
- sig = getInstrPtr(pnet[i].mb,0);
- if (strcmp(getModuleId(sig), modname) == 0 &&
strcmp(getFunctionId(sig), fcnname) == 0)
+ if (strcmp(pnet[i].mod, modname) == 0 && strcmp(pnet[i].fcn,
fcnname) == 0)
return i;
}
return i;
@@ -363,6 +366,7 @@ CQregister(Client cntxt, MalBlkPtr mb, M
#ifdef DEBUG_CQUERY
fprintf(stderr, "#cquery register %s.%s\n",
getModuleId(sig),getFunctionId(sig));
+ fprintFunction(stderr,mb,0,LIST_MAL_ALL);
#endif
memset((void*) (pnet+pnettop), 0, sizeof(CQnode));
@@ -382,11 +386,14 @@ CQregister(Client cntxt, MalBlkPtr mb, M
msg = CQanalysis(cntxt, mb, pnettop);
if(msg == MAL_SUCCEED) {
+ pnet[pnettop].mod = GDKstrdup(modnme);
+ pnet[pnettop].fcn = GDKstrdup(fcnnme);
pnet[pnettop].mb = nmb;
pnet[pnettop].stk = prepareMALstack(nmb, nmb->vsize);
pnet[pnettop].cycles = int_nil;
pnet[pnettop].beats = lng_nil;
+ pnet[pnettop].run = lng_nil;
pnet[pnettop].seen = *timestamp_nil;
pnet[pnettop].status = CQPAUSE;
pnettop++;
@@ -421,15 +428,12 @@ CQresume(Client cntxt, MalBlkPtr mb, Mal
#endif
MT_lock_set(&ttrLock);
for( ; idx < last; idx++)
- {
pnet[idx].status = CQWAIT;
- }
MT_lock_unset(&ttrLock);
/* start the scheduler if needed */
- if(CQinit) {
+ if(CQinit == 0) {
msg = CQstartScheduler();
- CQinit =1;
}
return msg;
}
@@ -455,9 +459,7 @@ CQpause(Client cntxt, MalBlkPtr mb, MalS
#endif
MT_lock_set(&ttrLock);
for( ; idx < last; idx++)
- {
pnet[idx].status = CQPAUSE;
- }
MT_lock_unset(&ttrLock);
return MAL_SUCCEED;
}
@@ -485,9 +487,7 @@ CQcycles(Client cntxt, MalBlkPtr mb, Mal
#endif
MT_lock_set(&ttrLock);
for( ; idx < last; idx++)
- {
pnet[idx].cycles = cycles;
- }
MT_lock_unset(&ttrLock);
return MAL_SUCCEED;
}
@@ -497,6 +497,7 @@ CQheartbeat(Client cntxt, MalBlkPtr mb,
{
str sch, fcn;
int beats, idx=0, last= pnettop;
+ str msg = MAL_SUCCEED;
(void) cntxt;
(void) mb;
@@ -514,16 +515,18 @@ CQheartbeat(Client cntxt, MalBlkPtr mb,
} else{
beats = *getArgReference_int(stk,pci,1);
#ifdef DEBUG_CQUERY
- fprintf(stderr, "#set the heartbeat %d\n",beats);
+ fprintf(stderr, "#set the heartbeat %d ms\n",beats);
#endif
}
MT_lock_set(&ttrLock);
for( ; idx < last; idx++)
- {
- pnet[idx].beats = beats;
+ pnet[idx].beats = beats * 1000; /* minimal 1 ms */
+ MT_lock_unset(&ttrLock);
+ /* start the scheduler if needed */
+ if(CQinit == 0) {
+ msg = CQstartScheduler();
}
- MT_lock_unset(&ttrLock);
- return MAL_SUCCEED;
+ return msg;
}
str
@@ -622,24 +625,27 @@ str
CQdump(void *ret)
{
int i, k;
- InstrPtr sig;
fprintf(stderr, "#scheduler status %s\n", statusname[pnstatus]);
for (i = 0; i < pnettop; i++) {
- sig = getInstrPtr(pnet[i].mb,0);
- fprintf(stderr, "#[%d]\t%s.%s %s\n",
- i, getModuleId(sig), getFunctionId(sig),
statusname[pnet[i].status]);
- if (pnet[i].error)
- fprintf(stderr, "#%s\n", pnet[i].error);
- fprintf(stderr, "#streams ");
+ fprintf(stderr, "#[%d]\t%s.%s %s ",
+ i, pnet[i].mod, pnet[i].fcn,
statusname[pnet[i].status]);
+ if ( pnet[i].beats != lng_nil)
+ fprintf(stderr, "beats="LLFMT" ", pnet[i].beats);
+
+ if( pnet[i].inout[0])
+ fprintf(stderr, " streams ");
for (k = 0; k < MAXSTREAMS && pnet[i].schema[k]; k++)
if( pnet[i].inout[k] == STREAM_IN)
fprintf(stderr, "%s.%s ", pnet[i].schema[k],
pnet[i].tables[k]);
- fprintf(stderr, " --> ");
+ if( pnet[i].inout[0])
+ fprintf(stderr, " --> ");
for (k = 0; k < MAXSTREAMS && pnet[i].schema[k]; k++)
if( pnet[i].inout[k] == STREAM_OUT)
fprintf(stderr, "%s.%s ", pnet[i].schema[k],
pnet[i].tables[k]);
- fprintf(stderr, "#\n ");
+ if (pnet[i].error)
+ fprintf(stderr, " errors:%s", pnet[i].error);
+ fprintf(stderr, "\n");
}
(void) ret;
return MAL_SUCCEED;
@@ -660,34 +666,24 @@ static void
CQexecute( Client cntxt, int idx)
{
CQnode *node= pnet+ idx;
- InstrPtr sig;
+ str msg;
if( pnstatus != CQRUNNING)
return;
- sig = getInstrPtr(node->mb,0);
-#ifdef DEBUG_CQUERY
- fprintf(stderr, "#cquery.execute %s.%s\n", getModuleId(sig),
getFunctionId(sig));
-#endif
// first grab exclusive access to all streams.
#ifdef DEBUG_CQUERY
- fprintf(stderr, "#cquery.execute %s.%s all locked\n",getModuleId(sig),
getFunctionId(sig));
+ fprintf(stderr, "#cquery.execute %s.%s locked\n",node->mod, node->fcn);
fprintFunction(stderr, node->mb, 0, LIST_MAL_NAME | LIST_MAL_VALUE |
LIST_MAL_MAPI);
#endif
- (void)runMALsequence(cntxt, node->mb, 1, 0, node->stk, 0, 0);
-
-#ifdef DEBUG_CQUERY
- fprintf(stderr, "#cquery.execute %s.%s transition done:\n",
getModuleId(sig), getFunctionId(sig));
-#endif
+ msg = runMALsequence(cntxt, node->mb, 1, 0, node->stk, 0, 0);
+ if( msg != MAL_SUCCEED)
+ pnet[idx].error = msg;
- // remember the time last accessed
- (void) MTIMEcurrent_timestamp(&node->seen);
-
// release all locks held
-
#ifdef DEBUG_CQUERY
- fprintf(stderr, "#cquery.execute %s.%s finished\n", getModuleId(sig),
getFunctionId(sig));
+ fprintf(stderr, "#cquery.execute %s.%s finished\n", node->mod,
node->fcn);
#endif
MT_lock_set(&ttrLock);
if( node->status != CQPAUSE)
@@ -701,19 +697,18 @@ CQscheduler(void *dummy)
int i, j;
int k = -1;
int pntasks;
+ int delay = cycleDelay;
Client cntxt = (Client) dummy;
str msg = MAL_SUCCEED;
- lng t, analysis, now;
+ lng t, now;
BAT *claimed[MAXSTREAMS];
- timestamp ts, tn;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list