Changeset: 8fa6ad5bbe62 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/8fa6ad5bbe62
Modified Files:
sql/storage/store.c
Branch: iso
Log Message:
merged with jul2021
diffs (237 lines):
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -2017,7 +2017,9 @@ logger_destroy(logger *lg)
if (LOG_DISABLED(lg)) {
lg->saved_id = lg->id;
lg->saved_tid = lg->tid;
+ logger_lock(lg);
logger_commit(lg);
+ logger_unlock(lg);
}
if (lg->catalog_bid) {
logger_lock(lg);
@@ -2120,9 +2122,10 @@ logger_flush(logger *lg, ulng ts)
lg->saved_tid = lg->tid;
if (lid)
logger_cleanup_range(lg);
- if (logger_commit(lg) != GDK_SUCCEED) {
+ logger_lock(lg);
+ if (logger_commit(lg) != GDK_SUCCEED)
TRC_ERROR(GDK, "failed to commit");
- }
+ logger_unlock(lg);
return GDK_SUCCEED;
}
if (lg->saved_id >= lid)
@@ -2547,7 +2550,10 @@ log_tend(logger *lg, ulng commit_ts)
l.id = lg->tid;
if (lg->flushnow) {
lg->flushnow = 0;
- return logger_commit(lg);
+ logger_lock(lg);
+ gdk_return res = logger_commit(lg);
+ logger_unlock(lg);
+ return res;
}
if (lg->current) {
diff --git a/sql/backends/monet5/sql.c b/sql/backends/monet5/sql.c
--- a/sql/backends/monet5/sql.c
+++ b/sql/backends/monet5/sql.c
@@ -1277,7 +1277,7 @@ mvc_bind_wrap(Client cntxt, MalBlkPtr mb
sql_table *t = mvc_bind_table(m, s, tname);
sql_column *c = mvc_bind_column(m, t, cname);
b = mvc_bind(m, sname, tname, cname, access);
- if (b && b->ttype != coltype) {
+ if (b && b->ttype && b->ttype != coltype) {
BBPunfix(b->batCacheid);
throw(SQL,"sql.bind",SQLSTATE(42000) "Column type mismatch");
}
diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h
--- a/sql/storage/sql_storage.h
+++ b/sql/storage/sql_storage.h
@@ -446,6 +446,7 @@ typedef struct sqlstore {
ATOMIC_TYPE timestamp; /* timestamp counter */
ATOMIC_TYPE transaction;/* transaction id counter */
ulng oldest;
+ ulng oldest_pending;
int readonly; /* store is readonly */
int singleuser; /* store is for a single user only (==1
enable, ==2 single user session running) */
int first; /* just created the db */
@@ -465,6 +466,7 @@ typedef struct sqlstore {
typedef struct sql_change {
sql_base *obj;
+ ulng ts; /* commit/rollback timestamp */
void *data; /* data changes */
bool committed; /* commit or rollback */
bool handled; /* handled in commit */
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -58,6 +58,13 @@ store_oldest(sqlstore *store)
return store_oldest_given_max(store, TRANSACTION_ID_BASE);
}
+static ulng
+store_oldest_pending(sqlstore *store)
+{
+ assert(store->oldest_pending != TRANSACTION_ID_BASE);
+ return store->oldest_pending;
+}
+
static inline bool
instore(sqlid id)
{
@@ -1979,7 +1986,7 @@ store_apply_deltas(sqlstore *store)
flusher.working = true;
store_lock(store);
- ulng oldest = store_oldest(store);
+ ulng oldest = store_oldest_pending(store);
store_unlock(store);
if (oldest)
res = store->logger_api.flush(store, oldest-1);
@@ -1988,23 +1995,11 @@ store_apply_deltas(sqlstore *store)
}
-/* Call while holding store->flush */
-static void
-wait_until_flusher_idle(sqlstore *store)
-{
- while (flusher.working) {
- const int sleeptime = 100;
- MT_lock_unset(&store->lock);
- MT_sleep_ms(sleeptime);
- MT_lock_set(&store->lock);
- }
-}
void
store_suspend_log(sqlstore *store)
{
MT_lock_set(&store->lock);
flusher.enabled = false;
- wait_until_flusher_idle(store);
MT_lock_unset(&store->lock);
}
@@ -2019,6 +2014,7 @@ store_resume_log(sqlstore *store)
static void
store_pending_changes(sqlstore *store, ulng oldest)
{
+ ulng oldest_changes = TRANSACTION_ID_BASE;
if (!list_empty(store->changes)) { /* lets first cleanup old stuff */
for(node *n=store->changes->h; n; ) {
node *next = n->next;
@@ -2029,9 +2025,13 @@ store_pending_changes(sqlstore *store, u
} else if (c->cleanup && c->cleanup(store, c, oldest)) {
list_remove_node(store->changes, store, n);
_DELETE(c);
+ } else if (c->ts < oldest_changes) {
+ oldest_changes = c->ts;
}
n = next;
}
+ if (oldest_changes < TRANSACTION_ID_BASE)
+ store->oldest_pending = oldest_changes;
}
}
@@ -2046,25 +2046,24 @@ store_manager(sqlstore *store)
for (;;) {
int res;
- if (store->logger_api.changes(store) <= 0) {
+ if (ATOMIC_GET(&store->nr_active) == 0) {
+ store_lock(store);
if (ATOMIC_GET(&store->nr_active) == 0) {
- store_lock(store);
- if (ATOMIC_GET(&store->nr_active) == 0) {
- ulng oldest = store_timestamp(store)+1;
- store_pending_changes(store, oldest);
- }
- store->logger_api.activate(store); /* rotate
too new log file */
- store_unlock(store);
+ ulng oldest = store_timestamp(store)+1;
+ store_pending_changes(store, oldest);
}
- if (GDKexiting())
- break;
- const int sleeptime = 100;
- MT_lock_unset(&store->flush);
- MT_sleep_ms(sleeptime);
- flusher.countdown_ms -= sleeptime;
- MT_lock_set(&store->flush);
+ store->logger_api.activate(store); /* rotate too new
log file */
+ store_unlock(store);
+ }
+ if (GDKexiting())
+ break;
+ const int sleeptime = 100;
+ MT_lock_unset(&store->flush);
+ MT_sleep_ms(sleeptime);
+ flusher.countdown_ms -= sleeptime;
+ MT_lock_set(&store->flush);
+ if (store->logger_api.changes(store) <= 0)
continue;
- }
if (GDKexiting())
break;
@@ -2413,7 +2412,6 @@ store_hot_snapshot_to_stream(sqlstore *s
MT_lock_set(&store->flush);
MT_lock_set(&store->lock);
locked = 1;
- wait_until_flusher_idle(store);
if (GDKexiting())
goto end;
@@ -3206,7 +3204,6 @@ static void
sql_trans_rollback(sql_trans *tr)
{
sqlstore *store = tr->store;
- ulng commit_ts = 0; /* invalid ts, ie rollback */
if (tr->predicates) {
list_destroy(tr->predicates);
@@ -3232,26 +3229,15 @@ sql_trans_rollback(sql_trans *tr)
/* rollback */
ulng oldest = store_oldest(store);
+ ulng commit_ts = store_get_timestamp(store); /* use most recent
timestamp such that we can cleanup savely */
for(node *n=nl->h; n; n = n->next) {
sql_change *c = n->data;
if (c->commit)
- c->commit(tr, c, commit_ts, oldest);
+ c->commit(tr, c, 0 /* ie rollback */, oldest);
+ c->ts = commit_ts;
}
- if (!list_empty(store->changes)) { /* lets first cleanup old
stuff */
- for(node *n=store->changes->h; n; ) {
- node *next = n->next;
- sql_change *c = n->data;
-
- if (!c->cleanup) {
- _DELETE(c);
- } else if (c->cleanup && c->cleanup(store, c,
oldest)) {
- list_remove_node(store->changes, store,
n);
- _DELETE(c);
- }
- n = next;
- }
- }
+ store_pending_changes(store, oldest);
for(node *n=nl->h; n; n = n->next) {
sql_change *c = n->data;
@@ -3452,6 +3438,7 @@ sql_trans_commit(sql_trans *tr)
ok = c->commit(tr, c, commit_ts, oldest);
else
c->obj->flags = 0;
+ c->ts = commit_ts;
}
/* flush logger after changes got applied */
if (ok == LOG_OK && flush)
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list