Changeset: 428672c49b28 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/428672c49b28
Modified Files:
gdk/gdk_logger.c
gdk/gdk_logger.h
gdk/gdk_logger_internals.h
sql/storage/bat/bat_logger.c
sql/storage/sql_storage.h
sql/storage/store.c
Branch: pax-log
Log Message:
First working version of log_tcommit:
Use the commit_queue to minimize the required WAL commit messages
and second flushes.
diffs (236 lines):
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -1265,6 +1265,10 @@ log_read_transaction(logger *lg)
}
}
break;
+ case LOG_COMMIT:
+ assert(l.id > 0);
+ // TODO
+ break;
default:
err = LOG_ERR;
}
@@ -2198,6 +2202,7 @@ log_load(int debug, const char *fn, cons
MT_lock_destroy(&lg->rotation_lock);
MT_lock_destroy(&lg->flush_lock);
log_queue_destroy(&lg->flush_queue);
+ log_queue_destroy(&lg->commit_queue);
GDKfree(lg->fn);
GDKfree(lg->dir);
GDKfree(lg->local_dir);
@@ -2267,6 +2272,7 @@ log_new(int debug, const char *fn, const
MT_lock_init(&lg->flush_lock, "flush_lock");
log_queue_initialize(&lg->flush_queue, "flush_queue_semaphore",
"flush_queue_lock");
+ log_queue_initialize(&lg->commit_queue, "commit_queue_semaphore",
"commit_queue_lock");
if (log_load(debug, fn, logdir, lg, filename) == GDK_SUCCEED) {
return lg;
@@ -2316,6 +2322,7 @@ log_destroy(logger *lg)
MT_lock_destroy(&lg->rotation_lock);
MT_lock_destroy(&lg->flush_lock);
log_queue_destroy(&lg->flush_queue);
+ log_queue_destroy(&lg->commit_queue);
GDKfree(lg->fn);
GDKfree(lg->dir);
GDKfree(lg->buf);
@@ -2937,8 +2944,8 @@ log_tend(logger *lg)
l.flag = LOG_END;
l.id = lg->tid;
- if ((result = log_write_format(lg, &l)) != GDK_SUCCEED)
- (void) ATOMIC_DEC(&lg->refcount);
+ result = log_write_format(lg, &l);
+ (void) ATOMIC_DEC(&lg->refcount);
return result;
}
@@ -2956,7 +2963,7 @@ log_tdone(logger *lg, ulng commit_ts)
}
gdk_return
-log_tcommit(logger *lg, ulng commit_ts)
+log_tcommit(logger *lg, ulng commit_ts, unsigned int commit_queue_number)
{
if (lg->debug & 1)
fprintf(stderr, "#log_tcommit " LLFMT "\n", commit_ts);
@@ -2968,25 +2975,44 @@ log_tcommit(logger *lg, ulng commit_ts)
return GDK_SUCCEED;
}
- gdk_return result;
- logformat l;
- l.flag = LOG_COMMIT;
- l.id = 0; // number of transactions to be committed;
-
- if ((result = log_write_format(lg, &l)) != GDK_SUCCEED) {
+ log_queue* commit_queue = &lg->commit_queue;
+
+ if (log_queue_has_number(commit_queue, commit_queue_number)) {
+
+ log_queue* cq = &lg->commit_queue;
+
+ const int cql = log_queue_length(cq);
+ gdk_return result;
+ logformat l;
+ l.flag = LOG_COMMIT;
+ l.id = cql; // number of transactions to be committed;
+
+ /* if the log file being rotated at the moment,
+ * wait for it to finish*/
+ MT_lock_set(&lg->rotation_lock);
+ (void) ATOMIC_INC(&lg->refcount);
+ MT_lock_unset(&lg->rotation_lock);
+
+ if ((result = log_write_format(lg, &l)) != GDK_SUCCEED) {
+ (void) ATOMIC_DEC(&lg->refcount);
+ return result;
+ }
+ else {
+ log_queue_truncate_left(cq, cql);
+ }
+
(void) ATOMIC_DEC(&lg->refcount);
- return result;
}
return GDK_SUCCEED;
}
gdk_return
-log_tflush(logger* lg, ulng log_file_id, ulng commit_ts) {
+log_tflush(logger* lg, ulng log_file_id, ulng commit_ts, unsigned int*
commit_queue_number) {
if (lg->flushnow) {
lg->flushnow = 0;
- if (!commit_ts) log_tdone(lg, commit_ts); // TODO: check if
+ if (!commit_queue_number) log_tdone(lg, commit_ts); // TODO:
check if
return log_commit(lg);
}
@@ -2994,8 +3020,6 @@ log_tflush(logger* lg, ulng log_file_id,
return GDK_SUCCEED;
}
- (void) ATOMIC_DEC(&lg->refcount);
-
MT_lock_set(&lg->rotation_lock);
if (log_file_id == lg->id) { // TODO: introduce lg->flushed and get rid
of log_file_id in signature
MT_lock_unset(&lg->rotation_lock);
@@ -3028,8 +3052,11 @@ log_tflush(logger* lg, ulng log_file_id,
* no need to do anything */
MT_lock_unset(&lg->rotation_lock);
-
- if (!commit_ts) log_tdone(lg, commit_ts);
+ if (commit_queue_number) {
+ *commit_queue_number =
log_queue_request_number(&lg->commit_queue);
+ }
+
+ if (!commit_queue_number) log_tdone(lg, commit_ts);
MT_lock_unset(&lg->flush_lock);
// TODO: request number for commit message queue.
diff --git a/gdk/gdk_logger.h b/gdk/gdk_logger.h
--- a/gdk/gdk_logger.h
+++ b/gdk/gdk_logger.h
@@ -73,8 +73,8 @@ gdk_export gdk_return log_bat_group_end(
gdk_export gdk_return log_tstart(logger *lg, bool flushnow, ulng *log_file_id);
gdk_export gdk_return log_tend(logger *lg);
-gdk_export gdk_return log_tflush(logger *lg, ulng log_file_id, ulng
commit_ts); /* Flush the WAL to disk using group commit */
-gdk_export gdk_return log_tcommit(logger *lg, ulng commit_ts); /* Flush the
WAL to disk using group commit */
+gdk_export gdk_return log_tflush(logger *lg, ulng log_file_id, ulng commit_ts,
unsigned int* commit_queue_number); /* Flush the WAL to disk using group commit
*/
+gdk_export gdk_return log_tcommit(logger *lg, ulng commit_ts, unsigned int
commit_queue_number);
gdk_export gdk_return log_tsequence(logger *lg, int seq, lng id);
gdk_export log_bid log_find_bat(logger *lg, log_id id);
diff --git a/gdk/gdk_logger_internals.h b/gdk/gdk_logger_internals.h
--- a/gdk/gdk_logger_internals.h
+++ b/gdk/gdk_logger_internals.h
@@ -53,7 +53,7 @@ struct logger {
stream *input_log; /* current stream to flush */
lng end; /* end of pre-allocated blocks for faster
f(data)sync */ // TODO: only incremen when actual files writes occur.
- ATOMIC_TYPE refcount; /* Number of active writers and flushers in the
logger */ // TODO check refcount in c->log and c->end
+ ATOMIC_TYPE refcount; /* Number of active writers in the logger */ //
TODO check if atomicity + rotation_lock is redundant, check refcount in c->log
and c->end
MT_Lock rotation_lock;
MT_Lock lock;
MT_Lock flush_lock; /* so only one transaction can flush to disk at any
given time */
@@ -79,6 +79,7 @@ struct logger {
size_t bufsize;
log_queue flush_queue;
+ log_queue commit_queue;
};
struct old_logger {
diff --git a/sql/storage/bat/bat_logger.c b/sql/storage/bat/bat_logger.c
--- a/sql/storage/bat/bat_logger.c
+++ b/sql/storage/bat/bat_logger.c
@@ -3150,14 +3150,14 @@ bl_tend(sqlstore *store)
}
static int
-bl_tflush(sqlstore *store, ulng log_file_id, ulng commit_ts)
+bl_tflush(sqlstore *store, ulng log_file_id, ulng commit_ts, unsigned int*
commit_queue_number)
{
- return log_tflush(store->logger, log_file_id, commit_ts) == GDK_SUCCEED
? LOG_OK : LOG_ERR;
+ return log_tflush(store->logger, log_file_id, commit_ts,
commit_queue_number) == GDK_SUCCEED ? LOG_OK : LOG_ERR;
}
static int
-bl_tcommit(sqlstore *store, ulng commit_ts) {
- return log_tcommit(store->logger, commit_ts) == GDK_SUCCEED ? LOG_OK :
LOG_ERR;
+bl_tcommit(sqlstore *store, ulng commit_ts, unsigned int commit_queue_number) {
+ return log_tcommit(store->logger, commit_ts, commit_queue_number) ==
GDK_SUCCEED ? LOG_OK : LOG_ERR;
}
static int
diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h
--- a/sql/storage/sql_storage.h
+++ b/sql/storage/sql_storage.h
@@ -282,8 +282,8 @@ typedef int (*logger_get_sequence_fptr)
typedef int (*log_isnew_fptr)(struct sqlstore *store);
typedef int (*log_tstart_fptr) (struct sqlstore *store, bool flush, ulng
*log_file_id);
typedef int (*log_tend_fptr) (struct sqlstore *store);
-typedef int (*log_tflush_fptr) (struct sqlstore *store, ulng log_file_id, ulng
commit_ts);
-typedef int (*log_tcommit_fptr) (struct sqlstore *store, ulng commit_ts);
+typedef int (*log_tflush_fptr) (struct sqlstore *store, ulng log_file_id, ulng
commit_ts, unsigned int* commit_queue_number);
+typedef int (*log_tcommit_fptr) (struct sqlstore *store, ulng commit_ts,
unsigned int commit_queue_number);
typedef lng (*log_save_id_fptr) (struct sqlstore *store);
typedef int (*log_tsequence_fptr) (struct sqlstore *store, int seq, lng id);
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -3986,17 +3986,23 @@ sql_trans_commit(sql_trans *tr)
store_unlock(store);
/* flush the log structure */
if (log) {
+ unsigned int commit_queue_number;
if (!flush)
MT_lock_unset(&store->commit); /* release the
commit log when flushing to disk */
if (ok == LOG_OK)
- ok = store->logger_api.log_tflush(store,
log_file_id, commit_ts); /* first flush/sync */
+ ok = store->logger_api.log_tflush(store,
log_file_id, commit_ts, &commit_queue_number); /* first flush/sync */
if (!flush)
MT_lock_set(&store->commit);
if (ok == LOG_OK)
- ok = store->logger_api.log_tcommit(store,
commit_ts); /* write final commit and */
- if (flush)
+ ok = store->logger_api.log_tcommit(store,
commit_ts, commit_queue_number); /* write final commit message */
+ if (!flush)
+ MT_lock_unset(&store->commit);
+ if (ok == LOG_OK)
+ ok = store->logger_api.log_tflush(store,
log_file_id, commit_ts, NULL); /* second flush/sync */
+ if (!flush)
+ MT_lock_set(&store->commit);
+ if (flush)
MT_lock_unset(&store->flush);
-
}
MT_lock_unset(&store->commit);
list_destroy(tr->changes);
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]