Changeset: 22d715657720 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=22d715657720
Added Files:
monetdb5/modules/weldudfs/Makefile.ag
monetdb5/modules/weldudfs/weld_udfs.c
monetdb5/modules/weldudfs/weld_udfs.h
Modified Files:
monetdb5/modules/Makefile.ag
monetdb5/modules/mal/Makefile.ag
monetdb5/modules/mal/mal_weld.c
monetdb5/modules/mal/mal_weld.h
monetdb5/modules/mal/mal_weld.mal
monetdb5/modules/mal/mal_weld.mal.sh
monetdb5/optimizer/opt_prelude.c
monetdb5/optimizer/opt_prelude.h
monetdb5/optimizer/opt_weld.c
monetdb5/tools/Makefile.ag
Branch: mal-weld
Log Message:
weld impl for algebra.join through UDF
On my Fedora machine it only works to call an UDF it's from another library.
So for this purpose I had to create the monetdb5/modules/weldudfs lib.
diffs (truncated from 468 to 300 lines):
diff --git a/monetdb5/modules/Makefile.ag b/monetdb5/modules/Makefile.ag
--- a/monetdb5/modules/Makefile.ag
+++ b/monetdb5/modules/Makefile.ag
@@ -4,4 +4,4 @@
#
# Copyright 1997 - July 2008 CWI, August 2008 - 2018 MonetDB B.V.
-SUBDIRS = atoms kernel mal
+SUBDIRS = atoms kernel mal weldudfs
diff --git a/monetdb5/modules/mal/Makefile.ag b/monetdb5/modules/mal/Makefile.ag
--- a/monetdb5/modules/mal/Makefile.ag
+++ b/monetdb5/modules/mal/Makefile.ag
@@ -4,7 +4,7 @@
#
# Copyright 1997 - July 2008 CWI, August 2008 - 2018 MonetDB B.V.
-INCLUDES = ../../mal ../atoms ../kernel \
+INCLUDES = ../../mal ../atoms ../kernel ../weldudfs \
../../../clients/mapilib \
../../../common/options \
../../../common/stream \
diff --git a/monetdb5/modules/mal/mal_weld.c b/monetdb5/modules/mal/mal_weld.c
--- a/monetdb5/modules/mal/mal_weld.c
+++ b/monetdb5/modules/mal/mal_weld.c
@@ -13,11 +13,9 @@
#include "mal_instruction.h"
#include "mal_weld.h"
#include "weld.h"
+#include "weld_udfs.h"
#define STR_SIZE_INC 4096
-#define OP_GET 0
-#define OP_SET 1
-
/* Variables in Weld will be named vXX - e.g. v19
* */
@@ -86,7 +84,7 @@ static str getWeldUTypeFromWidth(int wid
return "u64";
}
-static void getOrSetStructMember(char **addr, int type, const void *value, int
op) {
+void getOrSetStructMember(char **addr, int type, const void *value, int op) {
if (type == TYPE_bte) {
getOrSetStructMemberImpl(addr, char, value, op);
} else if (type == TYPE_int) {
@@ -159,6 +157,7 @@ WeldInitState(Client cntxt, MalBlkPtr mb
wstate->programMaxLen = 1;
wstate->program = calloc(wstate->programMaxLen, sizeof(char));
wstate->groupDeps = calloc(mb->vtop, sizeof(InstrPtr));
+ wstate->cudfOutputs = calloc(mb->vtop, sizeof(bit));
*getArgReference_ptr(stk, pci, 0) = wstate;;
return MAL_SUCCEED;
}
@@ -222,6 +221,10 @@ WeldRun(Client cntxt, MalBlkPtr mb, MalS
/* Also return the string column base ptr */
outputLen += sprintf(outputStmt + outputLen, "
v%dstr,", getArg(pci, i));
}
+ if (wstate->cudfOutputs[getArg(pci, i)]) {
+ /* Output bat produced by an UDF */
+ outputLen += sprintf(outputStmt + outputLen, "
v%dbat,", getArg(pci, i));
+ }
}
outputStmt[0] = '{';
@@ -239,9 +242,6 @@ WeldRun(Client cntxt, MalBlkPtr mb, MalS
#endif
weld_module_t m = weld_module_compile(wstate->program, conf, e);
weld_conf_free(conf);
- free(wstate->program);
- free(wstate->groupDeps);
- free(wstate);
if (weld_error_code(e)) {
throw(MAL, "weld.run", PROGRAM_GENERAL ": %s",
weld_error_message(e));
}
@@ -259,7 +259,13 @@ WeldRun(Client cntxt, MalBlkPtr mb, MalS
bat bid = *getArgReference_bat(stk, pci, i);
BAT *b = BATdescriptor(bid);
if (b == NULL) throw(MAL, "weld.run", SQLSTATE(HY002)
RUNTIME_OBJECT_MISSING);
- getOrSetStructMember(&inputPtr, TYPE_ptr,
&b->theap.base, OP_SET);
+ if (BATtdense(b)) {
+ /* Hack: store -b->seqbase instead: udfs will
check if it's a dense bat */
+ lng seqbase = -b->tseqbase;
+ getOrSetStructMember(&inputPtr, TYPE_lng,
&seqbase, OP_SET);
+ } else {
+ getOrSetStructMember(&inputPtr, TYPE_ptr,
&b->theap.base, OP_SET);
+ }
getOrSetStructMember(&inputPtr, TYPE_lng, &b->batCount,
OP_SET);
getOrSetStructMember(&inputPtr, TYPE_lng, &b->hseqbase,
OP_SET);
if (getBatType(type) == TYPE_str) {
@@ -301,7 +307,7 @@ WeldRun(Client cntxt, MalBlkPtr mb, MalS
char *outputStruct = weld_value_data(result);
for (i = 0; i < pci->retc; i++) {
int type = getArgType(mb, pci, i);
- if (isaBatType(type)) {
+ if (isaBatType(type) && !wstate->cudfOutputs[getArg(pci, i)]) {
BAT *b = COLnew(0, getBatType(type), 0, TRANSIENT);
getOrSetStructMember(&outputStruct, TYPE_ptr,
&b->theap.base, OP_GET);
getOrSetStructMember(&outputStruct, TYPE_lng,
&b->batCount, OP_GET);
@@ -340,6 +346,15 @@ WeldRun(Client cntxt, MalBlkPtr mb, MalS
b->theap.size = b->batCount << b->tshift;
BBPkeepref(b->batCacheid);
*getArgReference_bat(stk, pci, i) = b->batCacheid;
+ } else if (isaBatType(type) && wstate->cudfOutputs[getArg(pci,
i)]) {
+ /* BAT produced by an UDF, we just get its bat cacheID
*/
+ char *base = NULL;
+ lng count = -1;
+ bat bid = -1;
+ getOrSetStructMember(&outputStruct, TYPE_ptr, &base,
OP_GET); /* skip */
+ getOrSetStructMember(&outputStruct, TYPE_lng, &count,
OP_GET); /* skip */
+ getOrSetStructMember(&outputStruct, TYPE_bat, &bid,
OP_GET);
+ *getArgReference_bat(stk, pci, i) = bid;
} else {
/* TODO handle strings */
getOrSetStructMember(&outputStruct, type,
getArgReference(stk, pci, i), OP_GET);
@@ -353,6 +368,10 @@ WeldRun(Client cntxt, MalBlkPtr mb, MalS
weld_conf_free(conf);
weld_module_free(m);
free(inputStruct);
+ free(wstate->program);
+ free(wstate->groupDeps);
+ free(wstate->cudfOutputs);
+ free(wstate);
return MAL_SUCCEED;
}
@@ -1242,6 +1261,47 @@ WeldBatcalcIdentity(Client cntxt, MalBlk
}
str
+WeldAlgebraJoin(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+{
+ (void) cntxt;
+ int leftRet = getArg(pci, 0);
/* bat[:oid] */
+ int rightRet = getArg(pci, 1);
/* bat[:oid] */
+ int left = getArg(pci, 2);
/* bat[:any_1] */
+ int right = getArg(pci, 3);
/* bat[:any_1] */
+ int sleft = getArg(pci, 4);
/* bat[:oid] */
+ int sright = getArg(pci, 5);
/* bat[:oid] */
+ int nilMatches = getArg(pci, 6);
/* bit */
+ int estimate = getArg(pci, 7);
/* lng */
+ weldState *wstate = *getArgReference_ptr(stk, pci, pci->argc - 1); /*
has value */
+
+ str any_1 = getWeldType(getBatType(getArgType(mb, pci, 2)));
+ bat sleftBat = *getArgReference_bat(stk, pci, 4);
+ bat srightBat = *getArgReference_bat(stk, pci, 5);
+ char weldStmt[STR_SIZE_INC];
+ if (is_bat_nil(sleftBat) && is_bat_nil(srightBat)) {
+ sprintf(weldStmt,
+ "let joinResult = cudf[weldJoinNoCandList%s, {vec[i64],
vec[i64], i32, i32}](v%d, v%d, v%d, v%d);",
+ any_1, left, right, nilMatches, estimate);
+ } else {
+ sprintf(weldStmt,
+ "let joinResult = cudf[weldJoin%s, {vec[i64], vec[i64], i32,
i32}](v%d, v%d, v%d, v%d, v%d, v%d);",
+ any_1, left, right, sleft, sright, nilMatches, estimate);
+ }
+ sprintf(weldStmt + strlen(weldStmt),
+ "let v%d = joinResult.$0;"
+ "let v%dbat = joinResult.$2;"
+ "let v%dhseqbase = 0L;"
+ "let v%d = joinResult.$1;"
+ "let v%dbat = joinResult.$3;"
+ "let v%dhseqbase = 0L;",
+ leftRet, leftRet, leftRet, rightRet, rightRet, rightRet);
+ wstate->cudfOutputs[leftRet] = 1;
+ wstate->cudfOutputs[rightRet] = 1;
+ appendWeldStmt(wstate, weldStmt);
+ return MAL_SUCCEED;
+}
+
+str
WeldLanguagePass(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
(void) cntxt;
diff --git a/monetdb5/modules/mal/mal_weld.h b/monetdb5/modules/mal/mal_weld.h
--- a/monetdb5/modules/mal/mal_weld.h
+++ b/monetdb5/modules/mal/mal_weld.h
@@ -11,12 +11,17 @@
#include "mal.h"
#include "mal_interpreter.h"
+#define OP_GET 0
+#define OP_SET 1
+
typedef struct {
char *program;
InstrPtr *groupDeps;
+ bit *cudfOutputs;
size_t programMaxLen;
} weldState;
+mal_export void getOrSetStructMember(char **addr, int type, const void *value,
int op);
mal_export str WeldInitState(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
mal_export str WeldRun(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
mal_export str WeldAggrSum(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
@@ -47,6 +52,7 @@ mal_export str WeldBatMtimeYear(Client c
mal_export str WeldBatMergeCand(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
mal_export str WeldBatMirror(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
mal_export str WeldBatcalcIdentity(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
+mal_export str WeldAlgebraJoin(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
mal_export str WeldLanguagePass(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
#endif
diff --git a/monetdb5/modules/mal/mal_weld.mal
b/monetdb5/modules/mal/mal_weld.mal
--- a/monetdb5/modules/mal/mal_weld.mal
+++ b/monetdb5/modules/mal/mal_weld.mal
@@ -109,6 +109,10 @@ pattern batcalcidentity(b:bat[:any_2], w
address WeldBatcalcIdentity
comment "batcalc.identity";
+pattern algebrajoin(l:bat[:any_1], r:bat[:any_1], sl:bat[:oid], sr:bat[:oid],
nil_matches:bit, estimate:lng, wstate:ptr) (X_0:bat[:oid], X_1:bat[:oid])
+address WeldAlgebraJoin
+comment "algebra.join"
+
pattern aggrsum(b:bat[:bte], wstate:ptr):bte
address WeldAggrSum
comment "aggr.sum";
diff --git a/monetdb5/modules/mal/mal_weld.mal.sh
b/monetdb5/modules/mal/mal_weld.mal.sh
--- a/monetdb5/modules/mal/mal_weld.mal.sh
+++ b/monetdb5/modules/mal/mal_weld.mal.sh
@@ -119,6 +119,10 @@ pattern batcalcidentity(b:bat[:any_2], w
address WeldBatcalcIdentity
comment "batcalc.identity";
+pattern algebrajoin(l:bat[:any_1], r:bat[:any_1], sl:bat[:oid], sr:bat[:oid],
nil_matches:bit, estimate:lng, wstate:ptr) (X_0:bat[:oid], X_1:bat[:oid])
+address WeldAlgebraJoin
+comment "algebra.join"
+
EOF
for tp in ${numeric[@]}; do
diff --git a/monetdb5/modules/weldudfs/Makefile.ag
b/monetdb5/modules/weldudfs/Makefile.ag
new file mode 100644
--- /dev/null
+++ b/monetdb5/modules/weldudfs/Makefile.ag
@@ -0,0 +1,19 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+#
+# Copyright 1997 - July 2008 CWI, August 2008 - 2018 MonetDB B.V.
+
+INCLUDES = ../../mal ../atoms ../kernel ../mal \
+ ../../../common/options \
+ ../../../common/stream \
+ ../../../common/utils \
+ ../../../gdk
+
+MTSAFE
+
+lib_weldudfs = {
+ NOINST
+ SOURCES = \
+ weld_udfs.c weld_udfs.h
+}
diff --git a/monetdb5/modules/weldudfs/weld_udfs.c
b/monetdb5/modules/weldudfs/weld_udfs.c
new file mode 100644
--- /dev/null
+++ b/monetdb5/modules/weldudfs/weld_udfs.c
@@ -0,0 +1,104 @@
+#include "monetdb_config.h"
+#include "gdk.h"
+#include "weld_udfs.h"
+#include "mal_weld.h"
+#include "algebra.h"
+
+#define denseFor(PTR, TYPE, START, END) \
+ do { \
+ TYPE *tptr = (TYPE *)PTR; \
+ oid i; \
+ for (i = START; i < END; i++) { \
+ tptr[i - START] = i; \
+ } \
+ } while (0)
+
+static BAT *weldVecToBat(char *data, lng length, int type) {
+ lng seq = (lng)data;
+ if (seq <= 0) {
+ /* Dense BAT hack */
+ return BATdense(0, -seq, length);
+ } else {
+ BAT *b = COLnew(0, type, 0, TRANSIENT);
+ b->theap.base = data;
+ b->batCount = length;
+ if (b->batCount == 0) b->theap.base = NULL;
+ b->batCapacity = b->batCount;
+ b->theap.storage = STORE_NOWN;
+ b->tsorted = b->trevsorted = 0;
+ b->theap.free = b->batCount << b->tshift;
+ b->theap.size = b->batCount << b->tshift;
+ return b;
+ }
+}
+
+static BAT *replaceDenseBat(BAT *b, int type) {
+ if (!BATtdense(b)) return b;
+ BAT *bn = COLnew(0, type, b->batCount, TRANSIENT);
+ if (type == TYPE_bte) denseFor(Tloc(bn, 0), char, b->tseqbase,
b->tseqbase + b->batCount);
+ if (type == TYPE_int) denseFor(Tloc(bn, 0), int, b->tseqbase,
b->tseqbase + b->batCount);
+ if (type == TYPE_lng) denseFor(Tloc(bn, 0), lng, b->tseqbase,
b->tseqbase + b->batCount);
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list