Changeset: 30d0f6af90df for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/30d0f6af90df
Modified Files:
gdk/gdk_logger.c
sql/storage/bat/bat_storage.c
Branch: pax-log
Log Message:
Fix performance of WAL replay:
A table append is now WAL serialized as a group of bats
prepended by a candidate list that is then applied using BATupdate.
diffs (truncated from 439 to 300 lines):
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -383,7 +383,7 @@ struct offset {
};
static log_return
-log_read_updates(logger *lg, trans *tr, logformat *l, log_id id, struct
offset* offsets, lng* poffset_cnt)
+log_read_updates(logger *lg, trans *tr, logformat *l, log_id id, BAT** cands)
{
log_return res = LOG_OK;
lng nr, pnr;
@@ -413,6 +413,53 @@ log_read_updates(logger *lg, trans *tr,
return LOG_ERR;
}
}
+
+ if (l->flag == LOG_UPDATE_CONST) {
+ if (mnstr_readLng(lg->input_log, &offset) != 1)
+ return LOG_ERR;
+ if (cands) {
+ // This const range actually represents a
segment of candidates corresponding to updated bat entries
+
+ if (BATcount(*cands) == 0 || lg->flushing) {
+ // when flushing, we only need the
offset and count of the last segment of inserts.
+ assert((*cands)->ttype == TYPE_void);
+ BATtseqbase(*cands, offset);
+ BATsetcount(*cands, nr);
+ }
+ else if (!lg->flushing) {
+ assert(BATcount(*cands) > 0);
+ BAT* dense = BATdense(0,offset, nr);
+ BAT* newcands = NULL;
+ if (!dense ) {
+ res = LOG_ERR;
+ }
+ else if ((*cands)->ttype == TYPE_void) {
+ if ( (newcands =
BATmergecand(*cands, dense)) ) {
+ BBPreclaim(*cands);
+ *cands = newcands;
+ }
+ else
+ res = LOG_ERR;
+ }
+ else {
+ assert((*cands)->ttype ==
TYPE_oid);
+ assert(BATcount(*cands) > 0);
+ if (BATappend(*cands, dense,
NULL, true) != GDK_SUCCEED)
+ res = LOG_ERR;
+ }
+ BBPreclaim(dense);
+ }
+
+ // We have to read the value to update the read
cursor
+ size_t tlen = lg->bufsize;
+ void *t = rt(lg->buf, &tlen, lg->input_log, 1);
+ if (t == NULL) {
+ res = LOG_ERR;
+ }
+ return res;
+ }
+ }
+
if (!lg->flushing) {
r = COLnew(0, tpe, (BUN) nr, PERSISTENT);
if (r == NULL) {
@@ -423,11 +470,6 @@ log_read_updates(logger *lg, trans *tr,
}
if (l->flag == LOG_UPDATE_CONST) {
- if (mnstr_readLng(lg->input_log, &offset) != 1) {
- if (r)
- BBPreclaim(r);
- return LOG_ERR;
- }
size_t tlen = lg->bufsize;
void *t = rt(lg->buf, &tlen, lg->input_log, 1);
if (t == NULL) {
@@ -440,20 +482,6 @@ log_read_updates(logger *lg, trans *tr,
res = LOG_ERR;
}
}
- if (offsets && !*(bool*) t /*not deleted*/) {
- // This bat actually represents a segment of
appended rows and we want to collect this oid range
-
- lng offset_cnt = *poffset_cnt;
-
- const lng previous_os = offset_cnt ?
offsets[offset_cnt-1].os : 0;
- const lng previous_nr = offset_cnt ?
offsets[offset_cnt-1].nr : 0;
-
- const lng os = previous_os + previous_nr;
- const lng od = offset;
- const struct offset new_value = {os, nr, od};
- offsets[offset_cnt] = new_value;
- (*poffset_cnt)++;
- }
} else if (l->flag == LOG_UPDATE_BULK) {
if (mnstr_readLng(lg->input_log, &offset) != 1) {
if (r)
@@ -584,51 +612,42 @@ log_read_updates(logger *lg, trans *tr,
GDKfree(hv);
}
- if (res == LOG_OK) {
- if (l->flag==LOG_UPDATE_BULK && offsets && offset ==
-1) {
- for (lng i = 0; i < *poffset_cnt; i++) {
- BAT* rs = NULL;
- if (r && (rs = BATslice(r,
offsets[i].os, offsets[i].os+offsets[i].nr)) == NULL) {
- res = LOG_ERR;
- break;
- }
- if (tr_grow(tr) == GDK_SUCCEED) {
- tr->changes[tr->nr].type =
-
l->flag==LOG_UPDATE_CONST?LOG_UPDATE_BULK:l->flag;
- tr->changes[tr->nr].nr =
offsets[i].nr;
- tr->changes[tr->nr].tt = tpe;
- tr->changes[tr->nr].cid = id;
- tr->changes[tr->nr].offset =
offsets[i].od;
- tr->changes[tr->nr].b = rs;
- tr->changes[tr->nr].uid = uid;
- tr->nr++;
- } else {
- BBPreclaim(rs);
- res = LOG_ERR;
- }
- }
- if (r && res == LOG_OK) {
- BBPunfix(r->batCacheid);
+ if (res == LOG_OK && tr_grow(tr) == GDK_SUCCEED) {
+ tr->changes[tr->nr].type = l->flag;
+ if (l->flag==LOG_UPDATE_BULK && cands && offset == -1) {
+ struct canditer ci;
+ canditer_init(&ci, NULL, *cands);
+ const oid first = canditer_peek(&ci);
+ const oid last = canditer_last(&ci);
+ offset = (lng) first;
+ pnr = (lng) (last - first) + 1;
+ if (!lg->flushing ) {
+ assert(uid == NULL);
+ uid = *cands;
+ BBPfix((*cands)->batCacheid);
+ tr->changes[tr->nr].type = LOG_UPDATE;
}
}
- else if (tr_grow(tr) == GDK_SUCCEED) {
- tr->changes[tr->nr].type =
-
l->flag==LOG_UPDATE_CONST?LOG_UPDATE_BULK:l->flag;
- tr->changes[tr->nr].nr = pnr;
- tr->changes[tr->nr].tt = tpe;
- tr->changes[tr->nr].cid = id;
- tr->changes[tr->nr].offset = offset;
- tr->changes[tr->nr].b = r;
- tr->changes[tr->nr].uid = uid;
- tr->nr++;
- } else {
- res = LOG_ERR;
+ if (l->flag==LOG_UPDATE_CONST) {
+ assert(!cands); // TODO: This might change in
the future.
+ tr->changes[tr->nr].type = LOG_UPDATE_BULK;
}
+ tr->changes[tr->nr].nr = pnr;
+ tr->changes[tr->nr].tt = tpe;
+ tr->changes[tr->nr].cid = id;
+ tr->changes[tr->nr].offset = offset;
+ tr->changes[tr->nr].b = r;
+ tr->changes[tr->nr].uid = uid;
+ tr->nr++;
+ } else {
+ res = LOG_ERR;
}
if (res == LOG_ERR) {
if (r)
BBPreclaim(r);
- if (uid)
+ if (cands && uid)
+ BBPunfix((*cands)->batCacheid);
+ else if (uid)
BBPreclaim(uid);
}
} else {
@@ -682,9 +701,8 @@ la_bat_updates(logger *lg, logaction *la
if (b == NULL)
return GDK_FAIL;
}
+ BUN cnt = 0;
if (la->type == LOG_UPDATE_BULK) {
- BUN cnt = 0;
-
if (!lg->flushing) {
cnt = BATcount(b);
int is_msk = (b->ttype == TYPE_msk);
@@ -732,27 +750,16 @@ la_bat_updates(logger *lg, logaction *la
bat_iterator_end(&vi);
}
}
- cnt = (BUN)(la->offset + la->nr);
- if (la_bat_update_count(lg, la->cid, cnt, tid) != GDK_SUCCEED) {
- if (b)
- logbat_destroy(b);
+ } else if (la->type == LOG_UPDATE) {
+ if (!lg->flushing && BATupdate(b, la->uid, la->b, true) !=
GDK_SUCCEED) {
return GDK_FAIL;
}
- } else if (!lg->flushing && la->type == LOG_UPDATE) {
- BATiter vi = bat_iterator(la->b);
- BUN p, q;
-
- BATloop(la->b, p, q) {
- oid h = BUNtoid(la->uid, p);
- const void *t = BUNtail(vi, p);
-
- if (BUNreplace(b, h, t, true) != GDK_SUCCEED) {
- logbat_destroy(b);
- bat_iterator_end(&vi);
- return GDK_FAIL;
- }
- }
- bat_iterator_end(&vi);
+ }
+ cnt = (BUN)(la->offset + la->nr);
+ if (la_bat_update_count(lg, la->cid, cnt, tid) != GDK_SUCCEED) {
+ if (b)
+ logbat_destroy(b);
+ return GDK_FAIL;
}
if (b)
logbat_destroy(b);
@@ -1165,12 +1172,7 @@ log_read_transaction(logger *lg)
if (!lg->flushing)
GDKdebug &= ~CHECKMASK;
- // START variables used in case of LOG_TABLE
- log_id tid = 0; /*sql table id in case of LOG_TABLE*/
- struct offset* offsets = NULL;
- lng nr_offsets = 0;
- lng cap_offsets = 0;
- // END variables used in case of LOG_TABLE
+ BAT* cands = NULL; // used in case of LOG_TABLE
while (err == LOG_OK && (ok=log_read_format(lg, &l))) {
if (l.flag == 0 && l.id == 0) {
@@ -1223,11 +1225,7 @@ log_read_transaction(logger *lg)
if (tr == NULL)
err = LOG_EOF;
else {
- err = log_read_updates(lg, tr, &l, l.id,
offsets, &nr_offsets);
- if (l.flag == LOG_UPDATE_CONST && offsets &&
nr_offsets == cap_offsets) {
- cap_offsets <<= 1;
- offsets = GDKrealloc(offsets,
sizeof(*offsets) * cap_offsets);
- }
+ err = log_read_updates(lg, tr, &l, l.id,
cands?&cands:NULL);
}
break;
case LOG_CREATE:
@@ -1254,21 +1252,14 @@ log_read_transaction(logger *lg)
else {
if (l.id > 0) {
// START OF LOG_TABLE
- assert(tid == 0 && offsets == NULL &&
nr_offsets == 0);
- tid = l.id;
- cap_offsets = 10;
- offsets = GDKzalloc(sizeof(*offsets) *
cap_offsets);
- if (!offsets)
+ cands = COLnew(0, TYPE_void, 0,
TRANSIENT);
+ if (!cands)
err = LOG_ERR;
}
else {
// END OF LOG_TABLE
- assert(tid == -l.id);
- tid = 0;
- GDKfree(offsets);
- offsets = NULL;
- cap_offsets = 0;
- nr_offsets = 0;
+ BBPunfix(cands->batCacheid);
+ cands = NULL;
}
}
break;
@@ -1286,8 +1277,8 @@ log_read_transaction(logger *lg)
if (!lg->flushing)
GDKdebug = dbg;
- if (offsets)
- GDKfree(offsets);
+ if (cands)
+ GDKfree(cands);
if (!ok)
return LOG_EOF;
return err;
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -3418,22 +3418,15 @@ log_segment(sql_trans *tr, segment *s, s
}
static int
-log_segments(sql_trans *tr, segments *segs, sqlid id, size_t* nr_appends)
+log_segments(sql_trans *tr, segments *segs, sqlid id)
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]