Changeset: 58f0bc5ca6e3 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/58f0bc5ca6e3
Modified Files:
sql/storage/bat/bat_storage.c
sql/storage/store.c
Branch: iso
Log Message:
Merged with Jul2021
diffs (truncated from 2202 to 300 lines):
diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c
--- a/gdk/gdk_bat.c
+++ b/gdk/gdk_bat.c
@@ -1253,18 +1253,22 @@ BUNappendmulti(BAT *b, const void *value
BATrmprop(b, GDK_UNIQUE_ESTIMATE);
b->theap->dirty |= count > 0;
+ MT_rwlock_wrlock(&b->thashlock);
for (BUN i = 0; i < count; i++) {
void *t = b->ttype && b->tvarsized ? ((void **) values)[i] :
(void *) ((char *) values + i * Tsize(b));
setcolprops(b, t);
gdk_return rc = bunfastapp_nocheck(b, p, t, Tsize(b));
- if (rc != GDK_SUCCEED)
+ if (rc != GDK_SUCCEED) {
+ MT_rwlock_wrunlock(&b->thashlock);
return rc;
+ }
if (b->thash) {
- HASHappend(b, p, t);
+ HASHappend_locked(b, p, t);
}
p++;
}
+ MT_rwlock_wrunlock(&b->thashlock);
IMPSdestroy(b); /* no support for inserts in imprints yet */
OIDXdestroy(b);
@@ -1405,7 +1409,6 @@ BUNinplacemulti(BAT *b, const oid *posit
* clear it */
b->tnil = false;
}
- HASHdelete(b, p, val); /* first delete old value from hash */
if (b->ttype != TYPE_void && ATOMlinear(b->ttype)) {
const ValRecord *prop;
@@ -1449,6 +1452,9 @@ BUNinplacemulti(BAT *b, const oid *posit
}
OIDXdestroy(b);
IMPSdestroy(b);
+
+ MT_rwlock_wrlock(&b->thashlock);
+ HASHdelete_locked(b, p, val); /* first delete old value from
hash */
if (b->tvarsized && b->ttype) {
var_t _d;
ptr _ptr;
@@ -1469,13 +1475,17 @@ BUNinplacemulti(BAT *b, const oid *posit
break;
#endif
}
- if (ATOMreplaceVAR(b, &_d, t) != GDK_SUCCEED)
+ if (ATOMreplaceVAR(b, &_d, t) != GDK_SUCCEED) {
+ MT_rwlock_wrunlock(&b->thashlock);
return GDK_FAIL;
+ }
if (b->twidth < SIZEOF_VAR_T &&
(b->twidth <= 2 ? _d - GDK_VAROFFSET : _d) >=
((size_t) 1 << (8 * b->twidth))) {
/* doesn't fit in current heap, upgrade it */
- if (GDKupgradevarheap(b, _d, 0, false) !=
GDK_SUCCEED)
+ if (GDKupgradevarheap(b, _d, 0, false) !=
GDK_SUCCEED) {
+ MT_rwlock_wrunlock(&b->thashlock);
return GDK_FAIL;
+ }
}
_ptr = BUNtloc(bi, p);
switch (b->twidth) {
@@ -1498,10 +1508,11 @@ BUNinplacemulti(BAT *b, const oid *posit
mskSetVal(b, p, * (msk *) t);
} else {
assert(BATatoms[b->ttype].atomPut == NULL);
- if (ATOMfix(b->ttype, t) != GDK_SUCCEED)
+ if (ATOMfix(b->ttype, t) != GDK_SUCCEED ||
+ ATOMunfix(b->ttype, BUNtloc(bi, p)) != GDK_SUCCEED)
{
+ MT_rwlock_wrunlock(&b->thashlock);
return GDK_FAIL;
- if (ATOMunfix(b->ttype, BUNtloc(bi, p)) != GDK_SUCCEED)
- return GDK_FAIL;
+ }
switch (ATOMsize(b->ttype)) {
case 0: /* void */
break;
@@ -1530,7 +1541,8 @@ BUNinplacemulti(BAT *b, const oid *posit
}
}
- HASHinsert(b, p, t); /* insert new value into hash */
+ HASHinsert_locked(b, p, t); /* insert new value into hash */
+ MT_rwlock_wrunlock(&b->thashlock);
tt = b->ttype;
prv = p > 0 ? p - 1 : BUN_NONE;
@@ -1704,40 +1716,63 @@ BUNfnd(BAT *b, const void *v)
if (BATordered(b) || BATordered_rev(b))
return SORTfnd(b, v);
}
- bi = bat_iterator(b);
- switch (ATOMbasetype(b->ttype)) {
- case TYPE_bte:
- HASHfnd_bte(r, bi, v);
- break;
- case TYPE_sht:
- HASHfnd_sht(r, bi, v);
- break;
- case TYPE_int:
- HASHfnd_int(r, bi, v);
- break;
- case TYPE_flt:
- HASHfnd_flt(r, bi, v);
- break;
- case TYPE_dbl:
- HASHfnd_dbl(r, bi, v);
- break;
- case TYPE_lng:
- HASHfnd_lng(r, bi, v);
- break;
+ if (BAThash(b) == GDK_SUCCEED) {
+ MT_rwlock_rdlock(&b->thashlock);
+ if (b->thash == NULL) {
+ MT_rwlock_rdunlock(&b->thashlock);
+ goto hashfnd_failed;
+ }
+ bi = bat_iterator(b);
+ switch (ATOMbasetype(b->ttype)) {
+ case TYPE_bte:
+ HASHloop_bte(bi, b->thash, r, v)
+ break;
+ break;
+ case TYPE_sht:
+ HASHloop_sht(bi, b->thash, r, v)
+ break;
+ break;
+ case TYPE_int:
+ HASHloop_int(bi, b->thash, r, v)
+ break;
+ break;
+ case TYPE_flt:
+ HASHloop_flt(bi, b->thash, r, v)
+ break;
+ break;
+ case TYPE_dbl:
+ HASHloop_dbl(bi, b->thash, r, v)
+ break;
+ break;
+ case TYPE_lng:
+ HASHloop_lng(bi, b->thash, r, v)
+ break;
+ break;
#ifdef HAVE_HGE
- case TYPE_hge:
- HASHfnd_hge(r, bi, v);
- break;
+ case TYPE_hge:
+ HASHloop_hge(bi, b->thash, r, v)
+ break;
+ break;
#endif
- case TYPE_str:
- HASHfnd_str(r, bi, v);
- break;
- default:
- HASHfnd(r, bi, v);
+ case TYPE_uuid:
+ HASHloop_uuid(bi, b->thash, r, v)
+ break;
+ break;
+ case TYPE_str:
+ HASHloop_str(bi, b->thash, r, v)
+ break;
+ break;
+ default:
+ HASHloop(bi, b->thash, r, v)
+ break;
+ break;
+ }
+ MT_rwlock_rdunlock(&b->thashlock);
+ return r;
}
- return r;
hashfnd_failed:
/* can't build hash table, search the slow way */
+ GDKclrerr();
return slowfnd(b, v);
}
@@ -2036,44 +2071,41 @@ BATroles(BAT *b, const char *tnme)
/* rather than deleting X.new, we comply with the commit protocol and
* move it to backup storage */
static gdk_return
-backup_new(Heap *hp, int lockbat)
+backup_new(Heap *hp)
{
- int batret, bakret, xx, ret = 0;
+ int batret, bakret, ret = -1;
char *batpath, *bakpath;
struct stat st;
- /* file actions here interact with the global commits */
- for (xx = 0; xx <= lockbat; xx++)
- MT_lock_set(&GDKtrimLock(xx));
-
/* check for an existing X.new in BATDIR, BAKDIR and SUBDIR */
batpath = GDKfilepath(hp->farmid, BATDIR, hp->filename, ".new");
bakpath = GDKfilepath(hp->farmid, BAKDIR, hp->filename, ".new");
- if (batpath == NULL || bakpath == NULL) {
- ret = -1;
- goto bailout;
- }
- batret = MT_stat(batpath, &st);
- bakret = MT_stat(bakpath, &st);
+ if (batpath != NULL && bakpath != NULL) {
+ /* file actions here interact with the global commits */
+ MT_lock_set(&GDKtmLock);
+
+ batret = MT_stat(batpath, &st);
+ bakret = MT_stat(bakpath, &st);
- if (batret == 0 && bakret) {
- /* no backup yet, so move the existing X.new there out
- * of the way */
- if ((ret = MT_rename(batpath, bakpath)) < 0)
- GDKsyserror("backup_new: rename %s to %s failed\n",
- batpath, bakpath);
- TRC_DEBUG(IO_, "rename(%s,%s) = %d\n", batpath, bakpath, ret);
- } else if (batret == 0) {
- /* there is a backup already; just remove the X.new */
- if ((ret = MT_remove(batpath)) != 0)
- GDKsyserror("backup_new: remove %s failed\n", batpath);
- TRC_DEBUG(IO_, "remove(%s) = %d\n", batpath, ret);
+ if (batret == 0 && bakret) {
+ /* no backup yet, so move the existing X.new there out
+ * of the way */
+ if ((ret = MT_rename(batpath, bakpath)) < 0)
+ GDKsyserror("backup_new: rename %s to %s
failed\n",
+ batpath, bakpath);
+ TRC_DEBUG(IO_, "rename(%s,%s) = %d\n", batpath,
bakpath, ret);
+ } else if (batret == 0) {
+ /* there is a backup already; just remove the X.new */
+ if ((ret = MT_remove(batpath)) != 0)
+ GDKsyserror("backup_new: remove %s failed\n",
batpath);
+ TRC_DEBUG(IO_, "remove(%s) = %d\n", batpath, ret);
+ } else {
+ ret = 0;
+ }
+ MT_lock_unset(&GDKtmLock);
}
- bailout:
GDKfree(batpath);
GDKfree(bakpath);
- for (xx = lockbat; xx >= 0; xx--)
- MT_lock_unset(&GDKtrimLock(xx));
return ret ? GDK_FAIL : GDK_SUCCEED;
}
@@ -2093,7 +2125,7 @@ HEAPchangeaccess(Heap *hp, int dstmode,
}
if (hp->storage == STORE_MMAP) { /* 6=>4 */
hp->dirty = true;
- return backup_new(hp, BBP_THREADMASK) != GDK_SUCCEED ?
STORE_INVALID : STORE_MMAP; /* only called for existing bats */
+ return backup_new(hp) != GDK_SUCCEED ? STORE_INVALID :
STORE_MMAP; /* only called for existing bats */
}
return hp->storage; /* 7=>5 */
}
@@ -2105,7 +2137,7 @@ HEAPcommitpersistence(Heap *hp, bool wri
if (existing) { /* existing, ie will become transient */
if (hp->storage == STORE_MMAP && hp->newstorage == STORE_PRIV
&& writable) { /* 6=>2 */
hp->dirty = true;
- return backup_new(hp, -1) != GDK_SUCCEED ?
STORE_INVALID : STORE_MMAP; /* only called for existing bats */
+ return backup_new(hp) != GDK_SUCCEED ? STORE_INVALID :
STORE_MMAP; /* only called for existing bats */
}
return hp->newstorage; /* 4=>0,5=>1,7=>3,c=>a no change */
}
diff --git a/gdk/gdk_batop.c b/gdk/gdk_batop.c
--- a/gdk/gdk_batop.c
+++ b/gdk/gdk_batop.c
@@ -899,6 +899,7 @@ BATappend2(BAT *b, BAT *n, BAT *s, bool
if (BATextend(b, grows) != GDK_SUCCEED)
return GDK_FAIL;
}
+ MT_rwlock_wrlock(&b->thashlock);
if (BATatoms[b->ttype].atomFix == NULL &&
b->ttype != TYPE_void &&
n->ttype != TYPE_void &&
@@ -908,7 +909,7 @@ BATappend2(BAT *b, BAT *n, BAT *s, bool
Tloc(n, ci.seq - hseq),
cnt * Tsize(n));
for (BUN i = 0; b->thash && i < cnt; i++) {
- HASHappend(b, r, Tloc(b, r));
+ HASHappend_locked(b, r, Tloc(b, r));
r++;
}
BATsetcount(b, BATcount(b) + cnt);
@@ -919,13 +920,16 @@ BATappend2(BAT *b, BAT *n, BAT *s, bool
cnt--;
BUN p = canditer_next(&ci) - hseq;
const void *t = BUNtail(ni, p);
- if (bunfastapp_nocheck(b, r, t, Tsize(b)) !=
GDK_SUCCEED)
+ if (bunfastapp_nocheck(b, r, t, Tsize(b)) !=
GDK_SUCCEED) {
+ MT_rwlock_wrunlock(&b->thashlock);
return GDK_FAIL;
+ }
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list