Changeset: 1845207c2511 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/1845207c2511
Modified Files:
sql/storage/store.c
Branch: iso
Log Message:
merged with Jul2021
diffs (truncated from 892 to 300 lines):
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -71,8 +71,18 @@ unlock_table(sqlstore *store, sqlid id)
MT_lock_unset(&store->table_locks[id&(NR_TABLE_LOCKS-1)]);
}
-#define lock_column(store, id) lock_table(store, id)
-#define unlock_column(store, id) unlock_table(store, id)
+static void
+lock_column(sqlstore *store, sqlid id)
+{
+ MT_lock_set(&store->column_locks[id&(NR_TABLE_LOCKS-1)]);
+}
+
+static void
+unlock_column(sqlstore *store, sqlid id)
+{
+ MT_lock_unset(&store->column_locks[id&(NR_TABLE_LOCKS-1)]);
+}
+
static int
tc_gc_seg( sql_store Store, sql_change *change, ulng oldest)
@@ -218,7 +228,7 @@ rollback_segments(segments *segs, sql_tr
seg->next = cur->next;
if (cur == segs->t)
segs->t = seg;
- mark4destroy(cur, change, oldest/* TODO somehow
get current timestamp*/);
+ mark4destroy(cur, change,
store_get_timestamp(tr->store));
cur = seg;
} else {
seg = cur; /* begin of new merge */
@@ -618,6 +628,7 @@ cs_bind_ubat( column_storage *cs, int ac
BAT *b;
assert(access == RD_UPD_ID || access == RD_UPD_VAL);
+ /* returns the updates for cs */
if (cs->uibid && cs->uvbid) {
if (access == RD_UPD_ID)
b = temp_descriptor(cs->uibid);
@@ -630,20 +641,145 @@ cs_bind_ubat( column_storage *cs, int ac
}
static BAT *
-bind_ucol(sql_trans *tr, sql_column *c, int access)
+merge_updates( BAT *ui, BAT **UV, BAT *oi, BAT *ov)
+{
+ int err = 0;
+ BAT *uv = *UV;
+ BUN cnt = BATcount(ui)+BATcount(oi);
+ BAT *ni = bat_new(TYPE_oid, cnt, PERSISTENT);
+ BAT *nv = uv?bat_new(uv->ttype, cnt, PERSISTENT):NULL;
+
+ if (!ni || (uv && !nv)) {
+ bat_destroy(ni);
+ bat_destroy(nv);
+ bat_destroy(ui);
+ bat_destroy(uv);
+ bat_destroy(oi);
+ bat_destroy(ov);
+ return NULL;
+ }
+ BATiter uvi;
+ BATiter ovi;
+
+ if (uv) {
+ uvi = bat_iterator(uv);
+ ovi = bat_iterator(ov);
+ }
+
+ /* handle dense (void) cases together as we need too merge updates
(which is slower anyway) */
+ BUN uip = 0, uie = BATcount(ui);
+ BUN oip = 0, oie = BATcount(oi);
+
+ oid uiseqb = ui->tseqbase;
+ oid oiseqb = oi->tseqbase;
+ oid *uipt = NULL, *oipt = NULL;
+ if (!BATtdense(ui))
+ uipt = Tloc(ui, 0);
+ if (!BATtdense(oi))
+ oipt = Tloc(oi, 0);
+ while (uip < uie && oip < oie && !err) {
+ oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
+ oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
+
+ if (uiid <= oiid) {
+ if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
+ (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true)
!= GDK_SUCCEED))
+ err = 1;
+ uip++;
+ if (uiid == oiid)
+ oip++;
+ } else { /* uiid > oiid */
+ if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
+ (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true)
!= GDK_SUCCEED) )
+ err = 1;
+ oip++;
+ }
+ }
+ while (uip < uie && !err) {
+ oid uiid = (uipt)?uipt[uip]: uiseqb+uip;
+ if (BUNappend(ni, (ptr) &uiid, true) != GDK_SUCCEED ||
+ (ov && BUNappend(nv, (ptr) BUNtail(uvi, uip), true) !=
GDK_SUCCEED))
+ err = 1;
+ uip++;
+ }
+ while (oip < oie && !err) {
+ oid oiid = (oipt)?oipt[oip]: oiseqb+oip;
+ if (BUNappend(ni, (ptr) &oiid, true) != GDK_SUCCEED ||
+ (ov && BUNappend(nv, (ptr) BUNtail(ovi, oip), true) !=
GDK_SUCCEED) )
+ err = 1;
+ oip++;
+ }
+ bat_destroy(ui);
+ bat_destroy(uv);
+ bat_destroy(oi);
+ bat_destroy(ov);
+ if (!err) {
+ if (nv)
+ *UV = nv;
+ return ni;
+ }
+ *UV = NULL;
+ bat_destroy(ni);
+ bat_destroy(nv);
+ return NULL;
+}
+
+static sql_delta *
+older_delta( sql_delta *d, sql_trans *tr)
+{
+ sql_delta *o = d->next;
+
+ while (o) {
+ if (o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
+ break;
+ else
+ o = o->next;
+ }
+ if (o && o->cs.ucnt && VALID_4_READ(o->cs.ts, tr))
+ return o;
+ return NULL;
+}
+
+static BAT *
+bind_ubat(sql_trans *tr, sql_delta *d, int access, int type)
{
assert(tr->active);
- sql_delta *d = col_timestamp_delta(tr, c);
- return cs_bind_ubat(&d->cs, access, c->type.type->localtype);
+ sql_delta *o = NULL;
+ BAT *ui = NULL, *uv = NULL;
+
+ ui = cs_bind_ubat(&d->cs, RD_UPD_ID, type);
+ if (access == RD_UPD_VAL)
+ uv = cs_bind_ubat(&d->cs, RD_UPD_VAL, type);
+ while ((o = older_delta(d, tr)) != NULL) {
+ BAT *oui = NULL, *ouv = NULL;
+ if (!oui)
+ oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type);
+ if (access == RD_UPD_VAL)
+ ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type);
+ if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv)))
+ return NULL;
+ if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
+ return NULL;
+ d = o;
+ }
+ if (uv) {
+ bat_destroy(ui);
+ return uv;
+ }
+ return ui;
+}
+
+static BAT *
+bind_ucol(sql_trans *tr, sql_column *c, int access)
+{
+ return bind_ubat(tr, col_timestamp_delta(tr, c), access,
c->type.type->localtype);
}
static BAT *
bind_uidx(sql_trans *tr, sql_idx * i, int access)
{
int type = oid_index(i->type)?TYPE_oid:TYPE_lng;
- assert(tr->active);
- sql_delta *d = idx_timestamp_delta(tr, i);
- return cs_bind_ubat(&d->cs, access, type);
+ return bind_ubat(tr, idx_timestamp_delta(tr, i), access, type);
}
static BAT *
@@ -757,11 +893,29 @@ segments_is_append(segment *s, sql_trans
}
static int
+segments_is_deleted(segment *s, sql_trans *tr, oid rid)
+{
+ for(; s; s=s->next) {
+ if (s->start <= rid && s->end > rid) {
+ if (s->ts >= tr->ts && s->deleted) {
+ return 1;
+ }
+ break;
+ }
+ }
+ return 0;
+}
+
+/*
+ * Returns LOG_OK, LOG_ERR or LOG_CONFLICT
+ */
+static int
cs_update_bat( sql_trans *tr, column_storage *cs, sql_table *t, BAT *tids, BAT
*updates, int is_new)
{
storage *s = ATOMIC_PTR_GET(&t->data);
int res = LOG_OK;
- BAT *otids = tids;
+ BAT *otids = tids, *oupdates = updates;
+
if (!BATcount(tids))
return LOG_OK;
@@ -770,107 +924,225 @@ cs_update_bat( sql_trans *tr, column_sto
if (!otids)
return LOG_ERR;
}
+ /* When we go to smaller grained update structures we should check for
concurrent updates on this column ! */
+ /* currently only one update delta is possible */
+ if (!otids->tsorted) {
+ BAT *sorted, *order;
+ if (BATsort(&sorted, &order, NULL, otids, NULL, NULL, false,
false, false) != GDK_SUCCEED) {
+ if (otids != tids)
+ bat_destroy(otids);
+ return LOG_ERR;
+ }
+ if (otids != tids)
+ bat_destroy(otids);
+ otids = sorted;
+ oupdates = BATproject(order, oupdates);
+ bat_destroy(order);
+ }
+ assert(otids->tsorted);
if (!is_new && !cs->cleared) {
- BAT *ui, *uv;
-
- if (cs_real_update_bats(cs, &ui, &uv) != LOG_OK)
- return LOG_ERR;
+ BAT *ui = NULL, *uv = NULL;
/* handle updates on just inserted bits */
- if (count_inserts(s->segs->h, tr)) {
- segment *seg = s->segs->h;
- BUN ucnt = BATcount(otids);
- BATiter upi = bat_iterator(updates);
- BAT *b;
-
- if((b = temp_descriptor(cs->bid)) == NULL) {
- bat_destroy(ui);
- bat_destroy(uv);
- if (otids != tids)
- bat_destroy(otids);
- return LOG_ERR;
- }
-
- if (BATtdense(otids)) {
- oid start = otids->tseqbase, offset = start;
- oid end = start + ucnt;
- for(; seg; seg=seg->next) {
- if (seg->start <= start && seg->end >
start) {
- BUN lend = end <
seg->end?end:seg->end;
- if (seg->ts == tr->tid &&
!seg->deleted) {
- for (oid rid = start;
rid < lend; rid++) {
- ptr upd =
BUNtail(upi, rid-offset);
- if
(void_inplace(b, rid, upd, true) != GDK_SUCCEED) {
-
bat_destroy(b);
-
bat_destroy(ui);
-
bat_destroy(uv);
- if
(otids != tids)
-
bat_destroy(otids);
- return
LOG_ERR;
- }
+ /* handle updates on updates (within one transaction) */
+ BATiter upi = bat_iterator(oupdates);
+ BUN cnt = 0, ucnt = BATcount(otids);
+ BAT *b, *ins = NULL;
+ int *msk = NULL;
+
+ if((b = temp_descriptor(cs->bid)) == NULL)
+ res = LOG_ERR;
+
+ if (BATtdense(otids)) {
+ oid start = otids->tseqbase, offset = start;
+ oid end = start + ucnt;
+
+ for(segment *seg = s->segs->h; seg && res == LOG_OK ;
seg=seg->next) {
+ if (seg->start <= start && seg->end > start) {
+ /* check for delete conflicts */
+ if (seg->ts >= tr->ts && seg->deleted) {
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list