Changeset: ae4fb3b23216 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/ae4fb3b23216
Branch: Sep2022
Log Message:

merge with logger-fix


diffs (truncated from 1095 to 300 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -235,8 +235,8 @@ log_write_format(logger *lg, logformat *
 {
        assert(data->id || data->flag);
        assert(!lg->inmemory);
-       if (mnstr_write(lg->output_log, &data->flag, 1, 1) == 1 &&
-           mnstr_writeInt(lg->output_log, data->id))
+       if (mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 &&
+           mnstr_writeInt(lg->current->output_log, data->id))
                return GDK_SUCCEED;
        TRC_CRITICAL(GDK, "write failed\n");
        return GDK_FAIL;
@@ -281,7 +281,7 @@ log_write_id(logger *lg, int id)
 {
        assert(!lg->inmemory);
        assert(id >= 0);
-       if (mnstr_writeInt(lg->output_log, id))
+       if (mnstr_writeInt(lg->current->output_log, id))
                return GDK_SUCCEED;
        TRC_CRITICAL(GDK, "write failed\n");
        return GDK_FAIL;
@@ -1038,6 +1038,16 @@ log_create_types_file(logger *lg, const 
        return GDK_SUCCEED;
 }
 
+static inline void
+rotation_lock(logger *lg) {
+       MT_lock_set(&lg->rotation_lock);
+}
+
+static inline void
+rotation_unlock(logger *lg) {
+       MT_lock_unset(&lg->rotation_lock);
+}
+
 static gdk_return
 log_open_output(logger *lg)
 {
@@ -1047,9 +1057,12 @@ log_open_output(logger *lg)
                TRC_CRITICAL(GDK, "allocation failure\n");
                return GDK_FAIL;
        }
-
-       lg->end = 0;
-       lg->drops = 0;
+       ATOMIC_INIT(&new_range->refcount, 1);
+       ATOMIC_INIT(&new_range->last_ts, 0);
+       ATOMIC_INIT(&new_range->end, 0);
+       ATOMIC_INIT(&new_range->pend, 0);
+       ATOMIC_INIT(&new_range->flushed_end, 0);
+       ATOMIC_INIT(&new_range->drops, 0);
        if (!LOG_DISABLED(lg)) {
                char id[32];
                char *filename;
@@ -1067,14 +1080,13 @@ log_open_output(logger *lg)
 
                if (lg->debug & 1)
                        fprintf(stderr, "#log_open_output: %s.%s\n", LOGFILE, 
id);
-               lg->output_log = open_wstream(filename);
-               if (lg->output_log) {
+               new_range->output_log = open_wstream(filename);
+               if (new_range->output_log) {
                        short byteorder = 1234;
-                       mnstr_write(lg->output_log, &byteorder, 
sizeof(byteorder), 1);
+                       mnstr_write(new_range->output_log, &byteorder, 
sizeof(byteorder), 1);
                }
-               lg->end = 0;
-
-               if (lg->output_log == NULL || mnstr_errnr(lg->output_log) != 
MNSTR_NO__ERROR) {
+
+               if (new_range->output_log == NULL || 
mnstr_errnr(new_range->output_log) != MNSTR_NO__ERROR) {
                        TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename, 
mnstr_peek_error(NULL));
                        GDKfree(new_range);
                        GDKfree(filename);
@@ -1083,13 +1095,10 @@ log_open_output(logger *lg)
                GDKfree(filename);
        }
        new_range->id = lg->id;
-       new_range->last_ts = 0;
        new_range->next = NULL;
-       if (lg->current)
-               lg->current->next = new_range;
-       lg->current = new_range;
-       if (!lg->pending)
-               lg->pending = new_range;
+       logged_range* current = lg->current;
+       assert(current && current->next == NULL);
+       current->next = new_range;
        return GDK_SUCCEED;
 }
 
@@ -1104,12 +1113,9 @@ log_close_input(logger *lg)
 static inline void
 log_close_output(logger *lg)
 {
-       if (lg->flushing_output_log)
-               return;
-
        if (!LOG_DISABLED(lg))
-               close_stream(lg->output_log);
-       lg->output_log = NULL;
+               close_stream(lg->current->output_log);
+       lg->current->output_log = NULL;
 }
 
 static gdk_return
@@ -1184,7 +1190,7 @@ log_read_transaction(logger *lg)
                 * return GDK_FAIL */
                switch (l.flag) {
                case LOG_START:
-                       if (l.id > lg->tid)
+                       if (l.id > lg->tid) // TODO: check that this can only 
happen during initialisation
                                lg->tid = l.id;
                        if ((tr = tr_create(tr, l.id)) == NULL) {
                                err = LOG_ERR;
@@ -2113,12 +2119,10 @@ log_load(int debug, const char *fn, cons
        logbat_destroy(lg->seqs_id);
        logbat_destroy(lg->seqs_val);
        logbat_destroy(lg->dseqs);
-       ATOMIC_DESTROY(&lg->refcount);
+       ATOMIC_DESTROY(&lg->current->refcount);
+       ATOMIC_DESTROY(&lg->nr_flushers);
        MT_lock_destroy(&lg->lock);
        MT_lock_destroy(&lg->rotation_lock);
-       MT_sema_destroy(&lg->flush_queue_semaphore);
-       MT_lock_destroy(&lg->flush_lock);
-       MT_lock_destroy(&lg->flush_queue_lock);
        GDKfree(lg->fn);
        GDKfree(lg->dir);
        GDKfree(lg->rbuf);
@@ -2156,8 +2160,6 @@ log_new(int debug, const char *fn, const
                .funcdata = funcdata,
 
                .id = 0,
-               .drops = 0,
-               .end = 0,
                .saved_id = getBBPlogno(),              /* get saved log numer 
from bbp */
                .saved_tid = (int)getBBPtransid(),      /* get saved 
transaction id from bbp */
        };
@@ -2188,16 +2190,11 @@ log_new(int debug, const char *fn, const
                fprintf(stderr, "#log_new dir set to %s\n", lg->dir);
        }
 
-       ATOMIC_INIT(&lg->refcount, 0);
        MT_lock_init(&lg->lock, fn);
        MT_lock_init(&lg->rotation_lock, "rotation_lock");
-       MT_sema_init(&lg->flush_queue_semaphore, FLUSH_QUEUE_SIZE, 
"flush_queue_semaphore");
        MT_lock_init(&lg->flush_lock, "flush_lock");
-       MT_lock_init(&lg->flush_queue_lock, "flush_queue_lock");
-
-       // flush variables
-       lg->flush_queue_begin = 0;
-       lg->flush_queue_length = 0;
+       MT_cond_init(&lg->excl_flush_cv);
+       ATOMIC_INIT(&lg->nr_flushers, 0);
 
        if (log_load(debug, fn, logdir, lg, filename) == GDK_SUCCEED) {
                return lg;
@@ -2205,9 +2202,46 @@ log_new(int debug, const char *fn, const
        return NULL;
 }
 
+static logged_range*
+do_flush_range_cleanup(logger* lg) {
+       rotation_lock(lg);
+       logged_range* frange = lg->flush_ranges;
+       logged_range* first = frange;
+
+       while ( frange->next) {
+               if (ATOMIC_GET(&frange->refcount) > 1)
+                       break;
+               frange = frange->next;
+       }
+       if (first == frange) {
+               rotation_unlock(lg);
+               return first;
+       }
+
+       logged_range* flast = frange;
+
+       lg->flush_ranges = flast;
+       rotation_unlock(lg);
+
+       for (frange = first; frange && frange != flast; frange = frange->next) {
+               (void) ATOMIC_DEC(&frange->refcount);
+               if (!LOG_DISABLED(lg)) {
+                       close_stream(frange->output_log);
+                       frange->output_log = NULL;
+               }
+       }
+
+       return flast;
+}
+
 void
 log_destroy(logger *lg)
 {
+       log_close_input(lg);
+       logged_range* last = do_flush_range_cleanup(lg);
+       (void) last;
+       assert(last == lg->current && last == lg->flush_ranges);
+       log_close_output(lg);
        for (logged_range *p = lg->pending; p; ){
                logged_range *n = p->next;
                GDKfree(p);
@@ -2242,18 +2276,14 @@ log_destroy(logger *lg)
                logbat_destroy(lg->catalog_lid);
                log_unlock(lg);
        }
-       ATOMIC_DESTROY(&lg->refcount);
        MT_lock_destroy(&lg->lock);
        MT_lock_destroy(&lg->rotation_lock);
-       MT_sema_destroy(&lg->flush_queue_semaphore);
        MT_lock_destroy(&lg->flush_lock);
-       MT_lock_destroy(&lg->flush_queue_lock);
+       ATOMIC_DESTROY(&lg->nr_flushers);
        GDKfree(lg->fn);
        GDKfree(lg->dir);
        GDKfree(lg->rbuf);
        GDKfree(lg->wbuf);
-       log_close_input(lg);
-       log_close_output(lg);
        GDKfree(lg);
 }
 
@@ -2277,10 +2307,17 @@ log_create(int debug, const char *fn, co
                log_destroy(lg);
                return NULL;
        }
+       assert(lg->current == NULL);
+       logged_range dummy = {0};
+       lg->current = &dummy;
        if (log_open_output(lg) != GDK_SUCCEED) {
                log_destroy(lg);
                return NULL;
        }
+       lg->current = lg->current->next;
+       assert(lg->pending == NULL && lg->flush_ranges == NULL);
+       lg->pending                     = lg->current;
+       lg->flush_ranges        = lg->current;
        return lg;
 }
 
@@ -2290,9 +2327,9 @@ log_next_logfile(logger *lg, ulng ts)
        int m = (GDKdebug & FORCEMITOMASK)?1000:100;
        if (!lg->pending || !lg->pending->next)
                return 0;
-       if (lg->pending != lg->current && lg->pending->last_ts <= ts) {
+       if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending != 
lg->current && (ulng) ATOMIC_GET(&lg->pending->last_ts) <= ts) {
                logged_range *p = lg->pending;
-               for(int i = 1; i<m && p->next && p->next != lg->current && 
p->last_ts <= ts; i++)
+               for(int i = 1; i<m && ATOMIC_GET(&p->refcount) == 0 && p->next 
&& p->next != lg->current && (ulng) ATOMIC_GET(&p->last_ts) <= ts; i++)
                        p = p->next;
                return p->id;
        }
@@ -2302,7 +2339,6 @@ log_next_logfile(logger *lg, ulng ts)
 static void
 log_cleanup_range(logger *lg, ulng id)
 {
-       log_lock(lg);
        while (lg->pending && lg->pending->id <= id) {
                logged_range *p;
                p = lg->pending;
@@ -2310,23 +2346,19 @@ log_cleanup_range(logger *lg, ulng id)
                        lg->pending = p->next;
                GDKfree(p);
        }
-       log_unlock(lg);
 }
 
 gdk_return
 log_activate(logger *lg)
 {
        gdk_return res = GDK_SUCCEED;
-       MT_lock_set(&lg->rotation_lock);
-       log_lock(lg);
-       if (lg->drops > 100000 && lg->end > 0 && lg->saved_id+1 == lg->id) {
+       rotation_lock(lg);
+       if (!lg->current->next && lg->current->drops > 100000 && ((ulng) 
ATOMIC_GET(&lg->current->end) - (ulng) ATOMIC_GET(&lg->current->pend)) > 0 && 
lg->saved_id+1 == lg->id) {
                lg->id++;
-               log_close_output(lg);
                /* start new file */
                res = log_open_output(lg);
        }
-       log_unlock(lg);
-       MT_lock_unset(&lg->rotation_lock);
+       rotation_unlock(lg);
        return res;
 }
 
@@ -2345,9 +2377,9 @@ log_flush(logger *lg, ulng ts)
        }
        if (lg->saved_id >= lid)
                return GDK_SUCCEED;
-       MT_lock_set(&lg->rotation_lock);
+       rotation_lock(lg);
        ulng lgid = lg->id;
-       MT_lock_unset(&lg->rotation_lock);
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to