Changeset: bafd08bb1cf1 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=bafd08bb1cf1
Modified Files:
        gdk/gdk_join.c
        gdk/gdk_private.h
        gdk/gdk_search.c
Branch: default
Log Message:

Reduce number of binary search implementations.
The one we use now doesn't do any scan over equal values, instead it
continues the binary search until the first/last value is found.


diffs (truncated from 1090 to 300 lines):

diff --git a/gdk/gdk_join.c b/gdk/gdk_join.c
--- a/gdk/gdk_join.c
+++ b/gdk/gdk_join.c
@@ -182,251 +182,6 @@ joininitresults(BAT **r1p, BAT **r2p, BU
                         s##vals + ((x) * s##width))
 #define FVALUE(s, x)   (s##vals + ((x) * s##width))
 
-#define BINSEARCHFUNC(TYPE)                                            \
-static inline BUN                                                      \
-binsearch_##TYPE(const oid *rcand, oid offset, const TYPE *rvals,      \
-                BUN lo, BUN hi, TYPE v, int ordering, int last)        \
-{                                                                      \
-       BUN mid;                                                        \
-       TYPE x;                                                         \
-                                                                       \
-       assert(ordering == 1 || ordering == -1);                        \
-       assert(lo <= hi);                                               \
-                                                                       \
-       if (ordering > 0) {                                             \
-               if (rcand) {                                            \
-                       if (last) {                                     \
-                               if ((x = rvals[rcand[lo] - offset]) > v) \
-                                       return lo;                      \
-                               if ((x = rvals[rcand[hi] - offset]) < v || \
-                                   x == v)                             \
-                                       return hi + 1;                  \
-                                                                       \
-                               /* loop invariant: */                   \
-                               /* value@lo <= v < value@hi */          \
-                               while (hi - lo > 1) {                   \
-                                       mid = (hi + lo) / 2;            \
-                                       if (rvals[rcand[mid] - offset] > v) \
-                                               hi = mid;               \
-                                       else                            \
-                                               lo = mid;               \
-                               }                                       \
-                       } else {                                        \
-                               if ((x = rvals[rcand[lo] - offset]) > v || \
-                                   x == v)                             \
-                                       return lo;                      \
-                               if ((x = rvals[rcand[hi] - offset]) < v) \
-                                       return hi + 1;                  \
-                                                                       \
-                               /* loop invariant: */                   \
-                               /* value@lo < v <= value@hi */          \
-                               while (hi - lo > 1) {                   \
-                                       mid = (hi + lo) / 2;            \
-                                       if (rvals[rcand[mid] - offset] >= v) \
-                                               hi = mid;               \
-                                       else                            \
-                                               lo = mid;               \
-                               }                                       \
-                       }                                               \
-               } else {                                                \
-                       if (last) {                                     \
-                               if ((x = rvals[lo]) > v)                \
-                                       return lo;                      \
-                               if ((x = rvals[hi]) < v || x == v)      \
-                                       return hi + 1;                  \
-                                                                       \
-                               /* loop invariant: */                   \
-                               /* value@lo <= v < value@hi */          \
-                               while (hi - lo > 1) {                   \
-                                       mid = (hi + lo) / 2;            \
-                                       if (rvals[mid] > v)             \
-                                               hi = mid;               \
-                                       else                            \
-                                               lo = mid;               \
-                               }                                       \
-                       } else {                                        \
-                               if ((x = rvals[lo]) > v || x == v)      \
-                                       return lo;                      \
-                               if ((x = rvals[hi]) < v)                \
-                                       return hi + 1;                  \
-                                                                       \
-                               /* loop invariant: */                   \
-                               /* value@lo < v <= value@hi */          \
-                               while (hi - lo > 1) {                   \
-                                       mid = (hi + lo) / 2;            \
-                                       if (rvals[mid] >= v)            \
-                                               hi = mid;               \
-                                       else                            \
-                                               lo = mid;               \
-                               }                                       \
-                       }                                               \
-               }                                                       \
-       } else {                                                        \
-               if (rcand) {                                            \
-                       if (last) {                                     \
-                               if ((x = rvals[rcand[lo] - offset]) < v) \
-                                       return lo;                      \
-                               if ((x = rvals[rcand[hi] - offset]) > v || \
-                                   x == v)                             \
-                                       return hi + 1;                  \
-                                                                       \
-                               /* loop invariant: */                   \
-                               /* value@lo >= v > value@hi */          \
-                               while (hi - lo > 1) {                   \
-                                       mid = (hi + lo) / 2;            \
-                                       if (rvals[rcand[mid] - offset] < v) \
-                                               hi = mid;               \
-                                       else                            \
-                                               lo = mid;               \
-                               }                                       \
-                       } else {                                        \
-                               if ((x = rvals[rcand[lo] - offset]) < v || \
-                                   x == v)                             \
-                                       return lo;                      \
-                               if ((x = rvals[rcand[hi] - offset]) > v) \
-                                       return hi + 1;                  \
-                                                                       \
-                               /* loop invariant: */                   \
-                               /* value@lo > v >= value@hi */          \
-                               while (hi - lo > 1) {                   \
-                                       mid = (hi + lo) / 2;            \
-                                       if (rvals[rcand[mid] - offset] <= v) \
-                                               hi = mid;               \
-                                       else                            \
-                                               lo = mid;               \
-                               }                                       \
-                       }                                               \
-               } else {                                                \
-                       if (last) {                                     \
-                               if ((x = rvals[lo]) < v)                \
-                                       return lo;                      \
-                               if ((x = rvals[hi]) > v || x == v)      \
-                                       return hi + 1;                  \
-                                                                       \
-                               /* loop invariant: */                   \
-                               /* value@lo >= v > value@hi */          \
-                               while (hi - lo > 1) {                   \
-                                       mid = (hi + lo) / 2;            \
-                                       if (rvals[mid] < v)             \
-                                               hi = mid;               \
-                                       else                            \
-                                               lo = mid;               \
-                               }                                       \
-                       } else {                                        \
-                               if ((x = rvals[lo]) < v || x == v)      \
-                                       return lo;                      \
-                               if ((x = rvals[hi]) > v)                \
-                                       return hi + 1;                  \
-                                                                       \
-                               /* loop invariant: */                   \
-                               /* value@lo > v >= value@hi */          \
-                               while (hi - lo > 1) {                   \
-                                       mid = (hi + lo) / 2;            \
-                                       if (rvals[mid] <= v)            \
-                                               hi = mid;               \
-                                       else                            \
-                                               lo = mid;               \
-                               }                                       \
-                       }                                               \
-               }                                                       \
-       }                                                               \
-       return hi;                                                      \
-}
-
-BINSEARCHFUNC(bte)
-BINSEARCHFUNC(sht)
-BINSEARCHFUNC(int)
-BINSEARCHFUNC(lng)
-#ifdef HAVE_HGE
-BINSEARCHFUNC(hge)
-#endif
-BINSEARCHFUNC(flt)
-BINSEARCHFUNC(dbl)
-#if SIZEOF_OID == SIZEOF_INT
-#define binsearch_oid(rcand, offset, rvals, lo, hi, v, ordering, last) 
binsearch_int(rcand, offset, (const int *) rvals, lo, hi, (int) (v), ordering, 
last)
-#endif
-#if SIZEOF_OID == SIZEOF_LNG
-#define binsearch_oid(rcand, offset, rvals, lo, hi, v, ordering, last) 
binsearch_lng(rcand, offset, (const lng *) rvals, lo, hi, (lng) (v), ordering, 
last)
-#endif
-
-/* Do a binary search for the first/last occurrence of v between lo and hi
- * (lo inclusive, hi not inclusive) in rvals/rvars.
- * If last is set, return the index of the first value > v; if last is
- * not set, return the index of the first value >= v.
- * If ordering is -1, the values are sorted in reverse order and hence
- * all comparisons are reversed.
- */
-static BUN
-binsearch(const oid *rcand, oid offset,
-         int type, const char *rvals, const char *rvars,
-         int rwidth, BUN lo, BUN hi, const char *v,
-         int (*cmp)(const void *, const void *), int ordering, int last)
-{
-       BUN mid;
-       int c;
-
-       assert(ordering == 1 || ordering == -1);
-       assert(lo < hi);
-
-       --hi;                   /* now hi is inclusive */
-
-       switch (type) {
-       case TYPE_bte:
-               return binsearch_bte(rcand, offset, (const bte *) rvals,
-                                    lo, hi, *(const bte *) v, ordering, last);
-       case TYPE_sht:
-               return binsearch_sht(rcand, offset, (const sht *) rvals,
-                                    lo, hi, *(const sht *) v, ordering, last);
-       case TYPE_int:
-#if SIZEOF_OID == SIZEOF_INT
-       case TYPE_oid:
-#endif
-               return binsearch_int(rcand, offset, (const int *) rvals,
-                                    lo, hi, *(const int *) v, ordering, last);
-       case TYPE_lng:
-#if SIZEOF_OID == SIZEOF_LNG
-       case TYPE_oid:
-#endif
-               return binsearch_lng(rcand, offset, (const lng *) rvals,
-                                    lo, hi, *(const lng *) v, ordering, last);
-#ifdef HAVE_HGE
-       case TYPE_hge:
-               return binsearch_hge(rcand, offset, (const hge *) rvals,
-                                    lo, hi, *(const hge *) v, ordering, last);
-#endif
-       case TYPE_flt:
-               return binsearch_flt(rcand, offset, (const flt *) rvals,
-                                    lo, hi, *(const flt *) v, ordering, last);
-       case TYPE_dbl:
-               return binsearch_dbl(rcand, offset, (const dbl *) rvals,
-                                    lo, hi, *(const dbl *) v, ordering, last);
-       }
-
-       if ((c = ordering * cmp(VALUE(r, rcand ? rcand[lo] - offset : lo), v)) 
> 0 ||
-           (!last && c == 0))
-               return lo;
-       if ((c = ordering * cmp(VALUE(r, rcand ? rcand[hi] - offset : hi), v)) 
< 0 ||
-           (last && c == 0))
-               return hi + 1;
-
-       /* loop invariant:
-        * last ? value@lo <= v < value@hi : value@lo < v <= value@hi
-        *
-        * This version does some more work in the inner loop than the
-        * type-expanded versions (ordering and rcand checks) but is
-        * slow due to the function call and the needed check for
-        * rvars (in VALUE()) already, so we're beyond caring. */
-       while (hi - lo > 1) {
-               mid = (hi + lo) / 2;
-               if ((c = ordering * cmp(VALUE(r, rcand ? rcand[mid] - offset : 
mid), v)) > 0 ||
-                   (!last && c == 0))
-                       hi = mid;
-               else
-                       lo = mid;
-       }
-       return hi;
-}
-
 #define APPEND(b, o)           (((oid *) b->theap.base)[b->batCount++] = (o))
 
 static gdk_return
@@ -520,6 +275,13 @@ nomatch(BAT *r1, BAT *r2, BAT *l, BAT *r
        return GDK_FAIL;
 }
 
+#if SIZEOF_OID == SIZEOF_INT
+#define binsearch_oid(indir, offset, vals, lo, hi, v, ordering, last) 
binsearch_int(indir, offset, (const int *) vals, lo, hi, (int) (v), ordering, 
last)
+#endif
+#if SIZEOF_OID == SIZEOF_LNG
+#define binsearch_oid(indir, offset, vals, lo, hi, v, ordering, last) 
binsearch_lng(indir, offset, (const lng *) vals, lo, hi, (lng) (v), ordering, 
last)
+#endif
+
 static gdk_return
 mergejoin_void(BAT *r1, BAT *r2, BAT *l, BAT *r, BAT *sl, BAT *sr,
               int nil_on_miss, int only_misses, lng t0)
@@ -1832,7 +1594,7 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
                                                        l->ttype, lvals, lvars,
                                                        lwidth, lscan,
                                                        (BUN) (lcandend - 
lcand), v,
-                                                       cmp, lordering, 0);
+                                                       lordering, 0);
                                        lcand += nlx;
                                        if (lcand == lcandend)
                                                v = NULL;
@@ -1847,7 +1609,7 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
                                                           lwidth,
                                                           lstart + lscan,
                                                           lend, v,
-                                                          cmp, lordering, 0);
+                                                          lordering, 0);
                                        nlx = lstart - nlx;
                                        if (lstart == lend)
                                                v = NULL;
@@ -1943,7 +1705,7 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
                                   cmp(v, VALUE(l, lcand[lscan] - l->hseqbase)) 
== 0) {
                                /* lots of equal values: use binary
                                 * search to find end */
-                               nl = binsearch(lcand, l->hseqbase, l->ttype, 
lvals, lvars, lwidth, lscan, (BUN) (lcandend - lcand), v, cmp, lordering, 1);
+                               nl = binsearch(lcand, l->hseqbase, l->ttype, 
lvals, lvars, lwidth, lscan, (BUN) (lcandend - lcand), v, lordering, 1);
                                lcand += nl;
                        } else {
                                while (++lcand < lcandend &&
@@ -1972,7 +1734,7 @@ mergejoin(BAT *r1, BAT *r2, BAT *l, BAT 
                                         * end */
                                        nl = binsearch(NULL, 0, l->ttype, 
lvals, lvars,
                                                       lwidth, lstart + lscan,
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to