Changeset: 954c44bd4b6b for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=954c44bd4b6b
Modified Files:
        sql/backends/monet5/50_cquery.mal
        sql/backends/monet5/Tests/cquery00.malC
        sql/backends/monet5/Tests/cquery05.sql
        sql/backends/monet5/Tests/cquery10.sql
        sql/backends/monet5/sql_cquery.c
        sql/scripts/50_cquery.sql
Branch: timetrails
Log Message:

Log record maintenance


diffs (261 lines):

diff --git a/sql/backends/monet5/50_cquery.mal 
b/sql/backends/monet5/50_cquery.mal
--- a/sql/backends/monet5/50_cquery.mal
+++ b/sql/backends/monet5/50_cquery.mal
@@ -73,7 +73,7 @@ address CQwindow
 comment "Set the input window size constraints";
 
 # continuous query status analysis
-pattern log() (tick:bat[:timestamp],mod:bat[:str],fcn:bat[:str], 
status:bat[:str], time:bat[:lng],error:bat[:str])
+pattern log() (tick:bat[:timestamp],mod:bat[:str],fcn:bat[:str], 
time:bat[:lng],error:bat[:str])
 address CQlog
 comment "The log of all CQ executions";
 
diff --git a/sql/backends/monet5/Tests/cquery00.malC 
b/sql/backends/monet5/Tests/cquery00.malC
--- a/sql/backends/monet5/Tests/cquery00.malC
+++ b/sql/backends/monet5/Tests/cquery00.malC
@@ -4,6 +4,6 @@ cquery.dump();
 
 cquery.show("unknown","query");
 
-(tick:bat[:timestamp],mod:bat[:str],fcn:bat[:str], status:bat[:str], 
time:bat[:lng],error:bat[:str]) := cquery.log();
+(tick:bat[:timestamp],mod:bat[:str],fcn:bat[:str], 
time:bat[:lng],error:bat[:str]) := cquery.log();
 
-io.print(tick,mod,fcn,status,time,error);
+io.print(tick,mod,fcn,time,error);
diff --git a/sql/backends/monet5/Tests/cquery05.sql 
b/sql/backends/monet5/Tests/cquery05.sql
--- a/sql/backends/monet5/Tests/cquery05.sql
+++ b/sql/backends/monet5/Tests/cquery05.sql
@@ -10,12 +10,12 @@ end;
 -- register the CQ
 call cquery.register('sys','cq_basic');
 
--- The scheduler executes this CQ every 50 milliseconds
-call cquery.heartbeat('sys','cq_basic',50);
+-- The scheduler executes this CQ every 1000 milliseconds
+call cquery.heartbeat('sys','cq_basic',1000);
 
 -- reactivate this continuous query
 call cquery.resume('sys','cq_basic');
-call cquery.wait(2000);
+call cquery.wait(2100);
 call cquery.pause('sys','cq_basic');
 
 select 'RESULT';
diff --git a/sql/backends/monet5/Tests/cquery10.sql 
b/sql/backends/monet5/Tests/cquery10.sql
--- a/sql/backends/monet5/Tests/cquery10.sql
+++ b/sql/backends/monet5/Tests/cquery10.sql
@@ -4,14 +4,17 @@ create table result(i integer);
 
 create procedure cq_cycles()
 begin
-       insert into result values(select count(*) from result);
+       insert into sys.result (select count(*) from sys.result);
 end;
 
+-- register the CQ
+call cquery.register('sys','cq_cycles');
+
+-- The scheduler interval is 1 sec 
+call cquery.heartbeat('sys','cq_cycles',1000);
+
 -- The scheduler executes all CQ at most 5 rounds
-call cquery.cycles(5);
-
--- register the CQ
-call cquery.register('iot','cq_cycles');
+call cquery.cycles('sys','cq_cycles',5);
 
 -- reactivate all continuous queries
 call cquery.resume();
diff --git a/sql/backends/monet5/sql_cquery.c b/sql/backends/monet5/sql_cquery.c
--- a/sql/backends/monet5/sql_cquery.c
+++ b/sql/backends/monet5/sql_cquery.c
@@ -62,7 +62,6 @@ static int pnstatus;
 static BAT *CQ_id_tick = 0;
 static BAT *CQ_id_mod = 0;
 static BAT *CQ_id_fcn = 0;
-static BAT *CQ_id_state = 0;
 static BAT *CQ_id_time = 0;
 static BAT *CQ_id_error = 0;
 
@@ -155,13 +154,11 @@ CQcreatelog(void){
        CQ_id_tick = COLnew(0, TYPE_timestamp, 1<<16, TRANSIENT);
        CQ_id_mod = COLnew(0, TYPE_str, 1<<16, TRANSIENT);
        CQ_id_fcn = COLnew(0, TYPE_str, 1<<16, TRANSIENT);
-       CQ_id_state = COLnew(0, TYPE_str, 1<<16, TRANSIENT);
        CQ_id_time = COLnew(0, TYPE_lng, 1<<16, TRANSIENT);
        CQ_id_error = COLnew(0, TYPE_str, 1<<16, TRANSIENT);
        if ( CQ_id_tick == 0 &&
                CQ_id_mod == 0 &&
                CQ_id_fcn == 0 &&
-               CQ_id_state == 0 &&
                CQ_id_time == 0 &&
                CQ_id_error == 0){
                        (void) CQcleanuplog();
@@ -170,10 +167,22 @@ CQcreatelog(void){
        return MAL_SUCCEED;
 }
 
+static void
+CQentry(int idx)
+{
+       CQcreatelog();
+       if( BUNappend(CQ_id_tick, &pnet[idx].seen,FALSE) != GDK_SUCCEED ||
+               BUNappend(CQ_id_mod, pnet[idx].mod,FALSE) != GDK_SUCCEED ||
+               BUNappend(CQ_id_fcn, pnet[idx].fcn,FALSE) != GDK_SUCCEED ||
+               BUNappend(CQ_id_time, &pnet[idx].time,FALSE) != GDK_SUCCEED ||
+               BUNappend(CQ_id_error, (pnet[idx].error ? 
pnet[idx].error:""),FALSE) != GDK_SUCCEED )
+               pnet[idx].error = 
createException(SQL,"cquery.logentry",MAL_MALLOC_FAIL);
+}
+
 str
 CQlog( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
-       BAT *tickbat, *modbat, *fcnbat, *statebat, *timebat, *errbat;
-       bat *tickret, *modret, *fcnret, *stateret, *timeret, *errorret;
+       BAT *tickbat, *modbat, *fcnbat, *timebat, *errbat;
+       bat *tickret, *modret, *fcnret, *timeret, *errorret;
        
        (void) cntxt;
        (void) mb;
@@ -181,9 +190,8 @@ CQlog( Client cntxt, MalBlkPtr mb, MalSt
        tickret = getArgReference_bat(stk, pci, 0);
        modret = getArgReference_bat(stk, pci, 1);
        fcnret = getArgReference_bat(stk, pci, 2);
-       stateret = getArgReference_bat(stk, pci, 3);
-       timeret = getArgReference_bat(stk, pci, 4);
-       errorret = getArgReference_bat(stk, pci, 5);
+       timeret = getArgReference_bat(stk, pci, 3);
+       errorret = getArgReference_bat(stk, pci, 4);
 #ifdef DEBUG_CQUERY
        fprintf(stderr,"#produce query.log table\n");
 #endif
@@ -197,9 +205,6 @@ CQlog( Client cntxt, MalBlkPtr mb, MalSt
        fcnbat = COLcopy(CQ_id_fcn, TYPE_str, 0, TRANSIENT);
        if(fcnbat == NULL)
                goto wrapup;
-       statebat = COLcopy(CQ_id_state, TYPE_str, 0, TRANSIENT);
-       if(statebat == NULL)
-               goto wrapup;
        timebat = COLcopy(CQ_id_time, TYPE_lng, 0, TRANSIENT);
        if(timebat == NULL)
                goto wrapup;
@@ -209,7 +214,6 @@ CQlog( Client cntxt, MalBlkPtr mb, MalSt
        BBPkeepref(*tickret = tickbat->batCacheid);
        BBPkeepref(*modret = modbat->batCacheid);
        BBPkeepref(*fcnret = fcnbat->batCacheid);
-       BBPkeepref(*stateret = statebat->batCacheid);
        BBPkeepref(*timeret = timebat->batCacheid);
        BBPkeepref(*errorret = errbat->batCacheid);
        return MAL_SUCCEED;
@@ -683,7 +687,7 @@ CQexecute( Client cntxt, int idx)
 
        // release all locks held
 #ifdef DEBUG_CQUERY
-       fprintf(stderr, "#cquery.execute %s.%s finished\n", node->mod, 
node->fcn);
+               fprintf(stderr, "#cquery.execute %s.%s finised %s\n", 
node->mod, node->fcn, (msg?msg:""));
 #endif
        MT_lock_set(&ttrLock);
        if( node->status != CQPAUSE)
@@ -725,51 +729,44 @@ CQscheduler(void *dummy)
                MT_lock_set(&ttrLock); // analysis should be done with 
exclusive access
                for (k = i = 0; i < pnettop; i++) 
                if ( pnet[i].status == CQWAIT ){
-                       pnet[i].enabled = pnet[i].error == 0;
-                       if( pnet[i].cycles == 0)
-                               pnet[i].enabled = 0;
+                       pnet[i].enabled = pnet[i].error == 0 && pnet[i].cycles 
!= 0;
 
-                       /* queries are triggered by the heartbeat or  all 
window constraints */
-                       /* a heartbeat in combination with a window constraint 
is ambiguous */
-                       /* at least constraint on either should have been set */
+                       /* Queries are triggered by the heartbeat or  all 
window constraints */
+                       /* A heartbeat in combination with a window constraint 
is ambiguous */
+                       /* At least one constraint should be set */
                        if( pnet[i].beats == lng_nil && pnet[i].bats[0] == 0)
                                pnet[i].enabled = 0;
 
                        if( pnet[i].enabled && pnet[i].beats > 0){
-                               if( pnet[i].run == lng_nil){
-                                       // execute the first round
-                               } else {
-                                       if( now > pnet[i].run + pnet[i].beats)
-                                               pnet[i].enabled = 0;
+                               if( pnet[i].run != lng_nil ) {
+                                       pnet[i].enabled = now >= pnet[i].run + 
pnet[i].beats;
 #ifdef DEBUG_CQUERY_SCHEDULER
-                                       fprintf(stderr,"#now %s.%s  
"LLFMT"%s\n", pnet[i].mod, pnet[i].fcn, now, (pnet[i].enabled? 
"enabled":"disabled"));
+                                       fprintf(stderr,"#beat %s.%s  
"LLFMT"("LLFMT") %s\n", pnet[i].mod, pnet[i].fcn, 
+                                               pnet[i].run + pnet[i].beats, 
now, (pnet[i].enabled? "enabled":"disabled"));
 #endif
+                                       }
                                }
-                       }
 
                        /* check if all input baskets are available */
                        for (j = 0; pnet[i].enabled && (b = pnet[i].bats[j]); 
j++)
                                /* consider execution only if baskets are 
properly filled */
-                               if ( pnet[i].window[j] >= 0 && (BUN) 
pnet[i].window[j] > BATcount(b)){
+                               if ( pnet[i].inout[j] == STREAM_IN && (BUN) 
pnet[i].window[j] > BATcount(b)){
                                        pnet[i].enabled = 0;
                                        break;
                                } 
 
                        /* check availability of all stream baskets */
-                       for (j = 0; pnet[i].enabled && (b = pnet[i].bats[j]); 
j++)
+                       for (j = 0; pnet[i].enabled && (b = pnet[i].bats[j]); 
j++){
                                for(k=0; claimed[k]; k++)
-                                       if(claimed[k] == b){
+                                       if(claimed[k] == b)
                                                pnet[i].enabled = 0;
 #ifdef DEBUG_CQUERY_SCHEDULER
                                                fprintf(stderr, "#cquery: 
%s.%s,disgarded \n", pnet[i].mod, pnet[i].fcn);
 #endif
+                                               break;
                                        }
-
-                       for(k=0; claimed[k]; k++) {
-                               // find end of list
-                       }
-                       for (j = 0; pnet[i].enabled && (b = pnet[i].bats[j]); 
j++) {
-                               claimed[k++] = b;
+                               if (pnet[i].enabled && claimed[k] == 0){
+                                       claimed[k] = b;
                        }
 
 #ifdef DEBUG_CQUERY_SCHEDULER
@@ -807,8 +804,10 @@ CQscheduler(void *dummy)
                        if( pnet[i].cycles != int_nil && pnet[i].cycles > 0)
                                pnet[i].cycles--;
                        pnet[i].run = now;                              /* last 
executed */
-                       pnet[i].time += GDKusec() - t;   /* keep around in 
microseconds */
+                       pnet[i].time = GDKusec() - t;   /* keep around in 
microseconds */
+                       (void) MTIMEcurrent_timestamp(&pnet[i].seen);
                        pnet[i].enabled = 0;
+                       CQentry(i);
                        if (msg != MAL_SUCCEED ){
                                char buf[BUFSIZ];
                                if (pnet[i].error == NULL) {
@@ -827,11 +826,6 @@ CQscheduler(void *dummy)
                        fprintf(stderr, "#Terminate query thread %s limit %d 
\n", pnet[enabled[m]].fcnname, pnet[enabled[m]].limit);
 #endif
                        MT_join_thread(pnet[enabled[m]].tid);
-                       if( pnet[enabled[m]].limit > 0) 
pnet[enabled[m]].limit--;
-                       if(  pnet[enabled[m]].limit  ==0)
-                                pnet[enabled[m]].status = CQPAUSE; 
-                       //if( pnet[enabled[m]].limit == 0)
-                               //CQderegisterInternal(enabled[m]);
                }
 
 #ifdef DEBUG_CQUERY
diff --git a/sql/scripts/50_cquery.sql b/sql/scripts/50_cquery.sql
--- a/sql/scripts/50_cquery.sql
+++ b/sql/scripts/50_cquery.sql
@@ -71,7 +71,7 @@ create procedure cquery.window("schema" 
 -- continuous query status analysis
 
 create function cquery.log()
- returns table(tick timestamp,  "schema" string, "function" string, "status" 
string, time bigint, errors string)
+ returns table(tick timestamp,  "schema" string, "function" string, time 
bigint, errors string)
  external name cquery.log;
 
 create function cquery.summary()
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to