Changeset: 2e0ad40d1449 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=2e0ad40d1449 Modified Files: sql/backends/monet5/Makefile.ag sql/backends/monet5/sql_wlcr.c sql/backends/monet5/sql_wlcr.h sql/backends/monet5/sql_wlcr.mal sql/scripts/60_wlcr.sql Branch: wlcr Log Message:
Create the replica server process Prepare for processing the wlcr log files at the replica. diffs (284 lines): diff --git a/sql/backends/monet5/Makefile.ag b/sql/backends/monet5/Makefile.ag --- a/sql/backends/monet5/Makefile.ag +++ b/sql/backends/monet5/Makefile.ag @@ -64,7 +64,7 @@ headers_mal = { HEADERS = mal DIR = libdir/monetdb5 SOURCES = sql_aggr_bte.mal sql_aggr_flt.mal sql_aggr_dbl.mal sql_aggr_int.mal sql_aggr_lng.mal \ - sql_aggr_sht.mal sql_decimal.mal sql_inspect.mal sql_wlcr.mal\ + sql_aggr_sht.mal sql_decimal.mal sql_inspect.mal sql_wlcr.mal \ sql_rank.mal sqlcatalog.mal sql_transaction.mal sql.mal } diff --git a/sql/backends/monet5/sql_wlcr.c b/sql/backends/monet5/sql_wlcr.c --- a/sql/backends/monet5/sql_wlcr.c +++ b/sql/backends/monet5/sql_wlcr.c @@ -16,91 +16,153 @@ * Alternatively you start with an empty database. * * Since the wlcr files can be stored anywhere, the full path should be given. + * + * At any time there can only be on choice to interpret the files. */ #include "monetdb_config.h" #include "sql.h" #include "wlcr.h" #include "sql_wlcr.h" +#include "mal_client.h" + +#define WLCR_REPLAY 1 +#define WLCR_SYNC 2 +static int wlcr_mode; static str wlcr_master; static int wlcr_replaythreshold; -static int wlcr_replaybatch; +static int wlcr_replaybatches; + +static MT_Id wlcr_thread; static str -WLCRinit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) +WLCRreplayinit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { - int i = 1, j,k,l; + int i = 1, j,k; char path[PATHLENGTH]; + str dbname,dir; FILE *fd; (void) cntxt; (void) k; - (void) l; - wlcr_master = GDKgetenv("gdk_master"); - if (i< pci->argc+1 && getArgType(mb, pci, i) == TYPE_str){ - wlcr_master = *getArgReference_str(stk,pci,i); + if (getArgType(mb, pci, i) == TYPE_str){ + dbname = *getArgReference_str(stk,pci,i); i++; } - if( wlcr_master == NULL){ - throw(SQL,"wlcr.init","Can not access the wlcr directory"); + if( dbname == NULL){ + throw(SQL,"wlcr.init","Master database name missing."); } - snprintf(path,PATHLENGTH,"%s%cwlcr", wlcr_master, DIR_SEP); - mnstr_printf(cntxt->fdout,"#Testing '%s'\n", path); + snprintf(path,PATHLENGTH,"..%c%s",DIR_SEP,dbname); + dir = GDKfilepath(0,path,"master",0); + wlcr_master = GDKstrdup(dir); + mnstr_printf(cntxt->fdout,"#WLCR master '%s'\n", wlcr_master); + snprintf(path,PATHLENGTH,"%s%cwlcr", dir, DIR_SEP); + mnstr_printf(cntxt->fdout,"#Testing access to master '%s'\n", path); fd = fopen(path,"r"); if( fd == NULL){ - throw(SQL,"wlcr.init","Can not access '%s'\n",path); + throw(SQL,"wlcr.init","Can not access master control file '%s'\n",path); } - if( fscanf(fd,"%d %d %d", &j,&k,&l) != 3){ + if( fscanf(fd,"%d %d", &j,&k) != 2){ throw(SQL,"wlcr.init","'%s' does not have proper number of arguments\n",path); } - wlcr_replaybatch = j; + wlcr_replaybatches = j; - if ( i < pci->argc+1 && getArgType(mb, pci, i) == TYPE_int){ + if ( i < pci->argc && getArgType(mb, pci, i) == TYPE_int){ wlcr_replaythreshold = *getArgReference_int(stk,pci,i); } return MAL_SUCCEED; } +void +WLCRprocess(void *arg) +{ + Client cntxt = (Client) arg; + int i; + char path[PATHLENGTH]; + stream *fd; + Client c; + + c =MCforkClient(cntxt); + if( c == 0){ + GDKerror("Could not create user for WLCR process\n"); + return; + } + c->prompt = GDKstrdup(""); /* do not produce visible prompts */ + c->promptlength = 0; + c->listing = 0; + + mnstr_printf(cntxt->fdout,"#Ready to start the replayagainst '%s' batches %d threshold %d", wlcr_master, wlcr_replaybatches, wlcr_replaythreshold); + for( i= 0; i < wlcr_replaybatches; i++){ + snprintf(path,PATHLENGTH,"%s%cwlcr_%06d", wlcr_master, DIR_SEP,i); + mnstr_printf(cntxt->fdout,"#WLCR processing %s\n",path); + fd= open_rstream(path); + if( c->fdin == NULL || MCpushClientInput(c, bstream_create(fd, 128 * BLOCK), 0, "") < 0){ + mnstr_printf(cntxt->fdout,"#wlcr.replay:'%s' can not be accessed \n",path); + } + c->yycur = 0; + // preload the complete file + // now parse the file line by line + close_stream(fd); + } +} + str WLCRreplay(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { str msg; - int i; char path[PATHLENGTH]; stream *fd; - msg = WLCRinit(cntxt, mb, stk, pci); - mnstr_printf(cntxt->fdout,"#Ready to start the replay against '%s' threshold %d", wlcr_master, wlcr_replaythreshold); + if( wlcr_mode == WLCR_SYNC){ + throw(SQL,"wlcr.replay","System already in synchronization mode"); + } + if( wlcr_mode == WLCR_REPLAY){ + throw(SQL,"wlcr.replay","System already in replay mode"); + } + wlcr_mode = WLCR_REPLAY; + msg = WLCRreplayinit(cntxt, mb, stk, pci); + if( msg) + return msg; - for( i= 0; i < wlcr_replaybatch; i++){ - snprintf(path,PATHLENGTH,"%s%cwlcr_%06d", wlcr_master, DIR_SEP,i); - fd= open_rstream(path); - if( fd == NULL){ - throw(SQL,"wlcr.replay","'%s' can not be accessed \n",path); - } - close_stream(fd); + snprintf(path,PATHLENGTH,"%s%cwlcr", wlcr_master, DIR_SEP); + fd= open_rstream(path); + if( fd == NULL){ + throw(SQL,"wlcr.replay","'%s' can not be accessed \n",path); } - return msg; + close_stream(fd); + + if (MT_create_thread(&wlcr_thread, WLCRprocess, (void*) cntxt, MT_THR_JOINABLE) < 0) { + throw(SQL,"wlcr.replay","can not be accessed \n"); + } + return MAL_SUCCEED; } str WLCRsynchronize(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) { str msg; - int i; char path[PATHLENGTH]; stream *fd; - msg = WLCRinit(cntxt, mb, stk, pci); - mnstr_printf(cntxt->fdout,"#Ready to start the synchronization against '%s' threshold %d", wlcr_master, wlcr_replaythreshold); + if( wlcr_mode == WLCR_SYNC){ + throw(SQL,"wlcr.replay","System already in synchronization mode"); + } + if( wlcr_mode == WLCR_REPLAY){ + throw(SQL,"wlcr.replay","System already in replay mode"); + } + snprintf(path,PATHLENGTH,"%s%cwlcr", wlcr_master, DIR_SEP); + fd= open_rstream(path); + if( fd == NULL){ + throw(SQL,"wlcr.replay","'%s' can not be accessed \n",path); + } + close_stream(fd); - for( i= 0; i < wlcr_replaybatch; i++){ - snprintf(path,PATHLENGTH,"%s%cwlcr_%06d", wlcr_master, DIR_SEP,i); - fd= open_rstream(path); - if( fd == NULL){ - throw(SQL,"wlcr.synchronize","'%s' can not be accessed \n",path); - } - close_stream(fd); + wlcr_mode = WLCR_SYNC; + msg = WLCRreplayinit(cntxt, mb, stk, pci); + if( msg) + return msg; + if (MT_create_thread(&wlcr_thread, WLCRprocess, (void*) cntxt, MT_THR_JOINABLE) < 0) { + throw(SQL,"wlcr.synchronize","can not be started \n"); } - return msg; + return MAL_SUCCEED; } diff --git a/sql/backends/monet5/sql_wlcr.h b/sql/backends/monet5/sql_wlcr.h --- a/sql/backends/monet5/sql_wlcr.h +++ b/sql/backends/monet5/sql_wlcr.h @@ -17,6 +17,7 @@ /* */ +extern void WLCRprocess(void *arg); extern str WLCRreplay(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); extern str WLCRsynchronize(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); diff --git a/sql/backends/monet5/sql_wlcr.mal b/sql/backends/monet5/sql_wlcr.mal --- a/sql/backends/monet5/sql_wlcr.mal +++ b/sql/backends/monet5/sql_wlcr.mal @@ -4,19 +4,17 @@ # # Copyright 1997 - July 2008 CWI, August 2008 - 2017 MonetDB B.V. -pattern wlcr.replay() +module wlcr; + +pattern wlcr.replay(dbname:str) address WLCRreplay -comment "Replay all against a freshly reloaded snapshot"; +comment "Re-run all operations from another database in the same dbfarm"; -pattern wlcr.replay(path:str) +pattern wlcr.replay(dbname:str, threshold:int) address WLCRreplay -comment "Replay all against a freshly reloaded snapshot"; +comment "Re-run all operations from another database in the dbfarm using a query execution threshold"; -pattern wlcr.synchronize() +pattern wlcr.synchronize(dbname:str) address WLCRsynchronize comment "Roll the snapshot forward"; -pattern wlcr.synchronize(path:str) -address WLCRsynchronize -comment "Roll the snapshot forward"; - diff --git a/sql/scripts/60_wlcr.sql b/sql/scripts/60_wlcr.sql --- a/sql/scripts/60_wlcr.sql +++ b/sql/scripts/60_wlcr.sql @@ -11,30 +11,21 @@ create schema wlcr; create procedure wlcr.master() external name wlcr.master; -create procedure wlcr.master(path string) -external name wlcr.master; - create procedure wlcr.master(threshold integer) external name wlcr.master; -create procedure wlcr.master(path string, threshold integer) -external name wlcr.master; - create procedure wlcr.replay() external name wlcr.replay; create procedure wlcr.replay(threshold int) external name wlcr.replay; -create procedure wlcr.replay(path string) +create procedure wlcr.replay(dbname string) external name wlcr.replay; -create procedure wlcr.replay(path string, threshold int) +create procedure wlcr.replay(dbname string, threshold int) external name wlcr.replay; -create procedure wlcr.synchronize(path string) +create procedure wlcr.synchronize(dbname string) external name wlcr.synchronize; -create procedure wlcr.synchronize() -external name wlcr.synchronize; - _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list