Changeset: f330e27cf4b2 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/f330e27cf4b2
Modified Files:
sql/storage/bat/bat_storage.c
sql/storage/sql_storage.h
sql/storage/store.c
sql/test/BugTracker-2020/Tests/global_table_propagation.Bug-6846.py
Branch: Jul2021
Log Message:
one access of global temps, instantiate them on the 'localtmp' change set. This
way they are implemented
into transaction isolated areas. The data usage becomes equal to local temps.
The meta data stays equal
to persistent tables.
diffs (truncated from 401 to 300 lines):
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -508,9 +508,51 @@ timestamp_delta( sql_trans *tr, sql_delt
return d;
}
+static sql_table *
+find_tmp_table(sql_trans *tr, sql_table *t)
+{
+ assert(isGlobal(t));
+ assert(tr->tmp == t->s);
+ node *n = cs_find_id(&tr->localtmps, t->base.id);
+ sql_table *lt = NULL;
+
+ if (n)
+ lt = (sql_table*)n->data;
+ if (!lt) {
+ lt = globaltmp_instantiate(tr, t);
+ /* TODO prepend to not mark as new */
+ if (lt)
+ cs_add(&tr->localtmps, lt, true);
+ }
+ return lt;
+}
+
+static sql_column *
+find_tmp_column(sql_trans *tr, sql_column *c)
+{
+ assert(isGlobal(c->t));
+ sql_table *lt = find_tmp_table(tr, c->t);
+ if (lt)
+ return find_sql_column(lt, c->base.name);
+ return NULL;
+}
+
+static sql_idx *
+find_tmp_idx(sql_trans *tr, sql_idx *i)
+{
+ assert(isGlobal(i->t));
+ sql_table *lt = find_tmp_table(tr, i->t);
+ if (lt)
+ return find_sql_idx(lt, i->base.name);
+ return NULL;
+}
+
static sql_delta *
temp_col_timestamp_delta( sql_trans *tr, sql_column *c)
{
+ if (isGlobal(c->t))
+ c = find_tmp_column(tr, c);
+ assert (!isGlobal(c->t));
assert(isTempTable(c->t));
sql_delta *d = temp_delta(ATOMIC_PTR_GET(&c->data), tr->tid);
if (!d) {
@@ -569,6 +611,9 @@ timestamp_storage( sql_trans *tr, storag
static storage *
temp_tab_timestamp_storage( sql_trans *tr, sql_table *t)
{
+ if (isGlobal(t))
+ t = find_tmp_table(tr, t);
+ assert(!isGlobal(t));
assert(isTempTable(t));
storage *d = temp_storage(ATOMIC_PTR_GET(&t->data), tr->tid);
if (!d) {
@@ -857,7 +902,7 @@ older_delta( sql_delta *d, sql_trans *tr
}
static BAT *
-bind_ubat(sql_trans *tr, sql_delta *d, int access, int type, size_t cnt)
+bind_ubat(sql_trans *tr, sql_delta *d, bool temp, int access, int type, size_t
cnt)
{
assert(tr->active);
sql_delta *o = NULL;
@@ -871,22 +916,24 @@ bind_ubat(sql_trans *tr, sql_delta *d, i
return NULL;
}
}
- while ((o = older_delta(d, tr)) != NULL) {
- BAT *oui = NULL, *ouv = NULL;
- if (!oui)
- oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type, cnt);
- if (access == RD_UPD_VAL)
- ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type, cnt);
- if (!ui || !oui || (access == RD_UPD_VAL && (!uv || !ouv))) {
- bat_destroy(ui);
- bat_destroy(uv);
- bat_destroy(oui);
- bat_destroy(ouv);
- return NULL;
+ if (!temp) {
+ while ((o = older_delta(d, tr)) != NULL) {
+ BAT *oui = NULL, *ouv = NULL;
+ if (!oui)
+ oui = cs_bind_ubat(&o->cs, RD_UPD_ID, type,
cnt);
+ if (access == RD_UPD_VAL)
+ ouv = cs_bind_ubat(&o->cs, RD_UPD_VAL, type,
cnt);
+ if (!ui || !oui || (access == RD_UPD_VAL && (!uv ||
!ouv))) {
+ bat_destroy(ui);
+ bat_destroy(uv);
+ bat_destroy(oui);
+ bat_destroy(ouv);
+ return NULL;
+ }
+ if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
+ return NULL;
+ d = o;
}
- if ((ui = merge_updates(ui, &uv, oui, ouv)) == NULL)
- return NULL;
- d = o;
}
if (uv) {
bat_destroy(ui);
@@ -902,7 +949,7 @@ bind_ucol(sql_trans *tr, sql_column *c,
if (!d)
return NULL;
- return bind_ubat(tr, d, access, c->type.type->localtype, cnt);
+ return bind_ubat(tr, d, isTempTable(c->t), access,
c->type.type->localtype, cnt);
}
static BAT *
@@ -913,7 +960,7 @@ bind_uidx(sql_trans *tr, sql_idx * i, in
if (!d)
return NULL;
- return bind_ubat(tr, d, access, type, cnt);
+ return bind_ubat(tr, d, isTempTable(i->t), access, type, cnt);
}
static BAT *
@@ -1572,6 +1619,9 @@ update_col(sql_trans *tr, sql_column *c,
bool update_conflict = false;
sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
+ if (isTempTable(c->t) && isGlobal(c->t))
+ c = find_tmp_column(tr, c);
+
if (tpe == TYPE_bat) {
BAT *t = tids;
if (!BATcount(t))
@@ -1629,6 +1679,9 @@ update_idx(sql_trans *tr, sql_idx * i, v
bool update_conflict = false;
sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
+ if (isTempTable(i->t) && isGlobal(i->t))
+ i = find_tmp_idx(tr, i);
+
if (tpe == TYPE_bat) {
BAT *t = tids;
if (!BATcount(t))
@@ -1762,6 +1815,9 @@ append_col(sql_trans *tr, sql_column *c,
{
sql_delta *delta, *odelta = ATOMIC_PTR_GET(&c->data);
+ if (isTempTable(c->t) && isGlobal(c->t))
+ c = find_tmp_column(tr, c);
+
if ((delta = bind_col_data(tr, c, NULL)) == NULL)
return LOG_ERR;
@@ -1778,6 +1834,9 @@ append_idx(sql_trans *tr, sql_idx * i, B
{
sql_delta *delta, *odelta = ATOMIC_PTR_GET(&i->data);
+ if (isTempTable(i->t) && isGlobal(i->t))
+ i = find_tmp_idx(tr, i);
+
if ((delta = bind_idx_data(tr, i, NULL)) == NULL)
return LOG_ERR;
@@ -2054,6 +2113,9 @@ delete_tab(sql_trans *tr, sql_table * t,
BAT *b = ib;
storage *bat;
+ if (isTempTable(t) && isGlobal(t))
+ t = find_tmp_table(tr, t);
+
if (tpe == TYPE_bat && !BATcount(b))
return ok;
@@ -2292,7 +2354,7 @@ create_col(sql_trans *tr, sql_column *c)
size_t cnt = 0;
/* alter ? */
- if (ol_first_node(c->t->columns) && (fc =
ol_first_node(c->t->columns)->data) != NULL) {
+ if (!isTempTable(c->t) && ol_first_node(c->t->columns) && (fc =
ol_first_node(c->t->columns)->data) != NULL) {
storage *s = ATOMIC_PTR_GET(&fc->t->data);
cnt = segs_end(s->segs, tr, c->t);
}
@@ -3893,6 +3955,9 @@ claim_tab(sql_trans *tr, sql_table *t, s
{
storage *s;
+ if (isTempTable(t) && isGlobal(t))
+ t = find_tmp_table(tr, t);
+
/* we have a single segment structure for each persistent table
* for temporary tables each has its own */
if ((s = bind_del_data(tr, t, NULL)) == NULL)
@@ -3908,6 +3973,9 @@ key_claim_tab(sql_trans *tr, sql_table *
storage *s;
int res = 0;
+ if (isTempTable(t) && isGlobal(t))
+ t = find_tmp_table(tr, t);
+
/* we have a single segment structure for each persistent table
* for temporary tables each has its own */
if ((s = bind_del_data(tr, t, NULL)) == NULL)
@@ -4061,51 +4129,6 @@ bind_cands(sql_trans *tr, sql_table *t,
return segments2cands(s->segs->h, tr, t, start, end);
}
-static void
-temp_del_tab(sql_trans *tr, sql_table *t)
-{
- ulng tid = tr->tid;
- lock_table(tr->store, t->base.id);
- table_retry:
- for (storage *d = ATOMIC_PTR_GET(&t->data), *p = NULL, *n = NULL; d; d
= n) {
- n = d->next;
- if (d->cs.ts == tid) {
- if (p == NULL) {
- if (!ATOMIC_PTR_CAS(&t->data, (void **) &d, n))
- goto table_retry;
- } else {
- p->next = n;
- }
- d->next = NULL;
- destroy_storage(d);
- } else {
- p = d;
- }
- }
- unlock_table(tr->store, t->base.id);
- for (node *nd = t->columns->l->h; nd; nd = nd->next) {
- sql_column *c = nd->data;
- lock_column(tr->store, c->base.id);
- column_retry:
- for (sql_delta *d = ATOMIC_PTR_GET(&c->data), *p = NULL, *n =
NULL; d; d = n) {
- n = d->next;
- if (d->cs.ts == tid) {
- if (p == NULL) {
- if (!ATOMIC_PTR_CAS(&c->data, (void **)
&d, n))
- goto column_retry;
- } else {
- p->next = n;
- }
- d->next = NULL;
- destroy_delta(d, false);
- } else {
- p = d;
- }
- }
- unlock_column(tr->store, c->base.id);
- }
-}
-
void
bat_storage_init( store_functions *sf)
{
@@ -4150,8 +4173,6 @@ bat_storage_init( store_functions *sf)
sf->drop_del = &drop_del;
sf->clear_table = &clear_table;
-
- sf->temp_del_tab = &temp_del_tab;
}
#if 0
diff --git a/sql/storage/sql_storage.h b/sql/storage/sql_storage.h
--- a/sql/storage/sql_storage.h
+++ b/sql/storage/sql_storage.h
@@ -198,8 +198,6 @@ typedef BUN (*clear_table_fptr) (sql_tra
*/
typedef int (*update_table_fptr) (sql_trans *tr, sql_table *ft, sql_table *tt);
-typedef void (*temp_del_tab_fptr) (sql_trans *tr, sql_table *ft);
-
/* backing struct for this interface */
typedef struct store_functions {
@@ -247,8 +245,6 @@ typedef struct store_functions {
upgrade_col_fptr upgrade_col;
upgrade_idx_fptr upgrade_idx;
upgrade_del_fptr upgrade_del;
-
- temp_del_tab_fptr temp_del_tab;
} store_functions;
typedef int (*logger_create_fptr) (struct sqlstore *store, int debug, const
char *logdir, int catalog_version);
@@ -441,6 +437,7 @@ extern int sql_trans_copy_column(sql_tra
extern int sql_trans_copy_key(sql_trans *tr, sql_table *t, sql_key *k, sql_key
**kres);
extern int sql_trans_copy_idx(sql_trans *tr, sql_table *t, sql_idx *i, sql_idx
**ires);
extern int sql_trans_copy_trigger(sql_trans *tr, sql_table *t, sql_trigger
*tri, sql_trigger **tres);
+extern sql_table *globaltmp_instantiate(sql_trans *tr, sql_table *t);
#define NR_TABLE_LOCKS 64
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]