Changeset: 7ae70245e678 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/7ae70245e678
Modified Files:
gdk/gdk_bat.c
gdk/gdk_batop.c
gdk/gdk_bbp.c
gdk/gdk_heap.c
gdk/gdk_join.c
gdk/gdk_logger.c
gdk/gdk_sample.c
gdk/gdk_system.c
sql/backends/monet5/sql_scenario.c
sql/storage/bat/bat_storage.c
sql/storage/sql_storage.h
sql/storage/store.c
Branch: Aug2024
Log Message:
Fix some data races.
diffs (truncated from 564 to 300 lines):
diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c
--- a/gdk/gdk_bat.c
+++ b/gdk/gdk_bat.c
@@ -1122,6 +1122,7 @@ BUNappendmulti(BAT *b, const void *value
VALcopy(&maxprop, prop) != NULL)
maxbound = VALptr(&maxprop);
const bool notnull = BATgetprop_nolock(b, GDK_NOT_NULL) != NULL;
+ bool setnil = false;
MT_lock_unset(&b->theaplock);
MT_rwlock_wrlock(&b->thashlock);
if (values && b->ttype) {
@@ -1201,8 +1202,7 @@ BUNappendmulti(BAT *b, const void *value
}
}
} else {
- b->tnil = true;
- b->tnonil = false;
+ setnil = true;
}
p++;
}
@@ -1222,8 +1222,7 @@ BUNappendmulti(BAT *b, const void *value
} else if (ATOMstorage(b->ttype) == TYPE_msk) {
bi.minpos = bi.maxpos = BUN_NONE;
minvalp = maxvalp = NULL;
- b->tnil = false;
- b->tnonil = true;
+ assert(!b->tnil);
for (BUN i = 0; i < count; i++) {
t = (void *) ((char *) values + (i <<
b->tshift));
mskSetVal(b, p, *(msk *) t);
@@ -1261,8 +1260,7 @@ BUNappendmulti(BAT *b, const void *value
}
}
} else {
- b->tnil = true;
- b->tnonil = false;
+ setnil = true;
}
p++;
}
@@ -1282,10 +1280,13 @@ BUNappendmulti(BAT *b, const void *value
p++;
}
nunique = b->thash ? b->thash->nunique : 0;
- b->tnil = b->ttype != TYPE_msk;
+ setnil |= b->ttype != TYPE_msk;
+ }
+ MT_lock_set(&b->theaplock);
+ if (setnil) {
+ b->tnil = true;
b->tnonil = false;
}
- MT_lock_set(&b->theaplock);
b->tminpos = bi.minpos;
b->tmaxpos = bi.maxpos;
if (count > BATcount(b) / gdk_unique_estimate_keep_fraction)
diff --git a/gdk/gdk_batop.c b/gdk/gdk_batop.c
--- a/gdk/gdk_batop.c
+++ b/gdk/gdk_batop.c
@@ -2004,13 +2004,16 @@ BATordered(BAT *b)
lng t0 = GDKusec();
bool sorted;
+ MT_rwlock_rdlock(&b->thashlock);
MT_lock_set(&b->theaplock);
if (b->ttype == TYPE_void || b->tsorted || BATcount(b) == 0) {
MT_lock_unset(&b->theaplock);
+ MT_rwlock_rdunlock(&b->thashlock);
return true;
}
if (b->tnosorted > 0 || !ATOMlinear(b->ttype)) {
MT_lock_unset(&b->theaplock);
+ MT_rwlock_rdunlock(&b->thashlock);
return false;
}
@@ -2121,6 +2124,7 @@ BATordered(BAT *b)
}
}
doreturn:
+ MT_rwlock_rdunlock(&b->thashlock);
sorted = b->tsorted;
bat pbid = VIEWtparent(b);
MT_lock_unset(&b->theaplock);
@@ -2185,17 +2189,21 @@ BATordered_rev(BAT *b)
if (b == NULL || !ATOMlinear(b->ttype))
return false;
+ MT_rwlock_rdlock(&b->thashlock);
MT_lock_set(&b->theaplock);
if (BATcount(b) <= 1 || b->trevsorted) {
MT_lock_unset(&b->theaplock);
+ MT_rwlock_rdunlock(&b->thashlock);
return true;
}
if (b->ttype == TYPE_void) {
MT_lock_unset(&b->theaplock);
+ MT_rwlock_rdunlock(&b->thashlock);
return is_oid_nil(b->tseqbase);
}
if (BATtdense(b) || b->tnorevsorted > 0) {
MT_lock_unset(&b->theaplock);
+ MT_rwlock_rdunlock(&b->thashlock);
return false;
}
BATiter bi = bat_iterator_nolock(b);
@@ -2240,6 +2248,7 @@ BATordered_rev(BAT *b)
TRC_DEBUG(ALGO, "Fixed revsorted for " ALGOBATFMT " (" LLFMT "
usec)\n", ALGOBATPAR(b), GDKusec() - t0);
}
doreturn:
+ MT_rwlock_rdunlock(&b->thashlock);
revsorted = b->trevsorted;
bat pbid = VIEWtparent(b);
MT_lock_unset(&b->theaplock);
diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -4795,7 +4795,11 @@ BBPprintinfo(void)
}
MT_lock_unset(&GDKswapLock(i));
}
- uint32_t nfree = BBP_nfree;
+ uint32_t nfree = 0;
+ if (MT_lock_trytime(&GDKcacheLock, 1000)) {
+ nfree = BBP_nfree;
+ MT_lock_unset(&GDKcacheLock);
+ }
BBPtmunlock();
printf("BATs:\n");
if (bats[1][1][1][1][1].nr > 0)
diff --git a/gdk/gdk_heap.c b/gdk/gdk_heap.c
--- a/gdk/gdk_heap.c
+++ b/gdk/gdk_heap.c
@@ -895,12 +895,17 @@ HEAPsave(Heap *h, const char *nme, const
"(%s.%s,storage=%d,free=%zu,size=%zu,dosync=%s)\n",
nme?nme:"", ext, (int) h->newstorage, free, h->size,
dosync?"true":"false");
+ if (lock)
+ MT_lock_set(lock);
+ if (free == h->free)
+ h->dirty = false;
+ if (lock)
+ MT_lock_unset(lock);
rc = GDKsave(h->farmid, nme, ext, h->base, free, store, dosync);
if (lock)
MT_lock_set(lock);
if (rc == GDK_SUCCEED) {
h->hasfile = true;
- h->dirty = free != h->free;
h->wasempty = false;
} else {
h->dirty = true;
diff --git a/gdk/gdk_join.c b/gdk/gdk_join.c
--- a/gdk/gdk_join.c
+++ b/gdk/gdk_join.c
@@ -3385,6 +3385,8 @@ count_unique(BAT *b, BAT *s, BUN *cnt1,
*cnt1 = *cnt2 = 0;
+ BAT *pb = BATdescriptor(bi.h->parentid);
+ MT_rwlock_rdlock(&pb->thashlock);
if (bi.sorted || bi.revsorted) {
const void *prev = NULL;
algomsg = "sorted";
@@ -3431,6 +3433,8 @@ count_unique(BAT *b, BAT *s, BUN *cnt1,
assert(bvars == NULL);
seen = GDKzalloc((65536 / 32) * sizeof(seen[0]));
if (seen == NULL) {
+ MT_rwlock_rdunlock(&pb->thashlock);
+ BBPreclaim(pb);
bat_iterator_end(&bi);
return GDK_FAIL;
}
@@ -3472,6 +3476,8 @@ count_unique(BAT *b, BAT *s, BUN *cnt1,
snprintf(hs.heaplink.filename,
sizeof(hs.heaplink.filename), "%s.thshjnl%x", nme, (unsigned) MT_getpid()) >=
(int) sizeof(hs.heaplink.filename) ||
snprintf(hs.heapbckt.filename,
sizeof(hs.heapbckt.filename), "%s.thshjnb%x", nme, (unsigned) MT_getpid()) >=
(int) sizeof(hs.heapbckt.filename) ||
HASHnew(&hs, bi.type, ci.ncand, mask, BUN_NONE, false) !=
GDK_SUCCEED) {
+ MT_rwlock_rdunlock(&pb->thashlock);
+ BBPreclaim(pb);
GDKerror("cannot allocate hash table\n");
HEAPfree(&hs.heaplink, true);
HEAPfree(&hs.heapbckt, true);
@@ -3502,6 +3508,8 @@ count_unique(BAT *b, BAT *s, BUN *cnt1,
HEAPfree(&hs.heaplink, true);
HEAPfree(&hs.heapbckt, true);
}
+ MT_rwlock_rdunlock(&pb->thashlock);
+ BBPreclaim(pb);
bat_iterator_end(&bi);
TRC_DEBUG(ALGO, "b=" ALGOBATFMT ",s=" ALGOOPTBATFMT
@@ -3573,7 +3581,8 @@ BATguess_uniques(BAT *b, struct canditer
canditer_init(&lci, b, NULL);
ci = &lci;
}
- return (BUN) guess_uniques(b, ci);
+ double uniques = guess_uniques(b, ci);
+ return uniques < 0 ? 0 : (BUN) uniques;
}
/* estimate the cost of doing a hashjoin with a hash on r; return value
@@ -3633,7 +3642,7 @@ joincost(BAT *r, BUN lcount, struct cand
MT_lock_unset(&r->theaplock);
if (unique_est == 0) {
unique_est = guess_uniques(r, &(struct
canditer){.tpe=cand_dense, .ncand=BATcount(r)});
- if (unique_est < 0)
+ if (unique_est <= 0)
return -1;
}
/* we have an estimate of the number of unique
@@ -3668,7 +3677,7 @@ joincost(BAT *r, BUN lcount, struct cand
MT_lock_unset(&r->theaplock);
if (unique_est == 0) {
unique_est = guess_uniques(r, rci);
- if (unique_est < 0)
+ if (unique_est <= 0)
return -1;
}
/* we have an estimate of the number of unique
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -2555,13 +2555,13 @@ log_next_logfile(logger *lg, ulng ts)
if (ATOMIC_GET(&lg->pending->refcount) == 0 && lg->pending !=
lg->current && lg->pending != lg->flush_ranges &&
(ulng) ATOMIC_GET(&lg->pending->last_ts) == (ulng)
ATOMIC_GET(&lg->pending->flushed_ts) &&
(ulng) ATOMIC_GET(&lg->pending->flushed_ts) <= ts) {
- rotation_unlock(lg);
logged_range *p = lg->pending;
for (int i = 1;
i < m && ATOMIC_GET(&p->refcount) == 0 && p->next &&
p->next != lg->current &&
p->next != lg->flush_ranges && (ulng)
ATOMIC_GET(&p->last_ts) == (ulng) ATOMIC_GET(&p->flushed_ts)
&& (ulng) ATOMIC_GET(&p->flushed_ts) <= ts; i++)
p = p->next;
+ rotation_unlock(lg);
return p;
}
rotation_unlock(lg);
@@ -2928,8 +2928,8 @@ internal_log_bat(logger *lg, BAT *b, log
/* if offset is just for the log, but BAT is already sliced, reset
offset */
if (sliced)
offset = 0;
+ BATiter bi = bat_iterator(b);
if (b->ttype == TYPE_msk) {
- BATiter bi = bat_iterator(b);
if (offset % 32 == 0) {
if (!mnstr_writeIntArray(lg->current->output_log, (int
*) ((char *) bi.base + offset / 32),
(size_t) ((nr + 31) / 32)))
@@ -2945,26 +2945,22 @@ internal_log_bat(logger *lg, BAT *b, log
}
}
}
- bat_iterator_end(&bi);
- } else if (b->ttype < TYPE_str && !isVIEW(b)) {
- BATiter bi = bat_iterator(b);
+ } else if (b->ttype < TYPE_str && bi.h->parentid == b->batCacheid) {
const void *t = BUNtail(bi, (BUN) offset);
ok = wt(t, lg->current->output_log, (size_t) nr);
- bat_iterator_end(&bi);
} else if (b->ttype == TYPE_str) {
/* efficient string writes */
ok = string_writer(lg, b, offset, nr);
} else {
- BATiter bi = bat_iterator(b);
BUN end = (BUN) (offset + nr);
for (p = (BUN) offset; p < end && ok == GDK_SUCCEED; p++) {
const void *t = BUNtail(bi, p);
ok = wt(t, lg->current->output_log, 1);
}
- bat_iterator_end(&bi);
}
+ bat_iterator_end(&bi);
TRC_DEBUG(WAL, "Logged %d " LLFMT " inserts\n", id, nr);
diff --git a/gdk/gdk_sample.c b/gdk/gdk_sample.c
--- a/gdk/gdk_sample.c
+++ b/gdk/gdk_sample.c
@@ -200,11 +200,14 @@ BATsample(BAT *b, BUN n)
{
static random_state_engine rse;
+ MT_lock_set(&b->theaplock);
+ BUN batcount = BATcount(b);
+ MT_lock_unset(&b->theaplock);
MT_lock_set(&rse_lock);
if (rse[0] == 0 && rse[1] == 0 && rse[2] == 0 && rse[3] == 0)
init_random_state_engine(rse, (uint64_t) GDKusec());
MT_lock_unset(&rse_lock);
- BAT *bn = do_batsample(b->hseqbase, BATcount(b), n, rse, &rse_lock);
+ BAT *bn = do_batsample(b->hseqbase, batcount, n, rse, &rse_lock);
TRC_DEBUG(ALGO, ALGOBATFMT "," BUNFMT " -> " ALGOOPTBATFMT "\n",
ALGOBATPAR(b), n, ALGOOPTBATPAR(bn));
return bn;
diff --git a/gdk/gdk_system.c b/gdk/gdk_system.c
--- a/gdk/gdk_system.c
+++ b/gdk/gdk_system.c
@@ -200,13 +200,13 @@ struct mtthread {
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]