vibhatha commented on code in PR #12590:
URL: https://github.com/apache/arrow/pull/12590#discussion_r854114870


##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2339,208 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef class ScalarUdfContext:
+    """
+    A container to hold user-defined-function related
+    entities. `batch_length` and `MemoryPool` are important
+    entities in defining functions which require these details. 
+    """
+
+    def __init__(self):
+        raise TypeError("Do not call {}'s constructor directly"
+                        .format(self.__class__.__name__))
+
+    cdef void init(self, const CScalarUdfContext &c_context):
+        self.c_context = c_context
+
+    @property
+    def batch_length(self):
+        """
+        Returns the length of the batch associated with the
+        user-defined-function. Useful when the batch_length
+        is required to do computations specially when scalars
+        are parameters of the user-defined-function.
+
+        Returns
+        -------
+        batch_length: int64
+            The number of batches used when calling 
+            user-defined-function. 
+        """
+        return self.c_context.batch_length
+
+    @property
+    def memory_pool(self):
+        """
+        Returns the MemoryPool associated with the 
+        user-defined-function. An already initialized
+        MemoryPool can be used within the
+        user-defined-function. 
+
+        Returns
+        -------
+        memory_pool: MemoryPool
+            MemoryPool is obtained from the KernelContext
+            and passed to the ScalarUdfContext.
+        """
+        return box_memory_pool(self.c_context.pool)
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("")
+    f_doc.options_required = False
+    return f_doc
+
+cdef _scalar_udf_callback(user_function, const CScalarUdfContext& c_context, 
inputs):
+    """
+    Helper callback function used to wrap the ScalarUdfContext from Python to 
C++
+    execution.
+    """
+    cdef ScalarUdfContext context = ScalarUdfContext.__new__(ScalarUdfContext)
+    context.init(c_context)
+    return user_function(context, *inputs)
+
+
+def register_scalar_function(func, func_name, function_doc, in_types,
+                             out_type):
+    """
+    Register a user-defined scalar function. 
+
+    A scalar function is a function that executes elementwise
+    operations on arrays or scalars, and therefore whose results
+    generally do not depend on the order of the values in the
+    arguments. Accepts and returns arrays that are all of the
+    same size. These functions roughly correspond to the functions
+    used in SQL expressions.
+
+    Parameters
+    ----------
+    func : callable
+        A callable implementing the user-defined function.
+        It must take arguments equal to the number of
+        in_types defined. It must return an Array or Scalar
+        matching the out_type. It must return a Scalar if
+        all arguments are scalar, else it must return an Array.
+
+        To define a varargs function, pass a callable that takes
+        varargs. The last in_type will be the type of the all
+        varargs arguments.
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary mapping function argument names to
+        their respective InputType specifications.
+        The argument names will be used to generate
+        documentation for the function. The number of
+        arguments specified here determines the function
+        arity.
+    out_type : DataType
+        Output type of the function.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(ctx, array):
+    ...     return pc.add(array, 1)
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = {"array": pc.InputType.array(pa.int64())}
+    >>> out_type = pa.int64()
+    >>> pc.register_scalar_function(add_constant, func_name, func_doc,
+    ...                   in_types, out_type)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> answer = pc.call_function(func_name, [pa.array([20])])
+    >>> answer
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        shared_ptr[COutputType] c_out_type
+        CStatus st
+        shared_ptr[CScalarUdfOptions] c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(func):
+        c_function = <PyObject*>func
+    else:
+        raise TypeError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(func)
+    num_args = -1
+    if isinstance(in_types, dict):
+        for in_type in in_types.values():
+            if isinstance(in_type, InputType):
+                in_tmp = (<InputType> in_type).input_type
+                c_in_types.push_back(in_tmp)
+            else:
+                raise TypeError("in_types must be of type InputType")
+        function_doc["arg_names"] = in_types.keys()
+        num_args = len(in_types)
+    else:
+        raise TypeError(
+            "in_types must be a dictionary of InputType")
+
+    if func_spec.varargs:
+        c_arity = CArity.VarArgs(num_args)
+    else:
+        c_arity = CArity(num_args, False)
+
+    if not "summary" in function_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in function_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in function_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    c_func_doc = _make_function_doc(function_doc)

Review Comment:
   Ah yes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to