Changeset: 1ec3c9e06dcf for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=1ec3c9e06dcf
Modified Files:
gdk/gdk.h
gdk/gdk_mosaic.c
gdk/gdk_private.h
gdk/gdk_search.c
monetdb5/modules/mal/mosaic.c
monetdb5/modules/mal/mosaic.h
sql/backends/monet5/sql.c
Branch: mosaic
Log Message:
Bring mosaic in line with orderidx
The existence of the mosaic is detected as an auxillary heap
diffs (truncated from 402 to 300 lines):
diff --git a/gdk/gdk.h b/gdk/gdk.h
--- a/gdk/gdk.h
+++ b/gdk/gdk.h
@@ -2015,8 +2015,10 @@ gdk_export oid OIDnew(oid inc);
gdk_export gdk_return BAThash(BAT *b, BUN masksize);
/* support routines for the mosaic approach */
-gdk_export gdk_return MOSalloc(BAT *b);
+#define MOSAIC_VERSION 20140808
+gdk_export gdk_return MOSalloc(BAT *b, BUN cap);
gdk_export void MOSdestroy(BAT *b);
+gdk_export int BATcheckmosaic(BAT *b);
/*
* @- Column Imprints Functions
diff --git a/gdk/gdk_mosaic.c b/gdk/gdk_mosaic.c
--- a/gdk/gdk_mosaic.c
+++ b/gdk/gdk_mosaic.c
@@ -15,18 +15,18 @@
#include "gdk_private.h"
gdk_return
-MOSalloc(BAT *bn)
+MOSalloc(BAT *bn, BUN cap)
{
const char *nme = BBP_physical(bn->batCacheid);
- if ( (bn->T->mosaic = (Heap*)GDKzalloc(sizeof(Heap))) == NULL ||
- (bn->T->mosaic->filename = GDKfilepath(NOFARM, NULL, nme, "mosaic"))
== NULL)
+ if ( (bn->tmosaic = (Heap*)GDKzalloc(sizeof(Heap))) == NULL ||
+ (bn->tmosaic->filename = GDKfilepath(NOFARM, NULL, nme, "mosaic")) ==
NULL)
return GDK_FAIL;
- if( HEAPalloc(bn->T->mosaic, BATcapacity(bn) + 25*1024 , Tsize(bn)) !=
GDK_SUCCEED)
+ if( HEAPalloc(bn->tmosaic, cap, Tsize(bn)) != GDK_SUCCEED)
return GDK_FAIL;
- bn->T->mosaic->parentid = bn->batCacheid;
- bn->T->mosaic->farmid = BBPselectfarm(bn->batRole, bn->ttype, varheap);
+ bn->tmosaic->parentid = bn->batCacheid;
+ bn->tmosaic->farmid = BBPselectfarm(bn->batRole, bn->ttype, varheap);
return GDK_SUCCEED;
}
@@ -34,12 +34,65 @@ void
MOSdestroy(BAT *bn)
{ Heap *h;
- if( bn && bn->T->mosaic && !VIEWtparent(bn)){
- h= bn->T->mosaic;
- bn->T->mosaic = NULL;
+ if( bn && bn->tmosaic && !VIEWtparent(bn)){
+ h= bn->tmosaic;
+ bn->tmosaic = NULL;
if( HEAPdelete(h, BBP_physical(bn->batCacheid), "mosaic"))
IODEBUG fprintf(stderr,"#MOSdestroy (%s) failed",
BATgetId(bn));
- bn->T->mosaic = NULL;
+ bn->tmosaic = NULL;
GDKfree(h);
}
}
+/* return TRUE if we have a mosaic on the tail, even if we need to read
+ * one from disk */
+int
+BATcheckmosaic(BAT *b)
+{
+ int ret;
+ lng t;
+
+ assert(b->batCacheid > 0);
+ t = GDKusec();
+ MT_lock_set(&GDKhashLock(abs(b->batCacheid)));
+ t = GDKusec() - t;
+ if (b->tmosaic == NULL) {
+ Heap *hp;
+ const char *nme = BBP_physical(b->batCacheid);
+ const char *ext = "mosaic";
+ int fd;
+
+ if ((hp = GDKzalloc(sizeof(*hp))) != NULL &&
+ (hp->farmid = BBPselectfarm(b->batRole, b->ttype,
mosaicheap)) >= 0 &&
+ (hp->filename = GDKmalloc(strlen(nme) + 10)) != NULL) {
+ sprintf(hp->filename, "%s.%s", nme, ext);
+
+ /* check whether a persisted mosaic can be found */
+ if ((fd = GDKfdlocate(hp->farmid, nme, "rb+", ext)) >=
0) {
+ struct stat st;
+ int hdata;
+
+ if (BATcount(b) > 0 && read(fd, &hdata,
sizeof(hdata)) == sizeof(hdata) &&
+ hdata == MOSAIC_VERSION &&
+ fstat(fd, &st) == 0 &&
+ st.st_size >= (off_t) (hp->size = hp->free
= (oid) BATcount(b) * SIZEOF_OID) &&
+ HEAPload(hp, nme, ext, 0) == GDK_SUCCEED) {
+ close(fd);
+ b->tmosaic = hp;
+ ALGODEBUG fprintf(stderr,
"#BATcheckmosaic: reusing persisted mosaic %d\n", b->batCacheid);
+
MT_lock_unset(&GDKhashLock(abs(b->batCacheid)));
+ return 1;
+ }
+ close(fd);
+ /* unlink unusable file */
+ GDKunlink(hp->farmid, BATDIR, nme, ext);
+ }
+ GDKfree(hp->filename);
+ }
+ GDKfree(hp);
+ GDKclrerr(); /* we're not currently interested in errors */
+ }
+ ret = b->tmosaic != NULL;
+ MT_lock_unset(&GDKhashLock(abs(b->batCacheid)));
+ ALGODEBUG if (ret) fprintf(stderr, "#BATcheckmosaic: already has mosaic
%d, waited " LLFMT " usec\n", b->batCacheid, t);
+ return ret;
+}
diff --git a/gdk/gdk_private.h b/gdk/gdk_private.h
--- a/gdk/gdk_private.h
+++ b/gdk/gdk_private.h
@@ -58,8 +58,6 @@ struct BATstore {
__attribute__((__visibility__("hidden")));
__hidden gdk_return BATcheckmodes(BAT *b, int persistent)
__attribute__((__visibility__("hidden")));
-__hidden int BATcheckmosaic(BAT *b)
- __attribute__((__visibility__("hidden")));
__hidden BATstore *BATcreatedesc(int ht, int tt, int heapnames, int role)
__attribute__((__visibility__("hidden")));
__hidden void BATdelete(BAT *b)
diff --git a/gdk/gdk_search.c b/gdk/gdk_search.c
--- a/gdk/gdk_search.c
+++ b/gdk/gdk_search.c
@@ -848,6 +848,7 @@ SORTfndwhich(BAT *b, const void *v, enum
}
break;
}
+
return cur;
}
diff --git a/monetdb5/modules/mal/mosaic.c b/monetdb5/modules/mal/mosaic.c
--- a/monetdb5/modules/mal/mosaic.c
+++ b/monetdb5/modules/mal/mosaic.c
@@ -45,7 +45,7 @@ MOSinit(MOStask task, BAT *b){
if( isVIEW(b))
b= BATdescriptor(VIEWtparent(b));
assert(b);
- base = b->T->mosaic->base;
+ base = b->tmosaic->base;
assert(base);
task->type = b->ttype;
task->bsrc = b;
@@ -67,7 +67,7 @@ MOSdumpTask(Client cntxt,MOStask task)
mnstr_printf(cntxt->fdout,"# ");
mnstr_printf(cntxt->fdout,"clk "LLFMT"\tsizes "SZFMT"\t%3.0fx\t",
- task->timer, task->bsrc->T->mosaic->free, (flt)
task->bsrc->T->heap.free/task->bsrc->T->mosaic->free);
+ task->timer, task->bsrc->tmosaic->free, (flt)
task->bsrc->T->heap.free/task->bsrc->T->mosaic->free);
for ( i=0; i < MOSAIC_METHODS -1; i++)
if( task->filter[i])
mnstr_printf(cntxt->fdout, "%s["LLFMT ","LLFMT "]\t" ,
MOSfiltername[i], task->hdr->blks[i], task->hdr->elms[i]);
@@ -222,7 +222,7 @@ MOSdump(Client cntxt, MalBlkPtr mb, MalS
if ((b = BATdescriptor(bid)) == NULL)
throw(MAL,"mosaic.dump",INTERNAL_BAT_ACCESS);
- if ( !b->T->mosaic){
+ if ( !b->tmosaic){
BBPunfix(bid);
return MAL_SUCCEED;
}
@@ -383,7 +383,7 @@ MOScompressInternal(Client cntxt, bat *r
return msg;
}
- if ( isVIEWCOMBINE(bsrc) || BATcount(bsrc) < MIN_INPUT_COUNT ){
+ if ( isVIEWCOMBINE(bsrc) || BATcount(bsrc) < MIN_INPUT_COUNT ||
BATcheckmosaic(bsrc)){
/* no need to compress */
BBPkeepref(*ret = bsrc->batCacheid);
return msg;
@@ -393,18 +393,13 @@ MOScompressInternal(Client cntxt, bat *r
mnstr_printf(cntxt->fdout,"#compress bat %d \n",*bid);
#endif
- if( bsrc->T->mosaic == NULL){
+ if( bsrc->tmosaic == NULL && MOSalloc(bsrc, BATcapacity(bsrc) +
(MosaicHdrSize + MosaicBlkSize)/Tsize(bsrc)+ BATTINY) == GDK_FAIL){
// create the mosaic heap if not available.
- bsrc->T->mosaic = (Heap *) GDKzalloc(sizeof(Heap));
- if( bsrc->T->mosaic == NULL)
- throw(MAL,"mosaic.compress", MAL_MALLOC_FAIL);
- // create the heap for the compressed data
// The final size should be smaller then the original
// It may, however, be the case that we mix a lot of LITERAL
and, say, DELTA small blocks
// Then we total size may go beyond the original size and we
should terminate the process.
// This should be detected before we compress a block, in the
estimate functions
// or when we extend the non-compressed collector block
- if( MOSalloc(bsrc) == GDK_FAIL)
throw(MAL,"mosaic.compress", "heap construction
failes");
}
@@ -434,13 +429,13 @@ MOScompressInternal(Client cntxt, bat *r
while(task->start < task->stop ){
// default is to extend the non-compressed block with a single
element
cand = MOSoptimizer(cntxt, task, typewidth);
- if( task->dst >= bsrc->T->mosaic->base + bsrc->T->mosaic->size
- 16 ){
+ if( task->dst >= bsrc->tmosaic->base + bsrc->T->mosaic->size -
16 ){
MOSdestroy(bsrc);
msg= createException(MAL,"mosaic","abort compression
due to size");
task->hdr = 0;
goto finalize;
}
- assert (task->dst < bsrc->T->mosaic->base +
bsrc->T->mosaic->size );
+ assert (task->dst < bsrc->tmosaic->base + bsrc->T->mosaic->size
);
// wrapup previous block
switch(cand){
@@ -523,7 +518,7 @@ MOScompressInternal(Client cntxt, bat *r
MOSsetTag(task->blk,MOSAIC_EOL);
} else
task->dst = ((char*) task->blk)+ MosaicBlkSize;
- task->bsrc->T->mosaic->free = (task->dst - (char*)task->hdr);
+ task->bsrc->tmosaic->free = (task->dst - (char*)task->hdr);
task->timer = GDKusec() - task->timer;
if(debug)
MOSdumpTask(cntxt,task);
@@ -531,7 +526,7 @@ MOScompressInternal(Client cntxt, bat *r
// TODO
bsrc->batDirty = 1;
- task->ratio = task->hdr->ratio = (flt)task->bsrc->T->heap.free/
task->bsrc->T->mosaic->free;
+ task->ratio = task->hdr->ratio = (flt)task->bsrc->T->heap.free/
task->bsrc->tmosaic->free;
finalize:
MCexitMaintenance(cntxt);
*ret= bsrc->batCacheid;
@@ -590,7 +585,11 @@ MOSdecompressInternal(Client cntxt, bat
if ((bsrc = BATdescriptor(*bid)) == NULL)
throw(MAL, "mosaic.decompress", INTERNAL_BAT_ACCESS);
- if (!bsrc->T->mosaic) {
+ if (BATcheckmosaic(bsrc) == 0 ){
+ BBPunfix(bsrc->batCacheid);
+ throw(MAL, "mosaic.decompress", "mosaic file not available");
+ }
+ if (!bsrc->tmosaic) {
BBPkeepref(*ret = bsrc->batCacheid);
return MAL_SUCCEED;
}
@@ -599,7 +598,7 @@ MOSdecompressInternal(Client cntxt, bat
throw(MAL, "mosaic.decompress", "cannot decompress
VIEWCOMBINE");
}
- if (bsrc->T->mosaic && VIEWtparent(bsrc)) {
+ if (bsrc->tmosaic && VIEWtparent(bsrc)) {
BBPunfix(bsrc->batCacheid);
throw(MAL, "mosaic.decompress", "cannot decompress tail-VIEW");
}
@@ -720,7 +719,7 @@ isCompressed(bat bid)
if( bid == 0)
return 0;
b = BATdescriptor(bid);
- r = b->T->mosaic != NULL;
+ r = BATcheckmosaic(b);
BBPunfix(bid);
return r;
}
@@ -1196,7 +1195,7 @@ MOSjoin(Client cntxt, MalBlkPtr mb, MalS
}
// we assume one compressed argument
- if (bl->T->mosaic && br->T->mosaic){
+ if (bl->tmosaic && br->T->mosaic){
BBPunfix(bl->batCacheid);
BBPunfix(br->batCacheid);
throw(MAL,"mosaic.join","Join over generator pairs not
supported");
@@ -1214,7 +1213,7 @@ MOSjoin(Client cntxt, MalBlkPtr mb, MalS
throw(MAL,"mosaic.join",MAL_MALLOC_FAIL);
}
- if ( bl->T->mosaic){
+ if ( bl->tmosaic){
MOSinit(task,bl);
//task->elm = BATcount(br);
//task->src= Tloc(br,BUNfirst(br));
@@ -1241,7 +1240,7 @@ MOSjoin(Client cntxt, MalBlkPtr mb, MalS
// position the scan on the first mosaic block to consider
MOSinitializeScan(cntxt,task,startblk,stopblk);
- if ( bl->T->mosaic){
+ if ( bl->tmosaic){
task->elm = BATcount(br);
task->src= Tloc(br,BUNfirst(br));
} else {
@@ -1373,7 +1372,12 @@ MOSanalyseInternal(Client cntxt, int thr
BBPunfix(bid);
return 0;
}
- if ( BATcount(b) < MIN_INPUT_COUNT ){
+ if ( BATcheckmosaic(b)){
+ mnstr_printf(cntxt->fdout,"#already compressed %d %s\n",bid,
BBP_logical(bid));
+ BBPunfix(bid);
+ return 0;
+ }
+ if ( BATcount(b) < MIN_INPUT_COUNT ){
mnstr_printf(cntxt->fdout,"#ignore small %d %s\n",bid,
BBP_logical(bid));
BBPunfix(bid);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list