Changeset: 47c685b65a8a for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=47c685b65a8a
Modified Files:
gdk/gdk_select.c
Branch: Feb2013
Log Message:
Reduce the number of different cases for scanselect significantly.
We do this by noting that for each value v in any of the numeric
domains that we deal with, there is a value v' that can be calculated
simply (using a standard library function for float and double) so
that the expressions x < v and x <= v' are equivalent (similarly for x
> v).
diffs (truncated from 609 to 300 lines):
diff --git a/gdk/gdk_select.c b/gdk/gdk_select.c
--- a/gdk/gdk_select.c
+++ b/gdk/gdk_select.c
@@ -20,6 +20,7 @@
#include "monetdb_config.h"
#include "gdk.h"
#include "gdk_private.h"
+#include <math.h>
#define buninsfix(B,C,A,I,T,V,G,M,R) \
do { \
@@ -199,172 +200,244 @@ do {
\
} \
} while (0)
-/* scan select predicate switch */
-#define scantest(CAND,READ,COMP,NIL1,NIL2) \
-do { \
- if (equi ) { \
- scanloop(CAND,READ, COMP(v,==,vl) ); \
- } else \
- if ( anti && b->T->nonil && lval && !li && hval && !hi) { \
- scanloop(CAND,READ,(COMP(v,<=,vl) || COMP(v,>=,vh)) ); \
- } else \
- if ( anti && b->T->nonil && lval && !li && hval && hi) { \
- scanloop(CAND,READ,(COMP(v,<=,vl) || COMP(v,> ,vh)) ); \
- } else \
- if ( anti && b->T->nonil && lval && li && hval && !hi) { \
- scanloop(CAND,READ,(COMP(v,< ,vl) || COMP(v,>=,vh)) ); \
- } else \
- if ( anti && b->T->nonil && lval && li && hval && hi) { \
- scanloop(CAND,READ,(COMP(v,< ,vl) || COMP(v,> ,vh)) ); \
- } else \
- if ( anti && b->T->nonil && lval && !li && !hval ) { \
- scanloop(CAND,READ,(COMP(v,<=,vl) ) ); \
- } else \
- if ( anti && b->T->nonil && lval && li && !hval ) { \
- scanloop(CAND,READ,(COMP(v,< ,vl) ) ); \
- } else \
- if ( anti && b->T->nonil && !lval && hval && !hi) { \
- scanloop(CAND,READ,( COMP(v,>=,vh)) ); \
- } else \
- if ( anti && b->T->nonil && !lval && hval && hi) { \
- scanloop(CAND,READ,( COMP(v,> ,vh)) ); \
- } else \
- if ( anti && !b->T->nonil && lval && !li && hval && !hi) { \
- scanloop(CAND,READ,(COMP(v,<=,vl) || COMP(v,>=,vh)) NIL1(v)); \
- } else \
- if ( anti && !b->T->nonil && lval && !li && hval && hi) { \
- scanloop(CAND,READ,(COMP(v,<=,vl) || COMP(v,> ,vh)) NIL1(v)); \
- } else \
- if ( anti && !b->T->nonil && lval && li && hval && !hi) { \
- scanloop(CAND,READ,(COMP(v,< ,vl) || COMP(v,>=,vh)) NIL1(v)); \
- } else \
- if ( anti && !b->T->nonil && lval && li && hval && hi) { \
- scanloop(CAND,READ,(COMP(v,< ,vl) || COMP(v,> ,vh)) NIL1(v)); \
- } else \
- if ( anti && !b->T->nonil && lval && !li && !hval ) { \
- scanloop(CAND,READ,(COMP(v,<=,vl) ) NIL1(v)); \
- } else \
- if ( anti && !b->T->nonil && lval && li && !hval ) { \
- scanloop(CAND,READ,(COMP(v,< ,vl) ) NIL1(v)); \
- } else \
- if ( anti && !b->T->nonil && !lval && hval && !hi) { \
- scanloop(CAND,READ,( COMP(v,>=,vh)) NIL2(v)); \
- } else \
- if ( anti && !b->T->nonil && !lval && hval && hi) { \
- scanloop(CAND,READ,( COMP(v,> ,vh)) NIL2(v)); \
- } else \
- if (!anti && b->T->nonil && lval && !li && hval && !hi) { \
- scanloop(CAND,READ,(COMP(v,> ,vl) && COMP(v,< ,vh)) ); \
- } else \
- if (!anti && b->T->nonil && lval && !li && hval && hi) { \
- scanloop(CAND,READ,(COMP(v,> ,vl) && COMP(v,<=,vh)) ); \
- } else \
- if (!anti && b->T->nonil && lval && li && hval && !hi) { \
- scanloop(CAND,READ,(COMP(v,>=,vl) && COMP(v,< ,vh)) ); \
- } else \
- if (!anti && b->T->nonil && lval && li && hval && hi) { \
- scanloop(CAND,READ,(COMP(v,>=,vl) && COMP(v,<=,vh)) ); \
- } else \
- if (!anti && b->T->nonil && lval && !li && !hval ) { \
- scanloop(CAND,READ,(COMP(v,> ,vl) ) ); \
- } else \
- if (!anti && b->T->nonil && lval && li && !hval ) { \
- scanloop(CAND,READ,(COMP(v,>=,vl) ) ); \
- } else \
- if (!anti && b->T->nonil && !lval && hval && !hi) { \
- scanloop(CAND,READ,( COMP(v,< ,vh)) ); \
- } else \
- if (!anti && b->T->nonil && !lval && hval && hi) { \
- scanloop(CAND,READ,( COMP(v,<=,vh)) ); \
- } else \
- if (!anti && !b->T->nonil && lval && !li && hval && !hi) { \
- scanloop(CAND,READ,(COMP(v,> ,vl) && COMP(v,< ,vh)) NIL2(v)); \
- } else \
- if (!anti && !b->T->nonil && lval && !li && hval && hi) { \
- scanloop(CAND,READ,(COMP(v,> ,vl) && COMP(v,<=,vh)) NIL2(v)); \
- } else \
- if (!anti && !b->T->nonil && lval && li && hval && !hi) { \
- scanloop(CAND,READ,(COMP(v,>=,vl) && COMP(v,< ,vh)) NIL2(v)); \
- } else \
- if (!anti && !b->T->nonil && lval && li && hval && hi) { \
- scanloop(CAND,READ,(COMP(v,>=,vl) && COMP(v,<=,vh)) NIL2(v)); \
- } else \
- if (!anti && !b->T->nonil && lval && !li && !hval ) { \
- scanloop(CAND,READ,(COMP(v,> ,vl) ) NIL2(v)); \
- } else \
- if (!anti && !b->T->nonil && lval && li && !hval ) { \
- scanloop(CAND,READ,(COMP(v,>=,vl) ) NIL2(v)); \
- } else \
- if (!anti && !b->T->nonil && !lval && hval && !hi) { \
- scanloop(CAND,READ,( COMP(v,< ,vh)) NIL1(v)); \
- } else \
- if (!anti && !b->T->nonil && !lval && hval && hi) { \
- scanloop(CAND,READ,( COMP(v,<=,vh)) NIL1(v)); \
- } else \
- if (!anti && !b->T->nonil && !lval && !hval ) { \
- scanloop(CAND,READ, COMP(v,!=,nil) ); \
- } else \
- assert(0); \
-} while (0)
-
-/* local variables for known fixed-width types */
-#define scaninit_fix(TYPE) \
- TYPE vl = *(TYPE *) tl; \
- TYPE vh = *(TYPE *) th; \
- TYPE v; \
- TYPE nil = TYPE##_nil; \
- const TYPE *src = (const TYPE *) Tloc(b, 0);
-
-/* local variables for generic types */
-#define scaninit_var(TYPE) \
- const void *vl = tl; \
- const void *vh = th; \
- const void *v; \
- const void *nil = ATOMnilptr(b->ttype); \
- int (*cmp)(const void *, const void *) = BATatoms[b->ttype].atomCmp;\
- BATiter bi = bat_iterator(b); \
-
-/* various comparison calls for known fixed-width types */
-#define scancomp_fix(l,o,r) (l) o (r)
-#define scannil1_fix(v) && scancomp_fix(v,!=,nil)
-#define scannil2_fix(v)
-
-/* various comparison calls for generic types */
-#define scancomp_var(l,o,r) (*cmp)((l),(r)) o 0
-#define scannil1_var(v) && scancomp_var(v,!=,nil)
-#define scannil2_var(v) scannil1_var(v)
-
/* argument list for type-specific core scan select function call */
#define scanargs \
b, s, bn, tl, th, li, hi, equi, anti, lval, hval, \
p, q, cnt, off, dst, candlist, maximum
+#define PREVVALUEbte(x) ((x) - 1)
+#define PREVVALUEsht(x) ((x) - 1)
+#define PREVVALUEint(x) ((x) - 1)
+#define PREVVALUElng(x) ((x) - 1)
+#define PREVVALUEoid(x) ((x) - 1)
+#define PREVVALUEflt(x) nextafterf((x), -HUGE_VALF)
+#define PREVVALUEdbl(x) nextafter((x), -HUGE_VAL)
+
+#define NEXTVALUEbte(x) ((x) + 1)
+#define NEXTVALUEsht(x) ((x) + 1)
+#define NEXTVALUEint(x) ((x) + 1)
+#define NEXTVALUElng(x) ((x) + 1)
+#define NEXTVALUEoid(x) ((x) + 1)
+#define NEXTVALUEflt(x) nextafterf((x), HUGE_VALF)
+#define NEXTVALUEdbl(x) nextafter((x), HUGE_VAL)
+
+#define MINVALUEbte NEXTVALUEbte(GDK_bte_min)
+#define MINVALUEsht NEXTVALUEsht(GDK_sht_min)
+#define MINVALUEint NEXTVALUEint(GDK_int_min)
+#define MINVALUElng NEXTVALUElng(GDK_lng_min)
+#define MINVALUEoid GDK_oid_min
+#define MINVALUEflt NEXTVALUEflt(GDK_flt_min)
+#define MINVALUEdbl NEXTVALUEdbl(GDK_dbl_min)
+
+#define MAXVALUEbte GDK_bte_max
+#define MAXVALUEsht GDK_sht_max
+#define MAXVALUEint GDK_int_max
+#define MAXVALUElng GDK_lng_max
+#define MAXVALUEoid GDK_oid_max
+#define MAXVALUEflt GDK_flt_max
+#define MAXVALUEdbl GDK_dbl_max
+
+
/* definition of type-specific core scan select function */
-#define scanfunc(NAME,WHAT,TYPE,CAND,READ) \
+#define scanfunc(NAME, TYPE, CAND) \
static BUN \
NAME##_##TYPE (BAT *b, BAT *s, BAT *bn, const void *tl, const void *th,
\
int li, int hi, int equi, int anti, int lval, int hval, \
BUN r, BUN q, BUN cnt, wrd off, oid *dst, \
const oid *candlist, BUN maximum) \
{ \
- scaninit_##WHAT ( TYPE ) \
+ TYPE vl = *(const TYPE *) tl; \
+ TYPE vh = *(const TYPE *) th; \
+ TYPE v; \
+ TYPE nil = TYPE##_nil; \
+ const TYPE *src = (const TYPE *) Tloc(b, 0); \
oid o; \
BUN p = r; \
(void) candlist; \
- scantest(CAND, READ, scancomp_##WHAT, \
- scannil1_##WHAT, scannil2_##WHAT); \
+ (void) li; \
+ (void) hi; \
+ (void) lval; \
+ (void) hval; \
+ if (equi) { \
+ scanloop(CAND,v = src[o-off], v == vl); \
+ } else if (anti) { \
+ if (b->T->nonil) { \
+ scanloop(CAND,v = src[o-off],(v <= vl || v >= vh)); \
+ } else { \
+ scanloop(CAND,v = src[o-off],(v <= vl || v >= vh) && v
!= nil); \
+ } \
+ } else if (b->T->nonil && vl == MINVALUE##TYPE) { \
+ scanloop(CAND,v = src[o-off],v <= vh); \
+ } else if (vh == MAXVALUE##TYPE) { \
+ scanloop(CAND,v = src[o-off],v >= vl); \
+ } else { \
+ scanloop(CAND,v = src[o-off],v >= vl && v <= vh); \
+ } \
return cnt; \
}
+static BUN
+candscan_any (BAT *b, BAT *s, BAT *bn, const void *tl, const void *th,
+ int li, int hi, int equi, int anti, int lval, int hval,
+ BUN r, BUN q, BUN cnt, wrd off, oid *dst,
+ const oid *candlist, BUN maximum)
+{
+ const void *v;
+ const void *nil = ATOMnilptr(b->ttype);
+ int (*cmp)(const void *, const void *) = BATatoms[b->ttype].atomCmp;
+ BATiter bi = bat_iterator(b);
+ oid o;
+ BUN p = r;
+ int c;
+
+ (void) maximum;
+ if (equi) {
+ ALGODEBUG fprintf(stderr,
+ "#BATsubselect(b=%s#"BUNFMT",s=%s,anti=%d): "
+ "scanselect equi\n", BATgetId(b), BATcount(b),
+ BATgetId(s), anti);
+ while (p < q) {
+ o = *candlist++;
+ v = BUNtail(bi,o-off);
+ buninsfix(bn, T, dst, cnt, oid, o,
+ (BUN) ((dbl) cnt / (dbl) (p-r)
+ * (dbl) (q-p) * 1.1 + 1024),
+ BATcapacity(bn) + q - p, BUN_NONE);
+ cnt += ((*cmp)(tl, v) == 0);
+ p++;
+ }
+ } else if (anti) {
+ ALGODEBUG fprintf(stderr,
+ "#BATsubselect(b=%s#"BUNFMT",s=%s,anti=%d): "
+ "scanselect anti\n", BATgetId(b), BATcount(b),
+ BATgetId(s), anti);
+ while (p < q) {
+ o = *candlist++;
+ v = BUNtail(bi,o-off);
+ buninsfix(bn, T, dst, cnt, oid, o,
+ (BUN) ((dbl) cnt / (dbl) (p-r)
+ * (dbl) (q-p) * 1.1 + 1024),
+ BATcapacity(bn) + q - p, BUN_NONE);
+ cnt += ((nil == NULL || (*cmp)(v, nil) != 0) &&
+ ((lval &&
+ ((c = (*cmp)(tl, v)) > 0 ||
+ (!li && c == 0))) ||
+ (hval &&
+ ((c = (*cmp)(th, v)) < 0 ||
+ (!hi && c == 0)))));
+ p++;
+ }
+ } else {
+ ALGODEBUG fprintf(stderr,
+ "#BATsubselect(b=%s#"BUNFMT",s=%s,anti=%d): "
+ "scanselect range\n", BATgetId(b),
BATcount(b),
+ BATgetId(s), anti);
+ while (p < q) {
+ o = *candlist++;
+ v = BUNtail(bi,o-off);
+ buninsfix(bn, T, dst, cnt, oid, o,
+ (BUN) ((dbl) cnt / (dbl) (p-r)
+ * (dbl) (q-p) * 1.1 + 1024),
+ BATcapacity(bn) + q - p, BUN_NONE);
+ cnt += ((nil == NULL || (*cmp)(v, nil) != 0) &&
+ ((!lval ||
+ (c = cmp(tl, v)) < 0 ||
+ (li && c == 0)) &&
+ (!hval ||
+ (c = cmp(th, v)) > 0 ||
+ (hi && c == 0))));
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list