Changeset: 9cfebdb5fbe1 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/9cfebdb5fbe1
Modified Files:
gdk/gdk_logger.c
gdk/gdk_logger_internals.h
gdk/gdk_logger_old.c
Branch: logger-cleanup
Log Message:
fix flush range cleanup
diffs (206 lines):
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -1064,14 +1064,14 @@ static gdk_return
log_open_output(logger *lg)
{
logged_range *new_range =
(logged_range*)GDKmalloc(sizeof(logged_range));
- ATOMIC_INIT(&new_range->refcount, 1);
if (!new_range) {
TRC_CRITICAL(GDK, "allocation failure\n");
return GDK_FAIL;
}
-
+ ATOMIC_INIT(&new_range->refcount, 1);
+ ATOMIC_INIT(&new_range->last_ts, 0);
ATOMIC_INIT(&new_range->end, 0);
ATOMIC_INIT(&new_range->pend, 0);
ATOMIC_INIT(&new_range->flushed_end, 0);
@@ -1108,7 +1108,6 @@ log_open_output(logger *lg)
GDKfree(filename);
}
new_range->id = lg->id;
- new_range->last_ts = 0;
new_range->next = NULL;
logged_range* current = lg->current;
assert(current && current->next == NULL);
@@ -2323,9 +2322,9 @@ log_next_logfile(logger *lg, ulng ts)
int m = (GDKdebug & FORCEMITOMASK)?1000:100;
if (!lg->pending || !lg->pending->next)
return 0;
- if (lg->pending != lg->current && lg->pending->last_ts <= ts) {
+ if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending !=
lg->current && (ulng) ATOMIC_GET(&lg->pending->last_ts) <= ts) {
logged_range *p = lg->pending;
- for(int i = 1; i<m && p->next && p->next != lg->current &&
p->last_ts <= ts; i++)
+ for(int i = 1; i<m && ATOMIC_GET(&p->refcount) == 0 && p->next
&& p->next != lg->current && (ulng) ATOMIC_GET(&p->last_ts) <= ts; i++)
p = p->next;
return p->id;
}
@@ -2348,8 +2347,8 @@ gdk_return
log_activate(logger *lg)
{
gdk_return res = GDK_SUCCEED;
- rotation_lock(lg); /*protects against potential concurrent write in
log_tflush*/
- if (lg->current->drops > 100000 && ATOMIC_GET(&lg->current->end) > 0 &&
lg->saved_id+1 == lg->id) {
+ rotation_lock(lg);
+ if (lg->current->drops > 100000 && ((ulng)
ATOMIC_GET(&lg->current->end) - (ulng) ATOMIC_GET(&lg->current->pend)) > 0 &&
lg->saved_id+1 == lg->id) {
lg->id++;
log_close_output(lg);
/* start new file */
@@ -2863,7 +2862,7 @@ check_rotation_conditions(logger *lg) {
const lng log_large = (GDKdebug & FORCEMITOMASK)?LOG_MINI:LOG_LARGE;
return
(lg->saved_id+1 < lg->id && lg->current->drops > 100000) ||
- (p > log_large || (((lng) ATOMIC_GET(&lg->current->end) * 1024)
> log_large));
+ (p > log_large || (((lng) ATOMIC_GET(&lg->current->end) - (lng)
ATOMIC_GET(&lg->current->pend)) * 1024) > log_large);
}
gdk_return
@@ -2924,8 +2923,42 @@ log_tdone(logger* lg, logged_range *rang
if (lg->debug & 1)
fprintf(stderr, "#log_tdone " LLFMT "\n", commit_ts);
- if (range->last_ts < commit_ts)
- range->last_ts = commit_ts;
+ if ((ulng) ATOMIC_GET(&range->last_ts) < commit_ts)
+ ATOMIC_SET(&range->last_ts, commit_ts);
+}
+
+static void
+do_rotate(logger *lg) {
+ logged_range* next = lg->current->next;
+ if (next) {
+ assert(ATOMIC_GET(&next->refcount) == 1);
+ lng end = ATOMIC_GET(&lg->current->end);
+ ATOMIC_SET(&next->pend, end);
+ ATOMIC_SET(&next->end, end);
+ logged_range* pcurrent = lg->current;
+ assert(ATOMIC_GET(&pcurrent->refcount) > 0);
+ lg->current = lg->current->next;
+ (void) ATOMIC_DEC(&pcurrent->refcount);
+ }
+}
+
+static logged_range*
+do_flush_range_cleanup(logger* lg) {
+
+ logged_range* frange = ATOMIC_PTR_GET(&lg->flush_ranges);
+ while ( frange->next) {
+ if (ATOMIC_GET(&frange->refcount))
+ break;
+ if (ATOMIC_PTR_CAS(&lg->flush_ranges, &frange, frange->next)) {
+ close_stream(frange->output_log);
+ frange->output_log = NULL;
+ frange = frange->next;
+ }
+ else /*some other flusher is cleaning up the flush ranges*/
+ break;
+ }
+
+ return frange;
}
gdk_return
@@ -2933,32 +2966,27 @@ log_tflush(logger* lg, ulng log_file_id,
if (lg->flushnow) {
logged_range* frange = ATOMIC_PTR_GET(&lg->flush_ranges);
+ assert(frange == lg->current);
+ ulng end = ATOMIC_GET(&lg->current->end);
+ assert(end > ATOMIC_GET(&lg->current->pend));
+ ATOMIC_SET(&lg->current->flushed_end, end);
+ log_tdone(lg, lg->current, commit_ts);
+ lg->id++;
+ lg->flushnow = 0;
+ if (log_open_output(lg) != GDK_SUCCEED)
+ GDKfatal("Could not create new log file\n"); // TODO:
does not have to be fatal (yet)
+ do_rotate(lg);
(void) ATOMIC_DEC(&frange->refcount);
assert(ATOMIC_GET(&frange->refcount) == 0);
- assert(frange == lg->current);
- assert(ATOMIC_GET(&lg->current->end)
==ATOMIC_GET(&lg->current->pend));
- lg->flushnow = 0;
- if (log_commit(lg) == GDK_SUCCEED)
- log_tdone(lg, lg->current, commit_ts);
- else
- return GDK_FAIL;
+ (void) do_flush_range_cleanup(lg);
+ return log_commit(lg);
}
if (LOG_DISABLED(lg)) {
return GDK_SUCCEED;
}
- logged_range* frange = ATOMIC_PTR_GET(&lg->flush_ranges);
- logged_range* first = frange;
- while ( frange->next) {
- if (ATOMIC_GET(&frange->refcount))
- break;
- close_stream(frange->output_log);
- frange->output_log = NULL;
- frange = frange->next;
- }
- if (frange != first)
- ATOMIC_PTR_CAS(&lg->flush_ranges, &first, frange);
+ logged_range* frange = do_flush_range_cleanup(lg);
lng end = (lng) log_file_id;
while ((lng) ATOMIC_GET(&frange->end) < end) {
@@ -3156,20 +3184,6 @@ log_find_bat(logger *lg, log_id id)
}
-static inline void
-do_rotate(logger *lg) {
- logged_range* next = lg->current->next;
- if (next) {
- assert(ATOMIC_GET(&next->refcount) == 1);
- lng end = ATOMIC_GET(&lg->current->end);
- ATOMIC_SET(&next->pend, end);
- ATOMIC_SET(&next->end, end);
- logged_range* pcurrent = lg->current;
- assert(ATOMIC_GET(&pcurrent->refcount) > 0);
- lg->current = lg->current->next;
- (void) ATOMIC_DEC(&pcurrent->refcount);
- }
-}
gdk_return
log_tstart(logger *lg, bool flushnow, ulng *log_file_id)
@@ -3188,7 +3202,7 @@ log_tstart(logger *lg, bool flushnow, ul
assert(ATOMIC_GET(&lg->nr_flushers) == 0);
ulng end = ATOMIC_GET(¤t->end);
- assert(ATOMIC_GET(¤t->flushed_end) == end);
+ assert(!ATOMIC_GET(¤t->flushed_end) ||
ATOMIC_GET(¤t->flushed_end) == end);
if (ATOMIC_GET(¤t->pend) < end) {
lg->id++;
log_open_output(lg);
diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h
--- a/gdk/gdk_logger_internals.h
+++ b/gdk/gdk_logger_internals.h
@@ -14,8 +14,8 @@
#define FLUSH_QUEUE_SIZE 2048 /* maximum size of the flush queue, i.e. maximum
number of transactions committing simultaneously */
typedef struct logged_range_t {
- ulng id; /* log file id */
- ulng last_ts; /* last stored timestamp */
+ ulng id; /* log file id */
+ ATOMIC_TYPE last_ts; /* last stored timestamp */
struct logged_range_t *next;
ATOMIC_TYPE refcount;
ATOMIC_TYPE end;
diff --git a/gdk/gdk_logger_old.c b/gdk/gdk_logger_old.c
--- a/gdk/gdk_logger_old.c
+++ b/gdk/gdk_logger_old.c
@@ -1658,7 +1658,6 @@ logger_load(const char *fn, char filenam
BBPrelease(bids[p]);
}
logbat_destroy(lg->del);
- GDKfree(lg->local_dir);
GDKfree(lg);
return GDK_FAIL;
}
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]