[
https://issues.apache.org/jira/browse/TAJO-1562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14513465#comment-14513465
]
ASF GitHub Bot commented on TAJO-1562:
--------------------------------------
Github user jihoonson commented on the pull request:
https://github.com/apache/tajo/pull/551#issuecomment-96484878
This patch is ready for review. Users can define their own Python UDAFs by
defining a class containing some mandatory functions, i.e., reset(), eval(),
merge(), get_partial_result(), and get_final_result(). Here is a typical
example.
```
class AvgPy:
sum = 0
cnt = 0
def __init__(self):
self.reset()
def reset(self):
self.sum = 0
self.cnt = 0
# eval at the first stage
def eval(self, item):
self.sum += item
self.cnt += 1
# get intermediate result
def get_partial_result(self):
return [self.sum, self.cnt]
# merge intermediate results
def merge(self, list):
self.sum += list[0]
self.cnt += list[1]
# get final result
@output_type('float8')
def get_final_result(self):
return self.sum / float(self.cnt)
```
The additional restriction is that the return type of
```get_partial_result()``` and the input type of ```merge()``` must be same.
I'd like to give a brief explanation of how Python UDAFs are executed.
During aggregation, Tajo currently keeps intermediate aggregation state per key
in each implementation of aggregation operators. This approach also should be
applied for Python functions to control resource usage and avoid a huge change.
To do so, intermediate aggregation state of Python UDAF should be sent to Tajo
workers whenever performing aggregation for different keys. However, I don't
want for users to implement serialization/deserialization of intermediate
aggregation state.
So, I simply used json for that. The snapshot of the instance of Python
UDAF is sent to Tajo workers when necessary.
Let me consider an example. When a tuple (1, 10) is read and the
aggregation key is 1, Tajo creates a PythonAggFunctionContext for the
aggregation key and call AvgPy.eval() with an input 10. Here, the intermediate
aggregation state, i.e. AvgPy.sum = 10 and AvgPy.cnt = 1, is kept in the
instance of AvgPy instead of PythonAggFunctionContext. However, when a tuple
(2, 5) is read as a next input, Tajo gets intermediate aggregation state from
the Python controller, and stores it in PythonAggFunctionContext. After that,
the instance of AvgPy is reset to compute aggregation for another key 2.
Followings are some highlights of remaining implementation.
* I've improved Python controller to support UDAF as well as UDF.
* For UDAF execution, the Python controller executes some pre-defined
functions (reset(), eval(), merge(), get_partial_result(), and
get_final_result()).
* I've improved AggregationFunctionCallEval to support script functions via
AggFunctionInvoke.
* AggFunctionInvoke is an abstract class to represent how the function is
executed. There are two subclasses, ClassBasedAggFunctionInvoke and
PythonAggFunctionInvoke.
Finally, I've added several test functions, but some of them are disabled.
This is due to our rigid function syntax. Currently, function names are
explicitly defined in our parser, and their types are identified during parsing
SQLs. This architecture should be improved to support functions defined by
users.
> Python UDAF support
> -------------------
>
> Key: TAJO-1562
> URL: https://issues.apache.org/jira/browse/TAJO-1562
> Project: Tajo
> Issue Type: New Feature
> Components: function/udf
> Reporter: Jihoon Son
> Assignee: Jihoon Son
> Fix For: 0.11.0
>
>
> We need to support Python UDAF as well as UDF (TAJO-1344).
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)