Changeset: 3bf5307afae1 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=3bf5307afae1
Added Files:
monetdb5/modules/mal/wlcr.c
monetdb5/modules/mal/wlcr.h
monetdb5/modules/mal/wlcr.mal
monetdb5/optimizer/opt_wlcr.c
monetdb5/optimizer/opt_wlcr.h
Modified Files:
monetdb5/mal/mal_builder.c
monetdb5/mal/mal_client.c
monetdb5/mal/mal_client.h
monetdb5/mal/mal_interpreter.c
monetdb5/modules/mal/Makefile.ag
monetdb5/modules/mal/mal_init.mal
monetdb5/optimizer/Makefile.ag
monetdb5/optimizer/opt_pipes.c
monetdb5/optimizer/opt_prelude.c
monetdb5/optimizer/opt_prelude.h
monetdb5/optimizer/opt_support.c
monetdb5/optimizer/opt_wrapper.c
monetdb5/optimizer/optimizer.mal
sql/backends/monet5/sql.c
sql/backends/monet5/sql_scenario.c
sql/backends/monet5/sql_transaction.c
Branch: wlcr
Log Message:
WorkLoad Capture and Replay
First batch of extensions. See wlcr.c for more info
diffs (truncated from 1215 to 300 lines):
diff --git a/monetdb5/mal/mal_builder.c b/monetdb5/mal/mal_builder.c
--- a/monetdb5/mal/mal_builder.c
+++ b/monetdb5/mal/mal_builder.c
@@ -454,11 +454,16 @@ pushStr(MalBlkPtr mb, InstrPtr q, const
if (q == NULL)
return NULL;
cst.vtype= TYPE_str;
- if ((cst.val.sval= GDKstrdup(Val)) == NULL) {
- freeInstruction(q);
- return NULL;
+ if( Val == 0){
+ cst.val.sval = 0;
+ cst.len= 0;
+ } else{
+ if ( Val != NULL && (cst.val.sval= GDKstrdup(Val)) == NULL) {
+ freeInstruction(q);
+ return NULL;
+ }
+ cst.len= (int) strlen(cst.val.sval);
}
- cst.len= (int) strlen(cst.val.sval);
_t = defConstant(mb,TYPE_str,&cst);
return pushArgument(mb, q, _t);
}
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -251,6 +251,8 @@ MCinitClientRecord(Client c, oid user, b
/* create a recycler cache */
c->exception_buf_initialized = 0;
c->error_row = c->error_fld = c->error_msg = c->error_input = NULL;
+ c->wlcr_kind = 0;
+ c->wlcr = NULL;
#ifndef HAVE_EMBEDDED /* no authentication in embedded mode */
{
str msg = AUTHgetUsername(&c->username, c);
@@ -403,6 +405,10 @@ freeClient(Client c)
BBPdecref(c->error_msg->batCacheid,TRUE);
BBPdecref(c->error_input->batCacheid,TRUE);
c->error_row = c->error_fld = c->error_msg = c->error_input =
NULL;
+ if( c->wlcr)
+ freeMalBlk(c->wlcr);
+ c->wlcr_kind = 0;
+ c->wlcr = NULL;
}
if (t)
THRdel(t); /* you may perform suicide */
diff --git a/monetdb5/mal/mal_client.h b/monetdb5/mal/mal_client.h
--- a/monetdb5/mal/mal_client.h
+++ b/monetdb5/mal/mal_client.h
@@ -171,6 +171,11 @@ typedef struct CLIENT {
*/
bit active; /* processing a query or not */
Workset inprogress[THREADS];
+ /*
+ * The workload for replication/replay is saved initially as a MAL
block.
+ */
+ int wlcr_kind;
+ MalBlkPtr wlcr;
/*
* Errors during copy into are collected in a user specific column
set
*/
diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c
--- a/monetdb5/mal/mal_interpreter.c
+++ b/monetdb5/mal/mal_interpreter.c
@@ -1434,6 +1434,7 @@ void garbageCollector(Client cntxt, MalB
printStack(cntxt->fdout, mb, stk, 0);
}
#endif
+ assert(mb->vtop < mb->vsize);
(void) flag;
for (k = 0; k < mb->vtop; k++) {
// if (isVarCleanup(mb, k) ){
diff --git a/monetdb5/modules/mal/Makefile.ag b/monetdb5/modules/mal/Makefile.ag
--- a/monetdb5/modules/mal/Makefile.ag
+++ b/monetdb5/modules/mal/Makefile.ag
@@ -36,6 +36,7 @@ lib_mal = {
mkey.c mkey.h \
manifold.c manifold.h \
oltp.c oltp.h \
+ wlcr.c wlcr.h \
pcre.c \
profiler.c profiler.h \
querylog.c querylog.h \
@@ -59,7 +60,7 @@ headers_mal = {
inspect.mal manual.mal mal_io.mal mkey.mal manifold.mal \
iterator.mal clients.mal \
factories.mal groupby.mal mdb.mal pcre.mal mat.mal \
- transaction.mal oltp.mal \
+ transaction.mal oltp.mal wlcr.mal \
mal_mapi.mal sabaoth.mal remote.mal \
txtsim.mal \
tokenizer.mal sample.mal json_util.mal \
diff --git a/monetdb5/modules/mal/mal_init.mal
b/monetdb5/modules/mal/mal_init.mal
--- a/monetdb5/modules/mal/mal_init.mal
+++ b/monetdb5/modules/mal/mal_init.mal
@@ -94,6 +94,7 @@ include srvpool;
include mal_mapi;
include oltp;
+include wlcr;
# Any extensions (MAL scripts) that should be automatically loaded upon
# startup can be placed in the autoload directory. One typically finds
diff --git a/monetdb5/modules/mal/wlcr.c b/monetdb5/modules/mal/wlcr.c
new file mode 100644
--- /dev/null
+++ b/monetdb5/modules/mal/wlcr.c
@@ -0,0 +1,461 @@
+/*
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright 1997 - July 2008 CWI, August 2008 - 2017 MonetDB B.V.
+ */
+
+/*
+ * (c) Martin Kersten
+ * This module collects the workload-capture-replay statements during
transaction execution.
+ *
+ * Each wlcr log file contains a serial log for a transaction batch.
+ * Each job is identified by the original owner of the query, the snapshot
identity (name+nr) against
+ * which it was ran, an indication of the kind of transaction and
commit/rollback, its runtime (in ms) and starting time.
+ *
+ * Replaying the wlcr against another server based on the same snapshot should
produce a perfect copy.
+ * Each job should be executed using the credentials of the user issuing the
transaction.
+ * Any failuer encountered terminates the replication process.
+ *
+ * All wlcr files should be stored on a shared file system for all replicas to
access.
+ * The default is a subdirectory of the database and act as a secondary
database rebuild log.
+ * The location can be overruled using a full path to a shared disk as
GDKenvironemnt variable (wlcr_dir)
+ *
+ * The wlcr files have a textual format derived from the MAL statements.
+ * This can be used to ease the implementation of the wlreplay
+ *
+ * The logs may only be removed after a new snapshot has been taken or wlcr is
disabled
+ */
+#include "monetdb_config.h"
+#include "mal_builder.h"
+#include "wlcr.h"
+
+static MT_Lock wlcr_lock MT_LOCK_INITIALIZER("wlcr_lock");
+
+
+int wlcr_duration = INT_MAX; // how long to capture default= 0
+int wlcr_threshold = 0; // threshold (milliseconds) for keeping readonly
queries
+int wlcr_deltas = 1; // sent the delta values
+int wlcr_all = 1; // also ship failed transaction
+str wlcr_snapshot= "baseline"; // name assigned to the snapshot
+int wlcr_unit = 0; // last job executed
+
+static char *wlcr_name[]= {"","query","update","catalog"};
+
+static stream *wlcr_fd = 0;
+static str wlcr_log = "/tmp/wlcr";
+
+static InstrPtr
+WLCRaddtime(Client cntxt, InstrPtr pci, InstrPtr p)
+{
+ char *tbuf;
+ char ctm[26];
+ time_t clk = pci->clock.tv_sec;
+
+#ifdef HAVE_CTIME_R3
+ tbuf = ctime_r(&clk, ctm, sizeof(ctm));
+#else
+#ifdef HAVE_CTIME_R
+ tbuf = ctime_r(&clk, ctm);
+#else
+ tbuf = ctime(&clk);
+#endif
+#endif
+ tbuf[19]=0;
+ return pushStr(cntxt->wlcr, p, tbuf);
+}
+
+#define WLCR_start()\
+{ Symbol s; \
+ if( cntxt->wlcr == NULL){\
+ s = newSymbol("wlrc", FUNCTIONsymbol);\
+ cntxt->wlcr_kind = WLCR_QUERY;\
+ cntxt->wlcr = s->def;\
+ s->def = NULL;\
+ } \
+ if( cntxt->wlcr->stop == 0){\
+ p = newStmt(cntxt->wlcr,"wlreplay","job");\
+ p = pushStr(cntxt->wlcr,p, cntxt->username);\
+ p = pushStr(cntxt->wlcr,p, wlcr_snapshot);\
+ p = pushInt(cntxt->wlcr,p, wlcr_unit);\
+ p = WLCRaddtime(cntxt,pci, p); \
+ p->ticks = GDKms();\
+} }
+
+str
+WLCRproperties (Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ int i;
+
+ (void) cntxt;
+ (void) mb;
+
+ i = *getArgReference_int(stk,pci,1);
+ if ( i < 0)
+ throw(MAL,"wlcr.properties","Duration must be a possitive
number");
+ wlcr_duration = i;
+
+ i = *getArgReference_int(stk,pci,2);
+ if ( i < 0)
+ throw(MAL,"wlcr.properties","Duration must be a possitive
number");
+ wlcr_threshold = i;
+
+ wlcr_deltas = *getArgReference_int(stk,pci,3) != 0;
+ wlcr_all = *getArgReference_int(stk,pci,4) != 0;
+ return MAL_SUCCEED;
+}
+
+str
+WLCRjob(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ str snapshot = *getArgReference_str(stk,pci,1);
+ int tid = *getArgReference_int(stk,pci,2);
+
+ (void) cntxt;
+ (void) mb;
+
+ if ( strcmp(snapshot, wlcr_snapshot))
+ throw(MAL,"wlcr.job","Incompatible snapshot identifier");
+ if ( tid < wlcr_unit)
+ throw(MAL,"wlcr.job","Work unit identifier is before last one
executed");
+ return MAL_SUCCEED;
+}
+
+str
+WLCRfin(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ (void) cntxt;
+ (void) mb;
+ (void) stk;
+ (void) pci;
+ return MAL_SUCCEED;
+}
+
+str
+WLCRquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{ InstrPtr p;
+
+ (void) stk;
+ if ( strcmp("-- no query",getVarConstant(mb, getArg(pci,1)).val.sval)
== 0)
+ return MAL_SUCCEED;
+ WLCR_start();
+ p = newStmt(cntxt->wlcr, "wlreplay","query");
+ p = pushStr(cntxt->wlcr, p, getVarConstant(mb, getArg(pci,1)).val.sval);
+ p = pushStr(cntxt->wlcr, p, getVarConstant(mb, getArg(pci,2)).val.sval);
+ p->ticks = GDKms();
+ return MAL_SUCCEED;
+}
+
+str
+WLCRgeneric(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{ InstrPtr p;
+ int i, tpe, varid;
+ (void) stk;
+
+ WLCR_start();
+ p = newStmt(cntxt->wlcr, "wlreplay",getFunctionId(pci));
+ for( i = pci->retc; i< pci->argc; i++){
+ tpe =getArgType(mb, pci, i);
+ switch(tpe){
+ case TYPE_str:
+ p = pushStr(cntxt->wlcr, p, getVarConstant(mb,
getArg(pci, i)).val.sval);
+ break;
+ default:
+ varid = defConstant(cntxt->wlcr, tpe,
getArgReference(stk, pci, i));
+ p = pushArgument(cntxt->wlcr, p, varid);
+ }
+ }
+ p->ticks = GDKms();
+ cntxt->wlcr_kind = WLCR_CATALOG;
+ return MAL_SUCCEED;
+}
+
+#define bulk(TPE1, TPE2)\
+{ TPE1 *p = (TPE1 *) Tloc(b,0);\
+ TPE1 *q = (TPE1 *) Tloc(b, BUNlast(b));\
+ int k=0; \
+ for( ; p < q; p++, k++){\
+ if( k % 32 == 31){\
+ pci = newStmt(cntxt->wlcr,
"wlreplay",getFunctionId(pci));\
+ pci = pushStr(cntxt->wlcr, pci, sch);\
+ pci = pushStr(cntxt->wlcr, pci, tbl);\
+ pci = pushStr(cntxt->wlcr, pci, col);\
+ pci->ticks = GDKms();\
+ }\
+ pci = push##TPE2(cntxt->wlcr, pci ,*p);\
+} }
+
+static void
+WLCRdatashipping(Client cntxt, MalBlkPtr mb, InstrPtr pci, int bid)
+{ BAT *b;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list