Changeset: 8526e6a85f76 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8526e6a85f76
Modified Files:
monetdb5/extras/rdf/rdf.mx
monetdb5/extras/rdf/rdf_shredder.mx
monetdb5/modules/kernel/algebra.mx
monetdb5/modules/mal/tokenizer.c
monetdb5/modules/mal/tokenizer.h
monetdb5/modules/mal/tokenizer.mal
sql/backends/monet5/sql.mx
Branch: default
Log Message:
Update the code so that RDF module can load large rdf dataset (tested with
rdf-h data, an rdf version of tpc-h, SF10) and work with the mal plans provided
by HSP as well as the optimized mal plans for sorted rdf-h data.
- Add rdf2str() (address TKNZRrdf2str()) to convert id into string using the
dictionary, but if (id >= RDF_MIN_LITERAL) look it up in a literal map bat. (By
now, this function is only used in RDF module).
- Fix bug in SQLrdfShred() function (in sql/backends/monet5/sql.mx) while
loading rdf data
- Add leftfetchjoin_sorted() (address ALGleftfetchjoin_sorted) which is similar
to algebra_leftfetchjoin but asserts that the resulting tail column is sorted.
This also requires sorted tail left input. (Use this if you are sure of this).
diffs (truncated from 641 to 300 lines):
diff --git a/monetdb5/extras/rdf/rdf.mx b/monetdb5/extras/rdf/rdf.mx
--- a/monetdb5/extras/rdf/rdf.mx
+++ b/monetdb5/extras/rdf/rdf.mx
@@ -25,10 +25,9 @@ All Rights Reserved.
@mal
module rdf;
-command shred(location:str, graphname:str, schema:str):bat[:void, :bat]
-address RDFParser
-comment "look behind you!"
-
+command leftfetchjoin_sorted ( left:bat[:any_1,:oid], right:bat[:oid,:any_3] )
:bat[:any_1,:any_3]
+address ALGleftfetchjoin_sorted
+comment "like algebra_leftfetchjoin(), but asserts that the resulting tail
column is sorted -- ONLY USE IF YOU ARE SURE OF THIS!!! also requires sorted
tail left input"
@h
#ifndef _RDF_H_
#define _RDF_H_
@@ -47,7 +46,7 @@ comment "look behind you!"
#define _RDF_DEBUG
rdf_export str
-RDFParser(int *retval, str *location, str *graphname, str *schemam);
+RDFParser(BAT **graph, str *location, str *graphname, str *schemam);
#define TRIPLE_STORE 1
#define MLA_STORE 2
diff --git a/monetdb5/extras/rdf/rdf_shredder.mx
b/monetdb5/extras/rdf/rdf_shredder.mx
--- a/monetdb5/extras/rdf/rdf_shredder.mx
+++ b/monetdb5/extras/rdf/rdf_shredder.mx
@@ -90,7 +90,7 @@ typedef struct parserData {
int line; /* locator for errors */
int column; /* locator for errors */
/**GRAPH DATA */
- BAT *graph[N_GRAPH_BAT]; /* BATs for the result
+ BAT **graph; /* BATs for the result
shredded RDF graph */
} parserData;
@@ -144,7 +144,7 @@ at a time. Function rdf_parser_triple_ha
@= rdf_BUNappend_unq_1
bun = BUNfnd(BATmirror(@1),(ptr)@2);
if (bun == BUN_NONE) {
- if (BATcount(@1) > 4 * @1->T->hash->mask) {
+ if (@1->T->hash && BATcount(@1) > 4 * @1->T->hash->mask) {
HASHdestroy(@1);
BAThash(BATmirror(@1), 2*BATcount(@1));
}
@@ -258,7 +258,7 @@ create_BAT(int ht, int tt, int size)
}
static parserData*
-parserData_create (str location)
+parserData_create (str location, BAT** graph)
{
int i;
@@ -271,8 +271,9 @@ parserData_create (str location)
pdata->error = 0;
pdata->warning = 0;
pdata->location = location;
+ pdata->graph = graph;
- for (i = 0; i < N_GRAPH_BAT; i++) {
+ for (i = 0; i <= N_GRAPH_BAT; i++) {
pdata->graph[i] = NULL;
}
@@ -300,7 +301,7 @@ parserData_create (str location)
return NULL;
}
/* MAP_LEX must have the key property */
- BATseqbase(pdata->graph[MAP_LEX], 1 << 30);
+ BATseqbase(pdata->graph[MAP_LEX], RDF_MIN_LITERAL);
pdata->graph[MAP_LEX]->tkey = BOUND2BTRUE;
pdata->graph[MAP_LEX]->T->nokey[0] = 0;
pdata->graph[MAP_LEX]->T->nokey[1] = 0;
@@ -380,19 +381,19 @@ post_processing (parserData *pdata)
/* order MAP_LEX */
BATorder(BATmirror(graph[MAP_LEX]));
- map_oid = BATmark(graph[MAP_LEX], 1<<30); /* BATmark will create a
copy */
+ map_oid = BATmark(graph[MAP_LEX], RDF_MIN_LITERAL); /* BATmark will
create a copy */
BATorder(map_oid);
BATsetaccess(map_oid, BAT_READ); /* force BAtmark not to copy
bat */
- map_oid = BATmirror(BATmark(BATmirror(map_oid), 1<<30));
+ map_oid = BATmirror(BATmark(BATmirror(map_oid), RDF_MIN_LITERAL));
BATsetaccess(graph[MAP_LEX], BAT_READ); /* force BATmark not to copy
bat */
- graph[MAP_LEX] = BATmirror(BATmark(BATmirror(graph[MAP_LEX]), 1<<30));
+ graph[MAP_LEX] = BATmirror(BATmark(BATmirror(graph[MAP_LEX]),
RDF_MIN_LITERAL));
/* convert old oids of O_sort to new ones */
bi = bat_iterator(graph[O_sort]);
mi = bat_iterator(map_oid);
BATloop(graph[O_sort], p, d) {
bt = (oid *) BUNtloc(bi, p);
- if (*bt >= (1 << 30)) {
+ if (*bt >= (RDF_MIN_LITERAL)) {
BUNfndVOID(r, mi, bt);
void_inplace(graph[O_sort], p, BUNtloc(mi, r), 1);
}
@@ -491,9 +492,11 @@ if (pdata != NULL) {
@
@c
+#define RDF_CHUNK_SIZE 100*1024*1024
+
/* Main RDF parser function that drives raptor */
str
-RDFParser (int *retval, str *location, str *graphname, str *schema)
+RDFParser (BAT **graph, str *location, str *graphname, str *schema)
{
raptor_parser *rparser;
parserData *pdata;
@@ -501,8 +504,6 @@ RDFParser (int *retval, str *location, s
bit isURI;
str ret;
int iret;
- BAT **graph;
- BAT *retbat;
(void) graphname;
/* init tokenizer */
@@ -516,7 +517,7 @@ RDFParser (int *retval, str *location, s
#endif
/* Init pdata */
- pdata = parserData_create(*location);
+ pdata = parserData_create(*location,graph);
if (pdata == NULL) {
#ifdef _TKNZR_H
TKNZRclose(&iret);
@@ -552,16 +553,38 @@ RDFParser (int *retval, str *location, s
uri = raptor_new_uri((unsigned char *) pdata->location);
iret = raptor_parse_uri(rparser, uri, NULL);
} else {
- uri = raptor_new_uri(
-
raptor_uri_filename_to_uri_string(pdata->location));
- iret = raptor_parse_file(rparser, uri, NULL);
+
+ /* Too slow loading --> use old code
+ FILE *fp = fopen(pdata->location, "r");
+ char *buf = (char*) GDKmalloc(RDF_CHUNK_SIZE);
+ if (buf == NULL) {
+ throw(RDF, "rdf.rdfShred",
+ "could not allocate a %dMB file buffer\n",
(int) (RDF_CHUNK_SIZE>>20));
+ }
+ uri =
raptor_new_uri(raptor_uri_filename_to_uri_string(pdata->location));
+ iret = raptor_start_parse(rparser, uri);
+ while(fp && iret == 0) {
+ ssize_t len = (ssize_t) fread(buf, 1, RDF_CHUNK_SIZE,
fp);
+ iret = raptor_parse_chunk(rparser, (const unsigned
char*) buf, (size_t) len, len < RDF_CHUNK_SIZE);
+ }
+ fclose(fp);
+
+ */
+
+ /* does/may? not work on large files -- therefore the abpove
chunked read
+ iret = raptor_parse_file_stream(rparser, fp,
pdata->location, uri);
+ */
+
+ /* Old code */
+ uri = raptor_new_uri(
+
raptor_uri_filename_to_uri_string(pdata->location));
+ iret = raptor_parse_file(rparser, uri, NULL);
}
@:clean_raptor@
#ifdef _TKNZR_H
TKNZRclose(&iret);
#endif
- graph = pdata->graph;
assert (pdata->tcount == BATcount(graph[S_sort]) &&
pdata->tcount == BATcount(graph[P_sort]) &&
pdata->tcount == BATcount(graph[O_sort]));
@@ -590,22 +613,6 @@ RDFParser (int *retval, str *location, s
@:clean@
throw(RDF, "rdf.rdfShred", "could not post-proccess data");
}
-
- /* prepare return bat of bats */
- retbat = BATnew(TYPE_void, TYPE_bat, N_GRAPH_BAT);
- if (retbat == NULL) {
- @:clean@
- throw(RDF, "rdf.rdfShred",
- "could not allocate enough memory for return
bat");
- }
- BATseqbase(retbat, 0);
- for (iret = 0; iret < N_GRAPH_BAT; iret++) {
- retbat = BUNappend(retbat, &graph[iret]->batCacheid, TRUE);
- BBPunfix(graph[iret]->batCacheid);
- }
-
GDKfree(pdata);
- BBPkeepref(*retval = retbat->batCacheid);
-
return MAL_SUCCEED;
}
diff --git a/monetdb5/modules/kernel/algebra.mx
b/monetdb5/modules/kernel/algebra.mx
--- a/monetdb5/modules/kernel/algebra.mx
+++ b/monetdb5/modules/kernel/algebra.mx
@@ -576,6 +576,10 @@ command leftfetchjoin ( left:bat[:any_1,
address ALGleftfetchjoin
comment "Hook directly into the left fetch join implementation.";
+command leftfetchjoin_sorted ( left:bat[:any_1,:oid], right:bat[:oid,:any_3] )
:bat[:any_1,:any_3]
+address ALGleftfetchjoin_sorted
+comment "like leftfetchjoin(), but asserts that the resulting tail column is
sorted -- ONLY USE IF YOU ARE SURE OF THIS!!! also requires sorted tail left
input"
+
command mergejoin (left:bat[:any_1,:any_2], right:bat[:any_2,:any_3])
:bat[:any_1,:any_3]
address ALGmergejoin
@@ -1108,6 +1112,7 @@ algebra_export str ALG@1(int *result, in
@:ALGbinaryestimateExport(fetchjoin)@
@:ALGbinaryestimateExport(leftjoin)@
@:ALGbinaryestimateExport(leftfetchjoin)@
+@:ALGbinaryestimateExport(leftfetchjoin_sorted)@
@:ALGbinaryestimateExport(outerjoin)@
@:ALGbinaryExport(semijoin)@
@:ALGbinaryExport(sunion)@
@@ -2768,6 +2773,18 @@ ALGleftfetchjoin(bat *result, bat *lid,
return ALGbinaryestimate(result, lid, rid, NULL, BATleftfetchjoin,
"algebra.leftfetchjoin");
}
+static BAT* leftfetchjoin_sorted(BAT* left, BAT *right, BUN estimate) {
+ BAT *bn = BATleftfetchjoin(left, right, estimate);
+ if (bn) bn->tsorted = TRUE; /* OK: we must be sure of this, but you
are, aren't you? */
+ return bn;
+}
+
+str
+ALGleftfetchjoin_sorted(bat *result, bat *lid, bat *rid)
+{
+ return ALGbinaryestimate(result, lid, rid, NULL, leftfetchjoin_sorted,
"algebra.leftfetchjoin_sorted");
+}
+
str
ALGouterjoinestimate(bat *result, bat *lid, bat *rid, lng *estimate)
{
diff --git a/monetdb5/modules/mal/tokenizer.c b/monetdb5/modules/mal/tokenizer.c
--- a/monetdb5/modules/mal/tokenizer.c
+++ b/monetdb5/modules/mal/tokenizer.c
@@ -67,6 +67,20 @@ static char name[128];
#define GET_d(x) ((sht)((x)&255))
#define GET_h(x) ((x)>>8)
+static int prvlocate(BAT* b, oid *prv, str part) {
+ BAT *m = BATmirror(b);
+ BATiter mi = bat_iterator(m);
+ BUN p;
+ if (m->H->hash == NULL) BAThash(m, 2*BATcount(m));
+ HASHloop_str(mi, m->H->hash, p, part) {
+ if (*((oid *)BUNtail(mi,p)) == *prv) {
+ *prv = (oid) p;
+ return TRUE;
+ }
+ }
+ return FALSE;
+}
+
str
TKNZRopen(int *ret, str *in)
{
@@ -260,19 +274,7 @@ TKNZRappend(oid *pos, str *s)
if (p != BUN_NONE) {
prv = (oid) p;
for (i = 1; i < new; i++) {
- BAT *m = BATmirror(tokenBAT[i]);
- BATiter mi = bat_iterator(m);
- int fnd = 0;
-
- if (m->H->hash == NULL) BAThash(m, 2*BATcount(m));
- HASHloop_str(mi, m->H->hash, p, parts[i]) {
- if (*((oid *)BUNtail(mi,p)) == prv) {
- prv = (oid) p;
- fnd = 1;
- break;
- }
- }
- if (!fnd) break;
+ if (!prvlocate(tokenBAT[i], &prv, parts[i])) break;
}
} else {
i = 0;
@@ -422,14 +424,9 @@ TKNZRlocate(Client cntxt, MalBlkPtr mb,
if (p != BUN_NONE) {
prv = (oid) p;
for (i = 1; i < depth; i++) {
- p = BUNlocate(tokenBAT[i],(ptr) &prv, parts[i]);
- if (p == BUN_NONE) {
- prv = oid_nil;
- break;
- }
- prv = (oid) p;
+ if (!prvlocate(tokenBAT[i],(ptr) &prv,
parts[i])) break;
}
- if (prv == oid_nil) {
+ if (i < depth) {
pos = oid_nil;
} else {
comp = COMP(prv,i);
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list