Changeset: 5180d32892f0 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/5180d32892f0
Modified Files:
        monetdb5/optimizer/opt_dict.c
        sql/backends/monet5/dict.c
        sql/backends/monet5/dict.h
        sql/backends/monet5/sql.c
Branch: dict
Log Message:

handle joins within the dict module, ie first join on dictionaries then 
possibly renumber then join on the offsets


diffs (truncated from 387 to 300 lines):

diff --git a/monetdb5/optimizer/opt_dict.c b/monetdb5/optimizer/opt_dict.c
--- a/monetdb5/optimizer/opt_dict.c
+++ b/monetdb5/optimizer/opt_dict.c
@@ -138,8 +138,7 @@ OPTdictImplementation(Client cntxt, MalB
                                        /* (r1, r2) = join(col1, col2, cand1, 
cand2, ...) with
                                         *              col1 = 
dict.decompress(o1,u1), col2 = dict.decompress(o2,u2)
                                         *              iff u1 == u2
-                                        *                      (r1, r2) = 
join(o1, o2, cand1, cand2, ...)
-                                        *              else go for decompress 
*/
+                                        *                      (r1, r2) = 
algebra.join(o1, o2, cand1, cand2, ...) */
                                        int l = getArg(p, j+1);
                                        InstrPtr r = copyInstruction(p);
                                        getArg(r, j+0) = varisdict[k];
@@ -147,6 +146,27 @@ OPTdictImplementation(Client cntxt, MalB
                                        pushInstruction(mb,r);
                                        done = 1;
                                        break;
+                               } else if (j == 2 && p->argc > j+1 && 
getModuleId(p) == algebraRef && getFunctionId(p) == joinRef
+                                               && varisdict[getArg(p, j+1)] && 
vardictvalue[k] != vardictvalue[getArg(p, j+1)]) {
+                                       /* (r1, r2) = join(col1, col2, cand1, 
cand2, ...) with
+                                        *              col1 = 
dict.decompress(o1,u1), col2 = dict.decompress(o2,u2)
+                                        * (r1, r2) = dict.join(o1, u1, o2, u2, 
cand1, cand2, ...) */
+                                       int l = getArg(p, j+1);
+                                       InstrPtr r = newInstructionArgs(mb, 
dictRef, joinRef, 10);
+                                       assert(p->argc==8);
+                                       getArg(r, 0) = getArg(p, 0);
+                                       r = pushReturn(mb, r, getArg(p, 1));
+                                       r = addArgument(mb, r, varisdict[k]);
+                                       r = addArgument(mb, r, vardictvalue[k]);
+                                       r = addArgument(mb, r, varisdict[l]);
+                                       r = addArgument(mb, r, vardictvalue[l]);
+                                       r = addArgument(mb, r, getArg(p, 4));
+                                       r = addArgument(mb, r, getArg(p, 5));
+                                       r = addArgument(mb, r, getArg(p, 6));
+                                       r = addArgument(mb, r, getArg(p, 7));
+                                       pushInstruction(mb,r);
+                                       done = 1;
+                                       break;
                                } else if ((isMapOp(p) || isMap2Op(p)) && 
allConstExcept(mb, p, j)) {
                                        /* batcalc.-(1, col) with col = 
dict.decompress(o,u)
                                         * v1 = batcalc.-(1, u)
diff --git a/sql/backends/monet5/dict.c b/sql/backends/monet5/dict.c
--- a/sql/backends/monet5/dict.c
+++ b/sql/backends/monet5/dict.c
@@ -19,20 +19,20 @@ DICTcompress(Client cntxt, MalBlkPtr mb,
        sql_trans *tr = NULL;
 
        if (!sname || !tname || !cname)
-               throw(SQL, "sql.dict_compress", SQLSTATE(3F000) "dict compress: 
invalid column name");
+               throw(SQL, "dict.compress", SQLSTATE(3F000) "dict compress: 
invalid column name");
        if ((msg = getBackendContext(cntxt, &be)) != MAL_SUCCEED)
                return msg;
        tr = be->mvc->session->tr;
 
        sql_schema *s = find_sql_schema(tr, sname);
        if (!s)
-               throw(SQL, "sql.dict_compress", SQLSTATE(3F000) "schema '%s' 
unknown", sname);
+               throw(SQL, "dict.compress", SQLSTATE(3F000) "schema '%s' 
unknown", sname);
        sql_table *t = find_sql_table(tr, s, tname);
        if (!t)
-               throw(SQL, "sql.dict_compress", SQLSTATE(3F000) "table '%s.%s' 
unknown", sname, tname);
+               throw(SQL, "dict.compress", SQLSTATE(3F000) "table '%s.%s' 
unknown", sname, tname);
        sql_column *c = find_sql_column(t, cname);
        if (!c)
-               throw(SQL, "sql.dict_compress", SQLSTATE(3F000) "column 
'%s.%s.%s' unknown", sname, tname, cname);
+               throw(SQL, "dict.compress", SQLSTATE(3F000) "column '%s.%s.%s' 
unknown", sname, tname, cname);
 
        sqlstore *store = tr->store;
        BAT *b = store->storage_api.bind_col(tr, c, RDONLY);
@@ -40,7 +40,7 @@ DICTcompress(Client cntxt, MalBlkPtr mb,
        /* for now use all rows */
        BAT *u = BATunique(b, NULL);
        if (!u)
-               throw(SQL, "sql.dict_compress", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
+               throw(SQL, "dict.compress", SQLSTATE(HY013) MAL_MALLOC_FAIL);
 
        BUN cnt = BATcount(u);
        /* create hash on u */
@@ -48,18 +48,18 @@ DICTcompress(Client cntxt, MalBlkPtr mb,
        if (cnt > 2L*1024*1024*1024) {
                bat_destroy(u);
                bat_destroy(b);
-               throw(SQL, "sql.dict_compress", SQLSTATE(3F000) "dict compress: 
too many values");
+               throw(SQL, "dict.compress", SQLSTATE(3F000) "dict compress: too 
many values");
        }
        BAT *uv = BATproject(u, b); /* get values */
        bat_destroy(u);
        if (!uv) {
                bat_destroy(b);
-               throw(SQL, "sql.dict_compress", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
+               throw(SQL, "dict.compress", SQLSTATE(HY013) MAL_MALLOC_FAIL);
        }
     BAT *uu = COLcopy(uv, uv->ttype, true, PERSISTENT);
        if (!uu) {
                bat_destroy(uv);
-               throw(SQL, "sql.dict_compress", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
+               throw(SQL, "dict.compress", SQLSTATE(HY013) MAL_MALLOC_FAIL);
        }
        bat_destroy(uv);
        u = uu;
@@ -67,7 +67,7 @@ DICTcompress(Client cntxt, MalBlkPtr mb,
        BAT *o = COLnew(b->hseqbase, tt, BATcount(b), PERSISTENT);
        if (!o || BAThash(u) != GDK_SUCCEED) {
                bat_destroy(u);
-               throw(SQL, "sql.dict_compress", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
+               throw(SQL, "dict.compress", SQLSTATE(HY013) MAL_MALLOC_FAIL);
        }
 
        BUN p, q;
@@ -89,7 +89,7 @@ DICTcompress(Client cntxt, MalBlkPtr mb,
                o->tkey = b->tkey;
                if (sql_trans_alter_storage(tr, c, "DICT") != LOG_OK || 
store->storage_api.col_dict(tr, c, o, u) != LOG_OK) {
                        bat_iterator_end(&bi);
-                       throw(SQL, "sql.dict_compress", SQLSTATE(HY013) 
"alter_storage failed");
+                       throw(SQL, "dict.compress", SQLSTATE(HY013) 
"alter_storage failed");
                }
        } else if (tt == TYPE_sht) {
                sht *op = (sht*)Tloc(o, 0);
@@ -107,7 +107,7 @@ DICTcompress(Client cntxt, MalBlkPtr mb,
                o->tkey = b->tkey;
                if (sql_trans_alter_storage(tr, c, "DICT") != LOG_OK || 
store->storage_api.col_dict(tr, c, o, u) != LOG_OK) {
                        bat_iterator_end(&bi);
-                       throw(SQL, "sql.dict_compress", SQLSTATE(HY013) 
"alter_storage failed");
+                       throw(SQL, "dict.compress", SQLSTATE(HY013) 
"alter_storage failed");
                }
        } else {
                printf("implement int cases \n");
@@ -137,7 +137,7 @@ DICTdecompress(Client cntxt, MalBlkPtr m
        if (!o || !u) {
                bat_destroy(o);
                bat_destroy(u);
-               throw(SQL, "sql.dict_compress", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
+               throw(SQL, "dict.decompress", SQLSTATE(HY013) MAL_MALLOC_FAIL);
        }
 
        BAT *b = COLnew(o->hseqbase, u->ttype, BATcount(o), TRANSIENT);
@@ -153,7 +153,7 @@ DICTdecompress(Client cntxt, MalBlkPtr m
                                bat_destroy(b);
                                bat_destroy(o);
                                bat_destroy(u);
-                               throw(SQL, "sql.dict_compress", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
+                               throw(SQL, "dict.decompress", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
                        }
                }
        } else if (o->ttype == TYPE_sht) {
@@ -165,7 +165,7 @@ DICTdecompress(Client cntxt, MalBlkPtr m
                                bat_destroy(b);
                                bat_destroy(o);
                                bat_destroy(u);
-                               throw(SQL, "sql.dict_compress", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
+                               throw(SQL, "dict.decompress", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
                        }
                }
        } else if (o->ttype == TYPE_int) {
@@ -175,7 +175,7 @@ DICTdecompress(Client cntxt, MalBlkPtr m
                bat_destroy(b);
                bat_destroy(o);
                bat_destroy(u);
-               throw(SQL, "sql.dict_compress", SQLSTATE(HY013) "unknown offset 
type");
+               throw(SQL, "dict.decompress", SQLSTATE(HY013) "unknown offset 
type");
        }
        bat_iterator_end(&oi);
        BBPkeepref(*r = b->batCacheid);
@@ -195,7 +195,7 @@ DICTconvert(Client cntxt, MalBlkPtr mb, 
 
        BAT *o = BATdescriptor(O);
        if (!o)
-               throw(SQL, "sql.dict_compress", SQLSTATE(HY013) 
MAL_MALLOC_FAIL);
+               throw(SQL, "dict.convert", SQLSTATE(HY013) MAL_MALLOC_FAIL);
 
        BAT *b = COLnew(o->hseqbase, rt, BATcount(o), TRANSIENT);
 
@@ -223,3 +223,192 @@ DICTconvert(Client cntxt, MalBlkPtr mb, 
        bat_destroy(o);
        return MAL_SUCCEED;
 }
+
+static void
+BATnegateprops(BAT *b)
+{
+       /* disable all properties here */
+       b->tsorted = false;
+       b->trevsorted = false;
+       b->tnosorted = 0;
+       b->tnorevsorted = 0;
+       b->tseqbase = oid_nil;
+       b->tkey = false;
+       b->tnokey[0] = 0;
+       b->tnokey[1] = 0;
+}
+
+/* renumber lo iff rv0 is sorted and dense directly lookup in rv1
+ *                                     if not dense (ie missing matches on the 
right side), first check (ie output
+ *                                     too large values for a match ie 
BATcount(rv1))
+ * else sort rv0 -> reorder (project) rv1, then lookup etc in rv1
+* */
+static BAT *
+DICTrenumber( BAT *o, BAT *lc, BAT *rc, BUN offcnt)
+{
+       BAT *olc = lc, *orc = rc, *no = NULL;
+       BUN cnt = BATcount(o);
+
+       if (!lc->tsorted) {
+               BAT *nlc = NULL, *order = NULL;
+               int ret = BATsort(&nlc, &order, NULL, lc, NULL, NULL, false, 
false, false);
+               if (ret != GDK_SUCCEED)
+                       return no;
+               BAT *nrc = order;
+
+               if (!nlc || !nrc) {
+                       bat_destroy(nlc);
+                       bat_destroy(nrc);
+                       return no;
+               }
+               lc = nlc;
+               rc = nrc;
+       }
+       /* dense or cheap dense check */
+       if (!BATtdense(lc) && !(lc->tsorted && lc->tkey && BATcount(lc) == 
offcnt && *(oid*)Tloc(lc, offcnt-1) == offcnt-1)) {
+               BAT *nrc = COLnew(0, rc->ttype, offcnt, TRANSIENT);
+
+               /* create map with holes filled in */
+               if (o->ttype == TYPE_bte) {
+                       bte *op = Tloc(nrc, 0);
+                       unsigned char *ip = Tloc(rc, 0);
+                       unsigned char *lp = Tloc(lc, 0);
+                       for(BUN i = 0, j = 0; i<offcnt; i++) {
+                               if (lp[j] > i) {
+                                       op[i] = offcnt;
+                               } else {
+                                       op[i] = ip[j++];
+                               }
+                       }
+               } else if (o->ttype == TYPE_sht) {
+                       sht *op = Tloc(nrc, 0);
+                       unsigned short *ip = Tloc(rc, 0);
+                       unsigned short *lp = Tloc(lc, 0);
+                       for(BUN i = 0, j = 0; i<offcnt; i++) {
+                               if (lp[j] > i) {
+                                       op[i] = offcnt;
+                               } else {
+                                       op[i] = ip[j++];
+                               }
+                       }
+               } else {
+                       assert(0);
+               }
+               if (orc != rc)
+                       bat_destroy(rc);
+               rc = nrc;
+       }
+
+       no = COLnew(o->hseqbase, o->ttype, cnt, TRANSIENT);
+       if (o->ttype == TYPE_bte) {
+               bte *op = Tloc(no, 0);
+               unsigned char *ip = Tloc(o, 0);
+               oid *c = Tloc(rc, 0);
+               for(BUN i = 0; i<cnt; i++) {
+                       op[i] = ip[i]==offcnt?offcnt:c[ip[i]];
+               }
+               BATsetcount(no, cnt);
+               BATnegateprops(no);
+               no->tkey = o->tkey;
+       } else if (o->ttype == TYPE_sht) {
+               sht *op = Tloc(no, 0);
+               unsigned short *ip = Tloc(o, 0);
+               oid *c = Tloc(rc, 0);
+               for(BUN i = 0; i<cnt; i++) {
+                       op[i] = ip[i]==offcnt?offcnt:c[ip[i]];
+               }
+               BATsetcount(no, cnt);
+               BATnegateprops(no);
+               no->tkey = o->tkey;
+       } else {
+               assert(0);
+       }
+       if (olc != lc)
+               bat_destroy(lc);
+       if (orc != rc)
+               bat_destroy(rc);
+       return no;
+}
+
+/* simple join operator with on both sides a (different) dictionary
+ * (r0, r1) = dict.join(lo, lv, ro, rv, lcand, rcand, ... ) */
+str
+DICTjoin(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       (void)cntxt;
+       (void)mb;
+       bat *R0 = getArgReference_bat(stk, pci, 0);
+       bat *R1 = getArgReference_bat(stk, pci, 1);
+       bat LO = *getArgReference_bat(stk, pci, 2);
+       bat LV = *getArgReference_bat(stk, pci, 3);
+       bat RO = *getArgReference_bat(stk, pci, 4);
+       bat RV = *getArgReference_bat(stk, pci, 5);
+       bat LC = *getArgReference_bat(stk, pci, 6);
+       bat RC = *getArgReference_bat(stk, pci, 7);
+       BAT *lc = NULL, *rc = NULL, *r0 = NULL, *r1 = NULL;
+       bit nil_matches = *getArgReference_bit(stk, pci, 8);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to