Changeset: 9e50bda62d61 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/9e50bda62d61
Modified Files:
        monetdb5/extras/rapi/rapi.c
        sql/backends/monet5/UDF/pyapi3/pyapi3.c
Branch: sequences_7184
Log Message:

Implement cardinality based bulk operations for rapi en pyapi3.


diffs (truncated from 345 to 300 lines):

diff --git a/monetdb5/extras/rapi/rapi.c b/monetdb5/extras/rapi/rapi.c
--- a/monetdb5/extras/rapi/rapi.c
+++ b/monetdb5/extras/rapi/rapi.c
@@ -598,8 +598,6 @@ bailout:
 
 static str RAPIeval(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, 
bit grouped) {
        sql_func * sqlfun = NULL;
-       str exprStr = *getArgReference_str(stk, pci, pci->retc + 1);
-
        SEXP x, env, retval;
        SEXP varname = R_NilValue;
        SEXP varvalue = R_NilValue;
@@ -621,6 +619,19 @@ static str RAPIeval(Client cntxt, MalBlk
 
        rapiClient = cntxt;
 
+       // If the first input argument is of type lng, this is a 
cardinality-only bulk operation.
+       int has_card_arg = 0;
+       lng card; // cardinality of non-bat inputs
+       if (getArgType(mb, pci, pci->retc) == TYPE_lng) {
+               has_card_arg=1;
+               card = *getArgReference_lng(stk, pci, pci->retc);
+       }
+       else {
+               has_card_arg=0;
+               card = 1;
+       }
+       str exprStr = *getArgReference_str(stk, pci, pci->retc + 1 + 
has_card_arg);
+
        if (!RAPIEnabled()) {
                throw(MAL, "rapi.eval",
                          "Embedded R has not been enabled. Start server with 
--set %s=true",
@@ -632,10 +643,10 @@ static str RAPIeval(Client cntxt, MalBlk
        }
 
        if (!grouped) {
-               sql_subfunc *sqlmorefun = (*(sql_subfunc**) 
getArgReference(stk, pci, pci->retc));
-               if (sqlmorefun) sqlfun = (*(sql_subfunc**) getArgReference(stk, 
pci, pci->retc))->func;
+               sql_subfunc *sqlmorefun = (*(sql_subfunc**) 
getArgReference(stk, pci, pci->retc+has_card_arg));
+               if (sqlmorefun) sqlfun = sqlmorefun->func;
        } else {
-               sqlfun = *(sql_func**) getArgReference(stk, pci, pci->retc);
+               sqlfun = *(sql_func**) getArgReference(stk, pci, 
pci->retc+has_card_arg);
        }
 
        args = (str*) GDKzalloc(sizeof(str) * pci->argc);
@@ -653,7 +664,7 @@ static str RAPIeval(Client cntxt, MalBlk
        // NEW macro temporarily renamed to MNEW to allow including 
sql_catalog.h
 
        if (sqlfun != NULL && sqlfun->ops->cnt > 0) {
-               int carg = pci->retc + 2;
+               int carg = pci->retc + 2 + has_card_arg;
                argnode = sqlfun->ops->h;
                while (argnode) {
                        char* argname = ((sql_arg*) argnode->data)->name;
@@ -664,7 +675,7 @@ static str RAPIeval(Client cntxt, MalBlk
        }
        // the first unknown argument is the group, we don't really care for 
the rest.
        argnameslen = 2;
-       for (i = pci->retc + 2; i < pci->argc; i++) {
+       for (i = pci->retc + 2 + has_card_arg; i < pci->argc; i++) {
                if (args[i] == NULL) {
                        if (!seengrp && grouped) {
                                args[i] = GDKstrdup("aggr_group");
@@ -679,30 +690,16 @@ static str RAPIeval(Client cntxt, MalBlk
 
        // install the MAL variables into the R environment
        // we can basically map values to int ("INTEGER") or double ("REAL")
-       for (i = pci->retc + 2; i < pci->argc; i++) {
+       for (i = pci->retc + 2 + has_card_arg; i < pci->argc; i++) {
                int bat_type = getBatType(getArgType(mb,pci,i));
                // check for BAT or scalar first, keep code left
                if (!isaBatType(getArgType(mb,pci,i))) {
-                       b = COLnew(0, getArgType(mb, pci, i), 0, TRANSIENT);
+                       const ValRecord *v = &stk->stk[getArg(pci, i)];
+                       b = BATconstant(0, v->vtype, VALptr(v), card, 
TRANSIENT);
                        if (b == NULL) {
                                msg = createException(MAL, "rapi.eval", 
SQLSTATE(HY013) MAL_MALLOC_FAIL);
                                goto wrapup;
                        }
-                       if ( getArgType(mb,pci,i) == TYPE_str) {
-                               if (BUNappend(b, *getArgReference_str(stk, pci, 
i), false) != GDK_SUCCEED) {
-                                       BBPreclaim(b);
-                                       b = NULL;
-                                       msg = createException(MAL, "rapi.eval", 
SQLSTATE(HY013) MAL_MALLOC_FAIL);
-                                       goto wrapup;
-                               }
-                       } else {
-                               if (BUNappend(b, getArgReference(stk, pci, i), 
false) != GDK_SUCCEED) {
-                                       BBPreclaim(b);
-                                       b = NULL;
-                                       msg = createException(MAL, "rapi.eval", 
SQLSTATE(HY013) MAL_MALLOC_FAIL);
-                                       goto wrapup;
-                               }
-                       }
                } else {
                        b = BATdescriptor(*getArgReference_bat(stk, pci, i));
                        if (b == NULL) {
@@ -751,7 +748,7 @@ static str RAPIeval(Client cntxt, MalBlk
                goto wrapup;
        }
        argnames[0] = '\0';
-       for (i = pci->retc + 2; i < pci->argc; i++) {
+       for (i = pci->retc + 2 + has_card_arg; i < pci->argc; i++) {
                pos += snprintf(argnames + pos, argnameslen - pos, "%s%s",
                                                args[i], i < pci->argc - 1 ? ", 
" : "");
        }
@@ -824,6 +821,7 @@ static str RAPIeval(Client cntxt, MalBlk
                                                                  "Failed to 
convert column %i", i);
                        goto wrapup;
                }
+
                // bat return
                if (isaBatType(getArgType(mb,pci,i))) {
                        *getArgReference_bat(stk, pci, i) = b->batCacheid;
@@ -940,6 +938,7 @@ static mel_func rapi_init_funcs[] = {
  pattern("rapi", "eval_aggr", RAPIevalAggr, false, "grouped aggregates through 
R", args(1,4, 
varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
  command("rapi", "prelude", RAPIprelude, false, "", args(1,1, arg("",void))),
  pattern("batrapi", "eval", RAPIevalStd, false, "Execute a simple R script 
value", args(1,4, 
varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
+ pattern("batrapi", "eval", RAPIevalStd, false, "Execute a simple R script 
value", args(1,4, varargany("",0),arg("card", lng), 
arg("fptr",ptr),arg("expr",str))),
  pattern("batrapi", "subeval_aggr", RAPIevalAggr, false, "grouped aggregates 
through R", args(1,4, 
varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
  pattern("batrapi", "eval_aggr", RAPIevalAggr, false, "grouped aggregates 
through R", args(1,4, 
varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
  { .imp=NULL }
diff --git a/sql/backends/monet5/UDF/pyapi3/pyapi3.c 
b/sql/backends/monet5/UDF/pyapi3/pyapi3.c
--- a/sql/backends/monet5/UDF/pyapi3/pyapi3.c
+++ b/sql/backends/monet5/UDF/pyapi3/pyapi3.c
@@ -200,16 +200,29 @@ static str PyAPIeval(Client cntxt, MalBl
                throw(MAL, "pyapi3.eval", SQLSTATE(PY000) "Embedded Python is 
enabled but an error was "
                                                                 "thrown during 
initialization.");
        }
+
+       // If the first input argument is of type lng, this is a 
cardinality-only bulk operation.
+       int has_card_arg = 0;
+       lng card; // cardinality of non-bat inputs
+       if (getArgType(mb, pci, pci->retc) == TYPE_lng) {
+               has_card_arg=1;
+               card = *getArgReference_lng(stk, pci, pci->retc);
+       }
+       else {
+               has_card_arg=0;
+               card = 1;
+       }
+
        if (!grouped) {
                sql_subfunc *sqlmorefun =
-                       (*(sql_subfunc **)getArgReference(stk, pci, pci->retc));
+                       (*(sql_subfunc **)getArgReference(stk, pci, pci->retc + 
has_card_arg));
                if (sqlmorefun) {
                        sqlfun = sqlmorefun->func;
                }
        } else {
-               sqlfun = *(sql_func **)getArgReference(stk, pci, pci->retc);
+               sqlfun = *(sql_func **)getArgReference(stk, pci, pci->retc + 
has_card_arg);
        }
-       exprStr = *getArgReference_str(stk, pci, pci->retc + 1);
+       exprStr = *getArgReference_str(stk, pci, pci->retc + 1 + has_card_arg);
        varres = sqlfun ? sqlfun->varres : 0;
        retcols = !varres ? pci->retc : -1;
 
@@ -219,9 +232,9 @@ static str PyAPIeval(Client cntxt, MalBl
                throw(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL " 
arguments.");
        }
 
-       if ((pci->argc - (pci->retc + 2)) * sizeof(PyInput) > 0) {
+       if ((pci->argc - (pci->retc + 2 + has_card_arg)) * sizeof(PyInput) > 0) 
{
                pyinput_values =
-                       GDKzalloc((pci->argc - (pci->retc + 2)) * 
sizeof(PyInput));
+                       GDKzalloc((pci->argc - (pci->retc + 2 + has_card_arg)) 
* sizeof(PyInput));
 
                if (pyinput_values == NULL) {
                        GDKfree(args);
@@ -232,7 +245,7 @@ static str PyAPIeval(Client cntxt, MalBl
 
        // Analyse the SQL_Func structure to get the parameter names
        if (sqlfun != NULL && sqlfun->ops->cnt > 0) {
-               unnamedArgs = pci->retc + 2;
+               unnamedArgs = pci->retc + 2 + has_card_arg;
                argnode = sqlfun->ops->h;
                while (argnode) {
                        char *argname = ((sql_arg *)argnode->data)->name;
@@ -250,14 +263,14 @@ static str PyAPIeval(Client cntxt, MalBl
 
        // We name all the unknown arguments, if grouping is enabled the first
        // unknown argument that is the group variable, we name this 
'aggr_group'
-       for (i = pci->retc + 2; i < argcount; i++) {
+       for (i = pci->retc + 2 + has_card_arg; i < argcount; i++) {
                if (args[i] == NULL) {
                        if (!seengrp && grouped) {
                                args[i] = GDKstrdup("aggr_group");
                                seengrp = TRUE;
                        } else {
                                char argbuf[64];
-                               snprintf(argbuf, sizeof(argbuf), "arg%i", i - 
pci->retc - 1);
+                               snprintf(argbuf, sizeof(argbuf), "arg%i", i - 
pci->retc - (1 + has_card_arg));
                                args[i] = GDKstrdup(argbuf);
                        }
                }
@@ -268,24 +281,35 @@ static str PyAPIeval(Client cntxt, MalBl
        // (a thread can fork while another process is in the lock, which means 
we
        // can get stuck permanently)
        argnode = sqlfun && sqlfun->ops->cnt > 0 ? sqlfun->ops->h : NULL;
-       for (i = pci->retc + 2; i < argcount; i++) {
-               PyInput *inp = &pyinput_values[i - (pci->retc + 2)];
+       for (i = pci->retc + 2 + has_card_arg; i < argcount; i++) {
+               PyInput *inp = &pyinput_values[i - (pci->retc + 2 + 
has_card_arg)];
                if (!isaBatType(getArgType(mb, pci, i))) {
                        inp->scalar = true;
                        inp->bat_type = getArgType(mb, pci, i);
                        inp->count = 1;
-                       if (inp->bat_type == TYPE_str) {
-                               inp->dataptr = getArgReference_str(stk, pci, i);
-                       } else {
-                               inp->dataptr = getArgReference(stk, pci, i);
+
+                       if (!has_card_arg) {
+                               if (inp->bat_type == TYPE_str) {
+                                       inp->dataptr = getArgReference_str(stk, 
pci, i);
+                               } else {
+                                       inp->dataptr = getArgReference(stk, 
pci, i);
+                               }
+                       }
+                       else {
+                               const ValRecord *v = &stk->stk[getArg(pci, i)];
+                               b = BATconstant(0, v->vtype, VALptr(v), card, 
TRANSIENT);
+                               if (b == NULL) {
+                                       msg = createException(MAL, 
"pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
+                                       goto wrapup;
+                               }
+                               inp->count = BATcount(b);
+                               inp->bat_type = b->ttype;
+                               inp->bat = b;
                        }
                } else {
                        b = BATdescriptor(*getArgReference_bat(stk, pci, i));
                        if (b == NULL) {
-                               msg = createException(
-                                       MAL, "pyapi3.eval",
-                                       SQLSTATE(PY000) "The BAT passed to the 
function (argument #%d) is NULL.\n",
-                                       i - (pci->retc + 2) + 1);
+                               msg = createException(MAL, "pyapi3.eval", 
SQLSTATE(HY013) MAL_MALLOC_FAIL);
                                goto wrapup;
                        }
                        seqbase = b->hseqbase;
@@ -620,7 +644,7 @@ static str PyAPIeval(Client cntxt, MalBl
        // arrays)
        // We will put the python arrays in a PyTuple object, we will use this
        // PyTuple object as the set of arguments to call the Python function
-       pArgs = PyTuple_New(argcount - (pci->retc + 2) +
+       pArgs = PyTuple_New(argcount - (pci->retc + 2 + has_card_arg) +
                                                (code_object == NULL ? 
additional_columns : 0));
        pColumns = PyDict_New();
        pColumnTypes = PyDict_New();
@@ -631,23 +655,23 @@ static str PyAPIeval(Client cntxt, MalBl
 #endif
 
        // Now we will loop over the input BATs and convert them to python 
objects
-       for (i = pci->retc + 2; i < argcount; i++) {
+       for (i = pci->retc + 2 + has_card_arg; i < argcount; i++) {
                PyObject *result_array;
                // t_start and t_end hold the part of the BAT we will convert 
to a Numpy
                // array, by default these hold the entire BAT [0 - BATcount(b)]
-               size_t t_start = 0, t_end = pyinput_values[i - (pci->retc + 
2)].count;
+               size_t t_start = 0, t_end = pyinput_values[i - (pci->retc + 2 + 
has_card_arg)].count;
 
                // There are two possibilities, either the input is a BAT, or 
the input
                // is a scalar
                // If the input is a scalar we will convert it to a python 
scalar
                // If the input is a BAT, we will convert it to a numpy array
-               if (pyinput_values[i - (pci->retc + 2)].scalar) {
+               if (pyinput_values[i - (pci->retc + 2 + has_card_arg)].scalar) {
                        result_array = PyArrayObject_FromScalar(
-                               &pyinput_values[i - (pci->retc + 2)], &msg);
+                               &pyinput_values[i - (pci->retc + 2 + 
has_card_arg)], &msg);
                } else {
-                       int type = pyinput_values[i - (pci->retc + 2)].bat_type;
+                       int type = pyinput_values[i - (pci->retc + 2 + 
has_card_arg)].bat_type;
                        result_array = PyMaskedArray_FromBAT(
-                               &pyinput_values[i - (pci->retc + 2)], t_start, 
t_end, &msg,
+                               &pyinput_values[i - (pci->retc + 2 + 
has_card_arg)], t_start, t_end, &msg,
                                !enable_zerocopy_input && type != TYPE_void);
                }
                if (result_array == NULL) {
@@ -659,12 +683,12 @@ static str PyAPIeval(Client cntxt, MalBl
                }
                if (code_object == NULL) {
                        PyObject *arg_type = PyString_FromString(
-                               BatType_Format(pyinput_values[i - (pci->retc + 
2)].bat_type));
+                               BatType_Format(pyinput_values[i - (pci->retc + 
2 + has_card_arg)].bat_type));
                        PyDict_SetItemString(pColumns, args[i], result_array);
                        PyDict_SetItemString(pColumnTypes, args[i], arg_type);
                        Py_DECREF(arg_type);
                }
-               pyinput_values[i - (pci->retc + 2)].result = result_array;
+               pyinput_values[i - (pci->retc + 2 + has_card_arg)].result = 
result_array;
                PyTuple_SetItem(pArgs, ai++, result_array);
        }
        if (code_object == NULL) {
@@ -719,7 +743,7 @@ static str PyAPIeval(Client cntxt, MalBl
                        size_t *group_counts = NULL;
                        oid *aggr_group_arr = NULL;
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to