Changeset: 53601147f2e2 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/53601147f2e2
Modified Files:
sql/storage/bat/bat_storage.c
Branch: dict
Log Message:
initial steps for updating columns with dict compression
diffs (truncated from 365 to 300 lines):
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -1076,6 +1076,65 @@ segments_is_deleted(segment *s, sql_tran
return 0;
}
+static BAT *
+dict_append_bat(column_storage *cs, BAT *i)
+{
+ BAT *newoffsets = NULL;
+ BAT *u = temp_descriptor(cs->ebid);
+
+ if (!u)
+ return NULL;
+ BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
+ if (DICTprepare4append(&newoffsets, i, u) < 0) {
+ assert(0);
+ } else {
+ /* returns new offset bat (ie to be appended), possibly with
larger type ! */
+ if (BATcount(u) >= max_cnt) {
+ if (max_cnt == 64*1024) { /* decompress */
+ BAT *b = temp_descriptor(cs->bid);
+ BAT *n = b?DICTdecompress_(b , u):NULL;
+ /* TODO also decrompress updates if any */
+ bat_destroy(b);
+ assert(newoffsets == NULL);
+ if (!n) {
+ bat_destroy(u);
+ bat_destroy(n);
+ return NULL;
+ }
+ /* TODO change storage type */
+ if (cs->bid)
+ temp_destroy(cs->bid);
+ cs->bid = temp_create(n);
+ bat_destroy(n);
+ if (cs->ebid)
+ temp_destroy(cs->ebid);
+ cs->ebid = 0;
+ cs->st = ST_DEFAULT;
+ cs->cleared = true;
+ } else {
+ BAT *b = temp_descriptor(cs->bid);
+ BAT *n = b?DICTenlarge(b, BATcount(b),
BATcount(b) + BATcount(i)):NULL;
+ bat_destroy(b);
+ if (!n) {
+ bat_destroy(newoffsets);
+ bat_destroy(u);
+ bat_destroy(n);
+ return NULL;
+ }
+ if (cs->bid)
+ temp_destroy(cs->bid);
+ cs->bid = temp_create(n);
+ cs->cleared = true;
+ i = newoffsets;
+ }
+ } else { /* append */
+ i = newoffsets;
+ }
+ }
+ bat_destroy(u);
+ return i;
+}
+
/*
* Returns LOG_OK, LOG_ERR or LOG_CONFLICT
*/
@@ -1109,6 +1168,17 @@ cs_update_bat( sql_trans *tr, column_sto
return LOG_ERR;
}
}
+
+ if (cs->st == ST_DICT) {
+ /* possibly a new array is returned */
+ updates = dict_append_bat(cs, updates);
+ if (oupdates != updates)
+ bat_destroy(oupdates);
+ oupdates = updates;
+ if (!updates)
+ return LOG_ERR;
+ }
+
/* When we go to smaller grained update structures we should check for
concurrent updates on this column ! */
/* currently only one update delta is possible */
lock_table(tr->store, t->base.id);
@@ -1392,29 +1462,104 @@ delta_update_bat( sql_trans *tr, sql_del
return cs_update_bat(tr, &bat->cs, t, tids, updates, is_new);
}
+static void *
+dict_append_val(column_storage *cs, void *i, BUN cnt)
+{
+ void *newoffsets = NULL;
+ BAT *u = temp_descriptor(cs->ebid);
+
+ if (!u)
+ return NULL;
+ BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
+ if (DICTprepare4append_vals(&newoffsets, i, cnt, u) < 0) {
+ assert(0);
+ } else {
+ /* returns new offset bat (ie to be appended), possibly with
larger type ! */
+ if (BATcount(u) >= max_cnt) {
+ if (max_cnt == 64*1024) { /* decompress */
+ BAT *b = temp_descriptor(cs->bid);
+ BAT *n = b?DICTdecompress_(b , u):NULL;
+ /* TODO also decrompress updates if any */
+ bat_destroy(b);
+ assert(newoffsets == NULL);
+ if (!n) {
+ bat_destroy(u);
+ bat_destroy(n);
+ return NULL;
+ }
+ /* TODO change storage type */
+ if (cs->bid)
+ temp_destroy(cs->bid);
+ cs->bid = temp_create(n);
+ bat_destroy(n);
+ if (cs->ebid)
+ temp_destroy(cs->ebid);
+ cs->ebid = 0;
+ cs->st = ST_DEFAULT;
+ cs->cleared = true;
+ } else {
+ BAT *b = temp_descriptor(cs->bid);
+ BAT *n = b?DICTenlarge(b, BATcount(b),
BATcount(b) + cnt):NULL;
+ bat_destroy(b);
+ if (!n) {
+ GDKfree(newoffsets);
+ bat_destroy(u);
+ bat_destroy(n);
+ return NULL;
+ }
+ if (cs->bid)
+ temp_destroy(cs->bid);
+ cs->bid = temp_create(n);
+ cs->cleared = true;
+ i = newoffsets;
+ }
+ } else { /* append */
+ i = newoffsets;
+ }
+ }
+ bat_destroy(u);
+ return i;
+}
+
static int
cs_update_val( sql_trans *tr, column_storage *cs, sql_table *t, oid rid, void
*upd, int is_new)
{
+ void *oupd = upd;
storage *s = ATOMIC_PTR_GET(&t->data);
assert(!is_oid_nil(rid));
int inplace = is_new || cs->cleared || segments_is_append (s->segs->h,
tr, rid);
+ if (cs->st == ST_DICT) {
+ /* possibly a new array is returned */
+ upd = dict_append_val(cs, upd, 1);
+ if (!upd)
+ return LOG_ERR;
+ }
+
/* check if rid is insert ? */
if (!inplace) {
/* check conflict */
- if (segments_is_deleted(s->segs->h, tr, rid))
+ if (segments_is_deleted(s->segs->h, tr, rid)) {
+ if (oupd != upd)
+ GDKfree(upd);
return LOG_CONFLICT;
+ }
BAT *ui, *uv;
/* When we go to smaller grained update structures we should
check for concurrent updates on this column ! */
/* currently only one update delta is possible */
- if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
+ if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK) {
+ if (oupd != upd)
+ GDKfree(upd);
return LOG_ERR;
+ }
assert(uv->ttype);
assert(BATcount(ui) == BATcount(uv));
if (BUNappend(ui, (ptr) &rid, true) != GDK_SUCCEED ||
BUNappend(uv, (ptr) upd, true) != GDK_SUCCEED) {
+ if (oupd != upd)
+ GDKfree(upd);
bat_destroy(ui);
bat_destroy(uv);
return LOG_ERR;
@@ -1426,14 +1571,21 @@ cs_update_val( sql_trans *tr, column_sto
} else {
BAT *b = NULL;
- if((b = temp_descriptor(cs->bid)) == NULL)
+ if((b = temp_descriptor(cs->bid)) == NULL) {
+ if (oupd != upd)
+ GDKfree(upd);
return LOG_ERR;
+ }
if (void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
+ if (oupd != upd)
+ GDKfree(upd);
bat_destroy(b);
return LOG_ERR;
}
bat_destroy(b);
}
+ if (oupd != upd)
+ GDKfree(upd);
return LOG_OK;
}
@@ -1631,64 +1783,6 @@ update_idx(sql_trans *tr, sql_idx * i, v
return update_col_execute(tr, delta, i->t, isNew(i), tids, upd, tpe ==
TYPE_bat);
}
-static BAT *
-dict_append_bat(sql_delta *bat, BAT *i)
-{
- BAT *newoffsets = NULL;
- BAT *u = temp_descriptor(bat->cs.ebid);
-
- if (!u)
- return NULL;
- BUN max_cnt = (BATcount(u) < 256)?256:64*1024;
- if (DICTprepare4append(&newoffsets, i, u) < 0) {
- assert(0);
- } else {
- /* returns new offset bat (ie to be appended), possibly with
larger type ! */
- if (BATcount(u) >= max_cnt) {
- if (max_cnt == 64*1024) { /* decompress */
- BAT *b = temp_descriptor(bat->cs.bid);
- BAT *n = b?DICTdecompress_(b , u):NULL;
- bat_destroy(b);
- assert(newoffsets == NULL);
- if (!n) {
- bat_destroy(u);
- bat_destroy(n);
- return NULL;
- }
- /* TODO change storage type */
- if (bat->cs.bid)
- temp_destroy(bat->cs.bid);
- bat->cs.bid = temp_create(n);
- bat_destroy(n);
- if (bat->cs.ebid)
- temp_destroy(bat->cs.ebid);
- bat->cs.ebid = 0;
- bat->cs.st = ST_DEFAULT;
- bat->cs.cleared = true;
- } else {
- BAT *b = temp_descriptor(bat->cs.bid);
- BAT *n = b?DICTenlarge(b, BATcount(b),
BATcount(b) + BATcount(i)):NULL;
- bat_destroy(b);
- if (!n) {
- bat_destroy(newoffsets);
- bat_destroy(u);
- bat_destroy(n);
- return NULL;
- }
- if (bat->cs.bid)
- temp_destroy(bat->cs.bid);
- bat->cs.bid = temp_create(n);
- bat->cs.cleared = true;
- i = newoffsets;
- }
- } else { /* append */
- i = newoffsets;
- }
- }
- bat_destroy(u);
- return i;
-}
-
static int
delta_append_bat(sql_trans *tr, sql_delta *bat, sqlid id, BUN offset, BAT
*offsets, BAT *i)
{
@@ -1703,7 +1797,7 @@ delta_append_bat(sql_trans *tr, sql_delt
lock_column(tr->store, id);
if (bat->cs.st == ST_DICT) {
- oi = i = dict_append_bat(bat, i);
+ oi = i = dict_append_bat(&bat->cs, i);
if (!oi) {
unlock_column(tr->store, id);
return LOG_ERR;
@@ -1737,64 +1831,6 @@ delta_append_bat(sql_trans *tr, sql_delt
return (err)?LOG_ERR:LOG_OK;
}
-static void *
-dict_append_val(sql_delta *bat, void *i, BUN cnt)
-{
- void *newoffsets = NULL;
- BAT *u = temp_descriptor(bat->cs.ebid);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list