Changeset: f097aee982a1 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f097aee982a1
Added Files:
sql/backends/monet5/sql_wlcr.c
sql/backends/monet5/sql_wlcr.h
sql/backends/monet5/sql_wlcr.mal
sql/scripts/60_wlcr.sql
Modified Files:
monetdb5/modules/mal/wlcr.c
monetdb5/modules/mal/wlcr.h
monetdb5/modules/mal/wlcr.mal
sql/backends/monet5/40_sql.mal
sql/backends/monet5/Makefile.ag
sql/scripts/Makefile.ag
Branch: wlcr
Log Message:
Basic handling of WLCR logs writes
diffs (truncated from 550 to 300 lines):
diff --git a/monetdb5/modules/mal/wlcr.c b/monetdb5/modules/mal/wlcr.c
--- a/monetdb5/modules/mal/wlcr.c
+++ b/monetdb5/modules/mal/wlcr.c
@@ -7,62 +7,129 @@
*/
/*
- * (c) Martin Kersten
+ * (c) 2017 Martin Kersten
* This module collects the workload-capture-replay statements during
transaction execution.
+ * It is used primarilly for replication management and workload replay
+ *
+ * The goal is to maintain a replica of a master database. All data of the
master
+ * is basically only available for read only access. Accidental corruption of
this
+ * data is avoided by setting ownership and access properties at the SQL level
in the replica.
+ *
+ *
+ * IMPLEMENTATION
+ *
+ * The replica directory should be on a shared (global) file system.
+ * As default we use dbfarm/master.
+ *
+ * The binary dump for the database snapshot should be stored there in
master/bat.
+ * The associated log files are stored as master/wlcr<number>.
+ * Creation and restore of a snapshot should be a monetdb option. TODO
+ *
+ * Replication management start when you run the command
+ * CALL wlcr.master("(full)path to snapshot dir")
+ * It can also be passed as a command line parameter
+ * --set wlcr_dir="(full)path to snapshot dir"
*
* Each wlcr log file contains a serial log for a transaction batch.
- * Each job is identified by the original owner of the query, the snapshot
identity (name+nr) against
- * which it was ran, an indication of the kind of transaction and
commit/rollback, its runtime (in ms) and starting time.
+ * Each job is identified by the owner of the query, the snapshot tag,
+ * commit/rollback, its starting time and runtime (in ms).
*
- * Replaying the wlcr against another server based on the same snapshot should
produce a perfect copy.
- * Each job should be executed using the credentials of the user issuing the
transaction.
- * Any failuer encountered terminates the replication process.
+ * Logging of queries can be further limited to those that satisfy a threshold.
+ * CALL wlcr.master("(full)path to snapshot dir", threshold)
+ * The threshold is given in milliseconds. A negative threshold leads to
ignoring all queries.
+ *
+ * A replica server should issue the matching call
+ * CALL wlcr.synchronize("(full)path to snapshot dir")
+ *
+ * During synchronization only updates are executed for the user responsible
for the call.
+ * Queries are simply ignored unless needed as replacement for update actions.
+ *
+ * The alternative is to replay the log
+ * CALL wlcr.replay("(full)path to snapshot dir")
+ * In this mode all queries are executed under the credentials of the query
owner, including those that lead to updates.
+ *
+ * Any failure encountered terminates the synchronization process, leaving a
message in the merovingian log.
+ *
+ * The replay progress can be inspected using the function wlcr.drift() and
wlcr.synced().
+ * The latter is true if all accessible log files have been processed.
*
- * All wlcr files should be stored on a shared file system for all replicas to
access.
- * The default is a subdirectory of the database and act as a secondary
database rebuild log.
- * The location can be overruled using a full path to a shared disk as
GDKenvironemnt variable (wlcr_dir)
+ * The wlcr files purposely have a textual format derived from the MAL
statements.
+ * It creates some overhead for copy into situations.
*
- * The wlcr files have a textual format derived from the MAL statements.
- * This can be used to ease the implementation of the wlreplay
+ * The integrity of the wlcr directories is critical. For now we assume that
all batches are available.
+ * We should detect that wlcr.master() is issued after updates have taken
place on the snapshot TODO.
*
- * The logs may only be removed after a new snapshot has been taken or wlcr is
disabled
*/
#include "monetdb_config.h"
+#include <time.h>
#include "mal_builder.h"
#include "wlcr.h"
static MT_Lock wlcr_lock MT_LOCK_INITIALIZER("wlcr_lock");
-int wlcr_duration = INT_MAX; // how long to capture default= 0
int wlcr_threshold = 0; // threshold (milliseconds) for keeping readonly
queries
-int wlcr_deltas = 1; // sent the delta values
-int wlcr_all = 1; // also ship failed transaction
-str wlcr_snapshot= "baseline"; // name assigned to the snapshot
-int wlcr_unit = 0; // last job executed
+str wlcr_snapshot= 0; // name assigned to the snapshot
+int wlcr_batch = 0; // last job executed
static char *wlcr_name[]= {"","query","update","catalog"};
static stream *wlcr_fd = 0;
-static str wlcr_log = "/tmp/wlcr";
+static str wlcr_dir = 0;
+
+/* The database snapshots are binary copies of the dbfarm/database/bat
+ * New snapshots are created currently using the 'monetdb snapshot <db>'
command
+ * or a SQL procedure.
+ * It requires a database halt.
+ *
+ * The wlcr logs are stored in the snapshot directory as a time-stamped list
+ */
+str
+WLCRmaster(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ char path[PATHLENGTH];
+ FILE *fd;
+ int i = 1;
+ (void) stk;
+ (void) pci;
+
+ wlcr_dir = GDKgetenv("gdk_wlcrdir");
+ if (i< pci->argc+1 && getArgType(mb, pci, i) == TYPE_str){
+ wlcr_dir = *getArgReference_str(stk,pci,i);
+ wlcr_dir = GDKfilepath(0,wlcr_dir,"master",0);
+ i++;
+ }
+ // if the master director does not exit, create it
+ if ( wlcr_dir == NULL)
+ wlcr_dir = GDKfilepath(0,0,"master",0);
+
+ if ( i < pci->argc+1 && getArgType(mb, pci, i) == TYPE_int)
+ wlcr_threshold = *getArgReference_int(stk,pci,i);
+
+ snprintf(path, PATHLENGTH,"%s%cwlcr",wlcr_dir, DIR_SEP);
+ if( GDKcreatedir(path) == GDK_FAIL)
+ mnstr_printf(cntxt->fdout,"#Could not create %s\n",wlcr_dir);
+ mnstr_printf(cntxt->fdout,"#Snapshot directory '%s'\n", wlcr_dir);
+
+ fd = fopen(path,"w");
+ if ( fd == NULL)
+ return createException(MAL,"wlcr.master","Unable to initialize
WLCR %s", path);
+ if( fscanf(fd,"%d %d", &wlcr_batch, &wlcr_threshold) != 3)
+ fprintf(fd,"0 %d\n", wlcr_threshold);
+ fclose(fd);
+ mnstr_printf(cntxt->fdout,"#master wlcr_batch %d\n",wlcr_batch,
wlcr_threshold);
+ return MAL_SUCCEED;
+}
static InstrPtr
WLCRaddtime(Client cntxt, InstrPtr pci, InstrPtr p)
{
- char *tbuf;
- char ctm[26];
+ char tbuf[26];
time_t clk = pci->clock.tv_sec;
+ struct tm ctm;
-#ifdef HAVE_CTIME_R3
- tbuf = ctime_r(&clk, ctm, sizeof(ctm));
-#else
-#ifdef HAVE_CTIME_R
- tbuf = ctime_r(&clk, ctm);
-#else
- tbuf = ctime(&clk);
-#endif
-#endif
- tbuf[19]=0;
+ ctm = *localtime(&clk);
+ strftime(tbuf, 26, "%Y-%m-%dT%H:%M:%S",&ctm);
return pushStr(cntxt->wlcr, p, tbuf);
}
@@ -77,36 +144,13 @@ WLCRaddtime(Client cntxt, InstrPtr pci,
if( cntxt->wlcr->stop == 0){\
p = newStmt(cntxt->wlcr,"wlreplay","job");\
p = pushStr(cntxt->wlcr,p, cntxt->username);\
- p = pushStr(cntxt->wlcr,p, wlcr_snapshot);\
- p = pushInt(cntxt->wlcr,p, wlcr_unit);\
+ p = pushStr(cntxt->wlcr,p, wlcr_snapshot?
wlcr_snapshot:"dummy");\
+ p = pushInt(cntxt->wlcr,p, wlcr_batch);\
p = WLCRaddtime(cntxt,pci, p); \
p->ticks = GDKms();\
} }
str
-WLCRproperties (Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
-{
- int i;
-
- (void) cntxt;
- (void) mb;
-
- i = *getArgReference_int(stk,pci,1);
- if ( i < 0)
- throw(MAL,"wlcr.properties","Duration must be a possitive
number");
- wlcr_duration = i;
-
- i = *getArgReference_int(stk,pci,2);
- if ( i < 0)
- throw(MAL,"wlcr.properties","Duration must be a possitive
number");
- wlcr_threshold = i;
-
- wlcr_deltas = *getArgReference_int(stk,pci,3) != 0;
- wlcr_all = *getArgReference_int(stk,pci,4) != 0;
- return MAL_SUCCEED;
-}
-
-str
WLCRjob(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
str snapshot = *getArgReference_str(stk,pci,1);
@@ -117,7 +161,7 @@ WLCRjob(Client cntxt, MalBlkPtr mb, MalS
if ( strcmp(snapshot, wlcr_snapshot))
throw(MAL,"wlcr.job","Incompatible snapshot identifier");
- if ( tid < wlcr_unit)
+ if ( tid < wlcr_batch)
throw(MAL,"wlcr.job","Work unit identifier is before last one
executed");
return MAL_SUCCEED;
}
@@ -366,36 +410,51 @@ WLCRclear_table(Client cntxt, MalBlkPtr
return MAL_SUCCEED;
}
+// creation of file and updating the version file should be atomic TODO!!!
static str
-WLCRnewlogger(Client cntxt)
+WLCRloggerfile(Client cntxt)
{
+ char path[PATHLENGTH];
+ FILE *fd;
+
(void) cntxt;
- // find next available file
- return GDKstrdup(wlcr_log);
+ snprintf(path,PATHLENGTH,"%s%cwlcr",wlcr_dir, DIR_SEP);
+ mnstr_printf(cntxt->fdout,"#WLCRloggerfile %s\n",wlcr_dir);
+ fd = fopen(path,"w");
+ if( fd == NULL)
+ return "unknown";
+ fprintf(fd,"%d %d\n", wlcr_batch, wlcr_threshold);
+ fclose(fd);
+ snprintf(path,PATHLENGTH,"%s%cwlcr_%06d",wlcr_dir,DIR_SEP,wlcr_batch);
+ wlcr_batch++;
+ return GDKstrdup(path);
}
static str
WLCRwrite(Client cntxt)
{ str fname;
// save the wlcr record on a file and ship it to registered slaves
- if ( wlcr_fd == NULL){
- fname= WLCRnewlogger(cntxt);
- wlcr_fd = open_wastream(fname);
+ if( wlcr_dir ){
+ if ( wlcr_fd == NULL){
+ fname= WLCRloggerfile(cntxt);
+ mnstr_printf(cntxt->fdout,"#WLCRloggerfile batch
%s\n",fname);
+ wlcr_fd = open_wastream(fname);
+ }
+ // Limit the size of the log files
+
+ if ( wlcr_fd == NULL)
+ throw(MAL,"wlcr.write","WLCR log file not accessible");
+
+ if(cntxt->wlcr->stop == 0)
+ return MAL_SUCCEED;
+
+ newStmt(cntxt->wlcr,"wlreplay","fin");
+ MT_lock_set(&wlcr_lock);
+ printFunction(wlcr_fd, cntxt->wlcr, 0, LIST_MAL_DEBUG );
+ (void) mnstr_flush(wlcr_fd);
+ wlcr_batch++;
+ MT_lock_unset(&wlcr_lock);
}
- // Limit the size of the log files
-
- if ( wlcr_fd == NULL)
- throw(MAL,"wlcr.write","WLCR log file not accessible");
-
- if(cntxt->wlcr->stop == 0)
- return MAL_SUCCEED;
-
- newStmt(cntxt->wlcr,"wlreplay","fin");
- MT_lock_set(&wlcr_lock);
- printFunction(wlcr_fd, cntxt->wlcr, 0, LIST_MAL_DEBUG );
- (void) mnstr_flush(wlcr_fd);
- wlcr_unit++;
- MT_lock_unset(&wlcr_lock);
#ifdef _DEBUG_WLCR_
printFunction(cntxt->fdout, cntxt->wlcr, 0, LIST_MAL_ALL );
@@ -437,7 +496,7 @@ WLCRrollback(Client cntxt)
InstrPtr p;
if( cntxt->wlcr){
- if (wlcr_all && cntxt->wlcr->stop){
+ if (cntxt->wlcr->stop){
p= getInstrPtr(cntxt->wlcr,0);
p = pushStr(cntxt->wlcr,p,"rollback");
p = pushStr(cntxt->wlcr, p,
wlcr_name[cntxt->wlcr_kind]);
diff --git a/monetdb5/modules/mal/wlcr.h b/monetdb5/modules/mal/wlcr.h
--- a/monetdb5/modules/mal/wlcr.h
+++ b/monetdb5/modules/mal/wlcr.h
@@ -21,14 +21,11 @@
#define WLCR_UPDATE 2
#define WLCR_CATALOG 3
-mal_export int wlcr_duration; // how long to capture default= 0
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list