Changeset: ae06ee4938d6 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=ae06ee4938d6
Modified Files:
monetdb5/extras/pyapi/pyapi.c
Branch: pyapi
Log Message:
Added support for varres in PyAPIeval (but not yet available on SQL level).
diffs (219 lines):
diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c
--- a/monetdb5/extras/pyapi/pyapi.c
+++ b/monetdb5/extras/pyapi/pyapi.c
@@ -423,6 +423,8 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
#endif
PyGILState_STATE gstate = 0;
int j;
+ bit varres = sqlfun ? sqlfun->varres : 0;
+ int retcols = !varres ? pci->retc : -1;
size_t iu;
(void) cntxt;
@@ -900,23 +902,43 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
// For dictionary returns we need to map each of the (key,value)
pairs to the proper return value
// We first analyze the SQL Function structure for a list of
return value names
char **retnames = NULL;
- if (sqlfun != NULL) {
- retnames = GDKzalloc(sizeof(char*) * sqlfun->res->cnt);
- argnode = sqlfun->res->h;
- for(i = 0; i < sqlfun->res->cnt; i++) {
- retnames[i] = ((sql_arg*)argnode->data)->name;
- argnode = argnode->next;
+ if (!varres)
+ {
+ if (sqlfun != NULL) {
+ retnames = GDKzalloc(sizeof(char*) * sqlfun->res->cnt);
+ argnode = sqlfun->res->h;
+ for(i = 0; i < sqlfun->res->cnt; i++) {
+ retnames[i] = ((sql_arg*)argnode->data)->name;
+ argnode = argnode->next;
+ }
+ } else {
+ msg = createException(MAL, "pyapi.eval", "Return value is
a dictionary, but there is no sql function object, so we don't know the return
value names and mapping cannot be done.");
+ goto wrapup;
}
} else {
- msg = createException(MAL, "pyapi.eval", "Return value is a
dictionary, but there is no sql function object, so we don't know the return
value names and mapping cannot be done.");
- goto wrapup;
+ // If there are a variable number of return types, we take the
column names from the dictionary
+ PyObject *keys = PyDict_Keys(pResult);
+ retcols = PyList_Size(keys);
+ retnames = GDKzalloc(sizeof(char*) * retcols);
+ for(i = 0; i < retcols; i++) {
+ PyObject *colname = PyList_GetItem(keys, i);
+ if (!PyString_CheckExact(colname)) {
+ msg = createException(MAL, "pyapi.eval", "Expected a
string key in the dictionary, but received an object of type %s",
colname->ob_type->tp_name);
+ goto wrapup;
+ }
+ retnames[i] = ((PyStringObject*)colname)->ob_sval;
+ }
}
- pResult = PyDict_CheckForConversion(pResult, pci->retc, retnames,
&msg);
+ pResult = PyDict_CheckForConversion(pResult, retcols, retnames,
&msg);
if (retnames != NULL) GDKfree(retnames);
- } else {
+ } else if (varres) {
+ msg = createException(MAL, "pyapi.eval", "Expected a variable
number return values, but the return type was not a dictionary. We require the
return type to be a dictionary for column naming purposes.");
+ goto wrapup;
+ }
+ else {
// Now we need to do some error checking on the result object,
because the result object has to have the correct type/size
// We will also do some converting of result objects to a common
type (such as scalar -> [[scalar]])
- pResult = PyObject_CheckForConversion(pResult, pci->retc, NULL,
&msg);
+ pResult = PyObject_CheckForConversion(pResult, retcols, NULL,
&msg);
}
if (pResult == NULL) {
goto wrapup;
@@ -924,7 +946,10 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
}
VERBOSE_MESSAGE("Collecting return values.\n");
-
+ if (varres) {
+ GDKfree(pyreturn_values);
+ pyreturn_values = GDKzalloc(retcols * sizeof(PyReturn));
+ }
// Now we have executed the Python function, we have to collect the return
values and convert them to BATs
// We will first collect header information about the Python return
objects and extract the underlying C arrays
@@ -933,7 +958,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
// The reason we are doing this as a separate step is because this
preprocessing requires us to call the Python API
// Whereas the actual returning does not require us to call the Python API
// This means we can do the actual returning without holding the GIL
- if (!PyObject_PreprocessObject(pResult, pyreturn_values, pci->retc, &msg))
{
+ if (!PyObject_PreprocessObject(pResult, pyreturn_values, retcols, &msg)) {
goto wrapup;
}
@@ -963,10 +988,10 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
#endif
// Now we will write data about our result (memory size, type, number
of elements) to the header
ptr = (ReturnBatDescr*)shm_ptr;
- for (i = 0; i < pci->retc; i++)
+ for (i = 0; i < retcols; i++)
{
PyReturn *ret = &pyreturn_values[i];
- ReturnBatDescr *descr = &ptr[(process_id - 1) * pci->retc + i];
+ ReturnBatDescr *descr = &ptr[(process_id - 1) * retcols + i];
if (ret->result_type == NPY_OBJECT) {
// We can't deal with NPY_OBJECT arrays, because these are
'arrays of pointers', so we can't just copy the content of the array into
shared memory
@@ -1020,7 +1045,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
{
// So if we get here, we know all processes have finished and that
we are the last process to pass the first semaphore
// Since we are the last process, it is our job to create the
shared memory for each of the return values
- for (i = 0; i < pci->retc; i++)
+ for (i = 0; i < retcols; i++)
{
size_t return_size = 0;
size_t mask_size = 0;
@@ -1028,7 +1053,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
// Now we will count the size of the return values for each of
the processes
for(j = 0; j < process_count; j++)
{
- ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j *
pci->retc + i]);
+ ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j *
retcols + i]);
return_size += descr->bat_size;
mask_size += descr->bat_count * sizeof(bool);
has_mask = has_mask || descr->has_mask;
@@ -1044,7 +1069,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
if (has_mask)
{
assert(mask_size > 0);
- if (create_shared_memory(shm_id + pci->retc + (i + 1),
mask_size, NULL) != MAL_SUCCEED) //create a memory space for the mask
+ if (create_shared_memory(shm_id + retcols + (i + 1),
mask_size, NULL) != MAL_SUCCEED) //create a memory space for the mask
{
msg = createException(MAL, "pyapi.eval", "Failed to
allocate shared memory for returning mask.\n");
goto wrapup;
@@ -1067,11 +1092,11 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
// However, we do not know if any of the other childs have failed
// If they have, they have written descr->npy_type to -1 in one of
their headers
// So we check for that
- for (i = 0; i < pci->retc; i++)
+ for (i = 0; i < retcols; i++)
{
for(j = 0; j < process_count; j++)
{
- ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j *
pci->retc + i]);
+ ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j *
retcols + i]);
if (descr->npy_type < 0)
{
// If any of the child processes have failed, exit
without an error code because we did not fail
@@ -1083,7 +1108,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
}
// Now we can finally return the values
- for (i = 0; i < pci->retc; i++)
+ for (i = 0; i < retcols; i++)
{
char *mem_ptr;
PyReturn *ret = &pyreturn_values[i];
@@ -1095,7 +1120,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
bool has_mask = false;
for(j = 0; j < process_count; j++)
{
- ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j * pci->retc
+ i]);
+ ReturnBatDescr *descr = &(((ReturnBatDescr*)ptr)[j * retcols +
i]);
if (j < (process_id - 1))
{
start_size += descr->bat_size;
@@ -1118,7 +1143,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
if (has_mask) {
bool *mask_ptr;
- msg = get_shared_memory(shm_id + pci->retc + (i + 1),
mask_size, (void**) &mask_ptr);
+ msg = get_shared_memory(shm_id + retcols + (i + 1), mask_size,
(void**) &mask_ptr);
// If any of the processes return a mask, we need to write our
mask values to the shared memory
if (mask_ptr == NULL) {
@@ -1151,17 +1176,23 @@ returnvalues:
/*[RETURN_VALUES]*/
VERBOSE_MESSAGE("Returning values.\n");
- for (i = 0; i < pci->retc; i++)
+ for (i = 0; i < retcols; i++)
{
PyReturn *ret = &pyreturn_values[i];
- int bat_type = ATOMstorage(getColumnType(getArgType(mb,pci,i)));
+ int bat_type = TYPE_any;
+ if (!varres) {
+ bat_type = ATOMstorage(getColumnType(getArgType(mb,pci,i)));
- if (bat_type == TYPE_any || bat_type == TYPE_void) {
- getArgType(mb,pci,i) = bat_type;
- msg = createException(MAL, "pyapi.eval", "Unknown return value,
possibly projecting with no parameters.");
- goto wrapup;
+ if (bat_type == TYPE_any || bat_type == TYPE_void) {
+ getArgType(mb,pci,i) = bat_type;
+ msg = createException(MAL, "pyapi.eval", "Unknown return
value, possibly projecting with no parameters.");
+ goto wrapup;
+ }
+ } else {
+ bat_type = PyType_ToBat(ret->result_type);
}
+
b = PyObject_ConvertToBAT(ret, bat_type, i, seqbase, &msg);
if (b == NULL) {
goto wrapup;
@@ -1200,8 +1231,8 @@ returnvalues:
// To indicate that we failed, we will write information to our header
ptr = (ReturnBatDescr*)shm_ptr;
- for (i = 0; i < pci->retc; i++) {
- ReturnBatDescr *descr = &ptr[(process_id - 1) * pci->retc + i];
+ for (i = 0; i < retcols; i++) {
+ ReturnBatDescr *descr = &ptr[(process_id - 1) * retcols + i];
// We will write descr->npy_type to -1, so other processes can see
that we failed
descr->npy_type = -1;
// We will write the memory size of our error message to the
bat_size, so the main process can access the shared memory
@@ -1245,7 +1276,7 @@ returnvalues:
//thus we need to free python objects, thus we need to obtain the GIL
gstate = PyGILState_Ensure();
}
- for (i = 0; i < pci->retc; i++) {
+ for (i = 0; i < retcols; i++) {
PyReturn *ret = &pyreturn_values[i];
// First clean up any return values
if (!ret->multidimensional) {
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list