Changeset: aa88113d9231 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/aa88113d9231
Modified Files:
sql/storage/bat/bat_storage.c
Branch: Jan2022
Log Message:
solve issue where the upgrade of the compressed bats didn't got saved into the
logs.
diffs (truncated from 402 to 300 lines):
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -1083,9 +1083,11 @@ segments_is_deleted(segment *s, sql_tran
}
static BAT *
-dict_append_bat(column_storage *cs, BAT *i)
+dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
{
BAT *newoffsets = NULL;
+ sql_delta *bat = *batp;
+ column_storage *cs = &bat->cs;
BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
if (!u)
@@ -1094,6 +1096,7 @@ dict_append_bat(column_storage *cs, BAT
if (DICTprepare4append(&newoffsets, i, u) < 0) {
assert(0);
} else {
+ int new = 0;
/* returns new offset bat (ie to be appended), possibly with
larger type ! */
if (BATcount(u) >= max_cnt) {
if (max_cnt == 64*1024) { /* decompress */
@@ -1127,17 +1130,25 @@ dict_append_bat(column_storage *cs, BAT
bat_destroy(u);
return NULL;
}
- if (cs->bid)
+ if (cs->ts != tr->tid) {
+ *batp = ZNEW(sql_delta);
+ **batp = *bat;
+ (*batp)->next = NULL;
+ cs = &(*batp)->cs;
+ cs->ts = tr->tid;
+ new = 1;
+ }
+ if (cs->bid && !new)
temp_destroy(cs->bid);
cs->bid = temp_create(n);
bat_destroy(n);
- if (cs->ebid)
+ if (cs->ebid && !new)
temp_destroy(cs->ebid);
cs->ebid = 0;
cs->ucnt = 0;
- if (cs->uibid)
+ if (cs->uibid && !new)
temp_destroy(cs->uibid);
- if (cs->uvbid)
+ if (cs->uvbid && !new)
temp_destroy(cs->uvbid);
cs->uibid = cs->uvbid = 0;
cs->st = ST_DEFAULT;
@@ -1155,7 +1166,20 @@ dict_append_bat(column_storage *cs, BAT
bat_destroy(u);
return NULL;
}
- if (cs->bid)
+ if (cs->ts != tr->tid) {
+ *batp = ZNEW(sql_delta);
+ **batp = *bat;
+ (*batp)->next = NULL;
+ cs = &(*batp)->cs;
+ cs->ts = tr->tid;
+ new = 1;
+ temp_dup(cs->ebid);
+ if (cs->uibid) {
+ temp_dup(cs->uibid);
+ temp_dup(cs->uvbid);
+ }
+ }
+ if (cs->bid && !new)
temp_destroy(cs->bid);
cs->bid = temp_create(n);
bat_destroy(n);
@@ -1230,9 +1254,11 @@ for_append_bat(column_storage *cs, BAT *
* Returns LOG_OK, LOG_ERR or LOG_CONFLICT
*/
static int
-cs_update_bat( sql_trans *tr, column_storage *cs, sql_table *t, BAT *tids, BAT
*updates, int is_new)
+cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT
*updates, int is_new)
{
int res = LOG_OK;
+ sql_delta *bat = *batp;
+ column_storage *cs = &bat->cs;
BAT *otids = tids, *oupdates = updates;
if (!BATcount(tids))
@@ -1261,7 +1287,9 @@ cs_update_bat( sql_trans *tr, column_sto
if (cs->st == ST_DICT) {
/* possibly a new array is returned */
- BAT *nupdates = dict_append_bat(cs, updates);
+ BAT *nupdates = dict_append_bat(tr, batp, updates);
+ bat = *batp;
+ cs = &bat->cs;
if (oupdates != updates)
bat_destroy(updates);
updates = nupdates;
@@ -1550,15 +1578,17 @@ cs_update_bat( sql_trans *tr, column_sto
}
static int
-delta_update_bat( sql_trans *tr, sql_delta *bat, sql_table *t, BAT *tids, BAT
*updates, int is_new)
+delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT
*updates, int is_new)
{
- return cs_update_bat(tr, &bat->cs, t, tids, updates, is_new);
+ return cs_update_bat(tr, bat, t, tids, updates, is_new);
}
static void *
-dict_append_val(column_storage *cs, void *i, BUN cnt)
+dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
{
void *newoffsets = NULL;
+ sql_delta *bat = *batp;
+ column_storage *cs = &bat->cs;
BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
if (!u)
@@ -1567,6 +1597,7 @@ dict_append_val(column_storage *cs, void
if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
assert(0);
} else {
+ int new = 0;
/* returns new offset bat (ie to be appended), possibly with
larger type ! */
if (BATcount(u) >= max_cnt) {
if (max_cnt == 64*1024) { /* decompress */
@@ -1582,11 +1613,20 @@ dict_append_val(column_storage *cs, void
bat_destroy(u);
return NULL;
}
- if (cs->bid)
+ if (cs->ts != tr->tid) {
+ *batp = ZNEW(sql_delta);
+ **batp = *bat;
+ (*batp)->next = NULL;
+ cs = &(*batp)->cs;
+ cs->ts = tr->tid;
+ new = 1;
+ cs->uibid = cs->uvbid = 0;
+ }
+ if (cs->bid && !new)
temp_destroy(cs->bid);
cs->bid = temp_create(n);
bat_destroy(n);
- if (cs->ebid)
+ if (cs->ebid && !new)
temp_destroy(cs->ebid);
cs->ebid = 0;
cs->st = ST_DEFAULT;
@@ -1605,6 +1645,19 @@ dict_append_val(column_storage *cs, void
bat_destroy(u);
return NULL;
}
+ if (cs->ts != tr->tid) {
+ *batp = ZNEW(sql_delta);
+ **batp = *bat;
+ (*batp)->next = NULL;
+ cs = &(*batp)->cs;
+ cs->ts = tr->tid;
+ new = 1;
+ temp_dup(cs->ebid);
+ if (cs->uibid) {
+ temp_dup(cs->uibid);
+ temp_dup(cs->uvbid);
+ }
+ }
if (cs->bid)
temp_destroy(cs->bid);
cs->bid = temp_create(n);
@@ -1656,16 +1709,20 @@ for_append_val(column_storage *cs, void
}
static int
-cs_update_val( sql_trans *tr, column_storage *cs, sql_table *t, oid rid, void
*upd, int is_new)
+cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void
*upd, int is_new)
{
void *oupd = upd;
+ sql_delta *bat = *batp;
+ column_storage *cs = &bat->cs;
storage *s = ATOMIC_PTR_GET(&t->data);
assert(!is_oid_nil(rid));
int inplace = is_new || cs->cleared || segments_is_append (s->segs->h,
tr, rid);
if (cs->st == ST_DICT) {
/* possibly a new array is returned */
- upd = dict_append_val(cs, upd, 1);
+ upd = dict_append_val(tr, batp, upd, 1);
+ bat = *batp;
+ cs = &bat->cs;
if (!upd)
return LOG_ERR;
}
@@ -1724,11 +1781,11 @@ cs_update_val( sql_trans *tr, column_sto
}
static int
-delta_update_val( sql_trans *tr, sql_delta *bat, sql_table *t, oid rid, void
*upd, int is_new)
+delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void
*upd, int is_new)
{
int res = LOG_OK;
lock_table(tr->store, t->base.id);
- res = cs_update_val(tr, &bat->cs, t, rid, upd, is_new);
+ res = cs_update_val(tr, bat, t, rid, upd, is_new);
unlock_table(tr->store, t->base.id);
return res;
}
@@ -1799,7 +1856,7 @@ bind_col_data(sql_trans *tr, sql_column
/* abort */
if (update_conflict)
*update_conflict = true;
- else
+ else if (!obat->cs.cleared) /* concurrent appends are only
allowed on concurrent updates */
return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
return NULL;
}
@@ -1823,7 +1880,7 @@ bind_col_data(sql_trans *tr, sql_column
}
static int
-update_col_execute(sql_trans *tr, sql_delta *delta, sql_table *table, bool
is_new, void *incoming_tids, void *incoming_values, bool is_bat)
+update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool
is_new, void *incoming_tids, void *incoming_values, bool is_bat)
{
int ok = LOG_OK;
@@ -1859,8 +1916,10 @@ update_col(sql_trans *tr, sql_column *c,
if ((!inTransaction(tr, c->t) && (odelta != delta || isTempTable(c->t))
&& isGlobal(c->t)) || (!isNew(c->t) && isLocalTemp(c->t)))
trans_add(tr, &c->base, delta, &tc_gc_col, &commit_update_col,
isTempTable(c->t)?NULL:&log_update_col);
- if ((res = update_col_execute(tr, delta, c->t, isNew(c), tids, upd, tpe
== TYPE_bat)) != LOG_OK)
+ odelta = delta;
+ if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd,
tpe == TYPE_bat)) != LOG_OK)
return res;
+ assert(delta == odelta);
if (delta->cs.st == ST_DEFAULT && c->storage_type)
res = sql_trans_alter_storage(tr, c, NULL);
return res;
@@ -1904,6 +1963,7 @@ bind_idx_data(sql_trans *tr, sql_idx *i,
static int
update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, int tpe)
{
+ int res = LOG_OK;
bool update_conflict = false;
sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
@@ -1920,14 +1980,18 @@ update_idx(sql_trans *tr, sql_idx * i, v
if ((!inTransaction(tr, i->t) && (odelta != delta || isTempTable(i->t))
&& isGlobal(i->t)) || (!isNew(i->t) && isLocalTemp(i->t)))
trans_add(tr, &i->base, delta, &tc_gc_idx, &commit_update_idx,
isTempTable(i->t)?NULL:&log_update_idx);
- return update_col_execute(tr, delta, i->t, isNew(i), tids, upd, tpe ==
TYPE_bat);
+ odelta = delta;
+ res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, tpe ==
TYPE_bat);
+ assert(delta == odelta);
+ return res;
}
static int
-delta_append_bat(sql_trans *tr, sql_delta *bat, sqlid id, BUN offset, BAT
*offsets, BAT *i, char *storage_type)
+delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT
*offsets, BAT *i, char *storage_type)
{
BAT *b, *oi = i;
int err = 0;
+ sql_delta *bat = *batp;
assert(!offsets || BATcount(offsets) == BATcount(i));
if (!BATcount(i))
@@ -1937,7 +2001,8 @@ delta_append_bat(sql_trans *tr, sql_delt
lock_column(tr->store, id);
if (bat->cs.st == ST_DICT) {
- BAT *ni = dict_append_bat(&bat->cs, oi);
+ BAT *ni = dict_append_bat(tr, batp, oi);
+ bat = *batp;
if (oi != i) /* oi will be replaced, so destroy possible unmask
reference */
bat_destroy(oi);
oi = ni;
@@ -1948,6 +2013,7 @@ delta_append_bat(sql_trans *tr, sql_delt
}
if (bat->cs.st == ST_FOR) {
BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
+ bat = *batp;
if (oi != i) /* oi will be replaced, so destroy possible unmask
reference */
bat_destroy(oi);
oi = ni;
@@ -1985,15 +2051,17 @@ delta_append_bat(sql_trans *tr, sql_delt
}
static int
-delta_append_val(sql_trans *tr, sql_delta *bat, sqlid id, BUN offset, void *i,
BUN cnt, char *storage_type, int tt)
+delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, void
*i, BUN cnt, char *storage_type, int tt)
{
void *oi = i;
BAT *b;
lock_column(tr->store, id);
+ sql_delta *bat = *batp;
if (bat->cs.st == ST_DICT) {
/* possibly a new array is returned */
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list