Changeset: 30d0f6af90df for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/30d0f6af90df
Modified Files:
        gdk/gdk_logger.c
        sql/storage/bat/bat_storage.c
Branch: pax-log
Log Message:

Fix performance of WAL replay:
A table append is now WAL serialized as a group of bats
prepended by a candidate list that is then applied using BATupdate.


diffs (truncated from 439 to 300 lines):

diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -383,7 +383,7 @@ struct offset {
 };
 
 static log_return
-log_read_updates(logger *lg, trans *tr, logformat *l, log_id id, struct 
offset* offsets, lng* poffset_cnt)
+log_read_updates(logger *lg, trans *tr, logformat *l, log_id id, BAT** cands)
 {
        log_return res = LOG_OK;
        lng nr, pnr;
@@ -413,6 +413,53 @@ log_read_updates(logger *lg, trans *tr, 
                                return LOG_ERR;
                        }
                }
+
+               if (l->flag == LOG_UPDATE_CONST) {
+                       if (mnstr_readLng(lg->input_log, &offset) != 1)
+                               return LOG_ERR;
+                       if (cands) {
+                               // This const range actually represents a 
segment of candidates corresponding to updated bat entries
+
+                               if (BATcount(*cands) == 0 || lg->flushing) {
+                                       // when flushing, we only need the 
offset and count of the last segment of inserts.
+                                       assert((*cands)->ttype == TYPE_void);
+                                       BATtseqbase(*cands, offset);
+                                       BATsetcount(*cands, nr);
+                               }
+                               else if (!lg->flushing) {
+                                       assert(BATcount(*cands) > 0);
+                                       BAT* dense = BATdense(0,offset, nr);
+                                       BAT* newcands = NULL;
+                                       if (!dense ) {
+                                               res = LOG_ERR;
+                                       }
+                                       else if ((*cands)->ttype == TYPE_void) {
+                                               if ( (newcands = 
BATmergecand(*cands, dense)) ) {
+                                                       BBPreclaim(*cands);
+                                                       *cands = newcands;
+                                               }
+                                               else
+                                                       res = LOG_ERR;
+                                       }
+                                       else {
+                                               assert((*cands)->ttype == 
TYPE_oid);
+                                               assert(BATcount(*cands) > 0);
+                                               if (BATappend(*cands, dense, 
NULL, true) != GDK_SUCCEED)
+                                                       res = LOG_ERR;
+                                       }
+                                       BBPreclaim(dense);
+                               }
+
+                               // We have to read the value to update the read 
cursor
+                               size_t tlen = lg->bufsize;
+                               void *t = rt(lg->buf, &tlen, lg->input_log, 1);
+                               if (t == NULL) {
+                                       res = LOG_ERR;
+                               }
+                               return res;
+                       }
+               }
+
                if (!lg->flushing) {
                        r = COLnew(0, tpe, (BUN) nr, PERSISTENT);
                        if (r == NULL) {
@@ -423,11 +470,6 @@ log_read_updates(logger *lg, trans *tr, 
                }
 
                if (l->flag == LOG_UPDATE_CONST) {
-                       if (mnstr_readLng(lg->input_log, &offset) != 1) {
-                               if (r)
-                                       BBPreclaim(r);
-                               return LOG_ERR;
-                       }
                        size_t tlen = lg->bufsize;
                        void *t = rt(lg->buf, &tlen, lg->input_log, 1);
                        if (t == NULL) {
@@ -440,20 +482,6 @@ log_read_updates(logger *lg, trans *tr, 
                                                res = LOG_ERR;
                                }
                        }
-                       if (offsets && !*(bool*) t /*not deleted*/) {
-                               // This bat actually represents a segment of 
appended rows and we want to collect this oid range
-
-                               lng offset_cnt = *poffset_cnt;
-
-                               const lng previous_os = offset_cnt ? 
offsets[offset_cnt-1].os : 0;
-                               const lng previous_nr = offset_cnt ? 
offsets[offset_cnt-1].nr : 0;
-
-                               const lng os = previous_os + previous_nr;
-                               const lng od = offset;
-                               const struct offset new_value = {os, nr, od};
-                               offsets[offset_cnt] = new_value;
-                               (*poffset_cnt)++;
-                       }
                } else if (l->flag == LOG_UPDATE_BULK) {
                        if (mnstr_readLng(lg->input_log, &offset) != 1) {
                                if (r)
@@ -584,51 +612,42 @@ log_read_updates(logger *lg, trans *tr, 
                        GDKfree(hv);
                }
 
-               if (res == LOG_OK) {
-                       if (l->flag==LOG_UPDATE_BULK && offsets && offset == 
-1) {
-                               for (lng i = 0; i < *poffset_cnt; i++) {
-                                       BAT* rs = NULL;
-                                       if (r && (rs = BATslice(r, 
offsets[i].os, offsets[i].os+offsets[i].nr)) == NULL) {
-                                               res = LOG_ERR;
-                                               break;
-                                       }
-                                       if (tr_grow(tr) == GDK_SUCCEED) {
-                                               tr->changes[tr->nr].type =
-                                                       
l->flag==LOG_UPDATE_CONST?LOG_UPDATE_BULK:l->flag;
-                                               tr->changes[tr->nr].nr = 
offsets[i].nr;
-                                               tr->changes[tr->nr].tt = tpe;
-                                               tr->changes[tr->nr].cid = id;
-                                               tr->changes[tr->nr].offset = 
offsets[i].od;
-                                               tr->changes[tr->nr].b = rs;
-                                               tr->changes[tr->nr].uid = uid;
-                                               tr->nr++;
-                                       } else {
-                                               BBPreclaim(rs);
-                                               res = LOG_ERR;
-                                       }
-                               }
-                               if (r && res == LOG_OK) {
-                                       BBPunfix(r->batCacheid);
+               if (res == LOG_OK && tr_grow(tr) == GDK_SUCCEED) {
+                       tr->changes[tr->nr].type = l->flag;
+                       if (l->flag==LOG_UPDATE_BULK && cands && offset == -1) {
+                               struct canditer ci;
+                               canditer_init(&ci, NULL, *cands);
+                               const oid first = canditer_peek(&ci);
+                               const oid last = canditer_last(&ci);
+                               offset = (lng) first;
+                               pnr = (lng) (last - first) + 1;
+                               if (!lg->flushing ) {
+                                       assert(uid == NULL);
+                                       uid = *cands;
+                                       BBPfix((*cands)->batCacheid);
+                                       tr->changes[tr->nr].type = LOG_UPDATE;
                                }
                        }
-                       else if (tr_grow(tr) == GDK_SUCCEED) {
-                               tr->changes[tr->nr].type =
-                                       
l->flag==LOG_UPDATE_CONST?LOG_UPDATE_BULK:l->flag;
-                               tr->changes[tr->nr].nr = pnr;
-                               tr->changes[tr->nr].tt = tpe;
-                               tr->changes[tr->nr].cid = id;
-                               tr->changes[tr->nr].offset = offset;
-                               tr->changes[tr->nr].b = r;
-                               tr->changes[tr->nr].uid = uid;
-                               tr->nr++;
-                       } else {
-                               res = LOG_ERR;
+                       if (l->flag==LOG_UPDATE_CONST) {
+                               assert(!cands); // TODO: This might change in 
the future.
+                               tr->changes[tr->nr].type = LOG_UPDATE_BULK;
                        }
+                       tr->changes[tr->nr].nr = pnr;
+                       tr->changes[tr->nr].tt = tpe;
+                       tr->changes[tr->nr].cid = id;
+                       tr->changes[tr->nr].offset = offset;
+                       tr->changes[tr->nr].b = r;
+                       tr->changes[tr->nr].uid = uid;
+                       tr->nr++;
+               } else {
+                       res = LOG_ERR;
                }
                if (res == LOG_ERR) {
                        if (r)
                                BBPreclaim(r);
-                       if (uid)
+                       if (cands && uid)
+                               BBPunfix((*cands)->batCacheid);
+                       else if (uid)
                                BBPreclaim(uid);
                }
        } else {
@@ -682,9 +701,8 @@ la_bat_updates(logger *lg, logaction *la
                if (b == NULL)
                        return GDK_FAIL;
        }
+       BUN cnt = 0;
        if (la->type == LOG_UPDATE_BULK) {
-               BUN cnt = 0;
-
                if (!lg->flushing) {
                        cnt = BATcount(b);
                        int is_msk = (b->ttype == TYPE_msk);
@@ -732,27 +750,16 @@ la_bat_updates(logger *lg, logaction *la
                                bat_iterator_end(&vi);
                        }
                }
-               cnt = (BUN)(la->offset + la->nr);
-               if (la_bat_update_count(lg, la->cid, cnt, tid) != GDK_SUCCEED) {
-                       if (b)
-                               logbat_destroy(b);
+       } else if (la->type == LOG_UPDATE) {
+               if (!lg->flushing && BATupdate(b, la->uid, la->b, true) != 
GDK_SUCCEED) {
                        return GDK_FAIL;
                }
-       } else if (!lg->flushing && la->type == LOG_UPDATE) {
-               BATiter vi = bat_iterator(la->b);
-               BUN p, q;
-
-               BATloop(la->b, p, q) {
-                       oid h = BUNtoid(la->uid, p);
-                       const void *t = BUNtail(vi, p);
-
-                       if (BUNreplace(b, h, t, true) != GDK_SUCCEED) {
-                               logbat_destroy(b);
-                               bat_iterator_end(&vi);
-                               return GDK_FAIL;
-                       }
-               }
-               bat_iterator_end(&vi);
+       }
+       cnt = (BUN)(la->offset + la->nr);
+       if (la_bat_update_count(lg, la->cid, cnt, tid) != GDK_SUCCEED) {
+               if (b)
+                       logbat_destroy(b);
+               return GDK_FAIL;
        }
        if (b)
                logbat_destroy(b);
@@ -1165,12 +1172,7 @@ log_read_transaction(logger *lg)
        if (!lg->flushing)
                GDKdebug &= ~CHECKMASK;
 
-       // START variables used in case of LOG_TABLE
-       log_id tid = 0; /*sql table id in case of LOG_TABLE*/
-       struct offset* offsets = NULL;
-       lng nr_offsets = 0;
-       lng cap_offsets = 0;
-       // END variables used in case of LOG_TABLE
+       BAT* cands = NULL; // used in case of LOG_TABLE
 
        while (err == LOG_OK && (ok=log_read_format(lg, &l))) {
                if (l.flag == 0 && l.id == 0) {
@@ -1223,11 +1225,7 @@ log_read_transaction(logger *lg)
                        if (tr == NULL)
                                err = LOG_EOF;
                        else {
-                               err = log_read_updates(lg, tr, &l, l.id, 
offsets, &nr_offsets);
-                               if (l.flag == LOG_UPDATE_CONST && offsets && 
nr_offsets == cap_offsets) {
-                                       cap_offsets <<= 1;
-                                       offsets = GDKrealloc(offsets, 
sizeof(*offsets) * cap_offsets);
-                               }
+                               err = log_read_updates(lg, tr, &l, l.id, 
cands?&cands:NULL);
                        }
                        break;
                case LOG_CREATE:
@@ -1254,21 +1252,14 @@ log_read_transaction(logger *lg)
                        else {
                                if (l.id > 0) {
                                        // START OF LOG_TABLE
-                                       assert(tid == 0 && offsets == NULL && 
nr_offsets == 0);
-                                       tid = l.id;
-                                       cap_offsets = 10;
-                                       offsets = GDKzalloc(sizeof(*offsets) * 
cap_offsets);
-                                       if (!offsets)
+                                       cands = COLnew(0, TYPE_void, 0, 
TRANSIENT);
+                                       if (!cands)
                                                err = LOG_ERR;
                                }
                                else {
                                        // END OF LOG_TABLE
-                                       assert(tid == -l.id);
-                                       tid = 0;
-                                       GDKfree(offsets);
-                                       offsets = NULL;
-                                       cap_offsets = 0;
-                                       nr_offsets = 0;
+                                       BBPunfix(cands->batCacheid);
+                                       cands = NULL;
                                }
                        }
                        break;
@@ -1286,8 +1277,8 @@ log_read_transaction(logger *lg)
        if (!lg->flushing)
                GDKdebug = dbg;
 
-       if (offsets)
-               GDKfree(offsets);
+       if (cands)
+               GDKfree(cands);
        if (!ok)
                return LOG_EOF;
        return err;
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -3418,22 +3418,15 @@ log_segment(sql_trans *tr, segment *s, s
 }
 
 static int
-log_segments(sql_trans *tr, segments *segs, sqlid id, size_t* nr_appends)
+log_segments(sql_trans *tr, segments *segs, sqlid id)
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to