Changeset: 78d7e86a78ae for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/78d7e86a78ae
Modified Files:
clients/Tests/exports.stable.out
sql/backends/monet5/vaults/odbc/odbc_loader.c
Branch: nested
Log Message:
merged from default
diffs (truncated from 929 to 300 lines):
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -365,7 +365,7 @@ gdk_return MT_alloc_tls(MT_TLS_t *newkey
int MT_check_nr_cores(void);
void MT_cond_broadcast(MT_Cond *cond);
void MT_cond_destroy(MT_Cond *cond);
-void MT_cond_init(MT_Cond *cond);
+void MT_cond_init(MT_Cond *cond, const char *name);
void MT_cond_signal(MT_Cond *cond);
void MT_cond_wait(MT_Cond *cond, MT_Lock *lock);
int MT_create_thread(MT_Id *t, void (*function)(void *), void *arg, enum
MT_thr_detach d, const char *threadname);
diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -2221,16 +2221,18 @@ BBPdir_first(bool subcommit, lng logno,
static bat
BBPdir_step(bat bid, BUN size, int n, char *buf, size_t bufsize,
- FILE **obbpfp, FILE *nbbpf, BATiter *bi)
+ FILE **obbpfp, FILE *nbbpf, BATiter *bi, int *nbatp)
{
if (n < -1) /* safety catch */
return n;
+ int nbat = 0;
while (n >= 0 && n < bid) {
if (n > 0) {
if (fputs(buf, nbbpf) == EOF) {
GDKerror("Writing BBP.dir file failed.\n");
goto bailout;
}
+ nbat++;
}
if (fgets(buf, (int) bufsize, *obbpfp) == NULL) {
if (ferror(*obbpfp)) {
@@ -2254,7 +2256,9 @@ BBPdir_step(bat bid, BUN size, int n, ch
assert(BBP_status(bid) & BBPPERSISTENT);
if (new_bbpentry(nbbpf, bid, size, bi) != GDK_SUCCEED)
goto bailout;
+ nbat++;
}
+ *nbatp += nbat;
return n == -1 ? -1 : n == bid ? 0 : n;
bailout:
@@ -2885,13 +2889,10 @@ incref(bat i, bool logical, bool lock)
return 0;
if (lock) {
- for (;;) {
- MT_lock_set(&GDKswapLock(i));
- if (!(BBP_status(i) & (BBPUNSTABLE|BBPLOADING)))
- break;
+ MT_lock_set(&GDKswapLock(i));
+ while (BBP_status(i) & (BBPUNSTABLE|BBPLOADING)) {
/* the BATs is "unstable", try again */
- MT_lock_unset(&GDKswapLock(i));
- BBPspin(i, __func__, BBPUNSTABLE|BBPLOADING);
+ MT_cond_wait(&GDKswapCond(i), &GDKswapLock(i));
}
}
/* we have the lock */
@@ -2954,15 +2955,12 @@ decref(bat i, bool logical, bool lock, c
if (BBPcheck(i) == 0)
return -1;
- if (lock)
+ if (lock) {
MT_lock_set(&GDKswapLock(i));
-
- while (BBP_status(i) & BBPUNLOADING) {
- if (lock)
- MT_lock_unset(&GDKswapLock(i));
+ while (BBP_status(i) & BBPUNLOADING)
+ MT_cond_wait(&GDKswapCond(i), &GDKswapLock(i));
+ } else {
BBPspin(i, func, BBPUNLOADING);
- if (lock)
- MT_lock_set(&GDKswapLock(i));
}
b = (BBP_status(i) & BBPLOADED) ? BBP_desc(i) : NULL;
@@ -3080,6 +3078,7 @@ decref(bat i, bool logical, bool lock, c
BBPclear(i);
} else {
BBP_status_off(i, BBPUNLOADING);
+ MT_cond_broadcast(&GDKswapCond(i));
}
}
return refs;
@@ -3127,13 +3126,10 @@ BATdescriptor(bat i)
if (BBPcheck(i)) {
bool lock = locked_by == 0 || locked_by != MT_getpid();
if (lock) {
- for (;;) {
- MT_lock_set(&GDKswapLock(i));
- if (!(BBP_status(i) & (BBPUNSTABLE|BBPLOADING)))
- break;
+ MT_lock_set(&GDKswapLock(i));
+ while (BBP_status(i) & (BBPUNSTABLE|BBPLOADING)) {
/* the BATs is "unstable", try again */
- MT_lock_unset(&GDKswapLock(i));
- BBPspin(i, __func__, BBPUNSTABLE|BBPLOADING);
+ MT_cond_wait(&GDKswapCond(i), &GDKswapLock(i));
}
}
if (incref(i, false, false) > 0) {
@@ -3175,9 +3171,7 @@ getBBPdescriptor(bat i)
b = BBP_desc(i);
if ((status & BBPLOADED) == 0 || status & BBPWAITING) {
while (BBP_status(i) & BBPWAITING) { /* wait for bat to be
loaded by other thread */
- MT_lock_unset(&GDKswapLock(i));
- BBPspin(i, __func__, BBPWAITING);
- MT_lock_set(&GDKswapLock(i));
+ MT_cond_wait(&GDKswapCond(i), &GDKswapLock(i));
}
if (BBPvalid(i)) {
if ((BBP_status(i) & BBPLOADED) == 0) {
@@ -3195,6 +3189,7 @@ getBBPdescriptor(bat i)
BBP_status_off(i, BBPLOADING);
CHECKDEBUG if (b != NULL)
BATassertProps(b);
+ MT_cond_broadcast(&GDKswapCond(i));
}
return b;
}
@@ -3228,9 +3223,13 @@ BBPsave(BAT *b)
if (BBP_status(bid) & BBPSAVING) {
/* wait until save in other thread completes */
- if (lock)
+ if (lock) {
+ while (BBP_status(bid) & BBPSAVING)
+ MT_cond_wait(&GDKswapCond(bid),
&GDKswapLock(bid));
MT_lock_unset(&GDKswapLock(bid));
- BBPspin(bid, __func__, BBPSAVING);
+ } else {
+ BBPspin(bid, __func__, BBPSAVING);
+ }
} else {
/* save it */
unsigned flags = BBPSAVING;
@@ -3257,6 +3256,7 @@ BBPsave(BAT *b)
}
/* clearing bits can be done without the lock */
BBP_status_off(bid, BBPSAVING);
+ MT_cond_broadcast(&GDKswapCond(bid));
}
return ret;
}
@@ -3314,6 +3314,7 @@ BBPfree(BAT *b)
}
TRC_DEBUG(BAT_, "turn off unloading %d\n", bid);
BBP_status_off(bid, BBPUNLOADING);
+ MT_cond_broadcast(&GDKswapCond(bid));
BBP_unload_dec();
return ret;
}
@@ -3340,8 +3341,9 @@ BBPquickdesc(bat bid)
}
return NULL;
}
- BBPspin(bid, __func__, BBPWAITING);
+// BBPspin(bid, __func__, BBPWAITING);
b = BBP_desc(bid);
+ MT_lock_set(&b->theaplock);
if (b->ttype < 0) {
const char *aname = ATOMunknown_name(b->ttype);
int tt = ATOMindex(aname);
@@ -3352,6 +3354,7 @@ BBPquickdesc(bat bid)
b->ttype = tt;
}
}
+ MT_lock_unset(&b->theaplock);
return b;
}
@@ -3359,25 +3362,31 @@ BBPquickdesc(bat bid)
* @+ Global Commit
*/
static BAT *
-dirty_bat(bat *i, bool subcommit)
+dirty_bat(bat *i, bool subcommit, bool lock)
{
- if (BBPvalid(*i)) {
+ const bat bid = *i;
+ if (BBPvalid(bid)) {
BAT *b;
- BBPspin(*i, __func__, BBPSAVING);
- if (BBP_status(*i) & BBPLOADED) {
- b = BBP_desc(*i);
+ if (lock) {
+ while (BBP_status(bid) & BBPSAVING)
+ MT_cond_wait(&GDKswapCond(bid),
&GDKswapLock(bid));
+ } else {
+ BBPspin(bid, __func__, BBPSAVING);
+ }
+ if (BBP_status(bid) & BBPLOADED) {
+ b = BBP_desc(bid);
MT_lock_set(&b->theaplock);
- if ((BBP_status(*i) & BBPNEW) &&
+ if ((BBP_status(bid) & BBPNEW) &&
BATcheckmodes(b, false) != GDK_SUCCEED) /* check
mmap modes */
- *i = -*i; /* error */
- else if ((BBP_status(*i) & BBPPERSISTENT) &&
+ *i = -bid; /* error */
+ else if ((BBP_status(bid) & BBPPERSISTENT) &&
(subcommit || BATdirty(b))) {
MT_lock_unset(&b->theaplock);
return b; /* the bat is loaded,
persistent and dirty */
}
MT_lock_unset(&b->theaplock);
} else if (subcommit)
- return BBP_desc(*i);
+ return BBP_desc(bid);
}
return NULL;
}
@@ -3790,6 +3799,7 @@ BBPsync(int cnt, bat *restrict subcommit
char buf[3000];
int n = subcommit ? 0 : -1;
FILE *obbpf, *nbbpf;
+ int nbats = 0;
TRC_INFO(TM, "Committing %d bats\n", cnt - 1);
@@ -3824,12 +3834,11 @@ BBPsync(int cnt, bat *restrict subcommit
BBP_status_on(bid, BBPSYNCING);
/* wait until unloading is finished before
* attempting to make a backup */
- while (BBP_status(bid) & BBPUNLOADING) {
- if (lock)
- MT_lock_unset(&GDKswapLock(bid));
+ if (lock) {
+ while (BBP_status(bid) & BBPUNLOADING)
+ MT_cond_wait(&GDKswapCond(bid),
&GDKswapLock(bid));
+ } else {
BBPspin(bid, __func__, BBPUNLOADING);
- if (lock)
- MT_lock_set(&GDKswapLock(bid));
}
BAT *b = BBP_desc(bid);
if (subcommit && b->ttype != TYPE_void) {
@@ -3866,7 +3875,7 @@ BBPsync(int cnt, bat *restrict subcommit
fname, BAKDIR, SUBDIR);
}
}
- b = dirty_bat(&i, subcommit != NULL);
+ b = dirty_bat(&i, subcommit != NULL, lock);
if (i <= 0)
ret = GDK_FAIL;
else if (BBP_status(bid) & BBPEXISTING &&
@@ -3907,13 +3916,11 @@ BBPsync(int cnt, bat *restrict subcommit
* can set it, wait for
* BBPUNLOADING before
* attempting to save */
- for (;;) {
- if (lock)
- MT_lock_set(&GDKswapLock(i));
- if (!(BBP_status(i) &
(BBPSAVING|BBPUNLOADING)))
- break;
- if (lock)
- MT_lock_unset(&GDKswapLock(i));
+ if (lock) {
+ MT_lock_set(&GDKswapLock(i));
+ while (BBP_status(i) &
(BBPSAVING|BBPUNLOADING))
+ MT_cond_wait(&GDKswapCond(i),
&GDKswapLock(i));
+ } else {
BBPspin(i, __func__,
BBPSAVING|BBPUNLOADING);
}
BBP_status_on(i, BBPSAVING);
@@ -3921,13 +3928,14 @@ BBPsync(int cnt, bat *restrict subcommit
MT_lock_unset(&GDKswapLock(i));
ret = BATsave_iter(b, &bi, size);
BBP_status_off(i, BBPSAVING);
+ MT_cond_broadcast(&GDKswapCond(i));
}
bip = &bi;
} else {
bip = NULL;
}
if (ret == GDK_SUCCEED) {
- n = BBPdir_step(i, size, n, buf, sizeof(buf), &obbpf,
nbbpf, bip);
+ n = BBPdir_step(i, size, n, buf, sizeof(buf), &obbpf,
nbbpf, bip, &nbats);
if (n < -1)
ret = GDK_FAIL;
}
@@ -3959,6 +3967,7 @@ BBPsync(int cnt, bat *restrict subcommit
if (ret != GDK_SUCCEED)
GDKsyserror("rename(%s,%s) failed\n", bakdir, deldir);
TRC_DEBUG(IO_, "rename %s %s = %d\n", bakdir, deldir, (int)
ret);
+ TRC_INFO(TM, "%d bats written to BBP.dir\n", nbats);
}
/* AFTERMATH */
@@ -4001,6 +4010,7 @@ BBPsync(int cnt, bat *restrict subcommit
for (int idx = 1; idx < cnt; idx++) {
bat i = subcommit ? subcommit[idx] : idx;
BBP_status_off(i, BBPSYNCING);
+ MT_cond_broadcast(&GDKswapCond(i));
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]