Changeset: 0c37014e5d2c for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0c37014e5d2c
Added Files:
        sql/test/wlcr/Tests/wlr40.reqtests
Modified Files:
        monetdb5/modules/mal/wlcr.c
        monetdb5/modules/mal/wlcr.h
        monetdb5/modules/mal/wlcr.mal
        monetdb5/optimizer/opt_wlcr.c
        sql/backends/monet5/sql_wlcr.c
        sql/backends/monet5/sql_wlcr.h
        sql/backends/monet5/sql_wlcr.mal
        sql/scripts/60_wlcr.sql
        sql/test/wlcr/Tests/wlc01.py
        sql/test/wlcr/Tests/wlr01.py
Branch: wlcr
Log Message:

Added query filtering during capture


diffs (truncated from 374 to 300 lines):

diff --git a/monetdb5/modules/mal/wlcr.c b/monetdb5/modules/mal/wlcr.c
--- a/monetdb5/modules/mal/wlcr.c
+++ b/monetdb5/modules/mal/wlcr.c
@@ -50,17 +50,17 @@
  * The end of the transaction is marked as exec()
  *
  * Logging of queries can be limited to those that satisfy an minimum 
execution threshold.
- * SET replaythreshold= <number>
+ * CALL logthreshold(duration)
  * The threshold is given in milliseconds. A negative threshold leads to 
ignoring all queries.
- * The threshold setting is not saved because it is a client session specific 
action.
- * The default for a production system version is set to -1
+ * The threshold setting is saved and affects all future master log records.
+ * The default for a production system version should be set to -1
  *
  * A transaction log is owned by the master. He decides when the log may be 
globally
  * used. There are several triggers for this. A new transaction log is created 
when
  * the system has been collecting logs for some time (drift).
  * The problem here is that we should ensure that the log file is closed even 
if there
- * are no transactions running. After closing, the replicas can see from the
- * master configuration file that a new batch is available.
+ * are no transactions running. It is solved with a separate thread.
+ * After closing, the replicas can see from the master configuration file that 
a log is available.
  * The maximum drift can be set using a SQL command. Setting it to zero leads 
to a
  * log file per transaction.
  *
@@ -68,22 +68,26 @@
  *      monetdb master <dbname> [ <optional snapshot path>]
  * which locks the database, takes a save copy, initializes the state chance. 
  *
- * A replica can be constructed as follows:
+ * A fresh replica can be constructed as follows:
  *     monetdb replica <dbname> <mastername>
  *
  * Instead of using the monetdb command line we can use the SQL calls directly
  * master and replica, provided we start with a fresh database.
  *
- * Processing the log files starts in the background using the call.
+ * A fresh database can be turned into a replica using the call
  * CALL replicate("mastername")
+ * It will grab the latest snapshot of the master and applies all
+ * known log files before releasing returning a prompt. Progress of
+ * the replication can be monitored using the -fraw option in mclient.
+ *
  * It will iterate through the log files, applying all transactions.
  * Queries are simply ignored unless needed as replacement for catalog actions.
  *
- * The alternative is to replay only the query log
- * CALL replicate("dbname",threshold)
+ * The alternative is to also replay the queries as well.
+ * CALL replaythreshold(threshold)
  * In this mode all pure queries are executed under the credentials of the 
query owner
  * for which the reported threshold exceeds the argument[TODO].
- * It excludes catalog and update queries.
+ * It excludes catalog and update queries, which are always executed.
  *
  * Any failure encountered during a log replay terminates the process,
  * leaving a message in the merovingian log.
@@ -110,9 +114,9 @@ static str wlcr_logs = 0;   // The locati
 static char wlcr_time[26];     // The timestamp of the last committed 
transaction.
 static stream *wlcr_fd = 0;
 static int wlcr_start = 0;     // time stamp of first transaction in log file
-int wlcr_threshold;
 
 // These properties are needed by the replica to direct the roll-forward.
+int wlcr_threshold = 0;                // should be set to -1 for production
 str wlcr_dbname = 0;           // The master database name
 int wlcr_firstbatch = 0;       // first log file  associated with the snapshot
 int wlcr_batches = 0;          // identifier of next batch
@@ -153,6 +157,8 @@ str WLCgetConfig(void){
                        wlcr_batches = atoi(path+ 8);
                if( strncmp("drift=", path, 6) == 0)
                        wlcr_drift = atoi(path+ 6);
+               if( strncmp("threshold=", path, 10) == 0)
+                       wlcr_threshold = atoi(path+ 10);
        }
        fclose(fd);
        return MAL_SUCCEED;
@@ -180,6 +186,7 @@ str WLCsetConfig(void){
        fprintf(fd,"firstbatch=%d\n", wlcr_firstbatch);
        fprintf(fd,"batches=%d\n", wlcr_batches );
        fprintf(fd,"drift=%d\n", wlcr_drift );
+       fprintf(fd,"threshold=%d\n", wlcr_threshold );
        fclose(fd);
        return MAL_SUCCEED;
 }
@@ -322,7 +329,7 @@ WLCinitCmd(Client cntxt, MalBlkPtr mb, M
 }
 
 str 
-WLCthreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+WLClogthreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        (void) mb;
        (void) cntxt;
@@ -708,21 +715,23 @@ WLCwrite(Client cntxt, str kind)
                if ( wlcr_fd == NULL)
                        throw(MAL,"wlcr.write","WLC log file not accessible");
                else {
+                       // filter out queries that run too shortly
+                       p = getInstrPtr(cntxt->wlcr,0);
+                       if( cntxt->wlcr_kind != WLCR_QUERY ||  wlcr_threshold 
== 0 || wlcr_threshold < GDKms() - p->ticks ) {
+                               newStmt(cntxt->wlcr,"wlr","exec");
+                               wlcr_tid++;
+                               MT_lock_set(&wlcr_lock);
+                               p = getInstrPtr(cntxt->wlcr,0);
+                               p = pushStr(cntxt->wlcr,p,kind);
+                               p = pushStr(cntxt->wlcr, p, 
wlcr_name[cntxt->wlcr_kind]);
+                               p = pushLng(cntxt->wlcr,p, GDKms() - p->ticks);
+                               printFunction(wlcr_fd, cntxt->wlcr, 0, 
LIST_MAL_DEBUG );
+                               (void) mnstr_flush(wlcr_fd);
+                               if( wlcr_drift == 0 || wlcr_start + wlcr_drift 
< GDKms()/1000)
+                                       WLCcloselogger();
 
-
-                       newStmt(cntxt->wlcr,"wlr","exec");
-                       wlcr_tid++;
-                       MT_lock_set(&wlcr_lock);
-                       p = getInstrPtr(cntxt->wlcr,0);
-                       p = pushStr(cntxt->wlcr,p,kind);
-                       p = pushStr(cntxt->wlcr, p, 
wlcr_name[cntxt->wlcr_kind]);
-                       p = pushLng(cntxt->wlcr,p, GDKms() - p->ticks);
-                       printFunction(wlcr_fd, cntxt->wlcr, 0, LIST_MAL_DEBUG );
-                       (void) mnstr_flush(wlcr_fd);
-                       if( wlcr_drift == 0 || wlcr_start + wlcr_drift < 
GDKms()/1000)
-                               WLCcloselogger();
-
-                       MT_lock_unset(&wlcr_lock);
+                               MT_lock_unset(&wlcr_lock);
+                       }
                        trimMalVariables(cntxt->wlcr, NULL);
                        resetMalBlk(cntxt->wlcr, 0);
                        cntxt->wlcr_kind = WLCR_QUERY;
diff --git a/monetdb5/modules/mal/wlcr.h b/monetdb5/modules/mal/wlcr.h
--- a/monetdb5/modules/mal/wlcr.h
+++ b/monetdb5/modules/mal/wlcr.h
@@ -22,7 +22,7 @@
 #define WLCR_CATALOG   3
 #define WLCR_IGNORE            4
 
-mal_export int wlcr_threshold; // threshold (seconds) for sending readonly 
queries
+mal_export int wlcr_threshold;
 mal_export int wlcr_batches;
 mal_export int wlcr_drift;
 mal_export str wlcr_dbname;
@@ -34,7 +34,7 @@ mal_export str WLCgetConfig(void);
 mal_export str WLCinitCmd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 mal_export str WLCmaster(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 mal_export str WLCstopmaster(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
-mal_export str WLCthreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
+mal_export str WLClogthreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 mal_export str WLCdrift(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 mal_export str WLCjob(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 mal_export str WLCexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
diff --git a/monetdb5/modules/mal/wlcr.mal b/monetdb5/modules/mal/wlcr.mal
--- a/monetdb5/modules/mal/wlcr.mal
+++ b/monetdb5/modules/mal/wlcr.mal
@@ -26,9 +26,9 @@ pattern drift(duration:int)
 address WLCdrift
 comment "Maximal duration of collecting a transaction log befor releasing it";
 
-pattern threshold(limit:int)
-address WLCthreshold
-comment "Activate the workload-capture-replay. Only queries surpassing the 
threshold are kept for replay.";
+pattern logthreshold(limit:int)
+address WLClogthreshold
+comment "Activate the workload-capture-replay. Only queries surpassing the 
threshold (in milliseconds) are kept.";
 
 pattern job(user:str, tid:int, started:str, action:str, kind:str, runtime:int)
 address WLCjob
diff --git a/monetdb5/optimizer/opt_wlcr.c b/monetdb5/optimizer/opt_wlcr.c
--- a/monetdb5/optimizer/opt_wlcr.c
+++ b/monetdb5/optimizer/opt_wlcr.c
@@ -49,8 +49,7 @@ OPTwlcrImplementation(Client cntxt, MalB
                        q->argc--; // no need for the userid
                        pushInstruction(mb,q);
                } else
-/* CATALOG functions need not yet be reported explicitly.
- * They are already captured in the query define.
+               /* the catalog operations are needed to determine the job kind 
later on */
                if( getModuleId(p) == sqlcatalogRef &&
                 (
                        getFunctionId(p) == create_seqRef ||
@@ -72,7 +71,6 @@ OPTwlcrImplementation(Client cntxt, MalB
                        getArg(q,0) = newTmpVariable(mb,TYPE_any);
                        pushInstruction(mb,q);
                } else
-*/
                if( getModuleId(p) == sqlRef && 
                        (getFunctionId(p) == clear_tableRef || 
                         getFunctionId(p) == sqlcatalogRef) ){
diff --git a/sql/backends/monet5/sql_wlcr.c b/sql/backends/monet5/sql_wlcr.c
--- a/sql/backends/monet5/sql_wlcr.c
+++ b/sql/backends/monet5/sql_wlcr.c
@@ -14,8 +14,9 @@
  * 
  * After restart of a mserver against the newly created image,
  * the log files from the master are processed.
- * The SQL variable replaythreshold can be set to run/ignore pure queries as 
well.
  *
+ * In replay mode also all queries are executed if they surpass
+ * the latest threshold set for by the master.
  */
 #include "monetdb_config.h"
 #include "sql.h"
@@ -33,8 +34,8 @@
 static str wlr_logs;
 static str wlr_master;
 static int wlr_nextbatch;      // the next file to be processed
-static int wlr_tag;                    // the next transaction to be processed
-static int wlr_threshold = -1; // minimum time for a query to be re-executed
+static int wlr_tag;                    // the next transaction to be processeds
+static int wlr_threshold;      // replay threshold set by user.
 
 #define MAXLINE 2048
 
@@ -82,6 +83,16 @@ str WLRsetConfig(void){
     return MAL_SUCCEED;
 }
 
+str
+WLRreplaythreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+    (void) mb;
+    (void) cntxt;
+    wlr_threshold = * getArgReference_int(stk,pci,1);
+       if( (wlr_threshold >=0 && wlr_threshold < wlcr_threshold) || 
wlcr_threshold < 0)
+               throw(SQL,"wlr.replaythreshold","Warning: threshold is smaller 
then those currently reported");
+    return MAL_SUCCEED;
+}
 
 /*
  * When the master database exist, we should set the replica administration.
@@ -164,24 +175,6 @@ WLRinitReplica(str dbname)
        return MAL_SUCCEED;
 }
 
-
-static str
-WLRgetThreshold( Client cntxt, MalBlkPtr mb)
-{
-       mvc *m = NULL;
-       str msg;
-       atom *a;
-
-       if ((msg = getSQLContext(cntxt, mb, &m, NULL)) != NULL)
-               return msg;
-       if ((msg = checkSQLContext(cntxt)) != NULL)
-               return msg;
-       a = stack_get_var(m, "replaythreshold");
-       if (a)
-               wlr_threshold = a->data.val.ival;
-       return MAL_SUCCEED;
-}
-
 /* 
  * Run once through the list of pending WLC logs
  * Continuing where you left off the previous time.
@@ -222,7 +215,6 @@ WLRprocess(void *arg)
     if ((msg = checkSQLContext(c)) != NULL)
                mnstr_printf(GDKerr,"#Inconsitent SQL contex : %s\n",msg);
 
-       WLRgetThreshold(cntxt, 0);
 #ifdef _WLR_DEBUG_
        mnstr_printf(c->fdout,"#Ready to start the replay against '%s' batches 
%d:%d  threshold %d\n", 
                wlcr_archive, wlr_firstbatch, wlr_nextbatch, wlr_threshold);
@@ -327,6 +319,7 @@ WLRinit(Client cntxt)
        if( wlr_logs){
                // Always try to roll forward before you continue
                cntxt->wlcr_mode = WLCR_REPLICATE;
+               // The client has to wait initially for all logs known to be 
processed.
                WLRprocess(cntxt);
                if (MT_create_thread(&wlcr_thread, WLRprocessScheduler, (void*) 
cntxt, MT_THR_JOINABLE) < 0) {
                                GDKerror("wlcr.replicate:replay scheduling 
process can not be started");
@@ -348,7 +341,7 @@ WLCRreplicate(Client cntxt, MalBlkPtr mb
                return msg;
 
        cntxt->wlcr_mode = WLCR_REPLICATE;
-       // For testing it is helpful to wait for completion of the replication 
process
+       // The client has to wait initially for all logs known to be processed.
        WLRprocess(cntxt);
        // start the process for continual integration in the background
     if (MT_create_thread(&wlcr_thread, WLRprocessScheduler, (void*) cntxt, 
MT_THR_JOINABLE) < 0) {
@@ -367,7 +360,7 @@ WLRjob(Client cntxt, MalBlkPtr mb, MalSt
        if( strcmp(kind,"update") == 0)
                cntxt->wlcr_kind = WLCR_UPDATE;
        if( strcmp(kind,"query") == 0){
-               if(wlr_threshold < 0 ||  duration < wlr_threshold)
+               if(wlcr_threshold < 0 ||  duration < wlcr_threshold)
                        cntxt->wlcr_kind = WLCR_IGNORE;
                else
                        cntxt->wlcr_kind = WLCR_QUERY;
diff --git a/sql/backends/monet5/sql_wlcr.h b/sql/backends/monet5/sql_wlcr.h
--- a/sql/backends/monet5/sql_wlcr.h
+++ b/sql/backends/monet5/sql_wlcr.h
@@ -21,6 +21,7 @@ extern void WLRinit(Client cntxt);
 extern str WLCRreplicate(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 
 extern str WLRjob(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
+extern str WLRreplaythreshold(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to