Changeset: d13cef9b7320 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/d13cef9b7320
Modified Files:
gdk/gdk_bat.c
gdk/gdk_batop.c
gdk/gdk_hash.c
gdk/gdk_private.h
gdk/gdk_storage.c
gdk/gdk_system.c
sql/storage/store.c
Branch: Dec2025
Log Message:
Fix some data races.
diffs (truncated from 310 to 300 lines):
diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c
--- a/gdk/gdk_bat.c
+++ b/gdk/gdk_bat.c
@@ -1009,6 +1009,7 @@ BUNappendmulti(BAT *b, const void *value
if (bi.maxpos != BUN_NONE)
maxvalp = BUNtvar(bi, bi.maxpos);
const void *vbase = b->tvheap->base;
+ Heap *oldheap = b->theap;
for (BUN i = 0; i < count; i++) {
t = ((void **) values)[i];
bool isnil = atomeq(t, atomnil);
@@ -1040,12 +1041,11 @@ BUNappendmulti(BAT *b, const void *value
VALclear(&maxprop);
return rc;
}
- if (vbase != b->tvheap->base) {
- /* tvheap changed location, so
+ if (vbase != b->tvheap->base ||
+ oldheap != b->theap) {
+ /* a heap changed location, so
* pointers may need to be
- * updated (not if they were
- * initialized from t below, but
- * we don't know) */
+ * updated */
BUN minpos = bi.minpos;
BUN maxpos = bi.maxpos;
MT_lock_set(&b->theaplock);
@@ -1054,6 +1054,7 @@ BUNappendmulti(BAT *b, const void *value
bi.minpos = minpos;
bi.maxpos = maxpos;
vbase = b->tvheap->base;
+ oldheap = b->theap;
if (bi.minpos != BUN_NONE)
minvalp = BUNtvar(bi,
bi.minpos);
if (bi.maxpos != BUN_NONE)
@@ -1085,10 +1086,11 @@ BUNappendmulti(BAT *b, const void *value
if (maxbound)
VALclear(&maxprop);
if (b->thash) {
+ bi.vh = b->tvheap;
p -= count;
for (BUN i = 0; i < count; i++) {
t = ((void **) values)[i];
- HASHappend_locked(b, p, t);
+ HASHappend_locked(&bi, p, t);
p++;
}
nunique = b->thash ? b->thash->nunique : 0;
@@ -1115,7 +1117,7 @@ BUNappendmulti(BAT *b, const void *value
return rc;
}
if (b->thash) {
- HASHappend_locked(b, p, t);
+ HASHappend_locked(&bi, p, t);
}
if (!atomeq(t, atomnil)) {
if (p == 0) {
@@ -1149,7 +1151,8 @@ BUNappendmulti(BAT *b, const void *value
return rc;
}
if (b->thash) {
- HASHappend_locked(b, p, t);
+ bi.vh = b->tvheap;
+ HASHappend_locked(&bi, p, t);
}
p++;
}
diff --git a/gdk/gdk_batop.c b/gdk/gdk_batop.c
--- a/gdk/gdk_batop.c
+++ b/gdk/gdk_batop.c
@@ -383,10 +383,11 @@ insert_string_bat(BAT *b, BATiter *ni, s
MT_lock_set(&b->theaplock);
BATsetcount(b, oldcnt + ci->ncand);
assert(b->batCapacity >= b->batCount);
+ BATiter bi = bat_iterator_nolock(b);
MT_lock_unset(&b->theaplock);
/* maintain hash */
for (r = oldcnt, cnt = BATcount(b); b->thash && r < cnt; r++) {
- HASHappend_locked(b, r, b->tvheap->base + VarHeapVal(Tloc(b,
0), r, b->twidth));
+ HASHappend_locked(&bi, r, b->tvheap->base + VarHeapVal(Tloc(b,
0), r, b->twidth));
}
BUN nunique = b->thash ? b->thash->nunique : 0;
MT_rwlock_wrunlock(&b->thashlock);
@@ -461,12 +462,13 @@ append_varsized_bat(BAT *b, BATiter *ni,
MT_rwlock_wrlock(&b->thashlock);
MT_lock_set(&b->theaplock);
BATsetcount(b, BATcount(b) + ci->ncand);
+ BATiter bi = bat_iterator_nolock(b);
MT_lock_unset(&b->theaplock);
/* maintain hash table */
for (BUN i = BATcount(b) - ci->ncand;
b->thash && i < BATcount(b);
i++) {
- HASHappend_locked(b, i, b->tvheap->base + *(var_t *)
Tloc(b, i));
+ HASHappend_locked(&bi, i, b->tvheap->base + *(var_t *)
Tloc(b, i));
}
BUN nunique = b->thash ? b->thash->nunique : 0;
MT_rwlock_wrunlock(&b->thashlock);
@@ -550,7 +552,7 @@ append_varsized_bat(BAT *b, BATiter *ni,
BATiter bi = bat_iterator_nolock(b);
for (BUN i = 0; i < cnt; i++) {
const void *t = BUNtvar(bi, r);
- HASHappend_locked(b, r, t);
+ HASHappend_locked(&bi, r, t);
r++;
}
}
@@ -1018,6 +1020,9 @@ BATappend2(BAT *b, BAT *n, BAT *s, bool
goto bailout;
}
}
+ MT_lock_set(&b->theaplock);
+ BATiter bi = bat_iterator_nolock(b);
+ MT_lock_unset(&b->theaplock);
MT_rwlock_wrlock(&b->thashlock);
hlocked = true;
if (b->ttype != TYPE_void &&
@@ -1028,7 +1033,7 @@ BATappend2(BAT *b, BAT *n, BAT *s, bool
(const char *) ni.base + ((ci.seq - hseq) <<
ni.shift),
ci.ncand << ni.shift);
for (BUN i = 0; b->thash && i < ci.ncand; i++) {
- HASHappend_locked(b, r, Tloc(b, r));
+ HASHappend_locked(&bi, r, Tloc(b, r));
r++;
}
} else {
@@ -1056,8 +1061,10 @@ BATappend2(BAT *b, BAT *n, BAT *s, bool
} else if (tfastins_nocheck(b, r, t) !=
GDK_SUCCEED) {
goto bailout;
}
- if (b->thash)
- HASHappend_locked(b, r, t);
+ if (b->thash) {
+ bi.vh = b->tvheap;
+ HASHappend_locked(&bi, r, t);
+ }
r++;
}
TIMEOUT_CHECK(qry_ctx,
GOTO_LABEL_TIMEOUT_HANDLER(bailout, qry_ctx));
diff --git a/gdk/gdk_hash.c b/gdk/gdk_hash.c
--- a/gdk/gdk_hash.c
+++ b/gdk/gdk_hash.c
@@ -1100,8 +1100,9 @@ HASHprobe(const Hash *h, const void *v)
}
void
-HASHappend_locked(BAT *b, BUN i, const void *v)
+HASHappend_locked(BATiter *bi, BUN i, const void *v)
{
+ BAT *b = bi->b;
Hash *h = b->thash;
if (h == NULL) {
return;
@@ -1148,12 +1149,11 @@ HASHappend_locked(BAT *b, BUN i, const v
h->heaplink.free += h->width;
BUN hb = HASHget(h, c);
BUN hb2;
- BATiter bi = bat_iterator_nolock(b);
bool (*atomeq)(const void *, const void *) = ATOMequal(h->type);
for (hb2 = hb;
hb2 != BUN_NONE;
hb2 = HASHgetlink(h, hb2)) {
- if (atomeq(v, BUNtail(bi, hb2)))
+ if (atomeq(v, BUNtail(*bi, hb2)))
break;
}
h->nheads += hb == BUN_NONE;
@@ -1168,8 +1168,11 @@ BUN
HASHappend(BAT *b, BUN i, const void *v)
{
BUN nunique;
+ MT_lock_set(&b->theaplock);
+ BATiter bi = bat_iterator_nolock(b);
+ MT_lock_unset(&b->theaplock);
MT_rwlock_wrlock(&b->thashlock);
- HASHappend_locked(b, i, v);
+ HASHappend_locked(&bi, i, v);
nunique = b->thash ? b->thash->nunique : 0;
MT_rwlock_wrunlock(&b->thashlock);
return nunique;
diff --git a/gdk/gdk_private.h b/gdk/gdk_private.h
--- a/gdk/gdk_private.h
+++ b/gdk/gdk_private.h
@@ -170,7 +170,7 @@ lng getBBPlogno(void)
__attribute__((__visibility__("hidden")));
BUN HASHappend(BAT *b, BUN i, const void *v)
__attribute__((__visibility__("hidden")));
-void HASHappend_locked(BAT *b, BUN i, const void *v)
+void HASHappend_locked(BATiter *bi, BUN i, const void *v)
__attribute__((__visibility__("hidden")));
void HASHfree(BAT *b)
__attribute__((__visibility__("hidden")));
diff --git a/gdk/gdk_storage.c b/gdk/gdk_storage.c
--- a/gdk/gdk_storage.c
+++ b/gdk/gdk_storage.c
@@ -674,7 +674,8 @@ BATsave_iter(BAT *b, BATiter *bi, BUN si
assert(!GDKinmemory(bi->h->farmid));
/* views cannot be saved, but make an exception for
* force-remapped views */
- if (isVIEW(b)) {
+ if ((bi->h != NULL && bi->h->parentid != b->batCacheid) ||
+ (bi->vh != NULL && bi->vh->parentid != b->batCacheid)) {
if (locked)
MT_rwlock_rdunlock(&b->thashlock);
GDKerror("%s is a view on %s; cannot be saved\n", BATgetId(b),
BBP_logical(VIEWtparent(b)));
@@ -687,6 +688,8 @@ BATsave_iter(BAT *b, BATiter *bi, BUN si
}
/* start saving data */
+ if (bi->type == TYPE_msk)
+ MT_lock_set(&b->theaplock);
if (bi->type != TYPE_void && bi->base == NULL) {
assert(BBP_status(b->batCacheid) & BBPSWAPPED);
if (dosync && !(ATOMIC_GET(&GDKdebug) & NOSYNCMASK)) {
@@ -735,17 +738,18 @@ BATsave_iter(BAT *b, BATiter *bi, BUN si
if ((!bi->copiedtodisk || bi->hdirty)
&& (err == GDK_SUCCEED && bi->type)) {
const char *tail = strchr(bi->h->filename, '.') + 1;
- err = HEAPsave(bi->h, nme, tail, dosync, bi->hfree,
&b->theaplock);
+ err = HEAPsave(bi->h, nme, tail, dosync, bi->hfree,
bi->type == TYPE_msk ? NULL : &b->theaplock);
}
if (bi->vh
&& (!bi->copiedtodisk || bi->vhdirty)
&& ATOMvarsized(bi->type)
&& err == GDK_SUCCEED)
- err = HEAPsave(bi->vh, nme, "theap", dosync,
bi->vhfree, &b->theaplock);
+ err = HEAPsave(bi->vh, nme, "theap", dosync,
bi->vhfree, bi->type == TYPE_msk ? NULL : &b->theaplock);
}
if (err == GDK_SUCCEED) {
- MT_lock_set(&b->theaplock);
+ if (bi->type != TYPE_msk)
+ MT_lock_set(&b->theaplock);
if (b->theap != bi->h) {
assert(b->theap->dirty);
b->theap->wasempty = bi->h->wasempty;
@@ -767,7 +771,8 @@ BATsave_iter(BAT *b, BATiter *bi, BUN si
MT_lock_unset(&b->theaplock);
if (locked && b->thash && b->thash != (Hash *) 1)
BAThashsave(b, dosync);
- }
+ } else if (bi->type == TYPE_msk)
+ MT_lock_unset(&b->theaplock);
if (locked)
MT_rwlock_rdunlock(&b->thashlock);
return err;
diff --git a/gdk/gdk_system.c b/gdk/gdk_system.c
--- a/gdk/gdk_system.c
+++ b/gdk/gdk_system.c
@@ -199,9 +199,9 @@ struct mtthread {
void *data; /* and its data */
struct thread_funcs *thread_funcs; /* callback funcs */
int nthread_funcs;
- MT_Lock *lockwait; /* lock we're waiting for */
- ATOMIC_PTR_TYPE semawait;/* semaphore we're waiting for */
- MT_Cond *condwait; /* condition variable we're waiting for */
+ ATOMIC_PTR_TYPE lockwait; /* lock we're waiting for */
+ ATOMIC_PTR_TYPE semawait; /* semaphore we're waiting for */
+ ATOMIC_PTR_TYPE condwait; /* condition variable we're waiting for */
#ifdef LOCK_OWNER
MT_Lock *mylocks; /* locks we're holding */
#endif
@@ -316,9 +316,9 @@ dump_threads(void)
if (!GDK_TRACER_TEST(M_DEBUG, THRD))
printf("Threads:\n");
for (struct mtthread *t = mtthreads; t; t = t->next) {
- MT_Lock *lk = t->lockwait;
+ MT_Lock *lk = ATOMIC_PTR_GET(&t->lockwait);
MT_Sema *sm = ATOMIC_PTR_GET(&t->semawait);
- MT_Cond *cn = t->condwait;
+ MT_Cond *cn = ATOMIC_PTR_GET(&t->condwait);
struct mtthread *jn = t->joinwait;
const char *working = ATOMIC_PTR_GET(&t->working);
char mabuf[300];
@@ -643,7 +643,7 @@ MT_thread_setlockwait(MT_Lock *lock)
struct mtthread *self = thread_self();
if (self)
- self->lockwait = lock;
+ ATOMIC_PTR_SET(&self->lockwait, lock);
}
void
@@ -665,7 +665,7 @@ MT_thread_setcondwait(MT_Cond *cond)
struct mtthread *self = thread_self();
if (self)
- self->condwait = cond;
+ ATOMIC_PTR_SET(&self->condwait, cond);
}
#ifdef LOCK_OWNER
diff --git a/sql/storage/store.c b/sql/storage/store.c
--- a/sql/storage/store.c
+++ b/sql/storage/store.c
@@ -7868,8 +7868,11 @@ store_printinfo(sqlstore *store)
printf("WAL is currently locked, so no WAL information\n");
return;
}
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]