Changeset: 860a4dec7dde for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=860a4dec7dde
Modified Files:
gdk/gdk_hash.c
gdk/gdk_orderidx.c
Branch: Jul2017
Log Message:
We need to use locks when persisting indexes in the background.
diffs (253 lines):
diff --git a/gdk/gdk_hash.c b/gdk/gdk_hash.c
--- a/gdk/gdk_hash.c
+++ b/gdk/gdk_hash.c
@@ -273,44 +273,42 @@ BATcheckhash(BAT *b)
}
#ifdef PERSISTENTHASH
-struct hashsync {
- Heap *hp;
- bat id;
-};
-
static void
BAThashsync(void *arg)
{
- struct hashsync *hs = arg;
- Heap *hp = hs->hp;
+ BAT *b = arg;
+ Heap *hp;
int fd;
lng t0 = 0;
const char *failed = " failed";
ALGODEBUG t0 = GDKusec();
- if (HEAPsave(hp, hp->filename, NULL) == GDK_SUCCEED &&
- (fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL)) >= 0) {
- ((size_t *) hp->base)[0] |= 1 << 24;
- if (write(fd, hp->base, SIZEOF_SIZE_T) >= 0) {
- failed = ""; /* not failed */
- if (!(GDKdebug & FORCEMITOMASK)) {
+ MT_lock_set(&GDKhashLock(b->batCacheid));
+ if (b->thash != NULL && (hp = b->thash->heap) != NULL) {
+ if (HEAPsave(hp, hp->filename, NULL) == GDK_SUCCEED &&
+ (fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL))
>= 0) {
+ ((size_t *) hp->base)[0] |= 1 << 24;
+ if (write(fd, hp->base, SIZEOF_SIZE_T) >= 0) {
+ failed = ""; /* not failed */
+ if (!(GDKdebug & FORCEMITOMASK)) {
#if defined(NATIVE_WIN32)
- _commit(fd);
+ _commit(fd);
#elif defined(HAVE_FDATASYNC)
- fdatasync(fd);
+ fdatasync(fd);
#elif defined(HAVE_FSYNC)
- fsync(fd);
+ fsync(fd);
#endif
+ }
+ } else {
+ perror("write hash");
}
- } else {
- perror("write hash");
+ close(fd);
}
- close(fd);
+ ALGODEBUG fprintf(stderr, "#BAThash: persisting hash %s ("
LLFMT " usec)%s\n", hp->filename, GDKusec() - t0, failed);
}
- BBPunfix(hs->id);
- GDKfree(arg);
- ALGODEBUG fprintf(stderr, "#BAThash: persisting hash %s (" LLFMT "
usec)%s\n", hp->filename, GDKusec() - t0, failed);
+ MT_lock_unset(&GDKhashLock(b->batCacheid));
+ BBPunfix(b->batCacheid);
}
#endif
@@ -500,25 +498,19 @@ BAThash(BAT *b, BUN masksize)
memset((char *) h->Link + q * h->width, 0, (h->lim - q) *
h->width);
#endif
hp->parentid = b->batCacheid;
+ b->thash = h;
#ifdef PERSISTENTHASH
if (BBP_status(b->batCacheid) & BBPEXISTING) {
MT_Id tid;
- struct hashsync *hs = GDKmalloc(sizeof(*hs));
- if (hs != NULL) {
- BBPfix(b->batCacheid);
- hs->id = b->batCacheid;
- hs->hp = hp;
- if (MT_create_thread(&tid, BAThashsync, hs,
- MT_THR_DETACHED) < 0) {
- /* couldn't start thread: clean up */
- BBPunfix(b->batCacheid);
- GDKfree(hs);
- }
+ BBPfix(b->batCacheid);
+ if (MT_create_thread(&tid, BAThashsync, b,
+ MT_THR_DETACHED) < 0) {
+ /* couldn't start thread: clean up */
+ BBPunfix(b->batCacheid);
}
} else
ALGODEBUG fprintf(stderr, "#BAThash: NOT persisting
hash %d\n", b->batCacheid);
#endif
- b->thash = h;
ALGODEBUG {
t1 = GDKusec();
fprintf(stderr, "#BAThash: hash construction " LLFMT "
usec\n", t1 - t0);
@@ -575,27 +567,28 @@ void
HASHdestroy(BAT *b)
{
if (b) {
- if (b->thash == (Hash *) 1) {
+ Hash *hs = b->thash;
+ b->thash = NULL;
+ if (hs == (Hash *) 1) {
GDKunlink(BBPselectfarm(b->batRole, b->ttype, hashheap),
BATDIR,
BBP_physical(b->batCacheid),
"thash");
- } else if (b->thash) {
+ } else if (hs) {
bat p = VIEWtparent(b);
BAT *hp = NULL;
if (p)
hp = BBP_cache(p);
- if ((!hp || b->thash != hp->thash) && b->thash != (Hash
*) -1) {
- ALGODEBUG if (*(size_t *) b->thash->heap->base
& (1 << 24))
+ if ((!hp || hs != hp->thash) && hs != (Hash *) -1) {
+ ALGODEBUG if (*(size_t *) hs->heap->base & (1
<< 24))
fprintf(stderr, "#HASHdestroy: removing
persisted hash %d\n", b->batCacheid);
- HEAPfree(b->thash->heap, 1);
- GDKfree(b->thash->heap);
- GDKfree(b->thash);
+ HEAPfree(hs->heap, 1);
+ GDKfree(hs->heap);
+ GDKfree(hs);
}
}
- b->thash = NULL;
}
}
diff --git a/gdk/gdk_orderidx.c b/gdk/gdk_orderidx.c
--- a/gdk/gdk_orderidx.c
+++ b/gdk/gdk_orderidx.c
@@ -13,44 +13,40 @@
#define ORDERIDX_VERSION ((oid) 3)
#ifdef PERSISTENTIDX
-struct idxsync {
- Heap *hp;
- bat id;
- const char *func;
-};
-
static void
BATidxsync(void *arg)
{
- struct idxsync *hs = arg;
- Heap *hp = hs->hp;
+ BAT *b = arg;
+ Heap *hp;
int fd;
lng t0 = 0;
ALGODEBUG t0 = GDKusec();
- if (HEAPsave(hp, hp->filename, NULL) != GDK_SUCCEED ||
- (fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL)) < 0) {
- BBPunfix(hs->id);
- GDKfree(arg);
- return;
- }
- ((oid *) hp->base)[0] |= (oid) 1 << 24;
- if (write(fd, hp->base, SIZEOF_SIZE_T) < 0)
- perror("write orderidx");
- if (!(GDKdebug & FORCEMITOMASK)) {
+ MT_lock_set(&GDKhashLock(b->batCacheid));
+ if ((hp = b->torderidx) != NULL) {
+ if (HEAPsave(hp, hp->filename, NULL) == GDK_SUCCEED &&
+ (fd = GDKfdlocate(hp->farmid, hp->filename, "rb+", NULL))
>= 0) {
+ ((oid *) hp->base)[0] |= (oid) 1 << 24;
+ if (write(fd, hp->base, SIZEOF_SIZE_T) >= 0) {
+ if (!(GDKdebug & FORCEMITOMASK)) {
#if defined(NATIVE_WIN32)
- _commit(fd);
+ _commit(fd);
#elif defined(HAVE_FDATASYNC)
- fdatasync(fd);
+ fdatasync(fd);
#elif defined(HAVE_FSYNC)
- fsync(fd);
+ fsync(fd);
#endif
+ }
+ } else {
+ perror("write orderidx");
+ }
+ close(fd);
+ }
+ ALGODEBUG fprintf(stderr, "#BATidxsync: persisting orderidx %s
(" LLFMT " usec)\n", hp->filename, GDKusec() - t0);
}
- close(fd);
- BBPunfix(hs->id);
- ALGODEBUG fprintf(stderr, "#%s: persisting orderidx %s (" LLFMT "
usec)\n", hs->func, hp->filename, GDKusec() - t0);
- GDKfree(arg);
+ MT_lock_unset(&GDKhashLock(b->batCacheid));
+ BBPunfix(b->batCacheid);
}
#endif
@@ -153,14 +149,9 @@ persistOIDX(BAT *b)
if ((BBP_status(b->batCacheid) & BBPEXISTING) &&
b->batInserted == b->batCount) {
MT_Id tid;
- struct idxsync *hs = GDKmalloc(sizeof(*hs));
- if (hs != NULL) {
- BBPfix(b->batCacheid);
- hs->id = b->batCacheid;
- hs->hp = b->torderidx;
- hs->func = "BATorderidx";
- MT_create_thread(&tid, BATidxsync, hs, MT_THR_DETACHED);
- }
+ BBPfix(b->batCacheid);
+ if (MT_create_thread(&tid, BATidxsync, b, MT_THR_DETACHED) < 0)
+ BBPunfix(b->batCacheid);
} else
ALGODEBUG fprintf(stderr, "#BATorderidx: NOT persisting index
%d\n", b->batCacheid);
#else
@@ -479,24 +470,19 @@ GDKmergeidx(BAT *b, BAT**a, int n_ar)
GDKfree(q);
}
+ b->torderidx = m;
#ifdef PERSISTENTIDX
if ((BBP_status(b->batCacheid) & BBPEXISTING) &&
b->batInserted == b->batCount) {
MT_Id tid;
- struct idxsync *hs = GDKmalloc(sizeof(*hs));
- if (hs != NULL) {
- BBPfix(b->batCacheid);
- hs->id = b->batCacheid;
- hs->hp = m;
- hs->func = "GDKmergeidx";
- MT_create_thread(&tid, BATidxsync, hs, MT_THR_DETACHED);
- }
+ BBPfix(b->batCacheid);
+ if (MT_create_thread(&tid, BATidxsync, b, MT_THR_DETACHED) < 0)
+ BBPunfix(b->batCacheid);
} else
ALGODEBUG fprintf(stderr, "#GDKmergeidx: NOT persisting index
%d\n", b->batCacheid);
#endif
b->batDirtydesc = TRUE;
- b->torderidx = m;
MT_lock_unset(&GDKhashLock(b->batCacheid));
return GDK_SUCCEED;
}
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list