Changeset: 0c37014e5d2c for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0c37014e5d2c
Added Files:
sql/test/wlcr/Tests/wlr40.reqtests
Modified Files:
monetdb5/modules/mal/wlcr.c
monetdb5/modules/mal/wlcr.h
monetdb5/modules/mal/wlcr.mal
monetdb5/optimizer/opt_wlcr.c
sql/backends/monet5/sql_wlcr.c
sql/backends/monet5/sql_wlcr.h
sql/backends/monet5/sql_wlcr.mal
sql/scripts/60_wlcr.sql
sql/test/wlcr/Tests/wlc01.py
sql/test/wlcr/Tests/wlr01.py
Branch: wlcr
Log Message:
Added query filtering during capture
diffs (truncated from 374 to 300 lines):
diff --git a/monetdb5/modules/mal/wlcr.c b/monetdb5/modules/mal/wlcr.c
--- a/monetdb5/modules/mal/wlcr.c
+++ b/monetdb5/modules/mal/wlcr.c
@@ -50,17 +50,17 @@
* The end of the transaction is marked as exec()
*
* Logging of queries can be limited to those that satisfy an minimum
execution threshold.
- * SET replaythreshold= <number>
+ * CALL logthreshold(duration)
* The threshold is given in milliseconds. A negative threshold leads to
ignoring all queries.
- * The threshold setting is not saved because it is a client session specific
action.
- * The default for a production system version is set to -1
+ * The threshold setting is saved and affects all future master log records.
+ * The default for a production system version should be set to -1
*
* A transaction log is owned by the master. He decides when the log may be
globally
* used. There are several triggers for this. A new transaction log is created
when
* the system has been collecting logs for some time (drift).
* The problem here is that we should ensure that the log file is closed even
if there
- * are no transactions running. After closing, the replicas can see from the
- * master configuration file that a new batch is available.
+ * are no transactions running. It is solved with a separate thread.
+ * After closing, the replicas can see from the master configuration file that
a log is available.
* The maximum drift can be set using a SQL command. Setting it to zero leads
to a
* log file per transaction.
*
@@ -68,22 +68,26 @@
* monetdb master <dbname> [ <optional snapshot path>]
* which locks the database, takes a save copy, initializes the state chance.
*
- * A replica can be constructed as follows:
+ * A fresh replica can be constructed as follows:
* monetdb replica <dbname> <mastername>
*
* Instead of using the monetdb command line we can use the SQL calls directly
* master and replica, provided we start with a fresh database.
*
- * Processing the log files starts in the background using the call.
+ * A fresh database can be turned into a replica using the call
* CALL replicate("mastername")
+ * It will grab the latest snapshot of the master and applies all
+ * known log files before releasing returning a prompt. Progress of
+ * the replication can be monitored using the -fraw option in mclient.
+ *
* It will iterate through the log files, applying all transactions.
* Queries are simply ignored unless needed as replacement for catalog actions.
*
- * The alternative is to replay only the query log
- * CALL replicate("dbname",threshold)
+ * The alternative is to also replay the queries as well.
+ * CALL replaythreshold(threshold)
* In this mode all pure queries are executed under the credentials of the
query owner
* for which the reported threshold exceeds the argument[TODO].
- * It excludes catalog and update queries.
+ * It excludes catalog and update queries, which are always executed.
*
* Any failure encountered during a log replay terminates the process,
* leaving a message in the merovingian log.
@@ -110,9 +114,9 @@ static str wlcr_logs = 0; // The locati
static char wlcr_time[26]; // The timestamp of the last committed
transaction.
static stream *wlcr_fd = 0;
static int wlcr_start = 0; // time stamp of first transaction in log file
-int wlcr_threshold;
// These properties are needed by the replica to direct the roll-forward.
+int wlcr_threshold = 0; // should be set to -1 for production
str wlcr_dbname = 0; // The master database name
int wlcr_firstbatch = 0; // first log file associated with the snapshot
int wlcr_batches = 0; // identifier of next batch
@@ -153,6 +157,8 @@ str WLCgetConfig(void){
wlcr_batches = atoi(path+ 8);
if( strncmp("drift=", path, 6) == 0)
wlcr_drift = atoi(path+ 6);
+ if( strncmp("threshold=", path, 10) == 0)
+ wlcr_threshold = atoi(path+ 10);
}
fclose(fd);
return MAL_SUCCEED;
@@ -180,6 +186,7 @@ str WLCsetConfig(void){
fprintf(fd,"firstbatch=%d\n", wlcr_firstbatch);
fprintf(fd,"batches=%d\n", wlcr_batches );
fprintf(fd,"drift=%d\n", wlcr_drift );
+ fprintf(fd,"threshold=%d\n", wlcr_threshold );
fclose(fd);
return MAL_SUCCEED;
}
@@ -322,7 +329,7 @@ WLCinitCmd(Client cntxt, MalBlkPtr mb, M
}
str
-WLCthreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+WLClogthreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
(void) mb;
(void) cntxt;
@@ -708,21 +715,23 @@ WLCwrite(Client cntxt, str kind)
if ( wlcr_fd == NULL)
throw(MAL,"wlcr.write","WLC log file not accessible");
else {
+ // filter out queries that run too shortly
+ p = getInstrPtr(cntxt->wlcr,0);
+ if( cntxt->wlcr_kind != WLCR_QUERY || wlcr_threshold
== 0 || wlcr_threshold < GDKms() - p->ticks ) {
+ newStmt(cntxt->wlcr,"wlr","exec");
+ wlcr_tid++;
+ MT_lock_set(&wlcr_lock);
+ p = getInstrPtr(cntxt->wlcr,0);
+ p = pushStr(cntxt->wlcr,p,kind);
+ p = pushStr(cntxt->wlcr, p,
wlcr_name[cntxt->wlcr_kind]);
+ p = pushLng(cntxt->wlcr,p, GDKms() - p->ticks);
+ printFunction(wlcr_fd, cntxt->wlcr, 0,
LIST_MAL_DEBUG );
+ (void) mnstr_flush(wlcr_fd);
+ if( wlcr_drift == 0 || wlcr_start + wlcr_drift
< GDKms()/1000)
+ WLCcloselogger();
-
- newStmt(cntxt->wlcr,"wlr","exec");
- wlcr_tid++;
- MT_lock_set(&wlcr_lock);
- p = getInstrPtr(cntxt->wlcr,0);
- p = pushStr(cntxt->wlcr,p,kind);
- p = pushStr(cntxt->wlcr, p,
wlcr_name[cntxt->wlcr_kind]);
- p = pushLng(cntxt->wlcr,p, GDKms() - p->ticks);
- printFunction(wlcr_fd, cntxt->wlcr, 0, LIST_MAL_DEBUG );
- (void) mnstr_flush(wlcr_fd);
- if( wlcr_drift == 0 || wlcr_start + wlcr_drift <
GDKms()/1000)
- WLCcloselogger();
-
- MT_lock_unset(&wlcr_lock);
+ MT_lock_unset(&wlcr_lock);
+ }
trimMalVariables(cntxt->wlcr, NULL);
resetMalBlk(cntxt->wlcr, 0);
cntxt->wlcr_kind = WLCR_QUERY;
diff --git a/monetdb5/modules/mal/wlcr.h b/monetdb5/modules/mal/wlcr.h
--- a/monetdb5/modules/mal/wlcr.h
+++ b/monetdb5/modules/mal/wlcr.h
@@ -22,7 +22,7 @@
#define WLCR_CATALOG 3
#define WLCR_IGNORE 4
-mal_export int wlcr_threshold; // threshold (seconds) for sending readonly
queries
+mal_export int wlcr_threshold;
mal_export int wlcr_batches;
mal_export int wlcr_drift;
mal_export str wlcr_dbname;
@@ -34,7 +34,7 @@ mal_export str WLCgetConfig(void);
mal_export str WLCinitCmd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
mal_export str WLCmaster(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
mal_export str WLCstopmaster(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
-mal_export str WLCthreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
+mal_export str WLClogthreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
mal_export str WLCdrift(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
mal_export str WLCjob(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
mal_export str WLCexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
diff --git a/monetdb5/modules/mal/wlcr.mal b/monetdb5/modules/mal/wlcr.mal
--- a/monetdb5/modules/mal/wlcr.mal
+++ b/monetdb5/modules/mal/wlcr.mal
@@ -26,9 +26,9 @@ pattern drift(duration:int)
address WLCdrift
comment "Maximal duration of collecting a transaction log befor releasing it";
-pattern threshold(limit:int)
-address WLCthreshold
-comment "Activate the workload-capture-replay. Only queries surpassing the
threshold are kept for replay.";
+pattern logthreshold(limit:int)
+address WLClogthreshold
+comment "Activate the workload-capture-replay. Only queries surpassing the
threshold (in milliseconds) are kept.";
pattern job(user:str, tid:int, started:str, action:str, kind:str, runtime:int)
address WLCjob
diff --git a/monetdb5/optimizer/opt_wlcr.c b/monetdb5/optimizer/opt_wlcr.c
--- a/monetdb5/optimizer/opt_wlcr.c
+++ b/monetdb5/optimizer/opt_wlcr.c
@@ -49,8 +49,7 @@ OPTwlcrImplementation(Client cntxt, MalB
q->argc--; // no need for the userid
pushInstruction(mb,q);
} else
-/* CATALOG functions need not yet be reported explicitly.
- * They are already captured in the query define.
+ /* the catalog operations are needed to determine the job kind
later on */
if( getModuleId(p) == sqlcatalogRef &&
(
getFunctionId(p) == create_seqRef ||
@@ -72,7 +71,6 @@ OPTwlcrImplementation(Client cntxt, MalB
getArg(q,0) = newTmpVariable(mb,TYPE_any);
pushInstruction(mb,q);
} else
-*/
if( getModuleId(p) == sqlRef &&
(getFunctionId(p) == clear_tableRef ||
getFunctionId(p) == sqlcatalogRef) ){
diff --git a/sql/backends/monet5/sql_wlcr.c b/sql/backends/monet5/sql_wlcr.c
--- a/sql/backends/monet5/sql_wlcr.c
+++ b/sql/backends/monet5/sql_wlcr.c
@@ -14,8 +14,9 @@
*
* After restart of a mserver against the newly created image,
* the log files from the master are processed.
- * The SQL variable replaythreshold can be set to run/ignore pure queries as
well.
*
+ * In replay mode also all queries are executed if they surpass
+ * the latest threshold set for by the master.
*/
#include "monetdb_config.h"
#include "sql.h"
@@ -33,8 +34,8 @@
static str wlr_logs;
static str wlr_master;
static int wlr_nextbatch; // the next file to be processed
-static int wlr_tag; // the next transaction to be processed
-static int wlr_threshold = -1; // minimum time for a query to be re-executed
+static int wlr_tag; // the next transaction to be processeds
+static int wlr_threshold; // replay threshold set by user.
#define MAXLINE 2048
@@ -82,6 +83,16 @@ str WLRsetConfig(void){
return MAL_SUCCEED;
}
+str
+WLRreplaythreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ (void) mb;
+ (void) cntxt;
+ wlr_threshold = * getArgReference_int(stk,pci,1);
+ if( (wlr_threshold >=0 && wlr_threshold < wlcr_threshold) ||
wlcr_threshold < 0)
+ throw(SQL,"wlr.replaythreshold","Warning: threshold is smaller
then those currently reported");
+ return MAL_SUCCEED;
+}
/*
* When the master database exist, we should set the replica administration.
@@ -164,24 +175,6 @@ WLRinitReplica(str dbname)
return MAL_SUCCEED;
}
-
-static str
-WLRgetThreshold( Client cntxt, MalBlkPtr mb)
-{
- mvc *m = NULL;
- str msg;
- atom *a;
-
- if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL)
- return msg;
- if ((msg = checkSQLContext(cntxt)) != NULL)
- return msg;
- a = stack_get_var(m, "replaythreshold");
- if (a)
- wlr_threshold = a->data.val.ival;
- return MAL_SUCCEED;
-}
-
/*
* Run once through the list of pending WLC logs
* Continuing where you left off the previous time.
@@ -222,7 +215,6 @@ WLRprocess(void *arg)
if ((msg = checkSQLContext(c)) != NULL)
mnstr_printf(GDKerr,"#Inconsitent SQL contex : %s\n",msg);
- WLRgetThreshold(cntxt, 0);
#ifdef _WLR_DEBUG_
mnstr_printf(c->fdout,"#Ready to start the replay against '%s' batches
%d:%d threshold %d\n",
wlcr_archive, wlr_firstbatch, wlr_nextbatch, wlr_threshold);
@@ -327,6 +319,7 @@ WLRinit(Client cntxt)
if( wlr_logs){
// Always try to roll forward before you continue
cntxt->wlcr_mode = WLCR_REPLICATE;
+ // The client has to wait initially for all logs known to be
processed.
WLRprocess(cntxt);
if (MT_create_thread(&wlcr_thread, WLRprocessScheduler, (void*)
cntxt, MT_THR_JOINABLE) < 0) {
GDKerror("wlcr.replicate:replay scheduling
process can not be started");
@@ -348,7 +341,7 @@ WLCRreplicate(Client cntxt, MalBlkPtr mb
return msg;
cntxt->wlcr_mode = WLCR_REPLICATE;
- // For testing it is helpful to wait for completion of the replication
process
+ // The client has to wait initially for all logs known to be processed.
WLRprocess(cntxt);
// start the process for continual integration in the background
if (MT_create_thread(&wlcr_thread, WLRprocessScheduler, (void*) cntxt,
MT_THR_JOINABLE) < 0) {
@@ -367,7 +360,7 @@ WLRjob(Client cntxt, MalBlkPtr mb, MalSt
if( strcmp(kind,"update") == 0)
cntxt->wlcr_kind = WLCR_UPDATE;
if( strcmp(kind,"query") == 0){
- if(wlr_threshold < 0 || duration < wlr_threshold)
+ if(wlcr_threshold < 0 || duration < wlcr_threshold)
cntxt->wlcr_kind = WLCR_IGNORE;
else
cntxt->wlcr_kind = WLCR_QUERY;
diff --git a/sql/backends/monet5/sql_wlcr.h b/sql/backends/monet5/sql_wlcr.h
--- a/sql/backends/monet5/sql_wlcr.h
+++ b/sql/backends/monet5/sql_wlcr.h
@@ -21,6 +21,7 @@ extern void WLRinit(Client cntxt);
extern str WLCRreplicate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
extern str WLRjob(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
+extern str WLRreplaythreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list