Changeset: 22d715657720 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=22d715657720
Added Files:
        monetdb5/modules/weldudfs/Makefile.ag
        monetdb5/modules/weldudfs/weld_udfs.c
        monetdb5/modules/weldudfs/weld_udfs.h
Modified Files:
        monetdb5/modules/Makefile.ag
        monetdb5/modules/mal/Makefile.ag
        monetdb5/modules/mal/mal_weld.c
        monetdb5/modules/mal/mal_weld.h
        monetdb5/modules/mal/mal_weld.mal
        monetdb5/modules/mal/mal_weld.mal.sh
        monetdb5/optimizer/opt_prelude.c
        monetdb5/optimizer/opt_prelude.h
        monetdb5/optimizer/opt_weld.c
        monetdb5/tools/Makefile.ag
Branch: mal-weld
Log Message:

weld impl for algebra.join through UDF

On my Fedora machine it only works to call an UDF it's from another library.
So for this purpose I had to create the monetdb5/modules/weldudfs lib.


diffs (truncated from 468 to 300 lines):

diff --git a/monetdb5/modules/Makefile.ag b/monetdb5/modules/Makefile.ag
--- a/monetdb5/modules/Makefile.ag
+++ b/monetdb5/modules/Makefile.ag
@@ -4,4 +4,4 @@
 #
 # Copyright 1997 - July 2008 CWI, August 2008 - 2018 MonetDB B.V.
 
-SUBDIRS = atoms kernel mal 
+SUBDIRS = atoms kernel mal weldudfs
diff --git a/monetdb5/modules/mal/Makefile.ag b/monetdb5/modules/mal/Makefile.ag
--- a/monetdb5/modules/mal/Makefile.ag
+++ b/monetdb5/modules/mal/Makefile.ag
@@ -4,7 +4,7 @@
 #
 # Copyright 1997 - July 2008 CWI, August 2008 - 2018 MonetDB B.V.
 
-INCLUDES = ../../mal ../atoms ../kernel \
+INCLUDES = ../../mal ../atoms ../kernel ../weldudfs \
        ../../../clients/mapilib \
        ../../../common/options \
        ../../../common/stream \
diff --git a/monetdb5/modules/mal/mal_weld.c b/monetdb5/modules/mal/mal_weld.c
--- a/monetdb5/modules/mal/mal_weld.c
+++ b/monetdb5/modules/mal/mal_weld.c
@@ -13,11 +13,9 @@
 #include "mal_instruction.h"
 #include "mal_weld.h"
 #include "weld.h"
+#include "weld_udfs.h"
 
 #define STR_SIZE_INC 4096
-#define OP_GET 0
-#define OP_SET 1
-
 /* Variables in Weld will be named vXX - e.g. v19 
  * */
 
@@ -86,7 +84,7 @@ static str getWeldUTypeFromWidth(int wid
                return "u64";
 }
 
-static void getOrSetStructMember(char **addr, int type, const void *value, int 
op) {
+void getOrSetStructMember(char **addr, int type, const void *value, int op) {
        if (type == TYPE_bte) {
                getOrSetStructMemberImpl(addr, char, value, op);
        } else if (type == TYPE_int) {
@@ -159,6 +157,7 @@ WeldInitState(Client cntxt, MalBlkPtr mb
        wstate->programMaxLen = 1;
        wstate->program = calloc(wstate->programMaxLen, sizeof(char));
        wstate->groupDeps = calloc(mb->vtop, sizeof(InstrPtr));
+       wstate->cudfOutputs = calloc(mb->vtop, sizeof(bit));
        *getArgReference_ptr(stk, pci, 0) = wstate;;
        return MAL_SUCCEED;
 }
@@ -222,6 +221,10 @@ WeldRun(Client cntxt, MalBlkPtr mb, MalS
                        /* Also return the string column base ptr */
                        outputLen += sprintf(outputStmt + outputLen, " 
v%dstr,", getArg(pci, i));
                }
+               if (wstate->cudfOutputs[getArg(pci, i)]) {
+                       /* Output bat produced by an UDF */
+                       outputLen += sprintf(outputStmt + outputLen, " 
v%dbat,", getArg(pci, i));
+               }
        }
 
        outputStmt[0] = '{';
@@ -239,9 +242,6 @@ WeldRun(Client cntxt, MalBlkPtr mb, MalS
 #endif
        weld_module_t m = weld_module_compile(wstate->program, conf, e);
        weld_conf_free(conf);
-       free(wstate->program);
-       free(wstate->groupDeps);
-       free(wstate);
        if (weld_error_code(e)) {
                throw(MAL, "weld.run", PROGRAM_GENERAL ": %s", 
weld_error_message(e));
        }
@@ -259,7 +259,13 @@ WeldRun(Client cntxt, MalBlkPtr mb, MalS
                        bat bid = *getArgReference_bat(stk, pci, i);
                        BAT *b = BATdescriptor(bid);
                        if (b == NULL) throw(MAL, "weld.run", SQLSTATE(HY002) 
RUNTIME_OBJECT_MISSING);
-                       getOrSetStructMember(&inputPtr, TYPE_ptr, 
&b->theap.base, OP_SET);
+                       if (BATtdense(b)) {
+                               /* Hack: store -b->seqbase instead: udfs will 
check if it's a dense bat */
+                               lng seqbase = -b->tseqbase;
+                               getOrSetStructMember(&inputPtr, TYPE_lng, 
&seqbase, OP_SET);
+                       } else {
+                               getOrSetStructMember(&inputPtr, TYPE_ptr, 
&b->theap.base, OP_SET);
+                       }
                        getOrSetStructMember(&inputPtr, TYPE_lng, &b->batCount, 
OP_SET);
                        getOrSetStructMember(&inputPtr, TYPE_lng, &b->hseqbase, 
OP_SET);
                        if (getBatType(type) == TYPE_str) {
@@ -301,7 +307,7 @@ WeldRun(Client cntxt, MalBlkPtr mb, MalS
        char *outputStruct = weld_value_data(result);
        for (i = 0; i < pci->retc; i++) {
                int type = getArgType(mb, pci, i);
-               if (isaBatType(type)) {
+               if (isaBatType(type) && !wstate->cudfOutputs[getArg(pci, i)]) {
                        BAT *b = COLnew(0, getBatType(type), 0, TRANSIENT);
                        getOrSetStructMember(&outputStruct, TYPE_ptr, 
&b->theap.base, OP_GET);
                        getOrSetStructMember(&outputStruct, TYPE_lng, 
&b->batCount, OP_GET);
@@ -340,6 +346,15 @@ WeldRun(Client cntxt, MalBlkPtr mb, MalS
                        b->theap.size = b->batCount << b->tshift;
                        BBPkeepref(b->batCacheid);
                        *getArgReference_bat(stk, pci, i) = b->batCacheid;
+               } else if (isaBatType(type) && wstate->cudfOutputs[getArg(pci, 
i)]) {
+                       /* BAT produced by an UDF, we just get its bat cacheID 
*/
+                       char *base = NULL;
+                       lng count = -1;
+                       bat bid = -1;
+                       getOrSetStructMember(&outputStruct, TYPE_ptr, &base, 
OP_GET);  /* skip */
+                       getOrSetStructMember(&outputStruct, TYPE_lng, &count, 
OP_GET); /* skip */
+                       getOrSetStructMember(&outputStruct, TYPE_bat, &bid, 
OP_GET);
+                       *getArgReference_bat(stk, pci, i) = bid;
                } else {
                        /* TODO handle strings */
                        getOrSetStructMember(&outputStruct, type, 
getArgReference(stk, pci, i), OP_GET);
@@ -353,6 +368,10 @@ WeldRun(Client cntxt, MalBlkPtr mb, MalS
        weld_conf_free(conf);
        weld_module_free(m);
        free(inputStruct);
+       free(wstate->program);
+       free(wstate->groupDeps);
+       free(wstate->cudfOutputs);
+       free(wstate);
 
        return MAL_SUCCEED;
 }
@@ -1242,6 +1261,47 @@ WeldBatcalcIdentity(Client cntxt, MalBlk
 }
 
 str
+WeldAlgebraJoin(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+       (void) cntxt;
+       int leftRet = getArg(pci, 0);                                           
                           /* bat[:oid] */
+       int rightRet = getArg(pci, 1);                                          
                           /* bat[:oid] */
+       int left = getArg(pci, 2);                                              
                                   /* bat[:any_1] */
+       int right = getArg(pci, 3);                                             
                                   /* bat[:any_1] */
+       int sleft = getArg(pci, 4);                                             
                                   /* bat[:oid] */
+       int sright = getArg(pci, 5);                                            
                           /* bat[:oid] */
+       int nilMatches = getArg(pci, 6);                                        
                           /* bit */
+       int estimate = getArg(pci, 7);                                          
                           /* lng */
+       weldState *wstate = *getArgReference_ptr(stk, pci, pci->argc - 1); /* 
has value */
+
+       str any_1 = getWeldType(getBatType(getArgType(mb, pci, 2)));
+       bat sleftBat = *getArgReference_bat(stk, pci, 4);
+       bat srightBat = *getArgReference_bat(stk, pci, 5);
+       char weldStmt[STR_SIZE_INC];
+       if (is_bat_nil(sleftBat) && is_bat_nil(srightBat)) {
+               sprintf(weldStmt,
+               "let joinResult = cudf[weldJoinNoCandList%s, {vec[i64], 
vec[i64], i32, i32}](v%d, v%d, v%d, v%d);",
+               any_1, left, right, nilMatches, estimate);
+       } else {
+               sprintf(weldStmt,
+               "let joinResult = cudf[weldJoin%s, {vec[i64], vec[i64], i32, 
i32}](v%d, v%d, v%d, v%d, v%d, v%d);",
+               any_1, left, right, sleft, sright, nilMatches, estimate);
+       }
+       sprintf(weldStmt + strlen(weldStmt),
+       "let v%d = joinResult.$0;"
+       "let v%dbat = joinResult.$2;"
+       "let v%dhseqbase = 0L;"
+       "let v%d = joinResult.$1;"
+       "let v%dbat = joinResult.$3;"
+       "let v%dhseqbase = 0L;",
+       leftRet, leftRet, leftRet, rightRet, rightRet, rightRet);
+       wstate->cudfOutputs[leftRet] = 1;
+       wstate->cudfOutputs[rightRet] = 1;
+       appendWeldStmt(wstate, weldStmt);
+       return MAL_SUCCEED;
+}
+
+str
 WeldLanguagePass(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        (void) cntxt;
diff --git a/monetdb5/modules/mal/mal_weld.h b/monetdb5/modules/mal/mal_weld.h
--- a/monetdb5/modules/mal/mal_weld.h
+++ b/monetdb5/modules/mal/mal_weld.h
@@ -11,12 +11,17 @@
 #include "mal.h"
 #include "mal_interpreter.h"
 
+#define OP_GET 0
+#define OP_SET 1
+
 typedef struct {
        char *program;
        InstrPtr *groupDeps;
+       bit *cudfOutputs;
        size_t programMaxLen;
 } weldState;
 
+mal_export void getOrSetStructMember(char **addr, int type, const void *value, 
int op);
 mal_export str WeldInitState(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 mal_export str WeldRun(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
 mal_export str WeldAggrSum(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
pci);
@@ -47,6 +52,7 @@ mal_export str WeldBatMtimeYear(Client c
 mal_export str WeldBatMergeCand(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 mal_export str WeldBatMirror(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 mal_export str WeldBatcalcIdentity(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
+mal_export str WeldAlgebraJoin(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 mal_export str WeldLanguagePass(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 
 #endif
diff --git a/monetdb5/modules/mal/mal_weld.mal 
b/monetdb5/modules/mal/mal_weld.mal
--- a/monetdb5/modules/mal/mal_weld.mal
+++ b/monetdb5/modules/mal/mal_weld.mal
@@ -109,6 +109,10 @@ pattern batcalcidentity(b:bat[:any_2], w
 address WeldBatcalcIdentity
 comment "batcalc.identity";
 
+pattern algebrajoin(l:bat[:any_1], r:bat[:any_1], sl:bat[:oid], sr:bat[:oid], 
nil_matches:bit, estimate:lng, wstate:ptr) (X_0:bat[:oid], X_1:bat[:oid])
+address WeldAlgebraJoin
+comment "algebra.join"
+
 pattern aggrsum(b:bat[:bte], wstate:ptr):bte
 address WeldAggrSum
 comment "aggr.sum";
diff --git a/monetdb5/modules/mal/mal_weld.mal.sh 
b/monetdb5/modules/mal/mal_weld.mal.sh
--- a/monetdb5/modules/mal/mal_weld.mal.sh
+++ b/monetdb5/modules/mal/mal_weld.mal.sh
@@ -119,6 +119,10 @@ pattern batcalcidentity(b:bat[:any_2], w
 address WeldBatcalcIdentity
 comment "batcalc.identity";
 
+pattern algebrajoin(l:bat[:any_1], r:bat[:any_1], sl:bat[:oid], sr:bat[:oid], 
nil_matches:bit, estimate:lng, wstate:ptr) (X_0:bat[:oid], X_1:bat[:oid])
+address WeldAlgebraJoin
+comment "algebra.join"
+
 EOF
 
 for tp in ${numeric[@]}; do
diff --git a/monetdb5/modules/weldudfs/Makefile.ag 
b/monetdb5/modules/weldudfs/Makefile.ag
new file mode 100644
--- /dev/null
+++ b/monetdb5/modules/weldudfs/Makefile.ag
@@ -0,0 +1,19 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0.  If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+#
+# Copyright 1997 - July 2008 CWI, August 2008 - 2018 MonetDB B.V.
+
+INCLUDES = ../../mal ../atoms ../kernel ../mal \
+       ../../../common/options \
+       ../../../common/stream \
+       ../../../common/utils \
+       ../../../gdk 
+
+MTSAFE
+
+lib_weldudfs = {
+       NOINST
+       SOURCES = \
+               weld_udfs.c weld_udfs.h
+}
diff --git a/monetdb5/modules/weldudfs/weld_udfs.c 
b/monetdb5/modules/weldudfs/weld_udfs.c
new file mode 100644
--- /dev/null
+++ b/monetdb5/modules/weldudfs/weld_udfs.c
@@ -0,0 +1,104 @@
+#include "monetdb_config.h"
+#include "gdk.h"
+#include "weld_udfs.h"
+#include "mal_weld.h"
+#include "algebra.h"
+
+#define denseFor(PTR, TYPE, START, END) \
+       do {                                \
+               TYPE *tptr = (TYPE *)PTR;       \
+               oid i;                          \
+               for (i = START; i < END; i++) { \
+                       tptr[i - START] = i;        \
+               }                               \
+       } while (0)
+
+static BAT *weldVecToBat(char *data, lng length, int type) {
+       lng seq = (lng)data;
+       if (seq <= 0) {
+               /* Dense BAT hack */
+               return BATdense(0, -seq, length);
+       } else {
+               BAT *b = COLnew(0, type, 0, TRANSIENT);
+               b->theap.base = data;
+               b->batCount = length;
+               if (b->batCount == 0) b->theap.base = NULL;
+               b->batCapacity = b->batCount;
+               b->theap.storage = STORE_NOWN;
+               b->tsorted = b->trevsorted = 0;
+               b->theap.free = b->batCount << b->tshift;
+               b->theap.size = b->batCount << b->tshift;
+               return b;
+       }
+}
+
+static BAT *replaceDenseBat(BAT *b, int type) {
+       if (!BATtdense(b)) return b;
+       BAT *bn = COLnew(0, type, b->batCount, TRANSIENT);
+       if (type == TYPE_bte) denseFor(Tloc(bn, 0), char, b->tseqbase, 
b->tseqbase + b->batCount);
+       if (type == TYPE_int) denseFor(Tloc(bn, 0), int, b->tseqbase, 
b->tseqbase + b->batCount);
+       if (type == TYPE_lng) denseFor(Tloc(bn, 0), lng, b->tseqbase, 
b->tseqbase + b->batCount);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to