[ 
https://issues.apache.org/jira/browse/TAJO-1562?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14513465#comment-14513465
 ] 

ASF GitHub Bot commented on TAJO-1562:
--------------------------------------

Github user jihoonson commented on the pull request:

    https://github.com/apache/tajo/pull/551#issuecomment-96484878
  
    This patch is ready for review. Users can define their own Python UDAFs by 
defining a class containing some mandatory functions, i.e., reset(), eval(), 
merge(), get_partial_result(), and get_final_result(). Here is a typical 
example.
    
    ```
      class AvgPy:
        sum = 0
        cnt = 0
    
        def __init__(self):
            self.reset()
    
        def reset(self):
            self.sum = 0
            self.cnt = 0
    
        # eval at the first stage
        def eval(self, item):
            self.sum += item
            self.cnt += 1
    
        # get intermediate result
        def get_partial_result(self):
            return [self.sum, self.cnt]
    
        # merge intermediate results
        def merge(self, list):
            self.sum += list[0]
            self.cnt += list[1]
    
        # get final result
        @output_type('float8')
        def get_final_result(self):
            return self.sum / float(self.cnt)
    ```
    
    The additional restriction is that the return type of 
```get_partial_result()``` and the input type of ```merge()``` must be same.
    
    I'd like to give a brief explanation of how Python UDAFs are executed. 
During aggregation, Tajo currently keeps intermediate aggregation state per key 
in each implementation of aggregation operators. This approach also should be 
applied for Python functions to control resource usage and avoid a huge change. 
To do so, intermediate aggregation state of Python UDAF should be sent to Tajo 
workers whenever performing aggregation for different keys. However, I don't 
want for users to implement serialization/deserialization of intermediate 
aggregation state. 
    
    So, I simply used json for that. The snapshot of the instance of Python 
UDAF is sent to Tajo workers when necessary. 
    
    Let me consider an example. When a tuple (1, 10) is read and the 
aggregation key is 1, Tajo creates a PythonAggFunctionContext for the 
aggregation key and call AvgPy.eval() with an input 10. Here, the intermediate 
aggregation state, i.e. AvgPy.sum = 10 and AvgPy.cnt = 1, is kept in the 
instance of AvgPy instead of PythonAggFunctionContext. However, when a tuple 
(2, 5) is read as a next input, Tajo gets intermediate aggregation state from 
the Python controller, and stores it in PythonAggFunctionContext. After that, 
the instance of AvgPy is reset to compute aggregation for another key 2.
    
    Followings are some highlights of remaining implementation.
    * I've improved Python controller to support UDAF as well as UDF.
     * For UDAF execution, the Python controller executes some pre-defined 
functions (reset(), eval(), merge(), get_partial_result(), and 
get_final_result()). 
    * I've improved AggregationFunctionCallEval to support script functions via 
AggFunctionInvoke.
     * AggFunctionInvoke is an abstract class to represent how the function is 
executed. There are two subclasses, ClassBasedAggFunctionInvoke and 
PythonAggFunctionInvoke.
    
    Finally, I've added several test functions, but some of them are disabled. 
This is due to our rigid function syntax. Currently, function names are 
explicitly defined in our parser, and their types are identified during parsing 
SQLs. This architecture should be improved to support functions defined by 
users. 


> Python UDAF support
> -------------------
>
>                 Key: TAJO-1562
>                 URL: https://issues.apache.org/jira/browse/TAJO-1562
>             Project: Tajo
>          Issue Type: New Feature
>          Components: function/udf
>            Reporter: Jihoon Son
>            Assignee: Jihoon Son
>             Fix For: 0.11.0
>
>
> We need to support Python UDAF as well as UDF (TAJO-1344). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to