Changeset: f0b0c39ddf9d for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f0b0c39ddf9d
Modified Files:
gdk/gdk.h
gdk/gdk_bat.c
gdk/gdk_bbp.c
gdk/gdk_group.c
gdk/gdk_imprints.c
gdk/gdk_join.c
gdk/gdk_logger.c
gdk/gdk_private.h
gdk/gdk_search.c
gdk/gdk_select.c
gdk/gdk_setop.c
gdk/gdk_unique.c
monetdb5/modules/mal/cluster.c
sql/backends/monet5/sql.c
Branch: default
Log Message:
Implemented persistent hashes.
diffs (truncated from 693 to 300 lines):
diff --git a/gdk/gdk.h b/gdk/gdk.h
--- a/gdk/gdk.h
+++ b/gdk/gdk.h
@@ -2073,20 +2073,13 @@ gdk_export oid OIDnew(oid inc);
* BAThash (BAT *b, BUN masksize)
* @end multitable
*
- * The current BAT implementation supports one search accelerator:
- * hashing. The routine BAThash makes sure that a hash accelerator on
- * the head of the BAT exists. A zero is returned upon failure to
- * create the supportive structures.
- *
- * The hash data structures are currently maintained during update
- * operations.
+ * The current BAT implementation supports two search accelerators:
+ * hashing and imprints. The routine BAThash makes sure that a hash
+ * accelerator on the tail of the BAT exists. GDK_FAIL is returned
+ * upon failure to create the supportive structures.
*/
gdk_export gdk_return BAThash(BAT *b, BUN masksize);
-/* low level functions */
-
-#define BATprepareHash(X) (BAThash((X), 0) == GDK_FAIL)
-
/*
* @- Column Imprints Functions
*
diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c
--- a/gdk/gdk_bat.c
+++ b/gdk/gdk_bat.c
@@ -1869,9 +1869,9 @@ BUNlocate(BAT *b, const void *x, const v
* BUNlocate). Other threads might then crash.
*/
if (dohash(v->H))
- (void) BATprepareHash(BATmirror(v));
+ (void) BAThash(BATmirror(v), 0);
if (dohash(v->T))
- (void) BATprepareHash(v);
+ (void) BAThash(v, 0);
if (v->H->hash && v->T->hash) { /* we can choose
between two hash tables */
BUN hcnt = 0, tcnt = 0;
BUN i;
@@ -2902,7 +2902,7 @@ BATassertHeadProps(BAT *b)
if ((hp->farmid = BBPselectfarm(TRANSIENT, b->htype,
hashheap)) < 0 ||
(hs = HASHnew(hp, b->htype, BUNlast(b),
- mask)) == NULL) {
+ mask, BUN_NONE)) == NULL) {
GDKfree(ext);
GDKfree(hp->filename);
GDKfree(hp);
@@ -3116,7 +3116,7 @@ BATderiveHeadProps(BAT *b, int expensive
snprintf(hp->filename, nmelen + 30,
"%s.hash" SZFMT, nme, MT_getpid()) < 0 ||
(ext = GDKstrdup(hp->filename + nmelen + 1)) == NULL ||
- (hs = HASHnew(hp, b->htype, BUNlast(b), mask)) == NULL) {
+ (hs = HASHnew(hp, b->htype, BUNlast(b), mask, BUN_NONE)) ==
NULL) {
if (hp) {
if (hp->filename)
GDKfree(hp->filename);
diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -3840,10 +3840,10 @@ BBPdiskscan(const char *parent)
delete = (b == NULL || !b->T->vheap ||
b->batCopiedtodisk == 0);
} else if (strncmp(p + 1, "hhash", 5) == 0) {
BAT *b = getdesc(bid);
- delete = (b == NULL || !b->H->hash);
+ delete = b == NULL;
} else if (strncmp(p + 1, "thash", 5) == 0) {
BAT *b = getdesc(bid);
- delete = (b == NULL || !b->T->hash);
+ delete = b == NULL;
} else if (strncmp(p + 1, "himprints", 9) == 0) {
BAT *b = getdesc(bid);
delete = b == NULL;
diff --git a/gdk/gdk_group.c b/gdk/gdk_group.c
--- a/gdk/gdk_group.c
+++ b/gdk/gdk_group.c
@@ -734,11 +734,11 @@ BATgroup_internal(BAT **groups, BAT **ex
cnts[v]++;
}
GDKfree(sgrps);
- } else if (b->T->hash ||
+ } else if (BATcheckhash(b) ||
(b->batPersistence == PERSISTENT &&
- !BATprepareHash(b)) ||
+ BAThash(b, 0) == GDK_SUCCEED) ||
((parent = VIEWtparent(b)) != 0 &&
- BBPdescriptor(-parent)->T->hash)) {
+ BATcheckhash(BBPdescriptor(-parent)))) {
BUN lo, hi;
/* we already have a hash table on b, or b is
@@ -849,7 +849,7 @@ BATgroup_internal(BAT **groups, BAT **ex
"%s.hash" SZFMT, nme, MT_getpid()) < 0 ||
(ext = GDKstrdup(hp->filename + nmelen + 1)) == NULL ||
(hs = HASHnew(hp, b->ttype, BUNlast(b),
- MAX(HASHmask(b->batCount), 1 << 16))) ==
NULL) {
+ MAX(HASHmask(b->batCount), 1 << 16),
BUN_NONE)) == NULL) {
if (hp) {
if (hp->filename)
GDKfree(hp->filename);
diff --git a/gdk/gdk_imprints.c b/gdk/gdk_imprints.c
--- a/gdk/gdk_imprints.c
+++ b/gdk/gdk_imprints.c
@@ -658,6 +658,7 @@ BATimprints(BAT *b)
imprints->dict = (void *) ((uintptr_t) ((char
*) imprints->imps + pages * (imprints->bits / 8) + sizeof(uint64_t)) &
~(sizeof(uint64_t) - 1));
b->T->imprints = imprints;
close(fd);
+ ALGODEBUG fprintf(stderr, "#BATimprints:
reusing persisted imprints\n");
goto do_return;
}
close(fd);
@@ -795,23 +796,26 @@ BATimprints(BAT *b)
((size_t *) imprints->imprints->base)[1] = (size_t)
imprints->impcnt;
((size_t *) imprints->imprints->base)[2] = (size_t)
imprints->dictcnt;
((size_t *) imprints->imprints->base)[3] = (size_t) BATcount(b);
- if (b->batRole == PERSISTENT &&
+ if ((BBP_status(b->batCacheid) & BBPEXISTING) &&
HEAPsave(imprints->imprints, nme, b->batCacheid > 0 ?
"timprints" : "himprints") == 0 &&
(fd = GDKfdlocate(imprints->imprints->farmid, nme, "rb+",
b->batCacheid > 0 ? "timprints" :
"himprints")) >= 0) {
+ ALGODEBUG fprintf(stderr, "#BATimprints: persisting
imprints\n");
/* add version number */
((size_t *) imprints->imprints->base)[0] |= (size_t)
IMPRINTS_VERSION << 8;
/* sync-on-disk checked bit */
((size_t *) imprints->imprints->base)[0] |= (size_t) 1
<< 16;
if (write(fd, imprints->imprints->base, sizeof(size_t))
< 0)
perror("write imprints");
+ if (!(GDKdebug & FORCEMITOMASK)) {
#if defined(NATIVE_WIN32)
- _commit(fd);
+ _commit(fd);
#elif defined(HAVE_FDATASYNC)
- fdatasync(fd);
+ fdatasync(fd);
#elif defined(HAVE_FSYNC)
- fsync(fd);
+ fsync(fd);
#endif
+ }
close(fd);
}
b->T->imprints = imprints;
@@ -918,6 +922,8 @@ IMPSremove(BAT *b)
if ((imprints = b->T->imprints) != NULL) {
b->T->imprints = NULL;
+ if (* (size_t *) imprints->imprints->base & (1 << 16))
+ ALGODEBUG fprintf(stderr, "#IMPSremove: removing
persisted imprints\n");
if (HEAPdelete(imprints->imprints, BBP_physical(b->batCacheid),
b->batCacheid > 0 ? "timprints" : "himprints"))
IODEBUG fprintf(stderr, "#IMPSremove(%s): imprints
heap\n", BATgetId(b));
diff --git a/gdk/gdk_join.c b/gdk/gdk_join.c
--- a/gdk/gdk_join.c
+++ b/gdk/gdk_join.c
@@ -1349,7 +1349,7 @@ hashjoin(BAT *r1, BAT *r2, BAT *l, BAT *
return GDK_SUCCEED;
}
- if (BATprepareHash(r))
+ if (BAThash(r, 0) == GDK_FAIL)
goto bailout;
ri = bat_iterator(r);
nrcand = (BUN) (rcandend - rcand);
diff --git a/gdk/gdk_logger.c b/gdk/gdk_logger.c
--- a/gdk/gdk_logger.c
+++ b/gdk/gdk_logger.c
@@ -101,6 +101,30 @@ typedef struct logformat_t {
static int bm_commit(logger *lg);
static int tr_grow(trans *tr);
+static BUN
+logbat_find_int(BAT *b, int val)
+{
+ BUN p, q;
+ int *t = (int *) Tloc(b, BUNfirst(b));
+
+ for (p = 0, q = BATcount(b); p < q; p++)
+ if (t[p] == val)
+ return p + BUNfirst(b);
+ return BUN_NONE;
+}
+
+static BUN
+logbat_find_bid(BAT *b, log_bid val)
+{
+ BUN p, q;
+ log_bid *t = (log_bid *) Tloc(b, BUNfirst(b));
+
+ for (p = 0, q = BATcount(b); p < q; p++)
+ if (t[p] == val)
+ return p + BUNfirst(b);
+ return BUN_NONE;
+}
+
static void
logbat_destroy(BAT *b)
{
@@ -210,7 +234,7 @@ la_bat_clear(logger *lg, logaction *la)
fprintf(stderr, "#la_bat_clear %s\n", la->name);
/* do we need to skip these old updates */
if (BATcount(lg->snapshots_bid)) {
- BUN p = BUNfnd(lg->snapshots_bid, &bid);
+ BUN p = logbat_find_bid(lg->snapshots_bid, bid);
if (p != BUN_NONE) {
int tid = *(int *) Tloc(lg->snapshots_tid, p);
@@ -243,7 +267,7 @@ log_read_seq(logger *lg, logformat *l)
return LOG_ERR;
}
- if ((p = BUNfnd(lg->seqs_id, &seq)) != BUN_NONE) {
+ if ((p = logbat_find_int(lg->seqs_id, seq)) != BUN_NONE) {
BUNdelete(lg->seqs_id, p, FALSE);
BUNdelete(lg->seqs_val, p, FALSE);
}
@@ -402,7 +426,7 @@ la_bat_updates(logger *lg, logaction *la
/* do we need to skip these old updates */
if (BATcount(lg->snapshots_bid)) {
- BUN p = BUNfnd(lg->snapshots_bid, &bid);
+ BUN p = logbat_find_bid(lg->snapshots_bid, bid);
if (p != BUN_NONE) {
int tid = *(int *) Tloc(lg->snapshots_tid, p);
@@ -479,7 +503,7 @@ la_bat_destroy(logger *lg, logaction *la
BUN p;
logger_del_bat(lg, bid);
- if ((p = BUNfnd(lg->snapshots_bid, &bid)) != BUN_NONE) {
+ if ((p = logbat_find_bid(lg->snapshots_bid, bid)) != BUN_NONE) {
#ifndef NDEBUG
assert(BBP_desc(bid)->S.role == PERSISTENT);
assert(0 <= BBP_desc(bid)->H.heap.farmid &&
BBP_desc(bid)->H.heap.farmid < MAXFARMS);
@@ -593,7 +617,7 @@ la_bat_use(logger *lg, logaction *la)
return;
}
logger_add_bat(lg, b, la->name);
- if ((p = BUNfnd(lg->snapshots_bid, &b->batCacheid)) != BUN_NONE) {
+ if ((p = logbat_find_bid(lg->snapshots_bid, b->batCacheid)) !=
BUN_NONE) {
BUNdelete(lg->snapshots_bid, p, FALSE);
BUNdelete(lg->snapshots_tid, p, FALSE);
}
@@ -946,7 +970,7 @@ logger_commit(logger *lg)
if (lg->debug & 1)
fprintf(stderr, "#logger_commit\n");
- p = BUNfnd(lg->seqs_id, &id);
+ p = logbat_find_int(lg->seqs_id, id);
BUNdelete(lg->seqs_id, p, FALSE);
BUNdelete(lg->seqs_val, p, FALSE);
BUNappend(lg->seqs_id, &id, FALSE);
@@ -1292,7 +1316,7 @@ logger_new(int debug, const char *fn, co
if (lg->seqs_val == 0)
logger_fatal("Logger_new: inconsistent database,
seqs_val does not exist", 0, 0, 0);
if (BATcount(lg->seqs_id)) {
- BUN p = BUNfnd(lg->seqs_id, &id);
+ BUN p = logbat_find_int(lg->seqs_id, id);
lg->id = *(lng *) Tloc(lg->seqs_val, p);
} else {
if (BUNappend(lg->seqs_id, &id, FALSE) == GDK_FAIL ||
@@ -1611,7 +1635,7 @@ logger_changes(logger *lg)
int
logger_sequence(logger *lg, int seq, lng *id)
{
- BUN p = BUNfnd(lg->seqs_id, &seq);
+ BUN p = logbat_find_int(lg->seqs_id, seq);
if (p != BUN_NONE) {
*id = *(lng *) Tloc(lg->seqs_val, p);
@@ -1676,7 +1700,7 @@ log_bat_persists(logger *lg, BAT *b, con
assert(b->T->heap.farmid == 0);
assert(b->T->vheap == NULL ||
BBPfarms[b->T->vheap->farmid].roles & (1 << PERSISTENT));
- if ((p = BUNfnd(lg->snapshots_bid, &b->batCacheid)) !=
BUN_NONE){
+ if ((p = logbat_find_bid(lg->snapshots_bid, b->batCacheid)) !=
BUN_NONE){
BUNdelete(lg->snapshots_bid, p, FALSE);
BUNdelete(lg->snapshots_tid, p, FALSE);
}
@@ -1721,7 +1745,7 @@ log_bat_transient(logger *lg, const char
lg->changes++;
/* if this is a snapshot bat, we need to skip all changes */
- if ((p = BUNfnd(lg->snapshots_bid, &bid)) != BUN_NONE) {
+ if ((p = logbat_find_bid(lg->snapshots_bid, bid)) != BUN_NONE) {
#ifndef NDEBUG
assert(BBP_desc(bid)->S.role == PERSISTENT);
assert(0 <= BBP_desc(bid)->H.heap.farmid &&
BBP_desc(bid)->H.heap.farmid < MAXFARMS);
@@ -2035,7 +2059,7 @@ log_sequence(logger *lg, int seq, lng va
if (lg->debug & 1)
fprintf(stderr, "#log_sequence (%d," LLFMT ")\n", seq, val);
- if ((p = BUNfnd(lg->seqs_id, &seq)) != BUN_NONE) {
+ if ((p = logbat_find_int(lg->seqs_id, seq)) != BUN_NONE) {
BUNdelete(lg->seqs_id, p, FALSE);
BUNdelete(lg->seqs_val, p, FALSE);
}
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list