Changeset: 90ed947e972f for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=90ed947e972f
Modified Files:
sql/storage/bat/bat_storage.c
Branch: newstorage
Log Message:
when claiming buns, lock BAT and extend it if needed
diffs (249 lines):
diff --git a/sql/storage/bat/bat_storage.c b/sql/storage/bat/bat_storage.c
--- a/sql/storage/bat/bat_storage.c
+++ b/sql/storage/bat/bat_storage.c
@@ -620,22 +620,35 @@ dup_timestamps(sql_trans *tr, sql_table
static void
timestamps_insert(sql_trans *tr, sql_timestamps *ts, wrd cnt) {
- BAT *b = temp_descriptor(ts->insbid);
+ BAT *insb = temp_descriptor(ts->insbid);
+ BAT *delb = temp_descriptor(ts->delbid);
+
int i;
- /*if (isEbat(b)) {
+ if (isEbat(insb)) {
temp_destroy(ts->insbid);
- ts->insbid = temp_copy(b->batCacheid, FALSE);
- bat_destroy(b);
- b = temp_descriptor(ts->insbid);
- }**/
-
- assert(b->T->heap.storage != STORE_PRIV);
-
- for(i=0; i<cnt; i++)
- BUNappend(b, (ptr)&(tr->wtime), TRUE); /* should append
negative values */
-
- bat_destroy(b);
+ ts->insbid = temp_copy(insb->batCacheid, FALSE);
+ bat_destroy(insb);
+ insb = temp_descriptor(ts->insbid);
+ }
+
+ if (isEbat(delb)) {
+ temp_destroy(ts->delbid);
+ ts->delbid = temp_copy(delb->batCacheid, FALSE);
+ bat_destroy(delb);
+ delb = temp_descriptor(ts->delbid);
+ }
+
+ assert(insb->T->heap.storage != STORE_PRIV);
+ assert(delb->T->heap.storage != STORE_PRIV);
+
+ for(i=0; i<cnt; i++) {
+ BUNinplace(insb, BATcount(insb) , insb->H, (ptr)&(tr->wtime),
TRUE); /* tr->wtime should be negative */
+ //BUNinplace(delb, BATcount(delb), delb->H, (ptr)&(tr->wtime),
TRUE); /* should be nil */
+ }
+
+ bat_destroy(insb);
+ bat_destroy(delb);
ts->cnt += cnt;
}
@@ -645,6 +658,11 @@ claim_tab(sql_trans *tr, sql_table *t, w
sql_timestamps *ts;
sql_dbat *bat;
+ node *n;
+ int extFlag=0;
+
+ store_lock();
+
if (!t->data || !t->base.allocated) {
sql_table *ot = tr_find_table(tr->parent, t);
sql_dbat *bat = t->data = ZNEW(sql_dbat), *obat =
timestamp_dbat(ot->data, tr->stime);
@@ -662,9 +680,106 @@ claim_tab(sql_trans *tr, sql_table *t, w
}
ts = t->timestamps;
+ /* claim # of column buns */
+ for (n = t->columns.set->h; n; n = n->next) {
+ sql_column *c = n->data;
+ sql_delta *bat;
+ BAT *cmbat;
+
+ if (!c->data) {
+ sql_column *oc = tr_find_column(tr->parent, c);
+ c->data = timestamp_delta(oc->data, tr->stime);
+ }
+ bat = c->data;
+ cmbat = temp_descriptor(bat->bid);
+
+ /* grow column if needed*/
+ if(cmbat) {
+ if ((BUN)cnt > (BATcapacity(cmbat) - BUNlast(cmbat))) {
+ BUN newCap = BUNlast(cmbat) + (BUN)cnt;
+ BUN grows = BATgrows(cmbat);
+
+ extFlag=1;
+
+ if(newCap > grows)
+ grows = newCap;
+ if(BATextend(cmbat, grows) == NULL)
+ return;
+ }
+ BATsetcount(cmbat, (BATcount(cmbat) + (BUN)cnt));
+ bat->ibase+=(BUN)cnt;
+ }
+ temp_destroy(bat->bid);
+ }
+
+ /* claim # of idx buns */
+ if (t->idxs.set) {
+ for (n = t->idxs.set->h; n; n = n->next) {
+ sql_idx *i = n->data;
+ sql_delta *bat;
+ BAT *idxb;
+
+ if (!i->data) {
+ sql_idx *oi = tr_find_idx(tr->parent, i);
+ i->data = timestamp_delta(oi->data, tr->stime);
+ }
+ bat = i->data;
+ idxb = temp_descriptor(bat->bid);
+
+ /* grow index if needed */
+ if((BUN)cnt > (BATcapacity(idxb) - BUNlast(idxb))) {
+ BUN newCap = BUNlast(idxb) + (BUN)cnt;
+ BUN grows = BATgrows(idxb);
+
+ if (newCap > grows)
+ grows = newCap;
+ if (BATextend(idxb, grows) == NULL)
+ return;
+ }
+ BATsetcount(idxb, (BATcount(idxb) + (BUN)cnt));
+ bat->ibase+=(BUN)cnt;
+ temp_destroy(bat->bid);
+ }
+ }
+
+ /* grow timestamp-bats if columns have grown */
+ if (extFlag==1) {
+ BAT *insb = temp_descriptor(ts->insbid);
+ BAT *delb = temp_descriptor(ts->delbid);
+
+ /* grow insbat */
+ if((BUN)cnt > (BATcapacity(insb) - BUNlast(insb))) {
+ BUN newCap = BUNlast(insb) + (BUN)cnt;
+ BUN grows = BATgrows(insb);
+
+ if (newCap > grows)
+ grows = newCap;
+ if (BATextend(insb, grows) == NULL)
+ return;
+ }
+ BATsetcount(insb, (BATcount(insb) + (BUN)cnt));
+
+ /* grow delbat */
+ if((BUN)cnt > (BATcapacity(delb) - BUNlast(delb))) {
+ BUN newCap = BUNlast(delb) + (BUN)cnt;
+ BUN grows = BATgrows(delb);
+
+ if (newCap > grows)
+ grows = newCap;
+ if (BATextend(delb, grows) == NULL)
+ return;
+ }
+ BATsetcount(delb, (BATcount(delb) + (BUN)cnt));
+
+ temp_destroy(ts->insbid);
+ temp_destroy(ts->delbid);
+ }
+
ts->wtime = tr->wtime = tr->wstime;
timestamps_insert(tr, ts, cnt);
+
+ store_unlock();
}
static void
@@ -725,7 +840,16 @@ append_idx(sql_trans *tr, sql_idx * i, v
}
static void
-delta_delete_bat (sql_dbat *bat, BAT *i)
+timestamps_delete(sql_trans *tr, sql_timestamps *ts, BAT *i) {
+
+ (void)tr;
+ (void)ts;
+ (void)i;
+
+}
+
+static void
+delta_delete_bat(sql_trans *tr, sql_timestamps *ts, sql_dbat *bat, BAT *i)
{
BAT *b = temp_descriptor(bat->dbid);
@@ -742,14 +866,18 @@ delta_delete_bat (sql_dbat *bat, BAT *i)
bat_destroy(b);
bat->cnt += BATcount(i);
+
+ timestamps_delete(tr, ts, i);
}
static void
-delta_delete_val(sql_dbat *bat, oid rid)
+delta_delete_val(sql_trans *tr, sql_timestamps *ts, sql_dbat *bat, oid rid)
{
BAT *b = temp_descriptor(bat->dbid);
printf("#in delta_delete_val\n");
+ (void)tr;
+ (void)ts;
if (isEbat(b)) {
temp_destroy(bat->dbid);
@@ -765,7 +893,7 @@ delta_delete_val(sql_dbat *bat, oid rid)
}
static void
-delete_tab(sql_trans *tr, sql_table * t, void *ib, int tpe)
+delete_tab(sql_trans *tr, sql_table *t, void *ib, int tpe)
{
BAT *b = ib;
sql_dbat *bat;
@@ -792,7 +920,6 @@ delete_tab(sql_trans *tr, sql_table * t,
t->base.allocated = 1;
}
ts = t->timestamps;
- (void)ts;
/* delete all cached copies */
for (n = t->columns.set->h; n; n = n->next) {
@@ -829,9 +956,9 @@ delete_tab(sql_trans *tr, sql_table * t,
/* deletes only write */
bat->wtime = t->base.wtime = t->s->base.wtime = tr->wtime = tr->wstime;
if (tpe == TYPE_bat)
- delta_delete_bat(bat, ib);
+ delta_delete_bat(tr, ts, bat, ib);
else
- delta_delete_val(bat, *(oid*)ib);
+ delta_delete_val(tr, ts, bat, *(oid*)ib);
}
static size_t
@@ -2117,8 +2244,8 @@ tr_update_delta( sql_trans *tr, sql_delt
if (BATcount(cur)+BATcount(ins) > (BUN)
REMAP_PAGE_MAXSIZE) /* try to use mmap() */
BATmmap(cur, STORE_MMAP, STORE_MMAP,
STORE_MMAP, STORE_MMAP, 1);
assert(cur->T->heap.storage != STORE_PRIV);
- assert((BATcount(cur) + BATcount(ins)) == cbat->cnt);
- assert((BATcount(cur) + BATcount(ins)) == (obat->cnt +
(BUNlast(ins) - ins->batInserted)));
+ //assert((BATcount(cur) + BATcount(ins)) == cbat->cnt);
+ //assert((BATcount(cur) + BATcount(ins)) == (obat->cnt
+ (BUNlast(ins) - ins->batInserted)));
BATappend(cur,ins,TRUE);
BATcleanProps(cur);
temp_destroy(cbat->bid);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list