Changeset: aa88113d9231 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/aa88113d9231
Modified Files:
        sql/storage/bat/bat_storage.c
Branch: Jan2022
Log Message:

solve issue where the upgrade of the compressed bats didn't got saved into the 
logs.


diffs (truncated from 402 to 300 lines):

diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -1083,9 +1083,11 @@ segments_is_deleted(segment *s, sql_tran
 }
 
 static BAT *
-dict_append_bat(column_storage *cs, BAT *i)
+dict_append_bat(sql_trans *tr, sql_delta **batp, BAT *i)
 {
        BAT *newoffsets = NULL;
+       sql_delta *bat = *batp;
+       column_storage *cs = &bat->cs;
        BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
 
        if (!u)
@@ -1094,6 +1096,7 @@ dict_append_bat(column_storage *cs, BAT 
        if (DICTprepare4append(&newoffsets, i, u) < 0) {
                assert(0);
        } else {
+               int new = 0;
                /* returns new offset bat (ie to be appended), possibly with 
larger type ! */
                if (BATcount(u) >= max_cnt) {
                        if (max_cnt == 64*1024) { /* decompress */
@@ -1127,17 +1130,25 @@ dict_append_bat(column_storage *cs, BAT 
                                        bat_destroy(u);
                                        return NULL;
                                }
-                               if (cs->bid)
+                               if (cs->ts != tr->tid) {
+                                       *batp = ZNEW(sql_delta);
+                                       **batp = *bat;
+                                       (*batp)->next = NULL;
+                                       cs = &(*batp)->cs;
+                                       cs->ts = tr->tid;
+                                       new = 1;
+                               }
+                               if (cs->bid && !new)
                                        temp_destroy(cs->bid);
                                cs->bid = temp_create(n);
                                bat_destroy(n);
-                               if (cs->ebid)
+                               if (cs->ebid && !new)
                                        temp_destroy(cs->ebid);
                                cs->ebid = 0;
                                cs->ucnt = 0;
-                               if (cs->uibid)
+                               if (cs->uibid && !new)
                                        temp_destroy(cs->uibid);
-                               if (cs->uvbid)
+                               if (cs->uvbid && !new)
                                        temp_destroy(cs->uvbid);
                                cs->uibid = cs->uvbid = 0;
                                cs->st = ST_DEFAULT;
@@ -1155,7 +1166,20 @@ dict_append_bat(column_storage *cs, BAT 
                                        bat_destroy(u);
                                        return NULL;
                                }
-                               if (cs->bid)
+                               if (cs->ts != tr->tid) {
+                                       *batp = ZNEW(sql_delta);
+                                       **batp = *bat;
+                                       (*batp)->next = NULL;
+                                       cs = &(*batp)->cs;
+                                       cs->ts = tr->tid;
+                                       new = 1;
+                                       temp_dup(cs->ebid);
+                                       if (cs->uibid) {
+                                               temp_dup(cs->uibid);
+                                               temp_dup(cs->uvbid);
+                                       }
+                               }
+                               if (cs->bid && !new)
                                        temp_destroy(cs->bid);
                                cs->bid = temp_create(n);
                                bat_destroy(n);
@@ -1230,9 +1254,11 @@ for_append_bat(column_storage *cs, BAT *
  * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
  */
 static int
-cs_update_bat( sql_trans *tr, column_storage *cs, sql_table *t, BAT *tids, BAT 
*updates, int is_new)
+cs_update_bat( sql_trans *tr, sql_delta **batp, sql_table *t, BAT *tids, BAT 
*updates, int is_new)
 {
        int res = LOG_OK;
+       sql_delta *bat = *batp;
+       column_storage *cs = &bat->cs;
        BAT *otids = tids, *oupdates = updates;
 
        if (!BATcount(tids))
@@ -1261,7 +1287,9 @@ cs_update_bat( sql_trans *tr, column_sto
 
        if (cs->st == ST_DICT) {
                /* possibly a new array is returned */
-               BAT *nupdates = dict_append_bat(cs, updates);
+               BAT *nupdates = dict_append_bat(tr, batp, updates);
+               bat = *batp;
+               cs = &bat->cs;
                if (oupdates != updates)
                        bat_destroy(updates);
                updates = nupdates;
@@ -1550,15 +1578,17 @@ cs_update_bat( sql_trans *tr, column_sto
 }
 
 static int
-delta_update_bat( sql_trans *tr, sql_delta *bat, sql_table *t, BAT *tids, BAT 
*updates, int is_new)
+delta_update_bat( sql_trans *tr, sql_delta **bat, sql_table *t, BAT *tids, BAT 
*updates, int is_new)
 {
-       return cs_update_bat(tr, &bat->cs, t, tids, updates, is_new);
+       return cs_update_bat(tr, bat, t, tids, updates, is_new);
 }
 
 static void *
-dict_append_val(column_storage *cs, void *i, BUN cnt)
+dict_append_val(sql_trans *tr, sql_delta **batp, void *i, BUN cnt)
 {
        void *newoffsets = NULL;
+       sql_delta *bat = *batp;
+       column_storage *cs = &bat->cs;
        BAT *u = temp_descriptor(cs->ebid), *b = NULL, *n = NULL;
 
        if (!u)
@@ -1567,6 +1597,7 @@ dict_append_val(column_storage *cs, void
        if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
                assert(0);
        } else {
+               int new = 0;
                /* returns new offset bat (ie to be appended), possibly with 
larger type ! */
                if (BATcount(u) >= max_cnt) {
                        if (max_cnt == 64*1024) { /* decompress */
@@ -1582,11 +1613,20 @@ dict_append_val(column_storage *cs, void
                                        bat_destroy(u);
                                        return NULL;
                                }
-                               if (cs->bid)
+                               if (cs->ts != tr->tid) {
+                                       *batp = ZNEW(sql_delta);
+                                       **batp = *bat;
+                                       (*batp)->next = NULL;
+                                       cs = &(*batp)->cs;
+                                       cs->ts = tr->tid;
+                                       new = 1;
+                                       cs->uibid = cs->uvbid = 0;
+                               }
+                               if (cs->bid && !new)
                                        temp_destroy(cs->bid);
                                cs->bid = temp_create(n);
                                bat_destroy(n);
-                               if (cs->ebid)
+                               if (cs->ebid && !new)
                                        temp_destroy(cs->ebid);
                                cs->ebid = 0;
                                cs->st = ST_DEFAULT;
@@ -1605,6 +1645,19 @@ dict_append_val(column_storage *cs, void
                                        bat_destroy(u);
                                        return NULL;
                                }
+                               if (cs->ts != tr->tid) {
+                                       *batp = ZNEW(sql_delta);
+                                       **batp = *bat;
+                                       (*batp)->next = NULL;
+                                       cs = &(*batp)->cs;
+                                       cs->ts = tr->tid;
+                                       new = 1;
+                                       temp_dup(cs->ebid);
+                                       if (cs->uibid) {
+                                               temp_dup(cs->uibid);
+                                               temp_dup(cs->uvbid);
+                                       }
+                               }
                                if (cs->bid)
                                        temp_destroy(cs->bid);
                                cs->bid = temp_create(n);
@@ -1656,16 +1709,20 @@ for_append_val(column_storage *cs, void 
 }
 
 static int
-cs_update_val( sql_trans *tr, column_storage *cs, sql_table *t, oid rid, void 
*upd, int is_new)
+cs_update_val( sql_trans *tr, sql_delta **batp, sql_table *t, oid rid, void 
*upd, int is_new)
 {
        void *oupd = upd;
+       sql_delta *bat = *batp;
+       column_storage *cs = &bat->cs;
        storage *s = ATOMIC_PTR_GET(&t->data);
        assert(!is_oid_nil(rid));
        int inplace = is_new || cs->cleared || segments_is_append (s->segs->h, 
tr, rid);
 
        if (cs->st == ST_DICT) {
                /* possibly a new array is returned */
-               upd = dict_append_val(cs, upd, 1);
+               upd = dict_append_val(tr, batp, upd, 1);
+               bat = *batp;
+               cs = &bat->cs;
                if (!upd)
                        return LOG_ERR;
        }
@@ -1724,11 +1781,11 @@ cs_update_val( sql_trans *tr, column_sto
 }
 
 static int
-delta_update_val( sql_trans *tr, sql_delta *bat, sql_table *t, oid rid, void 
*upd, int is_new)
+delta_update_val( sql_trans *tr, sql_delta **bat, sql_table *t, oid rid, void 
*upd, int is_new)
 {
        int res = LOG_OK;
        lock_table(tr->store, t->base.id);
-       res = cs_update_val(tr, &bat->cs, t, rid, upd, is_new);
+       res = cs_update_val(tr, bat, t, rid, upd, is_new);
        unlock_table(tr->store, t->base.id);
        return res;
 }
@@ -1799,7 +1856,7 @@ bind_col_data(sql_trans *tr, sql_column 
                /* abort */
                if (update_conflict)
                        *update_conflict = true;
-               else
+               else if (!obat->cs.cleared) /* concurrent appends are only 
allowed on concurrent updates */
                        return timestamp_delta(tr, ATOMIC_PTR_GET(&c->data));
                return NULL;
        }
@@ -1823,7 +1880,7 @@ bind_col_data(sql_trans *tr, sql_column 
 }
 
 static int
-update_col_execute(sql_trans *tr, sql_delta *delta, sql_table *table, bool 
is_new, void *incoming_tids, void *incoming_values, bool is_bat)
+update_col_execute(sql_trans *tr, sql_delta **delta, sql_table *table, bool 
is_new, void *incoming_tids, void *incoming_values, bool is_bat)
 {
        int ok = LOG_OK;
 
@@ -1859,8 +1916,10 @@ update_col(sql_trans *tr, sql_column *c,
        if ((!inTransaction(tr, c->t) && (odelta != delta || isTempTable(c->t)) 
&& isGlobal(c->t)) || (!isNew(c->t) && isLocalTemp(c->t)))
                trans_add(tr, &c->base, delta, &tc_gc_col, &commit_update_col, 
isTempTable(c->t)?NULL:&log_update_col);
 
-       if ((res = update_col_execute(tr, delta, c->t, isNew(c), tids, upd, tpe 
== TYPE_bat)) != LOG_OK)
+       odelta = delta;
+       if ((res = update_col_execute(tr, &delta, c->t, isNew(c), tids, upd, 
tpe == TYPE_bat)) != LOG_OK)
                return res;
+       assert(delta == odelta);
        if (delta->cs.st == ST_DEFAULT && c->storage_type)
                res = sql_trans_alter_storage(tr, c, NULL);
        return res;
@@ -1904,6 +1963,7 @@ bind_idx_data(sql_trans *tr, sql_idx *i,
 static int
 update_idx(sql_trans *tr, sql_idx * i, void *tids, void *upd, int tpe)
 {
+       int res = LOG_OK;
        bool update_conflict = false;
        sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
 
@@ -1920,14 +1980,18 @@ update_idx(sql_trans *tr, sql_idx * i, v
        if ((!inTransaction(tr, i->t) && (odelta != delta || isTempTable(i->t)) 
&& isGlobal(i->t)) || (!isNew(i->t) && isLocalTemp(i->t)))
                trans_add(tr, &i->base, delta, &tc_gc_idx, &commit_update_idx, 
isTempTable(i->t)?NULL:&log_update_idx);
 
-       return update_col_execute(tr, delta, i->t, isNew(i), tids, upd, tpe == 
TYPE_bat);
+       odelta = delta;
+       res = update_col_execute(tr, &delta, i->t, isNew(i), tids, upd, tpe == 
TYPE_bat);
+       assert(delta == odelta);
+       return res;
 }
 
 static int
-delta_append_bat(sql_trans *tr, sql_delta *bat, sqlid id, BUN offset, BAT 
*offsets, BAT *i, char *storage_type)
+delta_append_bat(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, BAT 
*offsets, BAT *i, char *storage_type)
 {
        BAT *b, *oi = i;
        int err = 0;
+       sql_delta *bat = *batp;
 
        assert(!offsets || BATcount(offsets) == BATcount(i));
        if (!BATcount(i))
@@ -1937,7 +2001,8 @@ delta_append_bat(sql_trans *tr, sql_delt
 
        lock_column(tr->store, id);
        if (bat->cs.st == ST_DICT) {
-               BAT *ni = dict_append_bat(&bat->cs, oi);
+               BAT *ni = dict_append_bat(tr, batp, oi);
+               bat = *batp;
                if (oi != i) /* oi will be replaced, so destroy possible unmask 
reference */
                        bat_destroy(oi);
                oi = ni;
@@ -1948,6 +2013,7 @@ delta_append_bat(sql_trans *tr, sql_delt
        }
        if (bat->cs.st == ST_FOR) {
                BAT *ni = for_append_bat(&bat->cs, oi, storage_type);
+               bat = *batp;
                if (oi != i) /* oi will be replaced, so destroy possible unmask 
reference */
                        bat_destroy(oi);
                oi = ni;
@@ -1985,15 +2051,17 @@ delta_append_bat(sql_trans *tr, sql_delt
 }
 
 static int
-delta_append_val(sql_trans *tr, sql_delta *bat, sqlid id, BUN offset, void *i, 
BUN cnt, char *storage_type, int tt)
+delta_append_val(sql_trans *tr, sql_delta **batp, sqlid id, BUN offset, void 
*i, BUN cnt, char *storage_type, int tt)
 {
        void *oi = i;
        BAT *b;
        lock_column(tr->store, id);
+       sql_delta *bat = *batp;
 
        if (bat->cs.st == ST_DICT) {
                /* possibly a new array is returned */
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to