Changeset: 2e0ad40d1449 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=2e0ad40d1449
Modified Files:
        sql/backends/monet5/Makefile.ag
        sql/backends/monet5/sql_wlcr.c
        sql/backends/monet5/sql_wlcr.h
        sql/backends/monet5/sql_wlcr.mal
        sql/scripts/60_wlcr.sql
Branch: wlcr
Log Message:

Create the replica server process
Prepare for processing the wlcr log files at the replica.


diffs (284 lines):

diff --git a/sql/backends/monet5/Makefile.ag b/sql/backends/monet5/Makefile.ag
--- a/sql/backends/monet5/Makefile.ag
+++ b/sql/backends/monet5/Makefile.ag
@@ -64,7 +64,7 @@ headers_mal = {
        HEADERS = mal
        DIR = libdir/monetdb5
        SOURCES = sql_aggr_bte.mal  sql_aggr_flt.mal sql_aggr_dbl.mal  
sql_aggr_int.mal  sql_aggr_lng.mal \
-               sql_aggr_sht.mal  sql_decimal.mal  sql_inspect.mal sql_wlcr.mal\
+               sql_aggr_sht.mal  sql_decimal.mal  sql_inspect.mal sql_wlcr.mal 
\
                sql_rank.mal sqlcatalog.mal sql_transaction.mal  sql.mal
 }
 
diff --git a/sql/backends/monet5/sql_wlcr.c b/sql/backends/monet5/sql_wlcr.c
--- a/sql/backends/monet5/sql_wlcr.c
+++ b/sql/backends/monet5/sql_wlcr.c
@@ -16,91 +16,153 @@
  * Alternatively you start with an empty database.
  *
  * Since the wlcr files can be stored anywhere, the full path should be given.
+ *
+ * At any time there can only be on choice to interpret the files.
  */
 #include "monetdb_config.h"
 #include "sql.h"
 #include "wlcr.h"
 #include "sql_wlcr.h"
+#include "mal_client.h"
+
+#define WLCR_REPLAY 1
+#define WLCR_SYNC 2
+static int wlcr_mode;
 
 static str wlcr_master;
 static int wlcr_replaythreshold;
-static int wlcr_replaybatch;
+static int wlcr_replaybatches;
+
+static MT_Id wlcr_thread;
 
 static str
-WLCRinit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+WLCRreplayinit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
-    int i = 1, j,k,l;
+    int i = 1, j,k;
        char path[PATHLENGTH];
+       str dbname,dir;
        FILE *fd;
 
        (void) cntxt;
        (void) k;
-       (void) l;
 
-    wlcr_master = GDKgetenv("gdk_master");
-    if (i< pci->argc+1 && getArgType(mb, pci, i) == TYPE_str){
-        wlcr_master =  *getArgReference_str(stk,pci,i);
+    if (getArgType(mb, pci, i) == TYPE_str){
+        dbname =  *getArgReference_str(stk,pci,i);
         i++;
     }
-       if( wlcr_master == NULL){
-               throw(SQL,"wlcr.init","Can not access the wlcr directory");
+       if( dbname == NULL){
+               throw(SQL,"wlcr.init","Master database name missing.");
        }
-       snprintf(path,PATHLENGTH,"%s%cwlcr", wlcr_master, DIR_SEP);
-       mnstr_printf(cntxt->fdout,"#Testing '%s'\n", path);
+       snprintf(path,PATHLENGTH,"..%c%s",DIR_SEP,dbname);
+       dir = GDKfilepath(0,path,"master",0);
+       wlcr_master = GDKstrdup(dir);
+       mnstr_printf(cntxt->fdout,"#WLCR master '%s'\n", wlcr_master);
+       snprintf(path,PATHLENGTH,"%s%cwlcr", dir, DIR_SEP);
+       mnstr_printf(cntxt->fdout,"#Testing access to master '%s'\n", path);
        fd = fopen(path,"r");
        if( fd == NULL){
-               throw(SQL,"wlcr.init","Can not access '%s'\n",path);
+               throw(SQL,"wlcr.init","Can not access master control file 
'%s'\n",path);
        }
-       if( fscanf(fd,"%d %d %d", &j,&k,&l) != 3){
+       if( fscanf(fd,"%d %d", &j,&k) != 2){
                throw(SQL,"wlcr.init","'%s' does not have proper number of 
arguments\n",path);
        }
-       wlcr_replaybatch = j;
+       wlcr_replaybatches = j;
 
-    if ( i < pci->argc+1 && getArgType(mb, pci, i) == TYPE_int){
+    if ( i < pci->argc && getArgType(mb, pci, i) == TYPE_int){
         wlcr_replaythreshold = *getArgReference_int(stk,pci,i);
        }
        return MAL_SUCCEED;
 }
 
+void
+WLCRprocess(void *arg)
+{      
+       Client cntxt = (Client) arg;
+       int i;
+       char path[PATHLENGTH];
+       stream *fd;
+       Client c;
+
+       c =MCforkClient(cntxt);
+       if( c == 0){
+               GDKerror("Could not create user for WLCR process\n");
+               return;
+       }
+    c->prompt = GDKstrdup("");  /* do not produce visible prompts */
+    c->promptlength = 0;
+    c->listing = 0;
+
+       mnstr_printf(cntxt->fdout,"#Ready to start the replayagainst '%s' 
batches %d threshold %d", wlcr_master, wlcr_replaybatches, 
wlcr_replaythreshold);
+       for( i= 0; i < wlcr_replaybatches; i++){
+               snprintf(path,PATHLENGTH,"%s%cwlcr_%06d", wlcr_master, 
DIR_SEP,i);
+               mnstr_printf(cntxt->fdout,"#WLCR processing %s\n",path);
+               fd= open_rstream(path);
+               if( c->fdin == NULL || MCpushClientInput(c, bstream_create(fd, 
128 * BLOCK), 0, "") < 0){
+                       mnstr_printf(cntxt->fdout,"#wlcr.replay:'%s' can not be 
accessed \n",path);
+               }
+               c->yycur = 0;
+               // preload the complete file
+               // now parse the file line by line
+               close_stream(fd);
+       }
+}
+
 str
 WLCRreplay(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {      str msg;
-       int i;
        char path[PATHLENGTH];
        stream *fd;
 
-       msg = WLCRinit(cntxt, mb, stk, pci);
-       mnstr_printf(cntxt->fdout,"#Ready to start the replay against '%s' 
threshold %d", wlcr_master, wlcr_replaythreshold);
+       if( wlcr_mode == WLCR_SYNC){
+               throw(SQL,"wlcr.replay","System already in synchronization 
mode");
+       }
+       if( wlcr_mode == WLCR_REPLAY){
+               throw(SQL,"wlcr.replay","System already in replay mode");
+       }
+       wlcr_mode = WLCR_REPLAY;
+       msg = WLCRreplayinit(cntxt, mb, stk, pci);
+       if( msg)
+               return msg;
 
-       for( i= 0; i < wlcr_replaybatch; i++){
-               snprintf(path,PATHLENGTH,"%s%cwlcr_%06d", wlcr_master, 
DIR_SEP,i);
-               fd= open_rstream(path);
-               if( fd == NULL){
-                       throw(SQL,"wlcr.replay","'%s' can not be accessed 
\n",path);
-               }
-               close_stream(fd);
+       snprintf(path,PATHLENGTH,"%s%cwlcr", wlcr_master, DIR_SEP);
+       fd= open_rstream(path);
+       if( fd == NULL){
+               throw(SQL,"wlcr.replay","'%s' can not be accessed \n",path);
        }
-       return msg;
+       close_stream(fd);
+
+    if (MT_create_thread(&wlcr_thread, WLCRprocess, (void*) cntxt, 
MT_THR_JOINABLE) < 0) {
+                       throw(SQL,"wlcr.replay","can not be accessed \n");
+       }
+       return MAL_SUCCEED;
 }
 
 str
 WLCRsynchronize(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {      str msg;
-       int i;
        char path[PATHLENGTH];
        stream *fd;
 
-       msg = WLCRinit(cntxt, mb, stk, pci);
-       mnstr_printf(cntxt->fdout,"#Ready to start the synchronization against 
'%s' threshold %d", wlcr_master, wlcr_replaythreshold);
+       if( wlcr_mode == WLCR_SYNC){
+               throw(SQL,"wlcr.replay","System already in synchronization 
mode");
+       }
+       if( wlcr_mode == WLCR_REPLAY){
+               throw(SQL,"wlcr.replay","System already in replay mode");
+       }
+       snprintf(path,PATHLENGTH,"%s%cwlcr", wlcr_master, DIR_SEP);
+       fd= open_rstream(path);
+       if( fd == NULL){
+               throw(SQL,"wlcr.replay","'%s' can not be accessed \n",path);
+       }
+       close_stream(fd);
 
-       for( i= 0; i < wlcr_replaybatch; i++){
-               snprintf(path,PATHLENGTH,"%s%cwlcr_%06d", wlcr_master, 
DIR_SEP,i);
-               fd= open_rstream(path);
-               if( fd == NULL){
-                       throw(SQL,"wlcr.synchronize","'%s' can not be accessed 
\n",path);
-               }
-               close_stream(fd);
+       wlcr_mode = WLCR_SYNC;
+       msg = WLCRreplayinit(cntxt, mb, stk, pci);
+       if( msg)
+               return msg;
+    if (MT_create_thread(&wlcr_thread, WLCRprocess, (void*) cntxt, 
MT_THR_JOINABLE) < 0) {
+                       throw(SQL,"wlcr.synchronize","can not be started \n");
        }
-       return msg;
+       return MAL_SUCCEED;
 }
 
diff --git a/sql/backends/monet5/sql_wlcr.h b/sql/backends/monet5/sql_wlcr.h
--- a/sql/backends/monet5/sql_wlcr.h
+++ b/sql/backends/monet5/sql_wlcr.h
@@ -17,6 +17,7 @@
 
 /*
  */
+extern void WLCRprocess(void *arg);
 extern str WLCRreplay(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 extern str WLCRsynchronize(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 
diff --git a/sql/backends/monet5/sql_wlcr.mal b/sql/backends/monet5/sql_wlcr.mal
--- a/sql/backends/monet5/sql_wlcr.mal
+++ b/sql/backends/monet5/sql_wlcr.mal
@@ -4,19 +4,17 @@
 #
 # Copyright 1997 - July 2008 CWI, August 2008 - 2017 MonetDB B.V.
 
-pattern wlcr.replay()
+module wlcr;
+
+pattern wlcr.replay(dbname:str)
 address WLCRreplay
-comment "Replay all against a freshly reloaded snapshot";
+comment "Re-run all operations from another database in the same dbfarm";
 
-pattern wlcr.replay(path:str)
+pattern wlcr.replay(dbname:str, threshold:int)
 address WLCRreplay
-comment "Replay all against a freshly reloaded snapshot";
+comment "Re-run all operations from another database in the dbfarm using a 
query execution threshold";
 
-pattern wlcr.synchronize()
+pattern wlcr.synchronize(dbname:str)
 address WLCRsynchronize
 comment "Roll the snapshot forward";
 
-pattern wlcr.synchronize(path:str)
-address WLCRsynchronize
-comment "Roll the snapshot forward";
-
diff --git a/sql/scripts/60_wlcr.sql b/sql/scripts/60_wlcr.sql
--- a/sql/scripts/60_wlcr.sql
+++ b/sql/scripts/60_wlcr.sql
@@ -11,30 +11,21 @@ create schema wlcr;
 create procedure wlcr.master()
 external name wlcr.master;
 
-create procedure wlcr.master(path string)
-external name wlcr.master;
-
 create procedure wlcr.master(threshold integer)
 external name wlcr.master;
 
-create procedure wlcr.master(path string, threshold integer)
-external name wlcr.master;
-
 create procedure wlcr.replay()
 external name wlcr.replay;
 
 create procedure wlcr.replay(threshold int)
 external name wlcr.replay;
 
-create procedure wlcr.replay(path string)
+create procedure wlcr.replay(dbname string)
 external name wlcr.replay;
 
-create procedure wlcr.replay(path string, threshold int)
+create procedure wlcr.replay(dbname string, threshold int)
 external name wlcr.replay;
 
-create procedure wlcr.synchronize(path string)
+create procedure wlcr.synchronize(dbname string)
 external name wlcr.synchronize;
 
-create procedure wlcr.synchronize()
-external name wlcr.synchronize;
-
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to