Changeset: 0b2dd96d5c4a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0b2dd96d5c4a
Modified Files:
clients/Tests/exports.stable.out
monetdb5/modules/mal/wlcr.c
monetdb5/modules/mal/wlcr.h
monetdb5/optimizer/opt_wlcr.c
sql/backends/monet5/sql_scenario.c
sql/backends/monet5/sql_wlcr.c
sql/backends/monet5/sql_wlcr.h
sql/backends/monet5/sql_wlcr.mal
sql/scripts/60_wlcr.sql
sql/test/wlcr/Tests/wlr20.py
sql/test/wlcr/Tests/wlr30.py
sql/test/wlcr/Tests/wlr40.py
sql/test/wlcr/Tests/wlr50.py
Branch: wlcr
Log Message:
Various fixes
- register threads at GDK and watch out for exiting
- update of drift forces log file
- timing issue during system startup, added waitformaster()
- propoer call to execute catalog queries, ignoring change()
diffs (truncated from 418 to 300 lines):
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -1892,7 +1892,7 @@ str WLCexit(void);
str WLCfinish(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
str WLCgeneric(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
str WLCgetConfig(void);
-str WLCinit(Client cntxt);
+str WLCinit(void);
str WLCinitCmd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
str WLClogrollback(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
str WLClogthreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
diff --git a/monetdb5/modules/mal/wlcr.c b/monetdb5/modules/mal/wlcr.c
--- a/monetdb5/modules/mal/wlcr.c
+++ b/monetdb5/modules/mal/wlcr.c
@@ -245,7 +245,7 @@ static void
WLCRlogger(void *arg)
{
(void) arg;
- while(1){
+ while(!GDKexiting()){
if( wlcr_logs && wlcr_fd ){
if (wlcr_start + wlcr_drift < GDKms() / 1000){
MT_lock_set(&wlcr_lock);
@@ -257,7 +257,7 @@ WLCRlogger(void *arg)
if( wlcr_drift)
MT_sleep_ms( wlcr_drift * 1000);
else
- MT_sleep_ms( 10 * 1000);
+ MT_sleep_ms( 1 * 1000);
}
}
/*
@@ -265,19 +265,13 @@ WLCRlogger(void *arg)
* Then the master record information should be set and the WLClogger started.
*/
str
-WLCinit(Client cntxt)
+WLCinit(void)
{
char path[PATHLENGTH];
str pathname, msg= MAL_SUCCEED;
FILE *fd;
- if( wlcr_logs){
-#ifdef _WLC_DEBUG_
- mnstr_printf(cntxt->fdout,"#WLC already running\n");
-#else
- (void) cntxt;
-#endif
- } else{
+ if( wlcr_logs == NULL){
// use default location for archive
pathname = GDKfilepath(0,0,"master",0);
snprintf(path, PATHLENGTH,"%s%cwlc.config", pathname, DIR_SEP);
@@ -297,6 +291,7 @@ WLCinit(Client cntxt)
if (MT_create_thread(&wlcr_logger, WLCRlogger , (void*) 0,
MT_THR_JOINABLE) < 0) {
GDKerror("wlcr.logger thread could not be spawned");
}
+ GDKregister(wlcr_logger);
}
return MAL_SUCCEED;
}
@@ -327,10 +322,11 @@ WLCstopmaster(Client cntxt, MalBlkPtr mb
str
WLCinitCmd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
+ (void) cntxt;
(void) mb;
(void) stk;
(void) pci;
- return WLCinit(cntxt);
+ return WLCinit();
}
str
@@ -351,12 +347,16 @@ WLClogrollback(Client cntxt, MalBlkPtr m
return MAL_SUCCEED;
}
+/* Changing the drift should have immediate effect
+ * It forces a new log file
+ */
str
WLCdrift(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
(void) mb;
(void) cntxt;
wlcr_drift = * getArgReference_int(stk,pci,1);
+ WLCcloselogger();
return MAL_SUCCEED;
}
@@ -729,7 +729,7 @@ WLCwrite(Client cntxt)
{ str msg = MAL_SUCCEED;
InstrPtr p;
// save the wlcr record on a file
- if( cntxt->wlcr == 0 || cntxt->wlcr->stop == 0)
+ if( cntxt->wlcr == 0 || cntxt->wlcr->stop <= 1)
return MAL_SUCCEED;
if( wlcr_logs ){
@@ -774,7 +774,7 @@ WLCwrite(Client cntxt)
str
WLCcommit(int clientid)
{
- if( mal_clients[clientid].wlcr){
+ if( mal_clients[clientid].wlcr && mal_clients[clientid].wlcr->stop > 1){
newStmt(mal_clients[clientid].wlcr,"wlr","commit");
return WLCwrite( &mal_clients[clientid]);
}
diff --git a/monetdb5/modules/mal/wlcr.h b/monetdb5/modules/mal/wlcr.h
--- a/monetdb5/modules/mal/wlcr.h
+++ b/monetdb5/modules/mal/wlcr.h
@@ -28,7 +28,7 @@ mal_export int wlcr_drift;
mal_export str wlcr_dbname;
mal_export int wlcr_rollback;
-mal_export str WLCinit(Client cntxt);
+mal_export str WLCinit(void);
mal_export str WLCexit(void);
mal_export int WLCused(void);
mal_export str WLCgetConfig(void);
diff --git a/monetdb5/optimizer/opt_wlcr.c b/monetdb5/optimizer/opt_wlcr.c
--- a/monetdb5/optimizer/opt_wlcr.c
+++ b/monetdb5/optimizer/opt_wlcr.c
@@ -18,7 +18,7 @@
int
OPTwlcrImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{ int i, j, limit, slimit, updates=0;
- InstrPtr p, q, def;
+ InstrPtr p, q, def = 0;
InstrPtr *old;
lng usec = GDKusec();
char buf[256];
@@ -59,6 +59,7 @@ OPTwlcrImplementation(Client cntxt, MalB
} else
if( getModuleId(p) == sqlRef && getFunctionId(p) ==
clear_tableRef ){
setFunctionId(def,changeRef);
+ assert(def);
q= copyInstruction(p);
setModuleId(q, wlcrRef);
for( j=0; j< p->retc; j++)
diff --git a/sql/backends/monet5/sql_scenario.c
b/sql/backends/monet5/sql_scenario.c
--- a/sql/backends/monet5/sql_scenario.c
+++ b/sql/backends/monet5/sql_scenario.c
@@ -260,10 +260,8 @@ SQLinit(void)
throw(SQL, "SQLinit", "Starting idle manager failed");
}
GDKregister(idlethread);
- // check WLCR status
- WLCinit(&mal_clients[0]);
- WLRinit(&mal_clients[0]);
- return MAL_SUCCEED;
+ WLCinit();
+ return WLRinit();
}
str
@@ -639,14 +637,13 @@ SQLexitClient(Client c)
*/
str
SQLinitEnvironment(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{
+{
(void) mb;
(void) stk;
(void) pci;
return SQLinitClient(cntxt);
}
-
str
SQLstatement(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
diff --git a/sql/backends/monet5/sql_wlcr.c b/sql/backends/monet5/sql_wlcr.c
--- a/sql/backends/monet5/sql_wlcr.c
+++ b/sql/backends/monet5/sql_wlcr.c
@@ -164,7 +164,7 @@ WLRinitReplica(str dbname)
GDKfree(dir);
if( fd ){
(void) fclose(fd);
- throw(SQL,"setreplica","Already in replica mode for
'%s'",dbname);
+ return MAL_SUCCEED;
}
msg = WLRgetMaster(dbname);
@@ -253,7 +253,7 @@ WLRprocess(void *arg)
do{
pc = mb->stop;
if( parseMAL(c, c->curprg, 1, 1) || mb->errors){
- mnstr_printf(GDKerr,"#wlcr.process:parsing
failed '%s'\n",path);
+ mnstr_printf(GDKerr,"#wlcr.process:parsing
failed '%s':\n",path);
}
mb = c->curprg->def; // needed
q= getInstrPtr(mb, mb->stop-1);
@@ -298,24 +298,31 @@ WLRprocess(void *arg)
cntxt->wlcr_mode = 0;
}
+/*
+ * A timing issue. The WLRprocess can only start after the
+ * SQL environment has been initialized.
+ * It is now activated as part of the startup, but before
+ * a SQL client is known.
+ */
static void
WLRprocessScheduler(void *arg)
{
Client cntxt = (Client) arg;
- while(1){
+ while(!GDKexiting()){
+ // wait at most for the drift period, also at start
+ MT_sleep_ms( (wlcr_drift? wlcr_drift:1) * 1000 );
if( wlr_master)
WLRgetMaster(wlr_master);
- if( wlr_nextbatch < wlcr_batches)
+ if( wlr_nextbatch < wlcr_batches )
WLRprocess(cntxt);
- // wait at most for the drift period
- MT_sleep_ms( (wlcr_drift? wlcr_drift:1) * 1000 );
}
}
-void
-WLRinit(Client cntxt)
+str
+WLRinit(void)
{
+ Client cntxt = &mal_clients[0];
MT_Id wlcr_thread;
WLRgetConfig();
@@ -324,16 +331,36 @@ WLRinit(Client cntxt)
if( wlr_logs){
// Always try to roll forward before you continue
cntxt->wlcr_mode = WLCR_REPLICATE;
- // The client has to wait initially for all logs known to be
processed.
- WLRprocess(cntxt);
if (MT_create_thread(&wlcr_thread, WLRprocessScheduler, (void*)
cntxt, MT_THR_JOINABLE) < 0) {
- GDKerror("wlcr.replicate:replay scheduling
process can not be started");
+ throw(SQL,"wlcr.init","Starting wlr manager
failed");
}
+ GDKregister(wlcr_thread);
}
+ return MAL_SUCCEED;
+}
+
+/* for testing it is helpful to be able to wait until all log files have been
processed */
+str
+WLRwaitformaster(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{ int i = 0;
+ (void) cntxt;
+ (void) mb;
+ (void) stk;
+ (void) pci;
+
+ WLRgetMaster(wlr_master);
+ while( wlr_nextbatch < wlcr_batches && ! GDKexiting() && i++ < 60){
+ mnstr_printf(cntxt->fdout,"#waiting for master %d <
%d\n",wlr_nextbatch, wlcr_batches);
+ MT_sleep_ms(1000);
+ }
+ if( i >= 60)
+ throw(SQL,"waitformaster","Time out (> 60 seconds)");
+ mnstr_printf(cntxt->fdout,"#in sync with master %d ==
%d\n",wlr_nextbatch, wlcr_batches);
+ return MAL_SUCCEED;
}
str
-WLCRreplicate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+WLRreplicate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{ str msg;
MT_Id wlcr_thread;
(void) mb;
@@ -414,34 +441,23 @@ WLRquery(Client cntxt, MalBlkPtr mb, Mal
return msg;
}
+/* A change event need not be executed, because it is already captured
+ * in the update/append/delete
+ */
str
WLRchange(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{ str qry = *getArgReference_str(stk,pci,1);
- str msg = MAL_SUCCEED;
- char *x, *y, *qtxt;
-
+{
+ (void) cntxt;
+ (void) pci;
+ (void) stk;
(void) mb;
- if( cntxt->wlcr_kind == WLCR_ROLLBACK)
- return msg;
- // we need to get rid of the escaped quote.
- x = qtxt= (char*) GDKmalloc(strlen(qry) +1);
- for(y = qry; *y; y++){
- if( *y == '\\' ){
- if( *(y+1) == '\'')
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list