Changeset: f7c146863001 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f7c146863001
Added Files:
        sql/backends/monet5/Tests/pyapi29.sql
        sql/backends/monet5/Tests/pyapi29.stable.err
        sql/backends/monet5/Tests/pyapi29.stable.out
Modified Files:
        monetdb5/extras/pyapi/pyapi.c
Branch: pyapi
Log Message:

Added parallel computation of aggregates over different groups of a GROUP BY 
statement when PYTHON_MAP is used.

The PYTHON_MAP aggregate function is called once per group in parallel.

Note: currently only works with INTEGER input columns, and performs non-forked 
(i.e. single process, constrained by GIL) parallelization.


diffs (truncated from 519 to 300 lines):

diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c
--- a/monetdb5/extras/pyapi/pyapi.c
+++ b/monetdb5/extras/pyapi/pyapi.c
@@ -99,6 +99,27 @@ int PyAPIEnabled(void) {
             || GDKgetenv_isyes(pyapi_enableflag));
 }
 
+struct _AggrParams{
+    PyInput **pyinput_values;
+    void ****split_bats;
+    lng **group_counts;
+    str **args;
+    PyObject **connection;
+    PyObject **function;
+    str *pycall;
+    str msg;
+    size_t base;
+    size_t additional_columns;
+    size_t named_columns;
+    size_t columns;
+    size_t group_count;
+    size_t group_start;
+    size_t group_end;
+    pthread_t thread;
+};
+#define AggrParams struct _AggrParams
+static PyObject* ComputeParallelAggregation(AggrParams *p);
+
 static char* FunctionBasePath(void);
 static char* FunctionBasePath(void) {
     char *basepath = GDKgetenv("function_basepath");
@@ -402,6 +423,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
     sql_func * sqlfun = *(sql_func**) getArgReference(stk, pci, pci->retc);
     str exprStr = *getArgReference_str(stk, pci, pci->retc + 1);
 
+    const int additional_columns = 3;
     int i = 1, ai = 0;
     char* pycall = NULL;
     str *args;
@@ -434,6 +456,10 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
     bit varres = sqlfun ? sqlfun->varres : 0;
     int retcols = !varres ? pci->retc : -1;
     PyGILState_STATE gstate = PyGILState_LOCKED;
+    int unnamedArgs;
+    bit parallel_aggregation = grouped && mapped;
+
+    mapped = 0;
 
 #ifndef HAVE_FORK
     (void) mapped;
@@ -488,12 +514,11 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
 
     // Analyse the SQL_Func structure to get the parameter names
     if (sqlfun != NULL && sqlfun->ops->cnt > 0) {
-        int cargs = pci->retc + 2;
+        unnamedArgs = pci->retc + 2;
         argnode = sqlfun->ops->h;
         while (argnode) {
             char* argname = ((sql_arg*) argnode->data)->name;
-            args[cargs] = GDKstrdup(argname);
-            cargs++;
+            args[unnamedArgs++] = GDKstrdup(argname);
             argnode = argnode->next;
         }
     }
@@ -555,7 +580,10 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
             mapped = true;
             holds_gil = false;
         }
-        else python_call_active = true;
+        else {
+            python_call_active = true;
+            holds_gil = true;
+        }
         MT_lock_unset(&pyapiLock, "pyapi.evaluate");
     }
 #endif
@@ -952,7 +980,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
 
     // Now we will do the input handling (aka converting the input BATs to 
numpy arrays)
     // We will put the python arrays in a PyTuple object, we will use this 
PyTuple object as the set of arguments to call the Python function
-    pArgs = PyTuple_New(pci->argc - (pci->retc + 2) + (code_object == NULL ? 3 
: 0));
+    pArgs = PyTuple_New(pci->argc - (pci->retc + 2) + (code_object == NULL ? 
additional_columns : 0));
     pColumns = PyDict_New();
     pColumnTypes = PyDict_New();
 #ifdef HAVE_FORK
@@ -1047,8 +1075,217 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
         }
 
 
-        // The function has been successfully created/compiled, all that 
remains is to actually call the function
-        pResult = PyObject_CallObject(pFunc, pArgs);
+        if (parallel_aggregation && sqlfun != NULL && sqlfun->ops->cnt > 0) {
+            // parallel aggregation, we run the function once for every group 
in parallel
+            BAT *aggr_group = NULL, *group_first_occurrence = NULL;
+            size_t group_count, elements, element_it, group_it;
+            lng *group_counts = NULL;
+            lng *group_current_index = NULL;
+            lng **group_indices = NULL;
+            lng *aggr_group_arr = NULL;
+            void ***split_bats = NULL;
+            int named_columns = unnamedArgs - (pci->retc + 2);
+            PyObject *aggr_result;
+
+            // release the GIL
+            PyGILState_Release(gstate);
+            gstate = PyGILState_LOCKED;
+
+            // the first unnamed argument has the group numbers for every row
+            aggr_group = BATdescriptor(*getArgReference_bat(stk, pci, 
unnamedArgs));
+            // the second unnamed argument has the first occurrence of every 
group number, we just use this to get the total amount of groups quickly
+            group_first_occurrence = BATdescriptor(*getArgReference_bat(stk, 
pci, unnamedArgs + 1));
+            group_count = BATcount(group_first_occurrence);
+            BBPunfix(group_first_occurrence->batCacheid);
+            elements = BATcount(aggr_group); // get the amount of groups
+
+            // now we count, for every group, how many elements it has
+            group_counts = GDKzalloc(group_count * sizeof(size_t));
+            if (group_counts == NULL) {
+                msg = createException(MAL, "pyapi.eval", MAL_MALLOC_FAIL" 
group count array.");
+                goto aggrwrapup;
+            }
+
+            aggr_group_arr = (lng*) aggr_group->T->heap.base;
+            for(element_it = 0; element_it < elements; element_it++) {
+                group_counts[aggr_group_arr[element_it]]++;
+            }
+
+            // now that we have the total number of elements for every group, 
we need to get the actual indices of the groups 
+            group_indices = GDKzalloc(group_count * sizeof(lng*));
+            group_current_index = GDKzalloc(group_count * sizeof(lng));
+            if (group_indices == NULL) {
+                msg = createException(MAL, "pyapi.eval", MAL_MALLOC_FAIL" 
group indices array.");
+                goto aggrwrapup;
+            }
+
+            for(group_it = 0; group_it < group_count; group_it++) {
+                // for every group, allocate enough space for all the indices
+                group_indices[group_it] = GDKzalloc(group_counts[group_it] * 
sizeof(lng));
+                if (group_indices[group_it] == NULL) {
+                    msg = createException(MAL, "pyapi.eval", MAL_MALLOC_FAIL" 
group indices array.");
+                    goto aggrwrapup;
+                }
+            }
+            // we perform one iteration over the aggr_group and fill in all 
the group indices
+            for(element_it = 0; element_it < elements; element_it++) {
+                lng group = aggr_group_arr[element_it]; //group of current 
element
+                group_indices[group][group_current_index[group]++] = (lng) 
element_it; //add current index to group structure
+            }
+
+            //now perform the actual splitting of the data, first construct 
room for splits for every group
+            // elements are structured as follows: 
+            // split_bats [groupnr] [columnnr] [elementnr]
+            split_bats = GDKzalloc(group_count * sizeof(void*));
+            for(group_it = 0; group_it < group_count; group_it++) {
+                split_bats[group_it] = GDKzalloc(sizeof(void*) * 
named_columns);
+            }
+
+            // now split the columns one by one
+            for(i = 0; i < named_columns; i++) {
+                PyInput input = pyinput_values[i];
+                int ***ptr = (int***)split_bats;
+                if (input.scalar) {
+                    // scalar value, we don't handle this yet 
+                    for(group_it = 0; group_it < group_count; group_it++) {
+                        ptr[group_it][i] = GDKzalloc(sizeof(int));
+                        ptr[group_it][i][0] = 33;
+                    }
+                } else {
+                    // split group, always integer for now
+                    int *temp_indices;
+                    int *batcontent = (int*)input.bat->T->heap.base;
+                    // allocate space for split BAT
+                    for(group_it = 0; group_it < group_count; group_it++) {
+                        ptr[group_it][i] = GDKzalloc(group_counts[group_it] * 
sizeof(int));
+                    }
+                    // iterate over the elements of the current BAT
+                    temp_indices = GDKzalloc(sizeof(int) * group_count);
+                    for(element_it = 0; element_it < elements; element_it++) {
+                        //group of current element
+                        lng group = aggr_group_arr[element_it]; 
+                        //append current element to proper 
+                        ptr[group][i][temp_indices[group]++] = 
batcontent[element_it];
+                    }
+                    GDKfree(temp_indices);
+                }
+            }
+
+            {
+                int res = 0;
+                size_t threads = 8; //GDKgetenv("gdk_nr_threads");
+                size_t thread_it;
+                int result_it;
+                AggrParams *parameters;
+                PyObject **results;
+                double current = 0.0;
+                double increment;
+
+                // if there are less groups than threads, limit threads to 
amount of groups
+                threads = group_count < threads ? group_count : threads; 
+
+                increment = (double) group_count / (double) threads;
+                // start running the threads
+                parameters = GDKzalloc(threads * sizeof(AggrParams));
+                for(thread_it = 0; thread_it < threads; thread_it++) {
+                    AggrParams *params = &parameters[thread_it];
+                    params->named_columns = named_columns;
+                    params->additional_columns = additional_columns;
+                    params->group_count = group_count;
+                    params->group_counts = &group_counts;
+                    params->pyinput_values = &pyinput_values;
+                    params->split_bats = &split_bats;
+                    params->base = pci->retc + 2;
+                    params->function = &pFunc;
+                    params->connection = &pConnection;
+                    params->pycall = &pycall;
+                    params->group_start = floor(current);
+                    params->group_end = floor(current += increment);
+                    params->args = &args;
+                    params->msg = NULL;
+
+                    res = pthread_create(&params->thread, NULL, (void * 
(*)(void *))&ComputeParallelAggregation, params);
+                    if (res != 0) {
+                        msg = createException(MAL, "pyapi.eval", "Failed to 
start thread.");
+                        goto aggrwrapup;
+                    }
+                }
+                results = GDKzalloc(threads * sizeof(PyObject*));
+                for(thread_it = 0; thread_it < threads; thread_it++) {
+                    AggrParams params = parameters[thread_it];
+                    PyObject *result;
+                    int res = pthread_join(params.thread, (void**)&result);
+                    if (res != 0) {
+                        msg = createException(MAL, "pyapi.eval", "Failed to 
join thread.");
+                        goto aggrwrapup;
+                    }
+                    results[thread_it] = result;
+                }
+
+                // we need the GIL again to group the parameters
+                gstate = PyGILState_Ensure();
+
+                ai = 0;
+                aggr_result = PyList_New(group_count);
+                for(thread_it = 0; thread_it < threads; thread_it++) {
+                    AggrParams params = parameters[thread_it];
+                    PyObject *result = results[thread_it];
+                    for(result_it = 0; result_it < PyList_Size(result); 
result_it++) {
+                        PyObject *res = PyList_GetItem(result, result_it);
+                        PyList_SetItem(aggr_result, ai++, res);
+                        Py_INCREF(res);
+                    }
+                    Py_DECREF(result);
+                    if (params.msg != MAL_SUCCEED) {
+                        msg = GDKstrdup(params.msg);
+                        goto aggrwrapup;
+                    }
+                }
+                GDKfree(parameters);
+                GDKfree(results);
+            }
+            pResult = PyList_New(1);
+            PyList_SetItem(pResult, 0, aggr_result);
+
+aggrwrapup:
+            if (group_indices != NULL) {
+                for(group_it = 0; group_it < group_count; group_it++) {
+                    if (group_indices[group_it] != NULL) {
+                        GDKfree(group_indices[group_it]);
+                    }
+                }
+                GDKfree(group_indices);
+            }
+            if (group_current_index != NULL) {
+                GDKfree(group_current_index);
+            }
+            if (group_counts != NULL) {
+                GDKfree(group_counts);
+            }
+            if (split_bats != NULL) {
+                for(group_it = 0; group_it < group_count; group_it++) {
+                    if (split_bats[group_it] != NULL) {
+                        for(i = 0; i < named_columns; i++) {
+                            if (split_bats[group_it][i] != NULL) {
+                                GDKfree(split_bats[group_it][i]);
+                            }
+                        }
+                        GDKfree(split_bats[group_it]);
+                    }
+                }
+                GDKfree(split_bats);
+            }
+            if (aggr_group != NULL) {
+                BBPunfix(aggr_group->batCacheid);
+            }
+            if (msg != MAL_SUCCEED) {
+                goto wrapup;
+            }
+        } else {
+            // The function has been successfully created/compiled, all that 
remains is to actually call the function
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to