Changeset: ae4fb3b23216 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/ae4fb3b23216
Branch: Sep2022
Log Message:
merge with logger-fix
diffs (truncated from 1095 to 300 lines):
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -235,8 +235,8 @@ log_write_format(logger *lg, logformat *
{
assert(data->id || data->flag);
assert(!lg->inmemory);
- if (mnstr_write(lg->output_log, &data->flag, 1, 1) == 1 &&
- mnstr_writeInt(lg->output_log, data->id))
+ if (mnstr_write(lg->current->output_log, &data->flag, 1, 1) == 1 &&
+ mnstr_writeInt(lg->current->output_log, data->id))
return GDK_SUCCEED;
TRC_CRITICAL(GDK, "write failed\n");
return GDK_FAIL;
@@ -281,7 +281,7 @@ log_write_id(logger *lg, int id)
{
assert(!lg->inmemory);
assert(id >= 0);
- if (mnstr_writeInt(lg->output_log, id))
+ if (mnstr_writeInt(lg->current->output_log, id))
return GDK_SUCCEED;
TRC_CRITICAL(GDK, "write failed\n");
return GDK_FAIL;
@@ -1038,6 +1038,16 @@ log_create_types_file(logger *lg, const
return GDK_SUCCEED;
}
+static inline void
+rotation_lock(logger *lg) {
+ MT_lock_set(&lg->rotation_lock);
+}
+
+static inline void
+rotation_unlock(logger *lg) {
+ MT_lock_unset(&lg->rotation_lock);
+}
+
static gdk_return
log_open_output(logger *lg)
{
@@ -1047,9 +1057,12 @@ log_open_output(logger *lg)
TRC_CRITICAL(GDK, "allocation failure\n");
return GDK_FAIL;
}
-
- lg->end = 0;
- lg->drops = 0;
+ ATOMIC_INIT(&new_range->refcount, 1);
+ ATOMIC_INIT(&new_range->last_ts, 0);
+ ATOMIC_INIT(&new_range->end, 0);
+ ATOMIC_INIT(&new_range->pend, 0);
+ ATOMIC_INIT(&new_range->flushed_end, 0);
+ ATOMIC_INIT(&new_range->drops, 0);
if (!LOG_DISABLED(lg)) {
char id[32];
char *filename;
@@ -1067,14 +1080,13 @@ log_open_output(logger *lg)
if (lg->debug & 1)
fprintf(stderr, "#log_open_output: %s.%s\n", LOGFILE,
id);
- lg->output_log = open_wstream(filename);
- if (lg->output_log) {
+ new_range->output_log = open_wstream(filename);
+ if (new_range->output_log) {
short byteorder = 1234;
- mnstr_write(lg->output_log, &byteorder,
sizeof(byteorder), 1);
+ mnstr_write(new_range->output_log, &byteorder,
sizeof(byteorder), 1);
}
- lg->end = 0;
-
- if (lg->output_log == NULL || mnstr_errnr(lg->output_log) !=
MNSTR_NO__ERROR) {
+
+ if (new_range->output_log == NULL ||
mnstr_errnr(new_range->output_log) != MNSTR_NO__ERROR) {
TRC_CRITICAL(GDK, "creating %s failed: %s\n", filename,
mnstr_peek_error(NULL));
GDKfree(new_range);
GDKfree(filename);
@@ -1083,13 +1095,10 @@ log_open_output(logger *lg)
GDKfree(filename);
}
new_range->id = lg->id;
- new_range->last_ts = 0;
new_range->next = NULL;
- if (lg->current)
- lg->current->next = new_range;
- lg->current = new_range;
- if (!lg->pending)
- lg->pending = new_range;
+ logged_range* current = lg->current;
+ assert(current && current->next == NULL);
+ current->next = new_range;
return GDK_SUCCEED;
}
@@ -1104,12 +1113,9 @@ log_close_input(logger *lg)
static inline void
log_close_output(logger *lg)
{
- if (lg->flushing_output_log)
- return;
-
if (!LOG_DISABLED(lg))
- close_stream(lg->output_log);
- lg->output_log = NULL;
+ close_stream(lg->current->output_log);
+ lg->current->output_log = NULL;
}
static gdk_return
@@ -1184,7 +1190,7 @@ log_read_transaction(logger *lg)
* return GDK_FAIL */
switch (l.flag) {
case LOG_START:
- if (l.id > lg->tid)
+ if (l.id > lg->tid) // TODO: check that this can only
happen during initialisation
lg->tid = l.id;
if ((tr = tr_create(tr, l.id)) == NULL) {
err = LOG_ERR;
@@ -2113,12 +2119,10 @@ log_load(int debug, const char *fn, cons
logbat_destroy(lg->seqs_id);
logbat_destroy(lg->seqs_val);
logbat_destroy(lg->dseqs);
- ATOMIC_DESTROY(&lg->refcount);
+ ATOMIC_DESTROY(&lg->current->refcount);
+ ATOMIC_DESTROY(&lg->nr_flushers);
MT_lock_destroy(&lg->lock);
MT_lock_destroy(&lg->rotation_lock);
- MT_sema_destroy(&lg->flush_queue_semaphore);
- MT_lock_destroy(&lg->flush_lock);
- MT_lock_destroy(&lg->flush_queue_lock);
GDKfree(lg->fn);
GDKfree(lg->dir);
GDKfree(lg->rbuf);
@@ -2156,8 +2160,6 @@ log_new(int debug, const char *fn, const
.funcdata = funcdata,
.id = 0,
- .drops = 0,
- .end = 0,
.saved_id = getBBPlogno(), /* get saved log numer
from bbp */
.saved_tid = (int)getBBPtransid(), /* get saved
transaction id from bbp */
};
@@ -2188,16 +2190,11 @@ log_new(int debug, const char *fn, const
fprintf(stderr, "#log_new dir set to %s\n", lg->dir);
}
- ATOMIC_INIT(&lg->refcount, 0);
MT_lock_init(&lg->lock, fn);
MT_lock_init(&lg->rotation_lock, "rotation_lock");
- MT_sema_init(&lg->flush_queue_semaphore, FLUSH_QUEUE_SIZE,
"flush_queue_semaphore");
MT_lock_init(&lg->flush_lock, "flush_lock");
- MT_lock_init(&lg->flush_queue_lock, "flush_queue_lock");
-
- // flush variables
- lg->flush_queue_begin = 0;
- lg->flush_queue_length = 0;
+ MT_cond_init(&lg->excl_flush_cv);
+ ATOMIC_INIT(&lg->nr_flushers, 0);
if (log_load(debug, fn, logdir, lg, filename) == GDK_SUCCEED) {
return lg;
@@ -2205,9 +2202,46 @@ log_new(int debug, const char *fn, const
return NULL;
}
+static logged_range*
+do_flush_range_cleanup(logger* lg) {
+ rotation_lock(lg);
+ logged_range* frange = lg->flush_ranges;
+ logged_range* first = frange;
+
+ while ( frange->next) {
+ if (ATOMIC_GET(&frange->refcount) > 1)
+ break;
+ frange = frange->next;
+ }
+ if (first == frange) {
+ rotation_unlock(lg);
+ return first;
+ }
+
+ logged_range* flast = frange;
+
+ lg->flush_ranges = flast;
+ rotation_unlock(lg);
+
+ for (frange = first; frange && frange != flast; frange = frange->next) {
+ (void) ATOMIC_DEC(&frange->refcount);
+ if (!LOG_DISABLED(lg)) {
+ close_stream(frange->output_log);
+ frange->output_log = NULL;
+ }
+ }
+
+ return flast;
+}
+
void
log_destroy(logger *lg)
{
+ log_close_input(lg);
+ logged_range* last = do_flush_range_cleanup(lg);
+ (void) last;
+ assert(last == lg->current && last == lg->flush_ranges);
+ log_close_output(lg);
for (logged_range *p = lg->pending; p; ){
logged_range *n = p->next;
GDKfree(p);
@@ -2242,18 +2276,14 @@ log_destroy(logger *lg)
logbat_destroy(lg->catalog_lid);
log_unlock(lg);
}
- ATOMIC_DESTROY(&lg->refcount);
MT_lock_destroy(&lg->lock);
MT_lock_destroy(&lg->rotation_lock);
- MT_sema_destroy(&lg->flush_queue_semaphore);
MT_lock_destroy(&lg->flush_lock);
- MT_lock_destroy(&lg->flush_queue_lock);
+ ATOMIC_DESTROY(&lg->nr_flushers);
GDKfree(lg->fn);
GDKfree(lg->dir);
GDKfree(lg->rbuf);
GDKfree(lg->wbuf);
- log_close_input(lg);
- log_close_output(lg);
GDKfree(lg);
}
@@ -2277,10 +2307,17 @@ log_create(int debug, const char *fn, co
log_destroy(lg);
return NULL;
}
+ assert(lg->current == NULL);
+ logged_range dummy = {0};
+ lg->current = &dummy;
if (log_open_output(lg) != GDK_SUCCEED) {
log_destroy(lg);
return NULL;
}
+ lg->current = lg->current->next;
+ assert(lg->pending == NULL && lg->flush_ranges == NULL);
+ lg->pending = lg->current;
+ lg->flush_ranges = lg->current;
return lg;
}
@@ -2290,9 +2327,9 @@ log_next_logfile(logger *lg, ulng ts)
int m = (GDKdebug & FORCEMITOMASK)?1000:100;
if (!lg->pending || !lg->pending->next)
return 0;
- if (lg->pending != lg->current && lg->pending->last_ts <= ts) {
+ if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending !=
lg->current && (ulng) ATOMIC_GET(&lg->pending->last_ts) <= ts) {
logged_range *p = lg->pending;
- for(int i = 1; i<m && p->next && p->next != lg->current &&
p->last_ts <= ts; i++)
+ for(int i = 1; i<m && ATOMIC_GET(&p->refcount) == 0 && p->next
&& p->next != lg->current && (ulng) ATOMIC_GET(&p->last_ts) <= ts; i++)
p = p->next;
return p->id;
}
@@ -2302,7 +2339,6 @@ log_next_logfile(logger *lg, ulng ts)
static void
log_cleanup_range(logger *lg, ulng id)
{
- log_lock(lg);
while (lg->pending && lg->pending->id <= id) {
logged_range *p;
p = lg->pending;
@@ -2310,23 +2346,19 @@ log_cleanup_range(logger *lg, ulng id)
lg->pending = p->next;
GDKfree(p);
}
- log_unlock(lg);
}
gdk_return
log_activate(logger *lg)
{
gdk_return res = GDK_SUCCEED;
- MT_lock_set(&lg->rotation_lock);
- log_lock(lg);
- if (lg->drops > 100000 && lg->end > 0 && lg->saved_id+1 == lg->id) {
+ rotation_lock(lg);
+ if (!lg->current->next && lg->current->drops > 100000 && ((ulng)
ATOMIC_GET(&lg->current->end) - (ulng) ATOMIC_GET(&lg->current->pend)) > 0 &&
lg->saved_id+1 == lg->id) {
lg->id++;
- log_close_output(lg);
/* start new file */
res = log_open_output(lg);
}
- log_unlock(lg);
- MT_lock_unset(&lg->rotation_lock);
+ rotation_unlock(lg);
return res;
}
@@ -2345,9 +2377,9 @@ log_flush(logger *lg, ulng ts)
}
if (lg->saved_id >= lid)
return GDK_SUCCEED;
- MT_lock_set(&lg->rotation_lock);
+ rotation_lock(lg);
ulng lgid = lg->id;
- MT_lock_unset(&lg->rotation_lock);
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]