Changeset: e745cb70f584 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e745cb70f584
Modified Files:
gdk/gdk_logger.c
gdk/gdk_logger.h
sql/storage/bat/bat_logger.c
sql/storage/sql_storage.h
sql/storage/store.c
Branch: transaction-replication
Log Message:
Store the last shared WAL transaction id in a new file
We'll use that later to NOT replay transactions from the slave log and to
pickup where we left of for the shared one.
diffs (205 lines):
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -112,6 +112,7 @@ typedef struct logformat_t {
} logformat;
#define LOGFILE "log"
+#define LOGFILE_SHARED "log_shared"
static int bm_commit(logger *lg);
static int tr_grow(trans *tr);
@@ -741,6 +742,47 @@ tr_abort(logger *lg, trans *tr)
}
static int
+logger_update_catalog_file(logger *lg, char *dir, char *log_filename)
+{
+ FILE *fp;
+ char filename[BUFSIZ];
+ char bak_filename[BUFSIZ];
+
+ snprintf(filename, BUFSIZ, "%s%s", dir, log_filename);
+ snprintf(bak_filename, BUFSIZ, "%s.%s", filename, "bak");
+
+ /* check if an older file exists and move bak it up */
+ if (access(filename, F_OK) != -1) {
+ if (GDKmove(dir, filename, NULL, dir, filename, "bak") < 0) {
+ fprintf(stderr, "!ERROR: logger_update_catalog_file:
rename %s to %s.bak in %s failed\n", filename, filename, dir);
+ return LOG_ERR;
+ }
+ }
+
+ if ((fp = fopen(filename, "w")) != NULL) {
+ if (fprintf(fp, "%06d\n\n", lg->version) < 0) {
+ fprintf(stderr, "!ERROR: logger_update_catalog_file:
write to %s failed\n", filename);
+ return LOG_ERR;
+ }
+
+ if (fprintf(fp, LLFMT "\n", lg->id) < 0 || fclose(fp) < 0) {
+ fprintf(stderr, "!ERROR: logger_update_catalog_file:
write/flush to %s failed\n", filename);
+ return LOG_ERR;
+ }
+
+ /* cleanup the bak file, if it exists*/
+ if (access(bak_filename, F_OK) != -1) {
+ GDKunlink(dir, filename, "bak");
+ }
+ } else {
+ fprintf(stderr, "!ERROR: logger_update_catalog_file: could not
create %s\n", filename);
+ GDKerror("logger_update_catalog_file: could not open %s\n",
filename);
+ return LOG_ERR;
+ }
+ return LOG_OK;
+}
+
+static int
logger_open(logger *lg)
{
char filename[BUFSIZ];
@@ -936,6 +978,10 @@ logger_readlogs(logger *lg, FILE *fp, ch
(void) res;
}
}
+ /* if this is a shared logger, write the id in the shared file
*/
+ if (lg->shared) {
+ logger_update_catalog_file(lg, lg->local_dir,
LOGFILE_SHARED);
+ }
}
return res;
}
@@ -1492,7 +1538,7 @@ logger_load(int debug, char* fn, char fi
/* Initialize a new logger
* It will load any data in the logdir and persist it in the BATs*/
static logger *
-logger_new(int debug, char *fn, char *logdir, int version, preversionfix_fptr
prefuncp, postversionfix_fptr postfuncp, int shared)
+logger_new(int debug, char *fn, char *logdir, int version, preversionfix_fptr
prefuncp, postversionfix_fptr postfuncp, int shared, char *local_logdir)
{
logger *lg = (struct logger *) GDKmalloc(sizeof(struct logger));
char filename[BUFSIZ];
@@ -1527,6 +1573,24 @@ logger_new(int debug, char *fn, char *lo
if (lg->debug & 1) {
fprintf(stderr, "#logger_new dir=%s\n", lg->dir);
}
+
+ if (shared) {
+ logger_set_logdir_path(filename, fn, local_logdir);
+ /* set the slave logdir as well */
+ if ((lg->fn = GDKstrdup(fn)) == NULL ||
+ (lg->local_dir = GDKstrdup(filename)) == NULL) {
+ fprintf(stderr, "!ERROR: logger_new: strdup failed\n");
+ GDKfree(lg->fn);
+ GDKfree(lg->dir);
+ GDKfree(lg->local_dir);
+ GDKfree(lg);
+ return NULL;
+ }
+ if (lg->debug & 1) {
+ fprintf(stderr, "#logger_new slave_dir=%s\n", lg->dir);
+ }
+ }
+
lg->prefuncp = prefuncp;
lg->postfuncp = postfuncp;
lg->log = NULL;
@@ -1563,7 +1627,7 @@ logger_reload(logger *lg)
logger *
logger_create(int debug, char *fn, char *logdir, int version,
preversionfix_fptr prefuncp, postversionfix_fptr postfuncp)
{
- logger *lg = logger_new(debug, fn, logdir, version, prefuncp,
postfuncp, 0);
+ logger *lg = logger_new(debug, fn, logdir, version, prefuncp,
postfuncp, 0, NULL);
if (!lg)
return NULL;
@@ -1585,11 +1649,11 @@ logger_create(int debug, char *fn, char
/* Create a new shared logger, that is for slaves reading the master log
directory.
* Assumed to be read-only */
logger *
-logger_create_shared(int debug, char *fn, char *logdir, int version,
preversionfix_fptr prefuncp, postversionfix_fptr postfuncp)
+logger_create_shared(int debug, char *fn, char *logdir, char *local_logdir,int
version, preversionfix_fptr prefuncp, postversionfix_fptr postfuncp)
{
logger *lg = NULL;
- lg = logger_new(debug, fn, logdir, version, prefuncp, postfuncp, 1);
+ lg = logger_new(debug, fn, logdir, version, prefuncp, postfuncp, 1,
local_logdir);
return lg;
}
diff --git a/gdk/gdk_logger.h b/gdk/gdk_logger.h
--- a/gdk/gdk_logger.h
+++ b/gdk/gdk_logger.h
@@ -60,6 +60,7 @@ typedef struct logger {
#endif
char *fn;
char *dir;
+ char *local_dir; /* the directory in which the non-shared log is
written */
int shared; /* a flag to indicate if the logger is a shared on (usually
read-only) */
preversionfix_fptr prefuncp;
postversionfix_fptr postfuncp;
@@ -111,7 +112,7 @@ typedef int log_bid;
#define OBJ_SID 1
gdk_export logger *logger_create(int debug, char *fn, char *logdir, int
version, preversionfix_fptr prefuncp, postversionfix_fptr postfuncp);
-gdk_export logger *logger_create_shared(int debug, char *fn, char *logdir, int
version, preversionfix_fptr prefuncp, postversionfix_fptr postfuncp);
+gdk_export logger *logger_create_shared(int debug, char *fn, char *logdir,
char *slave_logdir, int version, preversionfix_fptr prefuncp,
postversionfix_fptr postfuncp);
gdk_export void logger_destroy(logger *lg);
gdk_export int logger_exit(logger *lg);
gdk_export int logger_restart(logger *lg);
diff --git a/sql/storage/bat/bat_logger.c b/sql/storage/bat/bat_logger.c
--- a/sql/storage/bat/bat_logger.c
+++ b/sql/storage/bat/bat_logger.c
@@ -338,11 +338,11 @@ bl_create(int debug, char *logdir, int c
}
static int
-bl_create_shared(int debug, char *logdir, int cat_version)
+bl_create_shared(int debug, char *logdir, int cat_version, char *local_logdir)
{
if (bat_logger_shared)
return LOG_ERR;
- bat_logger_shared = logger_create_shared(debug, "sql", logdir,
cat_version, bl_preversion, bl_postversion);
+ bat_logger_shared = logger_create_shared(debug, "sql", logdir,
local_logdir, cat_version, bl_preversion, bl_postversion);
if (bat_logger_shared)
return LOG_OK;
return LOG_ERR;
@@ -485,7 +485,7 @@ bat_logger_init( logger_functions *lf )
int
bat_logger_init_shared( logger_functions *lf )
{
- lf->create = bl_create_shared;
+ lf->create_shared = bl_create_shared;
lf->destroy = bl_destroy_shared;
lf->cleanup = bl_cleanup_shared;
lf->read_last_transaction_id = bl_read_last_transaction_id_shared;
diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h
--- a/sql/storage/sql_storage.h
+++ b/sql/storage/sql_storage.h
@@ -248,7 +248,7 @@ typedef struct store_functions {
extern store_functions store_funcs;
typedef int (*logger_create_fptr) (int debug, char *logdir, int
catalog_version);
-typedef int (*logger_create_shared_fptr) (int debug, char *logdir, int
catalog_version);
+typedef int (*logger_create_shared_fptr) (int debug, char *logdir, int
catalog_version, char *slave_logdir);
typedef void (*logger_destroy_fptr) (void);
typedef int (*logger_restart_fptr) (void);
@@ -268,6 +268,7 @@ typedef int (*log_sequence_fptr) (int se
typedef struct logger_functions {
logger_create_fptr create;
+ logger_create_shared_fptr create_shared;
logger_destroy_fptr destroy;
logger_restart_fptr restart;
logger_cleanup_fptr cleanup;
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -1345,7 +1345,7 @@ store_init(int debug, store_type store,
#ifdef STORE_DEBUG
fprintf(stderr, "#store_init creating read-only logger\n");
#endif
- if (!shared_logger_funcs.create ||
shared_logger_funcs.create(debug, log_settings->shared_logdir,
CATALOG_VERSION*v) == LOG_ERR) {
+ if (!shared_logger_funcs.create_shared ||
shared_logger_funcs.create_shared(debug, log_settings->shared_logdir,
CATALOG_VERSION*v, log_settings->logdir) == LOG_ERR) {
MT_lock_unset(&bs_lock, "store_init");
return -1;
}
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list