Changeset: 6da178705fa7 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/6da178705fa7
Modified Files:
        gdk/gdk_logger.c
        gdk/gdk_logger_internals.h
Branch: logger-fix
Log Message:

clean up synchronization gdk_logger


diffs (truncated from 1261 to 300 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -235,8 +235,8 @@ log_write_format(logger *lg, logformat *
 {
        assert(data->id || data->flag);
        assert(!lg->inmemory);
-       if (mnstr_write(lg->output_log, &data->flag, 1, 1) == 1 &&
-           mnstr_writeInt(lg->output_log, data->id))
+       if (mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 &&
+           mnstr_writeInt(lg->current->output_log, data->id))
                return GDK_SUCCEED;
        TRC_CRITICAL(GDK, "write failed\n");
        return GDK_FAIL;
@@ -281,7 +281,7 @@ log_write_id(logger *lg, int id)
 {
        assert(!lg->inmemory);
        assert(id >= 0);
-       if (mnstr_writeInt(lg->output_log, id))
+       if (mnstr_writeInt(lg->current->output_log, id))
                return GDK_SUCCEED;
        TRC_CRITICAL(GDK, "write failed\n");
        return GDK_FAIL;
@@ -300,7 +300,7 @@ log_read_id(logger *lg, log_id *id)
 #endif
 
 static log_return
-string_reader(logger *lg, BAT *b, lng nr)
+string_reader(logger *lg, char* rbuf, size_t* rbufsize, BAT *b, lng nr)
 {
        size_t sz = 0;
        lng SZ = 0;
@@ -310,24 +310,22 @@ string_reader(logger *lg, BAT *b, lng nr
                if (mnstr_readLng(lg->input_log, &SZ) != 1)
                        return LOG_EOF;
                sz = (size_t)SZ;
-               char *buf = lg->buf;
-               if (lg->bufsize < sz) {
-                       if (!(buf = GDKrealloc(lg->buf, sz)))
+               if (*rbufsize < sz) {
+                       if (!(rbuf = GDKrealloc(rbuf, sz)))
                                return LOG_ERR;
-                       lg->buf = buf;
-                       lg->bufsize = sz;
+                       *rbufsize = sz;
                }
 
-               if (mnstr_read(lg->input_log, buf, sz, 1) != 1)
+               if (mnstr_read(lg->input_log, rbuf, sz, 1) != 1)
                        return LOG_EOF;
                /* handle strings */
-               char *t = buf;
+               char *t = rbuf;
                /* chunked */
 #define CHUNK_SIZE 1024
                char *strings[CHUNK_SIZE];
                int cur = 0;
 
-               for(; nr>0 && res == LOG_OK && t < (buf+sz); nr--) {
+               for(; nr>0 && res == LOG_OK && t < (rbuf+sz); nr--) {
                        strings[cur++] = t;
                        if (cur == CHUNK_SIZE && b && BUNappendmulti(b, 
strings, cur, true) != GDK_SUCCEED)
                                res = LOG_ERR;
@@ -370,6 +368,8 @@ log_read_updates(logger *lg, trans *tr, 
        pnr = nr;
        tpe = find_type_nr(lg, type_id);
        if (tpe >= 0) {
+               size_t rbufsize = 64*1024;
+               void * rbuf             = GDKmalloc(rbufsize);
                BAT *uid = NULL;
                BAT *r = NULL;
                void *(*rt) (ptr, size_t *, stream *, size_t) = 
BATatoms[tpe].atomRead;
@@ -379,13 +379,17 @@ log_read_updates(logger *lg, trans *tr, 
                if (!lg->flushing && l->flag == LOG_UPDATE) {
                        uid = COLnew(0, TYPE_oid, (BUN)nr, PERSISTENT);
                        if (uid == NULL) {
+                               GDKfree(rbuf);
                                return LOG_ERR;
                        }
                }
 
                if (l->flag == LOG_UPDATE_CONST) {
-                       if (mnstr_readLng(lg->input_log, &offset) != 1)
+                       if (mnstr_readLng(lg->input_log, &offset) != 1) {
+                               GDKfree(rbuf);
                                return LOG_ERR;
+
+                       }
                        if (cands) {
                                // This const range actually represents a 
segment of candidates corresponding to updated bat entries
 
@@ -420,11 +424,12 @@ log_read_updates(logger *lg, trans *tr, 
                                }
 
                                // We have to read the value to update the read 
cursor
-                               size_t tlen = lg->bufsize;
-                               void *t = rt(lg->buf, &tlen, lg->input_log, 1);
+                               size_t tlen = rbufsize;
+                               void *t = rt(rbuf, &tlen, lg->input_log, 1);
                                if (t == NULL) {
                                        res = LOG_ERR;
                                }
+                               GDKfree(rbuf);
                                return res;
                        }
                }
@@ -434,18 +439,19 @@ log_read_updates(logger *lg, trans *tr, 
                        if (r == NULL) {
                                if (uid)
                                        BBPreclaim(uid);
+                               GDKfree(rbuf);
                                return LOG_ERR;
                        }
                }
 
                if (l->flag == LOG_UPDATE_CONST) {
-                       size_t tlen = lg->bufsize;
-                       void *t = rt(lg->buf, &tlen, lg->input_log, 1);
+                       size_t tlen = rbufsize;
+                       void *t = rt(rbuf, &tlen, lg->input_log, 1);
                        if (t == NULL) {
                                res = LOG_ERR;
                        } else {
-                               lg->buf = t;
-                               lg->bufsize = tlen;
+                               rbuf = t;
+                               rbufsize = tlen;
                                for(BUN p = 0; p<(BUN) nr; p++) {
                                        if (r && BUNappend(r, t, true) != 
GDK_SUCCEED)
                                                res = LOG_ERR;
@@ -455,6 +461,7 @@ log_read_updates(logger *lg, trans *tr, 
                        if (mnstr_readLng(lg->input_log, &offset) != 1) {
                                if (r)
                                        BBPreclaim(r);
+                               GDKfree(rbuf);
                                return LOG_ERR;
                        }
                        if (tpe == TYPE_msk) {
@@ -464,42 +471,42 @@ log_read_updates(logger *lg, trans *tr, 
                                        else
                                                res = LOG_ERR;
                                } else {
-                                       size_t tlen = lg->bufsize/sizeof(int);
+                                       size_t tlen = rbufsize/sizeof(int);
                                        size_t cnt = 0, snr = (size_t)nr;
                                        snr = (snr+31)/32;
                                        assert(tlen);
                                        for (; res == LOG_OK && snr > 0; 
snr-=cnt) {
                                                cnt = snr>tlen?tlen:snr;
-                                               if 
(!mnstr_readIntArray(lg->input_log, lg->buf, cnt))
+                                               if 
(!mnstr_readIntArray(lg->input_log, rbuf, cnt))
                                                        res = LOG_ERR;
                                        }
                                }
                        } else {
                                if (!ATOMvarsized(tpe)) {
                                        size_t cnt = 0, snr = (size_t)nr;
-                                       size_t tlen = 
lg->bufsize/ATOMsize(tpe), ntlen = lg->bufsize;
+                                       size_t tlen = rbufsize/ATOMsize(tpe), 
ntlen = rbufsize;
                                        assert(tlen);
                                        /* read in chunks of max
                                         * BUFSIZE/width rows */
                                        for (; res == LOG_OK && snr > 0; 
snr-=cnt) {
                                                cnt = snr>tlen?tlen:snr;
-                                               void *t = rt(lg->buf, &ntlen, 
lg->input_log, cnt);
+                                               void *t = rt(rbuf, &ntlen, 
lg->input_log, cnt);
 
                                                if (t == NULL) {
                                                        res = LOG_EOF;
                                                        break;
                                                }
-                                               assert(t == lg->buf);
+                                               assert(t == rbuf);
                                                if (r && BUNappendmulti(r, t, 
cnt, true) != GDK_SUCCEED)
                                                        res = LOG_ERR;
                                        }
                                } else if (tpe == TYPE_str) {
                                        /* efficient string */
-                                       res = string_reader(lg, r, nr);
+                                       res = string_reader(lg, rbuf, 
&rbufsize, r, nr);
                                } else {
                                        for (; res == LOG_OK && nr > 0; nr--) {
-                                               size_t tlen = lg->bufsize;
-                                               void *t = rt(lg->buf, &tlen, 
lg->input_log, 1);
+                                               size_t tlen = rbufsize;
+                                               void *t = rt(rbuf, &tlen, 
lg->input_log, 1);
 
                                                if (t == NULL) {
                                                        /* see if failure was 
due to
@@ -511,8 +518,8 @@ log_read_updates(logger *lg, trans *tr, 
                                                        else
                                                                res = LOG_ERR;
                                                } else {
-                                                       lg->buf = t;
-                                                       lg->bufsize = tlen;
+                                                       rbuf = t;
+                                                       rbufsize = tlen;
                                                        if (r && BUNappend(r, 
t, true) != GDK_SUCCEED)
                                                                res = LOG_ERR;
                                                }
@@ -559,11 +566,11 @@ log_read_updates(logger *lg, trans *tr, 
                                }
                        } else if (tpe == TYPE_str) {
                                /* efficient string */
-                               res = string_reader(lg, r, nr);
+                               res = string_reader(lg, rbuf, &rbufsize, r, nr);
                        } else {
                                for (; res == LOG_OK && nr > 0; nr--) {
-                                       size_t tlen = lg->bufsize;
-                                       void *t = rt(lg->buf, &tlen, 
lg->input_log, 1);
+                                       size_t tlen = rbufsize;
+                                       void *t = rt(rbuf, &tlen, 
lg->input_log, 1);
 
                                        if (t == NULL) {
                                                if (strstr(GDKerrbuf, "malloc") 
== NULL)
@@ -571,8 +578,8 @@ log_read_updates(logger *lg, trans *tr, 
                                                else
                                                        res = LOG_ERR;
                                        } else {
-                                               lg->buf = t;
-                                               lg->bufsize = tlen;
+                                               rbuf = t;
+                                               rbufsize = tlen;
                                                if ((r && BUNappend(r, t, true) 
!= GDK_SUCCEED))
                                                        res = LOG_ERR;
                                        }
@@ -622,6 +629,7 @@ log_read_updates(logger *lg, trans *tr, 
                        else if (uid)
                                BBPreclaim(uid);
                }
+               GDKfree(rbuf);
        } else {
                /* bat missing ERROR or ignore ? currently error. */
                res = LOG_ERR;
@@ -1039,18 +1047,37 @@ log_create_types_file(logger *lg, const 
        return GDK_SUCCEED;
 }
 
+static inline void
+rotation_lock(logger *lg) {
+       MT_lock_set(&lg->rotation_lock);
+}
+
+static inline void
+rotation_unlock(logger *lg) {
+       MT_lock_unset(&lg->rotation_lock);
+}
+
+static inline bool
+try_rotation_lock(logger *lg) {
+       return MT_lock_try(&lg->rotation_lock);
+}
+
 static gdk_return
 log_open_output(logger *lg)
 {
        logged_range *new_range = 
(logged_range*)GDKmalloc(sizeof(logged_range));
+       ATOMIC_INIT(&new_range->refcount, 1);
+
 
        if (!new_range) {
                TRC_CRITICAL(GDK, "allocation failure\n");
                return GDK_FAIL;
        }
 
-       lg->end = 0;
-       lg->drops = 0;
+       ATOMIC_INIT(&new_range->end, 0);
+       ATOMIC_INIT(&new_range->pend, 0);
+       ATOMIC_INIT(&new_range->flushed_end, 0);
+       ATOMIC_INIT(&new_range->drops, 0);
        if (!LOG_DISABLED(lg)) {
                char id[32];
                char *filename;
@@ -1068,14 +1095,13 @@ log_open_output(logger *lg)
 
                if (lg->debug & 1)
                        fprintf(stderr, "#log_open_output: %s.%s\n", LOGFILE, 
id);
-               lg->output_log = open_wstream(filename);
-               if (lg->output_log) {
+               new_range->output_log = open_wstream(filename);
+               if (new_range->output_log) {
                        short byteorder = 1234;
-                       mnstr_write(lg->output_log, &byteorder, 
sizeof(byteorder), 1);
+                       mnstr_write(new_range->output_log, &byteorder, 
sizeof(byteorder), 1);
                }
-               lg->end = 0;
-
-               if (lg->output_log == NULL || mnstr_errnr(lg->output_log) != 
MNSTR_NO__ERROR) {
+
+               if (new_range->output_log == NULL || 
mnstr_errnr(new_range->output_log) != MNSTR_NO__ERROR) {
                        TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, 
mnstr_peek_error(NULL));
                        GDKfree(new_range);
                        GDKfree(filename);
@@ -1084,15 +1110,11 @@ log_open_output(logger *lg)
                GDKfree(filename);
        }
        new_range->id = lg->id;
-       new_range->first_tid = lg->tid;
-       new_range->last_tid = lg->tid;
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to