Changeset: f7c146863001 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f7c146863001
Added Files:
sql/backends/monet5/Tests/pyapi29.sql
sql/backends/monet5/Tests/pyapi29.stable.err
sql/backends/monet5/Tests/pyapi29.stable.out
Modified Files:
monetdb5/extras/pyapi/pyapi.c
Branch: pyapi
Log Message:
Added parallel computation of aggregates over different groups of a GROUP BY
statement when PYTHON_MAP is used.
The PYTHON_MAP aggregate function is called once per group in parallel.
Note: currently only works with INTEGER input columns, and performs non-forked
(i.e. single process, constrained by GIL) parallelization.
diffs (truncated from 519 to 300 lines):
diff --git a/monetdb5/extras/pyapi/pyapi.c b/monetdb5/extras/pyapi/pyapi.c
--- a/monetdb5/extras/pyapi/pyapi.c
+++ b/monetdb5/extras/pyapi/pyapi.c
@@ -99,6 +99,27 @@ int PyAPIEnabled(void) {
|| GDKgetenv_isyes(pyapi_enableflag));
}
+struct _AggrParams{
+ PyInput **pyinput_values;
+ void ****split_bats;
+ lng **group_counts;
+ str **args;
+ PyObject **connection;
+ PyObject **function;
+ str *pycall;
+ str msg;
+ size_t base;
+ size_t additional_columns;
+ size_t named_columns;
+ size_t columns;
+ size_t group_count;
+ size_t group_start;
+ size_t group_end;
+ pthread_t thread;
+};
+#define AggrParams struct _AggrParams
+static PyObject* ComputeParallelAggregation(AggrParams *p);
+
static char* FunctionBasePath(void);
static char* FunctionBasePath(void) {
char *basepath = GDKgetenv("function_basepath");
@@ -402,6 +423,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
sql_func * sqlfun = *(sql_func**) getArgReference(stk, pci, pci->retc);
str exprStr = *getArgReference_str(stk, pci, pci->retc + 1);
+ const int additional_columns = 3;
int i = 1, ai = 0;
char* pycall = NULL;
str *args;
@@ -434,6 +456,10 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
bit varres = sqlfun ? sqlfun->varres : 0;
int retcols = !varres ? pci->retc : -1;
PyGILState_STATE gstate = PyGILState_LOCKED;
+ int unnamedArgs;
+ bit parallel_aggregation = grouped && mapped;
+
+ mapped = 0;
#ifndef HAVE_FORK
(void) mapped;
@@ -488,12 +514,11 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
// Analyse the SQL_Func structure to get the parameter names
if (sqlfun != NULL && sqlfun->ops->cnt > 0) {
- int cargs = pci->retc + 2;
+ unnamedArgs = pci->retc + 2;
argnode = sqlfun->ops->h;
while (argnode) {
char* argname = ((sql_arg*) argnode->data)->name;
- args[cargs] = GDKstrdup(argname);
- cargs++;
+ args[unnamedArgs++] = GDKstrdup(argname);
argnode = argnode->next;
}
}
@@ -555,7 +580,10 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
mapped = true;
holds_gil = false;
}
- else python_call_active = true;
+ else {
+ python_call_active = true;
+ holds_gil = true;
+ }
MT_lock_unset(&pyapiLock, "pyapi.evaluate");
}
#endif
@@ -952,7 +980,7 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
// Now we will do the input handling (aka converting the input BATs to
numpy arrays)
// We will put the python arrays in a PyTuple object, we will use this
PyTuple object as the set of arguments to call the Python function
- pArgs = PyTuple_New(pci->argc - (pci->retc + 2) + (code_object == NULL ? 3
: 0));
+ pArgs = PyTuple_New(pci->argc - (pci->retc + 2) + (code_object == NULL ?
additional_columns : 0));
pColumns = PyDict_New();
pColumnTypes = PyDict_New();
#ifdef HAVE_FORK
@@ -1047,8 +1075,217 @@ str PyAPIeval(Client cntxt, MalBlkPtr mb
}
- // The function has been successfully created/compiled, all that
remains is to actually call the function
- pResult = PyObject_CallObject(pFunc, pArgs);
+ if (parallel_aggregation && sqlfun != NULL && sqlfun->ops->cnt > 0) {
+ // parallel aggregation, we run the function once for every group
in parallel
+ BAT *aggr_group = NULL, *group_first_occurrence = NULL;
+ size_t group_count, elements, element_it, group_it;
+ lng *group_counts = NULL;
+ lng *group_current_index = NULL;
+ lng **group_indices = NULL;
+ lng *aggr_group_arr = NULL;
+ void ***split_bats = NULL;
+ int named_columns = unnamedArgs - (pci->retc + 2);
+ PyObject *aggr_result;
+
+ // release the GIL
+ PyGILState_Release(gstate);
+ gstate = PyGILState_LOCKED;
+
+ // the first unnamed argument has the group numbers for every row
+ aggr_group = BATdescriptor(*getArgReference_bat(stk, pci,
unnamedArgs));
+ // the second unnamed argument has the first occurrence of every
group number, we just use this to get the total amount of groups quickly
+ group_first_occurrence = BATdescriptor(*getArgReference_bat(stk,
pci, unnamedArgs + 1));
+ group_count = BATcount(group_first_occurrence);
+ BBPunfix(group_first_occurrence->batCacheid);
+ elements = BATcount(aggr_group); // get the amount of groups
+
+ // now we count, for every group, how many elements it has
+ group_counts = GDKzalloc(group_count * sizeof(size_t));
+ if (group_counts == NULL) {
+ msg = createException(MAL, "pyapi.eval", MAL_MALLOC_FAIL"
group count array.");
+ goto aggrwrapup;
+ }
+
+ aggr_group_arr = (lng*) aggr_group->T->heap.base;
+ for(element_it = 0; element_it < elements; element_it++) {
+ group_counts[aggr_group_arr[element_it]]++;
+ }
+
+ // now that we have the total number of elements for every group,
we need to get the actual indices of the groups
+ group_indices = GDKzalloc(group_count * sizeof(lng*));
+ group_current_index = GDKzalloc(group_count * sizeof(lng));
+ if (group_indices == NULL) {
+ msg = createException(MAL, "pyapi.eval", MAL_MALLOC_FAIL"
group indices array.");
+ goto aggrwrapup;
+ }
+
+ for(group_it = 0; group_it < group_count; group_it++) {
+ // for every group, allocate enough space for all the indices
+ group_indices[group_it] = GDKzalloc(group_counts[group_it] *
sizeof(lng));
+ if (group_indices[group_it] == NULL) {
+ msg = createException(MAL, "pyapi.eval", MAL_MALLOC_FAIL"
group indices array.");
+ goto aggrwrapup;
+ }
+ }
+ // we perform one iteration over the aggr_group and fill in all
the group indices
+ for(element_it = 0; element_it < elements; element_it++) {
+ lng group = aggr_group_arr[element_it]; //group of current
element
+ group_indices[group][group_current_index[group]++] = (lng)
element_it; //add current index to group structure
+ }
+
+ //now perform the actual splitting of the data, first construct
room for splits for every group
+ // elements are structured as follows:
+ // split_bats [groupnr] [columnnr] [elementnr]
+ split_bats = GDKzalloc(group_count * sizeof(void*));
+ for(group_it = 0; group_it < group_count; group_it++) {
+ split_bats[group_it] = GDKzalloc(sizeof(void*) *
named_columns);
+ }
+
+ // now split the columns one by one
+ for(i = 0; i < named_columns; i++) {
+ PyInput input = pyinput_values[i];
+ int ***ptr = (int***)split_bats;
+ if (input.scalar) {
+ // scalar value, we don't handle this yet
+ for(group_it = 0; group_it < group_count; group_it++) {
+ ptr[group_it][i] = GDKzalloc(sizeof(int));
+ ptr[group_it][i][0] = 33;
+ }
+ } else {
+ // split group, always integer for now
+ int *temp_indices;
+ int *batcontent = (int*)input.bat->T->heap.base;
+ // allocate space for split BAT
+ for(group_it = 0; group_it < group_count; group_it++) {
+ ptr[group_it][i] = GDKzalloc(group_counts[group_it] *
sizeof(int));
+ }
+ // iterate over the elements of the current BAT
+ temp_indices = GDKzalloc(sizeof(int) * group_count);
+ for(element_it = 0; element_it < elements; element_it++) {
+ //group of current element
+ lng group = aggr_group_arr[element_it];
+ //append current element to proper
+ ptr[group][i][temp_indices[group]++] =
batcontent[element_it];
+ }
+ GDKfree(temp_indices);
+ }
+ }
+
+ {
+ int res = 0;
+ size_t threads = 8; //GDKgetenv("gdk_nr_threads");
+ size_t thread_it;
+ int result_it;
+ AggrParams *parameters;
+ PyObject **results;
+ double current = 0.0;
+ double increment;
+
+ // if there are less groups than threads, limit threads to
amount of groups
+ threads = group_count < threads ? group_count : threads;
+
+ increment = (double) group_count / (double) threads;
+ // start running the threads
+ parameters = GDKzalloc(threads * sizeof(AggrParams));
+ for(thread_it = 0; thread_it < threads; thread_it++) {
+ AggrParams *params = ¶meters[thread_it];
+ params->named_columns = named_columns;
+ params->additional_columns = additional_columns;
+ params->group_count = group_count;
+ params->group_counts = &group_counts;
+ params->pyinput_values = &pyinput_values;
+ params->split_bats = &split_bats;
+ params->base = pci->retc + 2;
+ params->function = &pFunc;
+ params->connection = &pConnection;
+ params->pycall = &pycall;
+ params->group_start = floor(current);
+ params->group_end = floor(current += increment);
+ params->args = &args;
+ params->msg = NULL;
+
+ res = pthread_create(¶ms->thread, NULL, (void *
(*)(void *))&ComputeParallelAggregation, params);
+ if (res != 0) {
+ msg = createException(MAL, "pyapi.eval", "Failed to
start thread.");
+ goto aggrwrapup;
+ }
+ }
+ results = GDKzalloc(threads * sizeof(PyObject*));
+ for(thread_it = 0; thread_it < threads; thread_it++) {
+ AggrParams params = parameters[thread_it];
+ PyObject *result;
+ int res = pthread_join(params.thread, (void**)&result);
+ if (res != 0) {
+ msg = createException(MAL, "pyapi.eval", "Failed to
join thread.");
+ goto aggrwrapup;
+ }
+ results[thread_it] = result;
+ }
+
+ // we need the GIL again to group the parameters
+ gstate = PyGILState_Ensure();
+
+ ai = 0;
+ aggr_result = PyList_New(group_count);
+ for(thread_it = 0; thread_it < threads; thread_it++) {
+ AggrParams params = parameters[thread_it];
+ PyObject *result = results[thread_it];
+ for(result_it = 0; result_it < PyList_Size(result);
result_it++) {
+ PyObject *res = PyList_GetItem(result, result_it);
+ PyList_SetItem(aggr_result, ai++, res);
+ Py_INCREF(res);
+ }
+ Py_DECREF(result);
+ if (params.msg != MAL_SUCCEED) {
+ msg = GDKstrdup(params.msg);
+ goto aggrwrapup;
+ }
+ }
+ GDKfree(parameters);
+ GDKfree(results);
+ }
+ pResult = PyList_New(1);
+ PyList_SetItem(pResult, 0, aggr_result);
+
+aggrwrapup:
+ if (group_indices != NULL) {
+ for(group_it = 0; group_it < group_count; group_it++) {
+ if (group_indices[group_it] != NULL) {
+ GDKfree(group_indices[group_it]);
+ }
+ }
+ GDKfree(group_indices);
+ }
+ if (group_current_index != NULL) {
+ GDKfree(group_current_index);
+ }
+ if (group_counts != NULL) {
+ GDKfree(group_counts);
+ }
+ if (split_bats != NULL) {
+ for(group_it = 0; group_it < group_count; group_it++) {
+ if (split_bats[group_it] != NULL) {
+ for(i = 0; i < named_columns; i++) {
+ if (split_bats[group_it][i] != NULL) {
+ GDKfree(split_bats[group_it][i]);
+ }
+ }
+ GDKfree(split_bats[group_it]);
+ }
+ }
+ GDKfree(split_bats);
+ }
+ if (aggr_group != NULL) {
+ BBPunfix(aggr_group->batCacheid);
+ }
+ if (msg != MAL_SUCCEED) {
+ goto wrapup;
+ }
+ } else {
+ // The function has been successfully created/compiled, all that
remains is to actually call the function
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list