Changeset: 0010fd8975fd for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0010fd8975fd
Modified Files:
sql/storage/bat/bat_storage.c
sql/storage/sql_storage.h
sql/storage/store.c
Branch: unlock
Log Message:
add some more locks to make sure logrotation and appending are exclusive
diffs (249 lines):
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -806,10 +806,12 @@ append_col(sql_trans *tr, sql_column *c,
bat = c->data;
/* appends only write */
bat->cs.wtime = c->base.atime = c->t->base.atime = c->t->s->base.atime
= tr->atime = tr->wstime;
+ lock_table(c->t->base.id);
if (tpe == TYPE_bat)
ok = delta_append_bat(bat, offset, i, c->t);
else
ok = delta_append_val(bat, offset, i, c->t);
+ unlock_table(c->t->base.id);
return ok;
}
@@ -829,10 +831,12 @@ append_idx(sql_trans *tr, sql_idx * i, s
bat = i->data;
/* appends only write */
bat->cs.wtime = i->base.atime = i->t->base.atime = i->t->s->base.atime
= tr->atime = tr->wstime;
+ lock_table(i->t->base.id);
if (tpe == TYPE_bat)
ok = delta_append_bat(bat, offset, ib, i->t);
else
ok = delta_append_val(bat, offset, ib, i->t);
+ unlock_table(i->t->base.id);
return ok;
}
@@ -1826,7 +1830,7 @@ minmax( sql_trans *tr )
}
static int
-tr_update_cs( sql_trans *tr, column_storage *ocs, column_storage *ccs)
+tr_update_cs( sql_trans *tr, column_storage *ocs, column_storage *ccs, BUN end)
{
int ok = LOG_OK;
BAT *cur = NULL;
@@ -1854,6 +1858,7 @@ tr_update_cs( sql_trans *tr, column_stor
cur = temp_descriptor(ocs->bid);
if(!cur)
return LOG_ERR;
+ assert(end <= BATcount(cur));
if (ccs->ucnt && ccs->uibid) {
assert(!cleared);
BAT *ui = temp_descriptor(ccs->uibid);
@@ -1902,9 +1907,9 @@ tr_update_cs( sql_trans *tr, column_stor
}
static int
-tr_update_delta( sql_trans *tr, sql_delta *obat, sql_delta *cbat)
+tr_update_delta( sql_trans *tr, sql_delta *obat, sql_delta *cbat, BUN end)
{
- int ok = tr_update_cs( tr, &obat->cs, &cbat->cs);
+ int ok = tr_update_cs( tr, &obat->cs, &cbat->cs, end);
if (ok == LOG_OK && obat->next) {
ok = destroy_bat(obat->next);
@@ -1914,7 +1919,7 @@ tr_update_delta( sql_trans *tr, sql_delt
}
static int
-tr_merge_cs( sql_trans *tr, column_storage *cs)
+tr_merge_cs( sql_trans *tr, column_storage *cs, BUN end)
{
int ok = LOG_OK;
BAT *cur = NULL;
@@ -1929,6 +1934,7 @@ tr_merge_cs( sql_trans *tr, column_stora
return LOG_ERR;
}
+ assert(end <= BATcount(cur));
if (cs->ucnt) {
BAT *ui = temp_descriptor(cs->uibid);
BAT *uv = temp_descriptor(cs->uvbid);
@@ -1966,9 +1972,9 @@ tr_merge_cs( sql_trans *tr, column_stora
}
static int
-tr_merge_delta( sql_trans *tr, sql_delta *obat)
+tr_merge_delta( sql_trans *tr, sql_delta *obat, BUN end)
{
- int ok = tr_merge_cs(tr, &obat->cs);
+ int ok = tr_merge_cs(tr, &obat->cs, end);
if (obat->next) {
ok = destroy_bat(obat->next);
obat->next = NULL;
@@ -2014,7 +2020,7 @@ tr_update_dbat( sql_trans *tr, storage *
}
ts->cnt += fs->ucnt;
ts->cnt -= fs->icnt;
- int ok = tr_update_cs( tr, &ts->cs, &fs->cs);
+ int ok = tr_update_cs( tr, &ts->cs, &fs->cs, ts->end);
if (ok == LOG_OK && ts->next) {
ok = destroy_dbat(tr, ts->next);
ts->next = NULL;
@@ -2025,7 +2031,7 @@ tr_update_dbat( sql_trans *tr, storage *
static int
tr_merge_dbat(sql_trans *tr, storage *tdb)
{
- int ok = tr_merge_cs(tr, &tdb->cs);
+ int ok = tr_merge_cs(tr, &tdb->cs, tdb->end);
if (tdb->next) {
ok = destroy_dbat(tr, tdb->next);
tdb->next = NULL;
@@ -2040,6 +2046,7 @@ update_table(sql_trans *tr, sql_table *f
sql_table *ot = NULL;
int ok = LOG_OK;
node *n, *m, *o = NULL;
+ segments *sg = NULL;
if (ATOMIC_GET(&store_nr_active) == 1 || ft->base.allocated) {
if (ATOMIC_GET(&store_nr_active) > 1 && ft->data) { /* move
delta */
@@ -2064,6 +2071,7 @@ update_table(sql_trans *tr, sql_table *f
}
}
} else if (tt->data && ft->base.allocated) {
+ sg = ((storage*)ft->data)->segs;
if (tr_update_dbat(tr, tt->data, ft->data) != LOG_OK)
ok = LOG_ERR;
} else if (ATOMIC_GET(&store_nr_active) == 1 &&
!ft->base.allocated) {
@@ -2073,6 +2081,7 @@ update_table(sql_trans *tr, sql_table *f
tt->data = timestamp_dbat(ot->data,
tt->base.stime);
}
assert(tt->data);
+ sg = ((storage*)tt->data)->segs;
if (tr_merge_dbat(tr, tt->data) != LOG_OK)
ok = LOG_ERR;
ft->data = NULL;
@@ -2115,7 +2124,7 @@ update_table(sql_trans *tr, sql_table *f
}
}
} else if (oc->data && cc->base.allocated) {
- if (tr_update_delta(tr, oc->data, cc->data) !=
LOG_OK)
+ if (tr_update_delta(tr, oc->data, cc->data,
sg->end) != LOG_OK)
ok = LOG_ERR;
} else if (ATOMIC_GET(&store_nr_active) == 1 &&
!cc->base.allocated) {
/* only deletes, merge earlier changes */
@@ -2125,7 +2134,7 @@ update_table(sql_trans *tr, sql_table *f
}
assert(oc->data);
if (cc->base.wtime) {
- if (tr_merge_delta(tr, oc->data) !=
LOG_OK)
+ if (tr_merge_delta(tr, oc->data,
sg->end) != LOG_OK)
ok = LOG_ERR;
cc->data = NULL;
}
@@ -2202,7 +2211,7 @@ update_table(sql_trans *tr, sql_table *f
}
}
} else if (oi->data && ci->base.allocated) {
- if (tr_update_delta(tr, oi->data,
ci->data) != LOG_OK)
+ if (tr_update_delta(tr, oi->data,
ci->data, sg->end) != LOG_OK)
ok = LOG_ERR;
} else if (ATOMIC_GET(&store_nr_active) == 1 &&
!ci->base.allocated) {
if (!oi->data) {
@@ -2211,7 +2220,7 @@ update_table(sql_trans *tr, sql_table *f
}
assert(oi->data);
if (ci->base.wtime) {
- if (tr_merge_delta(tr,
oi->data) != LOG_OK)
+ if (tr_merge_delta(tr,
oi->data, sg->end) != LOG_OK)
ok = LOG_ERR;
ci->data = NULL;
}
@@ -2246,6 +2255,15 @@ update_table(sql_trans *tr, sql_table *f
}
static int
+rollback_table(sql_trans *tr, sql_table *t)
+{
+ (void)tr;
+ (void)t;
+ fprintf(stderr, "rollback %s.%s\n", t->s->base.name, t->base.name);
+ return 0;
+}
+
+static int
tr_log_cs( sql_trans *tr, column_storage *cs, segment *segs, int cleared,
sqlid id)
{
int ok = GDK_SUCCEED;
@@ -2452,6 +2470,7 @@ bat_storage_init( store_functions *sf)
sf->update_table = (update_table_fptr)&update_table;
sf->log_table = (update_table_fptr)&log_table;
sf->gtrans_minmax = (gtrans_update_fptr)&minmax;
+ sf->rollback_table = (clear_table_fptr)&rollback_table;
sf->cleanup = (cleanup_fptr)&cleanup;
diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h
--- a/sql/storage/sql_storage.h
+++ b/sql/storage/sql_storage.h
@@ -262,6 +262,7 @@ typedef struct store_functions {
update_table_fptr log_table;
update_table_fptr update_table;
gtrans_update_fptr gtrans_minmax;
+ clear_table_fptr rollback_table;
cleanup_fptr cleanup;
} store_functions;
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -7420,6 +7420,36 @@ sql_session_destroy(sql_session *s)
}
static void
+table_rollback(sql_trans *tr, sql_table *t)
+{
+ store_funcs.rollback_table(tr, t);
+}
+
+static void
+schema_rollback(sql_trans *tr, sql_schema *s)
+{
+ if (s->tables.set) {
+ for (node *n = s->tables.set->h; n; n = n->next) {
+ sql_table *t = n->data;
+ if (t->base.atime)
+ table_rollback(tr, t);
+ }
+ }
+}
+
+static void
+sql_trans_rollback(sql_trans *tr)
+{
+ sql_schema *tmp = find_sql_schema(tr, "tmp");
+
+ for (node *m = tr->schemas.set->h; m; m = m->next) {
+ sql_schema *s = m->data;
+ if (s->base.atime && s != tmp)
+ schema_rollback(tr, s);
+ }
+}
+
+static void
sql_trans_reset_tmp(sql_trans *tr, int commit)
{
sql_schema *tmp = find_sql_schema(tr, "tmp");
@@ -7520,6 +7550,8 @@ sql_trans_end(sql_session *s, int commit
TRC_DEBUG(SQL_STORE, "End of transaction: %d\n", s->tr->schema_number);
s->tr->active = 0;
s->auto_commit = s->ac_on_commit;
+ if(!commit && s->tr->atime)
+ sql_trans_rollback(s->tr);
sql_trans_reset_tmp(s->tr, commit); /* reset temp schema */
if (s->tr->parent == gtrans) {
list_move_data(active_sessions, passive_sessions, s);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list