Changeset: be6f3028804b for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/be6f3028804b
Modified Files:
gdk/gdk_logger.c
gdk/gdk_logger_internals.h
Branch: pax-log
Log Message:
Factor out log_queue implementation for reuse.
diffs (245 lines):
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -1845,6 +1845,69 @@ log_cleanup(logger *lg, lng id)
return GDK_SUCCEED;
}
+static void
+log_queue_destroy(log_queue* q) {
+ MT_sema_destroy(&q->sema);
+ MT_lock_destroy(&q->queue_lock);
+}
+
+static void
+log_queue_initialize(log_queue* q, const char* sema_name, const char*
queue_lock_name) {
+ q->begin = 0;
+ q->length = 0;
+ MT_sema_init(&q->sema, LOG_QUEUE_SIZE, sema_name);
+ MT_lock_init(&q->queue_lock, queue_lock_name);
+}
+
+static int
+log_queue_length(log_queue* q) {
+ MT_lock_set(&q->queue_lock);
+ const int fql = q->length;
+ MT_lock_unset(&q->queue_lock);
+ return fql;
+}
+
+static unsigned int
+log_queue_request_number(log_queue* q) {
+ // Semaphore protects ring buffer structure in queue against overflowing
+ static unsigned int _number = 0;
+ unsigned int result;
+ MT_sema_down(&q->sema);
+ MT_lock_set(&q->queue_lock);
+ result = ++_number;
+ const int end = (q->begin + q->length) % LOG_QUEUE_SIZE;
+ q->queue[end] = _number;
+ q->length++;
+ MT_lock_unset(&q->queue_lock);
+
+ return result;
+}
+
+static void
+log_queue_truncate_left(log_queue* q, int limit) {
+ MT_lock_set(&q->queue_lock);
+ q->begin = (q->begin + limit) % LOG_QUEUE_SIZE;
+ q->length -= limit;
+ MT_lock_unset(&q->queue_lock);
+
+ for (int i = 0; i < limit; i++)
+ MT_sema_up(&q->sema);
+}
+
+static int
+log_queue_has_number(log_queue* q, unsigned int number) {
+ MT_lock_set(&q->queue_lock);
+ const int fql = q->length;
+ MT_lock_unset(&q->queue_lock);
+ for (int i = 0; i < fql; i++) {
+ const int idx = (q->begin + i) % LOG_QUEUE_SIZE;
+ if (q->queue[idx] == number) {
+ return 1;
+ }
+ }
+ return 0;
+}
+
/* Load data from the logger logdir
* Initialize new directories and catalog files if none are present,
* unless running in read-only mode
@@ -2133,9 +2196,8 @@ log_load(int debug, const char *fn, cons
ATOMIC_DESTROY(&lg->refcount);
MT_lock_destroy(&lg->lock);
MT_lock_destroy(&lg->rotation_lock);
- MT_sema_destroy(&lg->flush_queue_semaphore);
MT_lock_destroy(&lg->flush_lock);
- MT_lock_destroy(&lg->flush_queue_lock);
+ log_queue_destroy(&lg->flush_queue);
GDKfree(lg->fn);
GDKfree(lg->dir);
GDKfree(lg->local_dir);
@@ -2202,13 +2264,9 @@ log_new(int debug, const char *fn, const
ATOMIC_INIT(&lg->refcount, 0);
MT_lock_init(&lg->lock, fn);
MT_lock_init(&lg->rotation_lock, "rotation_lock");
- MT_sema_init(&lg->flush_queue_semaphore, FLUSH_QUEUE_SIZE,
"flush_queue_semaphore");
MT_lock_init(&lg->flush_lock, "flush_lock");
- MT_lock_init(&lg->flush_queue_lock, "flush_queue_lock");
-
- // flush variables
- lg->flush_queue_begin = 0;
- lg->flush_queue_length = 0;
+
+ log_queue_initialize(&lg->flush_queue, "flush_queue_semaphore",
"flush_queue_lock");
if (log_load(debug, fn, logdir, lg, filename) == GDK_SUCCEED) {
return lg;
@@ -2256,9 +2314,8 @@ log_destroy(logger *lg)
ATOMIC_DESTROY(&lg->refcount);
MT_lock_destroy(&lg->lock);
MT_lock_destroy(&lg->rotation_lock);
- MT_sema_destroy(&lg->flush_queue_semaphore);
MT_lock_destroy(&lg->flush_lock);
- MT_lock_destroy(&lg->flush_queue_lock);
+ log_queue_destroy(&lg->flush_queue);
GDKfree(lg->fn);
GDKfree(lg->dir);
GDKfree(lg->buf);
@@ -2924,55 +2981,6 @@ log_tcommit(logger *lg, ulng commit_ts)
return GDK_SUCCEED;
}
-static unsigned int
-request_number_flush_queue(logger *lg) {
- // Semaphore protects ring buffer structure in queue against overflowing
- static unsigned int _number = 0;
- unsigned int result;
- MT_sema_down(&lg->flush_queue_semaphore);
- MT_lock_set(&lg->flush_queue_lock);
- result = ++_number;
- const int end = (lg->flush_queue_begin + lg->flush_queue_length) %
FLUSH_QUEUE_SIZE;
- lg->flush_queue[end] = _number;
- lg->flush_queue_length++;
- MT_lock_unset(&lg->flush_queue_lock);
-
- return result;
-}
-
-static void
-left_truncate_flush_queue(logger *lg, int limit) {
- MT_lock_set(&lg->flush_queue_lock);
- lg->flush_queue_begin = (lg->flush_queue_begin + limit) %
FLUSH_QUEUE_SIZE;
- lg->flush_queue_length -= limit;
- MT_lock_unset(&lg->flush_queue_lock);
-
- for (int i = 0; i < limit; i++)
- MT_sema_up(&lg->flush_queue_semaphore);
-}
-
-static int
-number_in_flush_queue(logger *lg, unsigned int number) {
- MT_lock_set(&lg->flush_queue_lock);
- const int fql = lg->flush_queue_length;
- MT_lock_unset(&lg->flush_queue_lock);
- for (int i = 0; i < fql; i++) {
- const int idx = (lg->flush_queue_begin + i) % FLUSH_QUEUE_SIZE;
- if (lg->flush_queue[idx] == number) {
- return 1;
- }
- }
- return 0;
-}
-
-static int
-flush_queue_length(logger *lg) {
- MT_lock_set(&lg->flush_queue_lock);
- const int fql = lg->flush_queue_length;
- MT_lock_unset(&lg->flush_queue_lock);
- return fql;
-}
-
gdk_return
log_tflush(logger* lg, ulng log_file_id, ulng commit_ts) {
@@ -2991,13 +2999,14 @@ log_tflush(logger* lg, ulng log_file_id,
MT_lock_set(&lg->rotation_lock);
if (log_file_id == lg->id) { // TODO: introduce lg->flushed and get rid
of log_file_id in signature
MT_lock_unset(&lg->rotation_lock);
- unsigned int number = request_number_flush_queue(lg);
+ log_queue* flush_queue = &lg->flush_queue;
+ unsigned int number = log_queue_request_number(flush_queue);
MT_lock_set(&lg->flush_lock);
/* the transaction is not yet flushed */
- if (number_in_flush_queue(lg, number)) {
+ if (log_queue_has_number(flush_queue, number)) {
/* number of transactions in the group commit */
- const int fqueue_length = flush_queue_length(lg);
+ const int fqueue_length = log_queue_length(flush_queue);
/* flush + fsync */
if (mnstr_flush(lg->output_log, MNSTR_FLUSH_DATA) ||
(!(GDKdebug & NOSYNCMASK) &&
mnstr_fsync(lg->output_log)) ||
@@ -3008,7 +3017,7 @@ log_tflush(logger* lg, ulng log_file_id,
}
else {
/* flush succeeded */
- left_truncate_flush_queue(lg, fqueue_length);
+ log_queue_truncate_left(flush_queue,
fqueue_length);
}
}
/* else the transaction was already flushed in a group commit.
diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h
--- a/gdk/gdk_logger_internals.h
+++ b/gdk/gdk_logger_internals.h
@@ -9,7 +9,7 @@
#ifndef _LOGGER_INTERNALS_H_
#define _LOGGER_INTERNALS_H_
-#define FLUSH_QUEUE_SIZE 2048 /* maximum size of the flush queue, i.e. maximum
number of transactions committing simultaneously */
+#define LOG_QUEUE_SIZE 2048 /* maximum size of the flush queue, i.e. maximum
number of transactions committing simultaneously */
typedef struct logged_range_t {
ulng id; /* log file id */
@@ -19,6 +19,15 @@ typedef struct logged_range_t {
struct logged_range_t *next;
} logged_range;
+typedef struct log_queue {
+ /* flush variables */
+ unsigned int queue[LOG_QUEUE_SIZE]; /* circular array with the current
transactions' ids waiting to be flushed */
+ int begin; /* start index of the queue */
+ int length; /* length of the queue */
+ MT_Sema sema; /*to protect the queue against ring buffer overflows */
+ MT_Lock queue_lock; /* to protect the queue against concurrent reads
and writes */
+} log_queue;
+
struct logger {
int debug;
int version;
@@ -47,6 +56,7 @@ struct logger {
ATOMIC_TYPE refcount; /* Number of active writers and flushers in the
logger */ // TODO check refcount in c->log and c->end
MT_Lock rotation_lock;
MT_Lock lock;
+ MT_Lock flush_lock; /* so only one transaction can flush to disk at any
given time */
/* Store log_bids (int) to circumvent trouble with reference counting */
BAT *catalog_bid; /* int bid column */
BAT *catalog_id; /* object identifier is unique */
@@ -68,13 +78,7 @@ struct logger {
void *buf;
size_t bufsize;
- /* flush variables */
- unsigned int flush_queue[FLUSH_QUEUE_SIZE]; /* circular array with the
current transactions' ids waiting to be flushed */
- int flush_queue_begin; /* start index of the queue */
- int flush_queue_length; /* length of the queue */
- MT_Sema flush_queue_semaphore; /*to protect the queue against ring
buffer overflows */
- MT_Lock flush_queue_lock; /* to protect the queue against concurrent
reads and writes */
- MT_Lock flush_lock; /* so only one transaction can flush to disk at any
given time */
+ log_queue flush_queue;
};
struct old_logger {
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]