Changeset: bb09d2c561ea for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/bb09d2c561ea
Modified Files:
gdk/gdk_logger.c
Branch: logger-cleanup
Log Message:
more cleanups
diffs (164 lines):
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -2231,9 +2231,42 @@ log_new(int debug, const char *fn, const
return NULL;
}
+static logged_range*
+do_flush_range_cleanup(logger* lg) {
+
+ logged_range* frange = ATOMIC_PTR_GET(&lg->flush_ranges);
+ logged_range* first = frange;
+
+ while ( frange->next) {
+ if (ATOMIC_GET(&frange->refcount) > 1)
+ break;
+ frange = frange->next;
+ }
+ if (first == frange)
+ return first;
+
+ logged_range* flast = frange;
+ if (ATOMIC_PTR_CAS(&lg->flush_ranges, &first, flast)) {
+ for (frange = first; frange && frange != flast; frange =
frange->next) {
+ (void) ATOMIC_DEC(&frange->refcount);
+ close_stream(frange->output_log);
+ frange->output_log = NULL;
+ frange = frange->next;
+ }
+ }
+ /* else some other flusher is cleaning up the flush ranges*/
+
+ return flast;
+}
+
void
log_destroy(logger *lg)
{
+ log_close_input(lg);
+ logged_range* last = do_flush_range_cleanup(lg);
+ (void) last;
+ assert(last == lg->current && last == (logged_range*)
ATOMIC_PTR_GET(&lg->flush_ranges));
+ log_close_output(lg);
for (logged_range *p = lg->pending; p; ){
logged_range *n = p->next;
GDKfree(p);
@@ -2268,7 +2301,6 @@ log_destroy(logger *lg)
logbat_destroy(lg->catalog_lid);
log_unlock(lg);
}
- ATOMIC_DESTROY(&lg->current->refcount);
MT_lock_destroy(&lg->lock);
MT_lock_destroy(&lg->rotation_lock);
MT_lock_destroy(&lg->flush_lock);
@@ -2277,8 +2309,6 @@ log_destroy(logger *lg)
GDKfree(lg->fn);
GDKfree(lg->dir);
GDKfree(lg->buf);
- log_close_input(lg);
- log_close_output(lg);
GDKfree(lg);
}
@@ -2348,9 +2378,8 @@ log_activate(logger *lg)
{
gdk_return res = GDK_SUCCEED;
rotation_lock(lg);
- if (lg->current->drops > 100000 && ((ulng)
ATOMIC_GET(&lg->current->end) - (ulng) ATOMIC_GET(&lg->current->pend)) > 0 &&
lg->saved_id+1 == lg->id) {
+ if (!lg->current->next && lg->current->drops > 100000 && ((ulng)
ATOMIC_GET(&lg->current->end) - (ulng) ATOMIC_GET(&lg->current->pend)) > 0 &&
lg->saved_id+1 == lg->id) {
lg->id++;
- log_close_output(lg);
/* start new file */
res = log_open_output(lg);
}
@@ -2504,7 +2533,6 @@ log_constant(logger *lg, int type, ptr v
}
ok = wt(val, lg->current->output_log, 1);
- log_unlock(lg);
if (lg->debug & 1)
fprintf(stderr, "#Logged %d " LLFMT " inserts\n", id, nr);
@@ -2938,29 +2966,9 @@ do_rotate(logger *lg) {
logged_range* pcurrent = lg->current;
assert(ATOMIC_GET(&pcurrent->refcount) > 0);
lg->current = lg->current->next;
- (void) ATOMIC_DEC(&pcurrent->refcount);
}
}
-static logged_range*
-do_flush_range_cleanup(logger* lg) {
-
- logged_range* frange = ATOMIC_PTR_GET(&lg->flush_ranges);
- while ( frange->next) {
- if (ATOMIC_GET(&frange->refcount))
- break;
- if (ATOMIC_PTR_CAS(&lg->flush_ranges, &frange, frange->next)) {
- close_stream(frange->output_log);
- frange->output_log = NULL;
- frange = frange->next;
- }
- else /*some other flusher is cleaning up the flush ranges*/
- break;
- }
-
- return frange;
-}
-
gdk_return
log_tflush(logger* lg, ulng log_file_id, ulng commit_ts) {
@@ -2977,8 +2985,8 @@ log_tflush(logger* lg, ulng log_file_id,
GDKfatal("Could not create new log file\n"); // TODO:
does not have to be fatal (yet)
do_rotate(lg);
(void) ATOMIC_DEC(&frange->refcount);
+ (void) do_flush_range_cleanup(lg);
assert(ATOMIC_GET(&frange->refcount) == 0);
- (void) do_flush_range_cleanup(lg);
return log_commit(lg);
}
@@ -3198,17 +3206,18 @@ log_tstart(logger *lg, bool flushnow, ul
/* I am waiting until all existing flushers are done */
MT_cond_wait(&lg->excl_flush_cv, &lg->rotation_lock);
}
- lg->flushnow = flushnow;
assert(ATOMIC_GET(&lg->nr_flushers) == 0);
ulng end = ATOMIC_GET(¤t->end);
assert(!ATOMIC_GET(¤t->flushed_end) ||
ATOMIC_GET(¤t->flushed_end) == end);
if (ATOMIC_GET(¤t->pend) < end) {
lg->id++;
- log_open_output(lg);
+ if (log_open_output(lg) != GDK_SUCCEED)
+ GDKfatal("Could not create new log file\n"); //
TODO: does not have to be fatal (yet)
}
+ lg->flushnow = flushnow;
}
- if (check_rotation_conditions(lg)) {
+ else if (check_rotation_conditions(lg)) {
lg->id++;
if (log_open_output(lg) != GDK_SUCCEED)
GDKfatal("Could not create new log file\n"); // TODO:
does not have to be fatal (yet)
@@ -3216,8 +3225,10 @@ log_tstart(logger *lg, bool flushnow, ul
do_rotate(lg);
rotation_unlock(lg);
- if (lg->flushnow && lg->saved_id+1 < lg->id)
+ if (lg->flushnow && lg->saved_id+1 < lg->id) {
+ (void) do_flush_range_cleanup(lg);
log_flush(lg, (1ULL<<63));
+ }
(void) ATOMIC_INC(&lg->current->refcount);
(void) ATOMIC_INC(&lg->current->end);
@@ -3236,7 +3247,5 @@ log_tstart(logger *lg, bool flushnow, ul
return GDK_FAIL;
}
- log_unlock(lg);
- rotation_unlock(lg);
return GDK_SUCCEED;
}
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]