Changeset: 954c44bd4b6b for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=954c44bd4b6b
Modified Files:
sql/backends/monet5/50_cquery.mal
sql/backends/monet5/Tests/cquery00.malC
sql/backends/monet5/Tests/cquery05.sql
sql/backends/monet5/Tests/cquery10.sql
sql/backends/monet5/sql_cquery.c
sql/scripts/50_cquery.sql
Branch: timetrails
Log Message:
Log record maintenance
diffs (261 lines):
diff --git a/sql/backends/monet5/50_cquery.mal
b/sql/backends/monet5/50_cquery.mal
--- a/sql/backends/monet5/50_cquery.mal
+++ b/sql/backends/monet5/50_cquery.mal
@@ -73,7 +73,7 @@ address CQwindow
comment "Set the input window size constraints";
# continuous query status analysis
-pattern log() (tick:bat[:timestamp],mod:bat[:str],fcn:bat[:str],
status:bat[:str], time:bat[:lng],error:bat[:str])
+pattern log() (tick:bat[:timestamp],mod:bat[:str],fcn:bat[:str],
time:bat[:lng],error:bat[:str])
address CQlog
comment "The log of all CQ executions";
diff --git a/sql/backends/monet5/Tests/cquery00.malC
b/sql/backends/monet5/Tests/cquery00.malC
--- a/sql/backends/monet5/Tests/cquery00.malC
+++ b/sql/backends/monet5/Tests/cquery00.malC
@@ -4,6 +4,6 @@ cquery.dump();
cquery.show("unknown","query");
-(tick:bat[:timestamp],mod:bat[:str],fcn:bat[:str], status:bat[:str],
time:bat[:lng],error:bat[:str]) := cquery.log();
+(tick:bat[:timestamp],mod:bat[:str],fcn:bat[:str],
time:bat[:lng],error:bat[:str]) := cquery.log();
-io.print(tick,mod,fcn,status,time,error);
+io.print(tick,mod,fcn,time,error);
diff --git a/sql/backends/monet5/Tests/cquery05.sql
b/sql/backends/monet5/Tests/cquery05.sql
--- a/sql/backends/monet5/Tests/cquery05.sql
+++ b/sql/backends/monet5/Tests/cquery05.sql
@@ -10,12 +10,12 @@ end;
-- register the CQ
call cquery.register('sys','cq_basic');
--- The scheduler executes this CQ every 50 milliseconds
-call cquery.heartbeat('sys','cq_basic',50);
+-- The scheduler executes this CQ every 1000 milliseconds
+call cquery.heartbeat('sys','cq_basic',1000);
-- reactivate this continuous query
call cquery.resume('sys','cq_basic');
-call cquery.wait(2000);
+call cquery.wait(2100);
call cquery.pause('sys','cq_basic');
select 'RESULT';
diff --git a/sql/backends/monet5/Tests/cquery10.sql
b/sql/backends/monet5/Tests/cquery10.sql
--- a/sql/backends/monet5/Tests/cquery10.sql
+++ b/sql/backends/monet5/Tests/cquery10.sql
@@ -4,14 +4,17 @@ create table result(i integer);
create procedure cq_cycles()
begin
- insert into result values(select count(*) from result);
+ insert into sys.result (select count(*) from sys.result);
end;
+-- register the CQ
+call cquery.register('sys','cq_cycles');
+
+-- The scheduler interval is 1 sec
+call cquery.heartbeat('sys','cq_cycles',1000);
+
-- The scheduler executes all CQ at most 5 rounds
-call cquery.cycles(5);
-
--- register the CQ
-call cquery.register('iot','cq_cycles');
+call cquery.cycles('sys','cq_cycles',5);
-- reactivate all continuous queries
call cquery.resume();
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -62,7 +62,6 @@ static int pnstatus;
static BAT *CQ_id_tick = 0;
static BAT *CQ_id_mod = 0;
static BAT *CQ_id_fcn = 0;
-static BAT *CQ_id_state = 0;
static BAT *CQ_id_time = 0;
static BAT *CQ_id_error = 0;
@@ -155,13 +154,11 @@ CQcreatelog(void){
CQ_id_tick = COLnew(0, TYPE_timestamp, 1<<16, TRANSIENT);
CQ_id_mod = COLnew(0, TYPE_str, 1<<16, TRANSIENT);
CQ_id_fcn = COLnew(0, TYPE_str, 1<<16, TRANSIENT);
- CQ_id_state = COLnew(0, TYPE_str, 1<<16, TRANSIENT);
CQ_id_time = COLnew(0, TYPE_lng, 1<<16, TRANSIENT);
CQ_id_error = COLnew(0, TYPE_str, 1<<16, TRANSIENT);
if ( CQ_id_tick == 0 &&
CQ_id_mod == 0 &&
CQ_id_fcn == 0 &&
- CQ_id_state == 0 &&
CQ_id_time == 0 &&
CQ_id_error == 0){
(void) CQcleanuplog();
@@ -170,10 +167,22 @@ CQcreatelog(void){
return MAL_SUCCEED;
}
+static void
+CQentry(int idx)
+{
+ CQcreatelog();
+ if( BUNappend(CQ_id_tick, &pnet[idx].seen,FALSE) != GDK_SUCCEED ||
+ BUNappend(CQ_id_mod, pnet[idx].mod,FALSE) != GDK_SUCCEED ||
+ BUNappend(CQ_id_fcn, pnet[idx].fcn,FALSE) != GDK_SUCCEED ||
+ BUNappend(CQ_id_time, &pnet[idx].time,FALSE) != GDK_SUCCEED ||
+ BUNappend(CQ_id_error, (pnet[idx].error ?
pnet[idx].error:""),FALSE) != GDK_SUCCEED )
+ pnet[idx].error =
createException(SQL,"cquery.logentry",MAL_MALLOC_FAIL);
+}
+
str
CQlog( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
- BAT *tickbat, *modbat, *fcnbat, *statebat, *timebat, *errbat;
- bat *tickret, *modret, *fcnret, *stateret, *timeret, *errorret;
+ BAT *tickbat, *modbat, *fcnbat, *timebat, *errbat;
+ bat *tickret, *modret, *fcnret, *timeret, *errorret;
(void) cntxt;
(void) mb;
@@ -181,9 +190,8 @@ CQlog( Client cntxt, MalBlkPtr mb, MalSt
tickret = getArgReference_bat(stk, pci, 0);
modret = getArgReference_bat(stk, pci, 1);
fcnret = getArgReference_bat(stk, pci, 2);
- stateret = getArgReference_bat(stk, pci, 3);
- timeret = getArgReference_bat(stk, pci, 4);
- errorret = getArgReference_bat(stk, pci, 5);
+ timeret = getArgReference_bat(stk, pci, 3);
+ errorret = getArgReference_bat(stk, pci, 4);
#ifdef DEBUG_CQUERY
fprintf(stderr,"#produce query.log table\n");
#endif
@@ -197,9 +205,6 @@ CQlog( Client cntxt, MalBlkPtr mb, MalSt
fcnbat = COLcopy(CQ_id_fcn, TYPE_str, 0, TRANSIENT);
if(fcnbat == NULL)
goto wrapup;
- statebat = COLcopy(CQ_id_state, TYPE_str, 0, TRANSIENT);
- if(statebat == NULL)
- goto wrapup;
timebat = COLcopy(CQ_id_time, TYPE_lng, 0, TRANSIENT);
if(timebat == NULL)
goto wrapup;
@@ -209,7 +214,6 @@ CQlog( Client cntxt, MalBlkPtr mb, MalSt
BBPkeepref(*tickret = tickbat->batCacheid);
BBPkeepref(*modret = modbat->batCacheid);
BBPkeepref(*fcnret = fcnbat->batCacheid);
- BBPkeepref(*stateret = statebat->batCacheid);
BBPkeepref(*timeret = timebat->batCacheid);
BBPkeepref(*errorret = errbat->batCacheid);
return MAL_SUCCEED;
@@ -683,7 +687,7 @@ CQexecute( Client cntxt, int idx)
// release all locks held
#ifdef DEBUG_CQUERY
- fprintf(stderr, "#cquery.execute %s.%s finished\n", node->mod,
node->fcn);
+ fprintf(stderr, "#cquery.execute %s.%s finised %s\n",
node->mod, node->fcn, (msg?msg:""));
#endif
MT_lock_set(&ttrLock);
if( node->status != CQPAUSE)
@@ -725,51 +729,44 @@ CQscheduler(void *dummy)
MT_lock_set(&ttrLock); // analysis should be done with
exclusive access
for (k = i = 0; i < pnettop; i++)
if ( pnet[i].status == CQWAIT ){
- pnet[i].enabled = pnet[i].error == 0;
- if( pnet[i].cycles == 0)
- pnet[i].enabled = 0;
+ pnet[i].enabled = pnet[i].error == 0 && pnet[i].cycles
!= 0;
- /* queries are triggered by the heartbeat or all
window constraints */
- /* a heartbeat in combination with a window constraint
is ambiguous */
- /* at least constraint on either should have been set */
+ /* Queries are triggered by the heartbeat or all
window constraints */
+ /* A heartbeat in combination with a window constraint
is ambiguous */
+ /* At least one constraint should be set */
if( pnet[i].beats == lng_nil && pnet[i].bats[0] == 0)
pnet[i].enabled = 0;
if( pnet[i].enabled && pnet[i].beats > 0){
- if( pnet[i].run == lng_nil){
- // execute the first round
- } else {
- if( now > pnet[i].run + pnet[i].beats)
- pnet[i].enabled = 0;
+ if( pnet[i].run != lng_nil ) {
+ pnet[i].enabled = now >= pnet[i].run +
pnet[i].beats;
#ifdef DEBUG_CQUERY_SCHEDULER
- fprintf(stderr,"#now %s.%s
"LLFMT"%s\n", pnet[i].mod, pnet[i].fcn, now, (pnet[i].enabled?
"enabled":"disabled"));
+ fprintf(stderr,"#beat %s.%s
"LLFMT"("LLFMT") %s\n", pnet[i].mod, pnet[i].fcn,
+ pnet[i].run + pnet[i].beats,
now, (pnet[i].enabled? "enabled":"disabled"));
#endif
+ }
}
- }
/* check if all input baskets are available */
for (j = 0; pnet[i].enabled && (b = pnet[i].bats[j]);
j++)
/* consider execution only if baskets are
properly filled */
- if ( pnet[i].window[j] >= 0 && (BUN)
pnet[i].window[j] > BATcount(b)){
+ if ( pnet[i].inout[j] == STREAM_IN && (BUN)
pnet[i].window[j] > BATcount(b)){
pnet[i].enabled = 0;
break;
}
/* check availability of all stream baskets */
- for (j = 0; pnet[i].enabled && (b = pnet[i].bats[j]);
j++)
+ for (j = 0; pnet[i].enabled && (b = pnet[i].bats[j]);
j++){
for(k=0; claimed[k]; k++)
- if(claimed[k] == b){
+ if(claimed[k] == b)
pnet[i].enabled = 0;
#ifdef DEBUG_CQUERY_SCHEDULER
fprintf(stderr, "#cquery:
%s.%s,disgarded \n", pnet[i].mod, pnet[i].fcn);
#endif
+ break;
}
-
- for(k=0; claimed[k]; k++) {
- // find end of list
- }
- for (j = 0; pnet[i].enabled && (b = pnet[i].bats[j]);
j++) {
- claimed[k++] = b;
+ if (pnet[i].enabled && claimed[k] == 0){
+ claimed[k] = b;
}
#ifdef DEBUG_CQUERY_SCHEDULER
@@ -807,8 +804,10 @@ CQscheduler(void *dummy)
if( pnet[i].cycles != int_nil && pnet[i].cycles > 0)
pnet[i].cycles--;
pnet[i].run = now; /* last
executed */
- pnet[i].time += GDKusec() - t; /* keep around in
microseconds */
+ pnet[i].time = GDKusec() - t; /* keep around in
microseconds */
+ (void) MTIMEcurrent_timestamp(&pnet[i].seen);
pnet[i].enabled = 0;
+ CQentry(i);
if (msg != MAL_SUCCEED ){
char buf[BUFSIZ];
if (pnet[i].error == NULL) {
@@ -827,11 +826,6 @@ CQscheduler(void *dummy)
fprintf(stderr, "#Terminate query thread %s limit %d
\n", pnet[enabled[m]].fcnname, pnet[enabled[m]].limit);
#endif
MT_join_thread(pnet[enabled[m]].tid);
- if( pnet[enabled[m]].limit > 0)
pnet[enabled[m]].limit--;
- if( pnet[enabled[m]].limit ==0)
- pnet[enabled[m]].status = CQPAUSE;
- //if( pnet[enabled[m]].limit == 0)
- //CQderegisterInternal(enabled[m]);
}
#ifdef DEBUG_CQUERY
diff --git a/sql/scripts/50_cquery.sql b/sql/scripts/50_cquery.sql
--- a/sql/scripts/50_cquery.sql
+++ b/sql/scripts/50_cquery.sql
@@ -71,7 +71,7 @@ create procedure cquery.window("schema"
-- continuous query status analysis
create function cquery.log()
- returns table(tick timestamp, "schema" string, "function" string, "status"
string, time bigint, errors string)
+ returns table(tick timestamp, "schema" string, "function" string, time
bigint, errors string)
external name cquery.log;
create function cquery.summary()
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list