Changeset: e745cb70f584 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e745cb70f584
Modified Files:
        gdk/gdk_logger.c
        gdk/gdk_logger.h
        sql/storage/bat/bat_logger.c
        sql/storage/sql_storage.h
        sql/storage/store.c
Branch: transaction-replication
Log Message:

Store the last shared WAL transaction id in a new file
We'll use that later to NOT replay transactions from the slave log and to 
pickup where we left of for the shared one.


diffs (205 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -112,6 +112,7 @@ typedef struct logformat_t {
 } logformat;
 
 #define LOGFILE "log"
+#define LOGFILE_SHARED "log_shared"
 
 static int bm_commit(logger *lg);
 static int tr_grow(trans *tr);
@@ -741,6 +742,47 @@ tr_abort(logger *lg, trans *tr)
 }
 
 static int
+logger_update_catalog_file(logger *lg, char *dir, char *log_filename)
+{
+       FILE *fp;
+       char filename[BUFSIZ];
+       char bak_filename[BUFSIZ];
+
+       snprintf(filename, BUFSIZ, "%s%s", dir, log_filename);
+       snprintf(bak_filename, BUFSIZ, "%s.%s", filename, "bak");
+
+       /* check if an older file exists and move bak it up */
+       if (access(filename, F_OK) != -1) {
+               if (GDKmove(dir, filename, NULL, dir, filename, "bak") < 0) {
+                       fprintf(stderr, "!ERROR: logger_update_catalog_file: 
rename %s to %s.bak in %s failed\n", filename, filename, dir);
+                       return LOG_ERR;
+               }
+       }
+
+       if ((fp = fopen(filename, "w")) != NULL) {
+               if (fprintf(fp, "%06d\n\n", lg->version) < 0) {
+                       fprintf(stderr, "!ERROR: logger_update_catalog_file: 
write to %s failed\n", filename);
+                       return LOG_ERR;
+               }
+
+               if (fprintf(fp, LLFMT "\n", lg->id) < 0 || fclose(fp) < 0) {
+                       fprintf(stderr, "!ERROR: logger_update_catalog_file: 
write/flush to %s failed\n", filename);
+                       return LOG_ERR;
+               }
+
+               /* cleanup the bak file, if it exists*/
+               if (access(bak_filename, F_OK) != -1) {
+                       GDKunlink(dir, filename, "bak");
+               }
+       } else {
+               fprintf(stderr, "!ERROR: logger_update_catalog_file: could not 
create %s\n", filename);
+               GDKerror("logger_update_catalog_file: could not open %s\n", 
filename);
+               return LOG_ERR;
+       }
+       return LOG_OK;
+}
+
+static int
 logger_open(logger *lg)
 {
        char filename[BUFSIZ];
@@ -936,6 +978,10 @@ logger_readlogs(logger *lg, FILE *fp, ch
                                (void) res;
                        }
                }
+               /* if this is a shared logger, write the id in the shared file 
*/
+               if (lg->shared) {
+                       logger_update_catalog_file(lg, lg->local_dir, 
LOGFILE_SHARED);
+               }
        }
        return res;
 }
@@ -1492,7 +1538,7 @@ logger_load(int debug, char* fn, char fi
 /* Initialize a new logger
  * It will load any data in the logdir and persist it in the BATs*/
 static logger *
-logger_new(int debug, char *fn, char *logdir, int version, preversionfix_fptr 
prefuncp, postversionfix_fptr postfuncp, int shared)
+logger_new(int debug, char *fn, char *logdir, int version, preversionfix_fptr 
prefuncp, postversionfix_fptr postfuncp, int shared, char *local_logdir)
 {
        logger *lg = (struct logger *) GDKmalloc(sizeof(struct logger));
        char filename[BUFSIZ];
@@ -1527,6 +1573,24 @@ logger_new(int debug, char *fn, char *lo
        if (lg->debug & 1) {
                fprintf(stderr, "#logger_new dir=%s\n", lg->dir);
        }
+
+       if (shared) {
+               logger_set_logdir_path(filename, fn, local_logdir);
+               /* set the slave logdir as well */
+               if ((lg->fn = GDKstrdup(fn)) == NULL ||
+                               (lg->local_dir = GDKstrdup(filename)) == NULL) {
+                       fprintf(stderr, "!ERROR: logger_new: strdup failed\n");
+                       GDKfree(lg->fn);
+                       GDKfree(lg->dir);
+                       GDKfree(lg->local_dir);
+                       GDKfree(lg);
+                       return NULL;
+               }
+               if (lg->debug & 1) {
+                       fprintf(stderr, "#logger_new slave_dir=%s\n", lg->dir);
+               }
+       }
+
        lg->prefuncp = prefuncp;
        lg->postfuncp = postfuncp;
        lg->log = NULL;
@@ -1563,7 +1627,7 @@ logger_reload(logger *lg)
 logger *
 logger_create(int debug, char *fn, char *logdir, int version, 
preversionfix_fptr prefuncp, postversionfix_fptr postfuncp)
 {
-       logger *lg = logger_new(debug, fn, logdir, version, prefuncp, 
postfuncp, 0);
+       logger *lg = logger_new(debug, fn, logdir, version, prefuncp, 
postfuncp, 0, NULL);
 
        if (!lg)
                return NULL;
@@ -1585,11 +1649,11 @@ logger_create(int debug, char *fn, char 
 /* Create a new shared logger, that is for slaves reading the master log 
directory.
  * Assumed to be read-only */
 logger *
-logger_create_shared(int debug, char *fn, char *logdir, int version, 
preversionfix_fptr prefuncp, postversionfix_fptr postfuncp)
+logger_create_shared(int debug, char *fn, char *logdir, char *local_logdir,int 
version, preversionfix_fptr prefuncp, postversionfix_fptr postfuncp)
 {
        logger *lg = NULL;
 
-       lg = logger_new(debug, fn, logdir, version, prefuncp, postfuncp, 1);
+       lg = logger_new(debug, fn, logdir, version, prefuncp, postfuncp, 1, 
local_logdir);
 
        return lg;
 }
diff --git a/gdk/gdk_logger.h b/gdk/gdk_logger.h
--- a/gdk/gdk_logger.h
+++ b/gdk/gdk_logger.h
@@ -60,6 +60,7 @@ typedef struct logger {
 #endif
        char *fn;
        char *dir;
+       char *local_dir; /* the directory in which the non-shared log is 
written */
        int shared; /* a flag to indicate if the logger is a shared on (usually 
read-only) */
        preversionfix_fptr prefuncp;
        postversionfix_fptr postfuncp;
@@ -111,7 +112,7 @@ typedef int log_bid;
 #define OBJ_SID        1
 
 gdk_export logger *logger_create(int debug, char *fn, char *logdir, int 
version, preversionfix_fptr prefuncp, postversionfix_fptr postfuncp);
-gdk_export logger *logger_create_shared(int debug, char *fn, char *logdir, int 
version, preversionfix_fptr prefuncp, postversionfix_fptr postfuncp);
+gdk_export logger *logger_create_shared(int debug, char *fn, char *logdir, 
char *slave_logdir, int version, preversionfix_fptr prefuncp, 
postversionfix_fptr postfuncp);
 gdk_export void logger_destroy(logger *lg);
 gdk_export int logger_exit(logger *lg);
 gdk_export int logger_restart(logger *lg);
diff --git a/sql/storage/bat/bat_logger.c b/sql/storage/bat/bat_logger.c
--- a/sql/storage/bat/bat_logger.c
+++ b/sql/storage/bat/bat_logger.c
@@ -338,11 +338,11 @@ bl_create(int debug, char *logdir, int c
 }
 
 static int
-bl_create_shared(int debug, char *logdir, int cat_version)
+bl_create_shared(int debug, char *logdir, int cat_version, char *local_logdir)
 {
        if (bat_logger_shared)
                return LOG_ERR;
-       bat_logger_shared = logger_create_shared(debug, "sql", logdir, 
cat_version, bl_preversion, bl_postversion);
+       bat_logger_shared = logger_create_shared(debug, "sql", logdir, 
local_logdir, cat_version, bl_preversion, bl_postversion);
        if (bat_logger_shared)
                return LOG_OK;
        return LOG_ERR;
@@ -485,7 +485,7 @@ bat_logger_init( logger_functions *lf )
 int
 bat_logger_init_shared( logger_functions *lf )
 {
-       lf->create = bl_create_shared;
+       lf->create_shared = bl_create_shared;
        lf->destroy = bl_destroy_shared;
        lf->cleanup = bl_cleanup_shared;
        lf->read_last_transaction_id = bl_read_last_transaction_id_shared;
diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h
--- a/sql/storage/sql_storage.h
+++ b/sql/storage/sql_storage.h
@@ -248,7 +248,7 @@ typedef struct store_functions {
 extern store_functions store_funcs;
 
 typedef int (*logger_create_fptr) (int debug, char *logdir, int 
catalog_version);
-typedef int (*logger_create_shared_fptr) (int debug, char *logdir, int 
catalog_version);
+typedef int (*logger_create_shared_fptr) (int debug, char *logdir, int 
catalog_version, char *slave_logdir);
 
 typedef void (*logger_destroy_fptr) (void);
 typedef int (*logger_restart_fptr) (void);
@@ -268,6 +268,7 @@ typedef int (*log_sequence_fptr) (int se
 
 typedef struct logger_functions {
        logger_create_fptr create;
+       logger_create_shared_fptr create_shared;
        logger_destroy_fptr destroy;
        logger_restart_fptr restart;
        logger_cleanup_fptr cleanup;
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -1345,7 +1345,7 @@ store_init(int debug, store_type store, 
 #ifdef STORE_DEBUG
        fprintf(stderr, "#store_init creating read-only logger\n");
 #endif
-               if (!shared_logger_funcs.create || 
shared_logger_funcs.create(debug, log_settings->shared_logdir, 
CATALOG_VERSION*v) == LOG_ERR) {
+               if (!shared_logger_funcs.create_shared || 
shared_logger_funcs.create_shared(debug, log_settings->shared_logdir, 
CATALOG_VERSION*v, log_settings->logdir) == LOG_ERR) {
                        MT_lock_unset(&bs_lock, "store_init");
                        return -1;
                }
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to