Changeset: 0b2dd96d5c4a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0b2dd96d5c4a
Modified Files:
        clients/Tests/exports.stable.out
        monetdb5/modules/mal/wlcr.c
        monetdb5/modules/mal/wlcr.h
        monetdb5/optimizer/opt_wlcr.c
        sql/backends/monet5/sql_scenario.c
        sql/backends/monet5/sql_wlcr.c
        sql/backends/monet5/sql_wlcr.h
        sql/backends/monet5/sql_wlcr.mal
        sql/scripts/60_wlcr.sql
        sql/test/wlcr/Tests/wlr20.py
        sql/test/wlcr/Tests/wlr30.py
        sql/test/wlcr/Tests/wlr40.py
        sql/test/wlcr/Tests/wlr50.py
Branch: wlcr
Log Message:

Various fixes
- register threads at GDK and watch out for exiting
- update of drift forces log file
- timing issue during system startup, added waitformaster()
- propoer call to execute catalog queries, ignoring change()


diffs (truncated from 418 to 300 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1892,7 +1892,7 @@ str WLCexit(void);
 str WLCfinish(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 str WLCgeneric(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 str WLCgetConfig(void);
-str WLCinit(Client cntxt);
+str WLCinit(void);
 str WLCinitCmd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 str WLClogrollback(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 str WLClogthreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
diff --git a/monetdb5/modules/mal/wlcr.c b/monetdb5/modules/mal/wlcr.c
--- a/monetdb5/modules/mal/wlcr.c
+++ b/monetdb5/modules/mal/wlcr.c
@@ -245,7 +245,7 @@ static void
 WLCRlogger(void *arg)
 {
        (void) arg;
-       while(1){
+       while(!GDKexiting()){
                if( wlcr_logs && wlcr_fd ){
                        if (wlcr_start + wlcr_drift < GDKms() / 1000){
                                MT_lock_set(&wlcr_lock);
@@ -257,7 +257,7 @@ WLCRlogger(void *arg)
                if( wlcr_drift)
                                MT_sleep_ms( wlcr_drift * 1000);
                else
-                               MT_sleep_ms(  10  * 1000);
+                               MT_sleep_ms(  1  * 1000);
        }
 }
 /*
@@ -265,19 +265,13 @@ WLCRlogger(void *arg)
  * Then the master record information should be set and the WLClogger started.
  */
 str 
-WLCinit(Client cntxt)
+WLCinit(void)
 {
        char path[PATHLENGTH];
        str pathname, msg= MAL_SUCCEED;
        FILE *fd;
 
-       if( wlcr_logs){
-#ifdef _WLC_DEBUG_
-               mnstr_printf(cntxt->fdout,"#WLC already running\n");
-#else
-       (void) cntxt;
-#endif
-       } else{
+       if( wlcr_logs == NULL){
                // use default location for archive
                pathname = GDKfilepath(0,0,"master",0);
                snprintf(path, PATHLENGTH,"%s%cwlc.config", pathname, DIR_SEP);
@@ -297,6 +291,7 @@ WLCinit(Client cntxt)
                if (MT_create_thread(&wlcr_logger, WLCRlogger , (void*) 0, 
MT_THR_JOINABLE) < 0) {
                 GDKerror("wlcr.logger thread could not be spawned");
         }
+               GDKregister(wlcr_logger);
        }
        return MAL_SUCCEED;
 }
@@ -327,10 +322,11 @@ WLCstopmaster(Client cntxt, MalBlkPtr mb
 str 
 WLCinitCmd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
+       (void) cntxt;
        (void) mb;
        (void) stk;
        (void) pci;
-       return WLCinit(cntxt);
+       return WLCinit();
 }
 
 str 
@@ -351,12 +347,16 @@ WLClogrollback(Client cntxt, MalBlkPtr m
        return MAL_SUCCEED;
 }
 
+/* Changing the drift should have immediate effect
+ * It forces a new log file
+ */
 str 
 WLCdrift(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        (void) mb;
        (void) cntxt;
        wlcr_drift = * getArgReference_int(stk,pci,1);
+       WLCcloselogger();
        return MAL_SUCCEED;
 }
 
@@ -729,7 +729,7 @@ WLCwrite(Client cntxt)
 {      str msg = MAL_SUCCEED;
        InstrPtr p;
        // save the wlcr record on a file 
-       if( cntxt->wlcr == 0 || cntxt->wlcr->stop == 0)
+       if( cntxt->wlcr == 0 || cntxt->wlcr->stop <= 1)
                return MAL_SUCCEED;
 
        if( wlcr_logs ){        
@@ -774,7 +774,7 @@ WLCwrite(Client cntxt)
 str
 WLCcommit(int clientid)
 {
-       if( mal_clients[clientid].wlcr){
+       if( mal_clients[clientid].wlcr && mal_clients[clientid].wlcr->stop > 1){
                newStmt(mal_clients[clientid].wlcr,"wlr","commit");
                return WLCwrite( &mal_clients[clientid]);
        }
diff --git a/monetdb5/modules/mal/wlcr.h b/monetdb5/modules/mal/wlcr.h
--- a/monetdb5/modules/mal/wlcr.h
+++ b/monetdb5/modules/mal/wlcr.h
@@ -28,7 +28,7 @@ mal_export int wlcr_drift;
 mal_export str wlcr_dbname;
 mal_export int wlcr_rollback;
 
-mal_export str WLCinit(Client cntxt);
+mal_export str WLCinit(void);
 mal_export str WLCexit(void);
 mal_export int WLCused(void);
 mal_export str WLCgetConfig(void);
diff --git a/monetdb5/optimizer/opt_wlcr.c b/monetdb5/optimizer/opt_wlcr.c
--- a/monetdb5/optimizer/opt_wlcr.c
+++ b/monetdb5/optimizer/opt_wlcr.c
@@ -18,7 +18,7 @@
 int
 OPTwlcrImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {      int i, j, limit, slimit, updates=0;
-       InstrPtr p, q, def;
+       InstrPtr p, q, def = 0;
        InstrPtr *old;
        lng usec = GDKusec();
        char buf[256];
@@ -59,6 +59,7 @@ OPTwlcrImplementation(Client cntxt, MalB
                } else
                if( getModuleId(p) == sqlRef && getFunctionId(p) == 
clear_tableRef ){
                        setFunctionId(def,changeRef);
+                               assert(def);
                                q= copyInstruction(p);
                                setModuleId(q, wlcrRef);
                                for( j=0; j< p->retc; j++)
diff --git a/sql/backends/monet5/sql_scenario.c 
b/sql/backends/monet5/sql_scenario.c
--- a/sql/backends/monet5/sql_scenario.c
+++ b/sql/backends/monet5/sql_scenario.c
@@ -260,10 +260,8 @@ SQLinit(void)
                throw(SQL, "SQLinit", "Starting idle manager failed");
        }
        GDKregister(idlethread);
-       // check WLCR status
-       WLCinit(&mal_clients[0]);
-       WLRinit(&mal_clients[0]);
-       return MAL_SUCCEED;
+       WLCinit();
+       return WLRinit();
 }
 
 str
@@ -639,14 +637,13 @@ SQLexitClient(Client c)
  */
 str
 SQLinitEnvironment(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{
+{ 
        (void) mb;
        (void) stk;
        (void) pci;
        return SQLinitClient(cntxt);
 }
 
-
 str
 SQLstatement(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
diff --git a/sql/backends/monet5/sql_wlcr.c b/sql/backends/monet5/sql_wlcr.c
--- a/sql/backends/monet5/sql_wlcr.c
+++ b/sql/backends/monet5/sql_wlcr.c
@@ -164,7 +164,7 @@ WLRinitReplica(str dbname)
        GDKfree(dir);
        if( fd ){
                (void) fclose(fd);
-               throw(SQL,"setreplica","Already in replica mode for 
'%s'",dbname);
+               return MAL_SUCCEED;
        }
 
        msg = WLRgetMaster(dbname);
@@ -253,7 +253,7 @@ WLRprocess(void *arg)
                do{
                        pc = mb->stop;
                        if( parseMAL(c, c->curprg, 1, 1)  || mb->errors){
-                               mnstr_printf(GDKerr,"#wlcr.process:parsing 
failed '%s'\n",path);
+                               mnstr_printf(GDKerr,"#wlcr.process:parsing 
failed '%s':\n",path);
                        }
                        mb = c->curprg->def; // needed
                        q= getInstrPtr(mb, mb->stop-1);
@@ -298,24 +298,31 @@ WLRprocess(void *arg)
        cntxt->wlcr_mode = 0;
 }
 
+/*
+ * A timing issue. The WLRprocess can only start after the
+ * SQL environment has been initialized.
+ * It is now activated as part of the startup, but before
+ * a SQL client is known.
+ */
 static void
 WLRprocessScheduler(void *arg)
 {
        Client cntxt = (Client) arg;
 
-       while(1){
+       while(!GDKexiting()){
+               // wait at most for the drift period, also at start
+               MT_sleep_ms( (wlcr_drift? wlcr_drift:1) * 1000 );
                if( wlr_master)
                        WLRgetMaster(wlr_master);
-               if( wlr_nextbatch < wlcr_batches)
+               if( wlr_nextbatch < wlcr_batches )
                        WLRprocess(cntxt);
-               // wait at most for the drift period
-               MT_sleep_ms( (wlcr_drift? wlcr_drift:1) * 1000 );
        }
 }
 
-void
-WLRinit(Client cntxt)
+str
+WLRinit(void)
 {
+       Client cntxt = &mal_clients[0];
        MT_Id wlcr_thread;
        
        WLRgetConfig();
@@ -324,16 +331,36 @@ WLRinit(Client cntxt)
        if( wlr_logs){
                // Always try to roll forward before you continue
                cntxt->wlcr_mode = WLCR_REPLICATE;
-               // The client has to wait initially for all logs known to be 
processed.
-               WLRprocess(cntxt);
                if (MT_create_thread(&wlcr_thread, WLRprocessScheduler, (void*) 
cntxt, MT_THR_JOINABLE) < 0) {
-                               GDKerror("wlcr.replicate:replay scheduling 
process can not be started");
+                               throw(SQL,"wlcr.init","Starting wlr manager 
failed");
                }
+               GDKregister(wlcr_thread);
        }
+       return MAL_SUCCEED;
+}
+
+/* for testing it is helpful to be able to wait until all log files have been 
processed */
+str
+WLRwaitformaster(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{      int i = 0;
+       (void) cntxt;
+       (void) mb;
+       (void) stk;
+       (void) pci;
+
+       WLRgetMaster(wlr_master);
+       while( wlr_nextbatch < wlcr_batches && ! GDKexiting() && i++  < 60){
+               mnstr_printf(cntxt->fdout,"#waiting for master %d < 
%d\n",wlr_nextbatch, wlcr_batches);
+               MT_sleep_ms(1000);
+       }
+       if( i >= 60)
+               throw(SQL,"waitformaster","Time out (> 60 seconds)");
+       mnstr_printf(cntxt->fdout,"#in sync with master %d == 
%d\n",wlr_nextbatch, wlcr_batches);
+       return MAL_SUCCEED;
 }
 
 str
-WLCRreplicate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+WLRreplicate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {      str msg;
        MT_Id wlcr_thread;
        (void) mb;
@@ -414,34 +441,23 @@ WLRquery(Client cntxt, MalBlkPtr mb, Mal
        return msg;
 }
 
+/* A change event need not be executed, because it is already captured
+ * in the update/append/delete
+ */
 str
 WLRchange(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{      str qry =  *getArgReference_str(stk,pci,1);
-       str msg = MAL_SUCCEED;
-       char *x, *y, *qtxt;
-
+{      
+       (void) cntxt;
+       (void) pci;
+       (void) stk;
        (void) mb;
-       if( cntxt->wlcr_kind == WLCR_ROLLBACK)
-               return msg;
-       // we need to get rid of the escaped quote.
-       x = qtxt= (char*) GDKmalloc(strlen(qry) +1);
-       for(y = qry; *y; y++){
-               if( *y == '\\' ){
-                       if( *(y+1) ==  '\'')
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to