lidavidm commented on code in PR #12590:
URL: https://github.com/apache/arrow/pull/12590#discussion_r847247140


##########
python/pyarrow/_compute.pyx:
##########
@@ -199,6 +204,81 @@ FunctionDoc = namedtuple(
      "options_required"))
 
 
+cdef wrap_input_type(const CInputType c_input_type):
+    """
+    Wrap a C++ InputType in an InputType object.
+    """
+    cdef InputType input_type = InputType.__new__(InputType)
+    input_type.init(c_input_type)
+    return input_type
+
+
+cdef class InputType(_Weakrefable):
+    """
+    An interface for defining input-types for streaming execution engine
+    applications. 

Review Comment:
   Let's try to keep the docstring in 80 characters
   
   ```suggestion
       An input type specification for a user-defined function. 
   ```



##########
cpp/src/arrow/python/udf.h:
##########
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/python/platform.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/registry.h"
+#include "arrow/datum.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/python/common.h"
+#include "arrow/python/pyarrow.h"
+#include "arrow/python/visibility.h"
+
+namespace arrow {
+
+namespace py {
+
+// Exposing the UDFOptions: https://issues.apache.org/jira/browse/ARROW-16041

Review Comment:
   nit, but outside of regression tests and TODOs I don't think we usually 
backlink to the original JIRA; a docstring would be preferable. And/or 
explicitly mark this `TODO(ARROW-16041): ...explanation...`



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2331,169 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input type name, InputType
+        objects which defines the input types for the function.
+        When defining a list of InputType for a varargs function,
+        the list only needs to contain the number of elements equal
+        to the num_args (which is the miniumu required arguments). 
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        User-defined-function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output should be an Array or a Scalar.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity* c_arity_ptr
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise ValueError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(function)
+    num_args = -1
+    if isinstance(in_types, dict):
+        for in_type in in_types.values():
+            if isinstance(in_type, InputType):
+                in_tmp = (<InputType> in_type).input_type
+                c_in_types.push_back(in_tmp)
+            else:
+                raise ValueError("Expected an object of type InputType")
+        function_doc["arg_names"] = in_types.keys()
+        num_args = len(in_types)
+    else:
+        if num_args == -1:
+            raise ValueError(
+                "Input types must be an empty list or a List[InputType]")

Review Comment:
   ```suggestion
                   "in_types must be a list of InputType")
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2331,169 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input type name, InputType
+        objects which defines the input types for the function.
+        When defining a list of InputType for a varargs function,
+        the list only needs to contain the number of elements equal
+        to the num_args (which is the miniumu required arguments). 

Review Comment:
   there is no such parameter anymore
   
   ```suggestion
           The number of arguments specified here determines the
           function arity. 
   ```



##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -2934,3 +2964,16 @@ cdef extern from "arrow/util/byte_size.h" namespace 
"arrow::util" nogil:
     int64_t TotalBufferSize(const CChunkedArray& array)
     int64_t TotalBufferSize(const CRecordBatch& record_batch)
     int64_t TotalBufferSize(const CTable& table)
+
+cdef extern from "arrow/python/udf.h" namespace "arrow::py" nogil:
+    cdef cppclass CScalarUdfOptions" arrow::py::ScalarUdfOptions":
+
+        CScalarUdfOptions(c_string func_name, CArity arity, CFunctionDoc 
func_doc,
+                          vector[CInputType] in_types, COutputType out_type)
+
+    cdef cppclass CUdfBuilder" arrow::py::UdfBuilder":
+        CUdfBuilder()
+
+    cdef cppclass CScalarUdfBuilder" arrow::py::ScalarUdfBuilder"(CUdfBuilder):
+        CScalarUdfBuilder()
+        CStatus MakeFunction(PyObject* function, CScalarUdfOptions* options)

Review Comment:
   In C++ this should probably take the options by const-reference



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,439 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_scalar_function
+from pyarrow.compute import InputType
+
+
+unary_doc = {"summary": "add function",
+             "description": "test add function"}
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+
+binary_doc = {"summary": "y=mx",
+              "description": "find y from y = mx"}
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+
+ternary_doc = {"summary": "y=mx+c",
+               "description": "find y from y = mx + c"}
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+
+varargs_doc = {"summary": "z=ax+by+c",
+               "description": "find z from z = ax + by + c"
+               }
+
+
+def varargs_function(*args):
+    a, x, b, y, c = args
+    ax = pc.call_function("multiply", [a, x])
+    by = pc.call_function("multiply", [b, y])
+    ax_by = pc.call_function("add", [ax, by])
+    return pc.call_function("add", [ax_by, c])
+
+
+def test_scalar_udf_function_with_scalar_valued_functions():
+    function_names = [
+        "scalar_y=x+k",
+        "scalar_y=mx",
+        "scalar_y=mx+c",
+        "scalar_z=ax+by+c",
+    ]
+
+    function_input_types = [
+        {
+            "scalar": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+            "scalar4": InputType.scalar(pa.int64()),
+            "scalar5": InputType.scalar(pa.int64()),
+        },
+    ]
+
+    function_output_types = [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+    function_docs = [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+    functions = [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+    function_inputs = [
+        [
+            pa.scalar(10, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        [
+            pa.scalar(2, pa.int64()),
+            pa.scalar(10, pa.int64()),
+            pa.scalar(3, pa.int64()),
+            pa.scalar(20, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+    ]
+
+    expected_outputs = [
+        pa.scalar(11, pa.int64()),  # 10 + 1
+        pa.scalar(20, pa.int64()),  # 10 * 2
+        pa.scalar(25, pa.int64()),  # 10 * 2 + 5
+        pa.scalar(85, pa.int64()),  # (2 * 10) + (3 * 20) + 5
+    ]

Review Comment:
   Why not just compare the result with the result of manually calling the 
Python function?



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,439 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_scalar_function
+from pyarrow.compute import InputType
+
+
+unary_doc = {"summary": "add function",
+             "description": "test add function"}
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+
+binary_doc = {"summary": "y=mx",
+              "description": "find y from y = mx"}
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+
+ternary_doc = {"summary": "y=mx+c",
+               "description": "find y from y = mx + c"}
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+
+varargs_doc = {"summary": "z=ax+by+c",
+               "description": "find z from z = ax + by + c"
+               }
+
+
+def varargs_function(*args):
+    a, x, b, y, c = args
+    ax = pc.call_function("multiply", [a, x])
+    by = pc.call_function("multiply", [b, y])
+    ax_by = pc.call_function("add", [ax, by])
+    return pc.call_function("add", [ax_by, c])
+
+
+def test_scalar_udf_function_with_scalar_valued_functions():
+    function_names = [
+        "scalar_y=x+k",
+        "scalar_y=mx",
+        "scalar_y=mx+c",
+        "scalar_z=ax+by+c",
+    ]
+
+    function_input_types = [
+        {
+            "scalar": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+        },
+        {
+            "scalar1": InputType.scalar(pa.int64()),
+            "scalar2": InputType.scalar(pa.int64()),
+            "scalar3": InputType.scalar(pa.int64()),
+            "scalar4": InputType.scalar(pa.int64()),
+            "scalar5": InputType.scalar(pa.int64()),
+        },
+    ]
+
+    function_output_types = [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+    function_docs = [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+    functions = [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+    function_inputs = [
+        [
+            pa.scalar(10, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        [
+            pa.scalar(2, pa.int64()),
+            pa.scalar(10, pa.int64()),
+            pa.scalar(3, pa.int64()),
+            pa.scalar(20, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+    ]
+
+    expected_outputs = [
+        pa.scalar(11, pa.int64()),  # 10 + 1
+        pa.scalar(20, pa.int64()),  # 10 * 2
+        pa.scalar(25, pa.int64()),  # 10 * 2 + 5
+        pa.scalar(85, pa.int64()),  # (2 * 10) + (3 * 20) + 5
+    ]
+
+    for name, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input, \
+        expected_output in zip(function_names,
+                               function_input_types,
+                               function_output_types,
+                               function_docs,
+                               functions,
+                               function_inputs,
+                               expected_outputs):
+
+        register_scalar_function(
+            name, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_scalar_udf_with_array_data_functions():
+    function_names = [
+        "array_y=x+k",
+        "array_y=mx",
+        "array_y=mx+c",
+        "array_z=ax+by+c"
+    ]
+
+    function_input_types = [
+        {
+            "array": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+            "array3": InputType.array(pa.int64()),
+        },
+        {
+            "array1": InputType.array(pa.int64()),
+            "array2": InputType.array(pa.int64()),
+            "array3": InputType.array(pa.int64()),
+            "array4": InputType.array(pa.int64()),
+            "array5": InputType.array(pa.int64()),
+        },
+    ]
+
+    function_output_types = [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+    function_docs = [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+    functions = [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+    function_inputs = [
+        [
+            pa.array([10, 20], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ],
+        [
+            pa.array([2, 3], pa.int64()),
+            pa.array([10, 20], pa.int64()),
+            pa.array([3, 7], pa.int64()),
+            pa.array([20, 30], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ]
+    ]
+
+    expected_outputs = [
+        pa.array([11, 21], pa.int64()),  # [10 + 1, 20 + 1]
+        pa.array([20, 80], pa.int64()),  # [10 * 2, 20 * 4]
+        pa.array([25, 90], pa.int64()),  # [(10 * 2) + 5, (20 * 4) + 10]
+        # [(2 * 10) + (3 * 20) + 5, (3 * 20) + (7 * 30) + 10]
+        pa.array([85, 280], pa.int64())
+    ]
+
+    for name, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input, \
+        expected_output in zip(function_names,
+                               function_input_types,
+                               function_output_types,
+                               function_docs,
+                               functions,
+                               function_inputs,
+                               expected_outputs):
+
+        register_scalar_function(
+            name, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_udf_input():
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate function name
+    doc = {
+        "summary": "test udf input",
+        "description": "parameters are validated"
+    }
+    in_types = {"scalar": InputType.scalar(pa.int64())}
+    out_type = pa.int64()
+    with pytest.raises(TypeError):
+        register_scalar_function(None, doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # validate function
+    with pytest.raises(ValueError, match="Object must be a callable"):
+        register_scalar_function("none_function", doc, in_types,
+                                 out_type, None)
+
+    # validate output type
+    with pytest.raises(ValueError, match="Output value type must be defined"):
+        register_scalar_function("output_function", doc, in_types,
+                                 None, unary_scalar_function)
+
+    # validate input type
+    expected_expr = r'Input types must be an'
+    with pytest.raises(ValueError, match=expected_expr):
+        register_scalar_function("input_function", doc, None,
+                                 out_type, unary_scalar_function)
+
+
+def test_varargs_function_validation():
+    def n_add(*values):
+        base_val = values[:2]
+        res = pc.call_function("add", base_val)
+        for other_val in values[2:]:
+            res = pc.call_function("add", [res, other_val])
+        return res
+
+    in_types = {"array1": InputType.array(pa.int64()),
+                "array2": InputType.array(pa.int64())
+                }
+    doc = {"summary": "n add function",
+           "description": "add N number of arrays"
+           }
+    register_scalar_function("n_add", doc,
+                             in_types, pa.int64(), n_add)
+
+    func = pc.get_function("n_add")
+
+    assert func.name == "n_add"
+    error_msg = "VarArgs function 'n_add' needs at least 2 arguments"
+    with pytest.raises(pa.lib.ArrowInvalid, match=error_msg):
+        pc.call_function("n_add", [pa.array([1, 10]),
+                                   ])
+
+
+def test_function_doc_validation():
+
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate arity
+    in_types = {"scalar": InputType.scalar(pa.int64())}
+    out_type = pa.int64()
+
+    # doc with no summary
+    func_doc = {
+        "description": "desc"
+    }
+
+    expected_expr = "Function doc must contain a summary"
+
+    with pytest.raises(ValueError, match=expected_expr):
+        register_scalar_function("no_summary", func_doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # doc with no decription
+    func_doc = {
+        "summary": "test summary"
+    }
+
+    expected_expr = "Function doc must contain a description"
+
+    with pytest.raises(ValueError, match=expected_expr):
+        register_scalar_function("no_desc", func_doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # doc with empty dictionary
+    func_doc = {}
+    expected_expr = r"Function doc must contain a summary,"
+    with pytest.raises(ValueError, match=expected_expr):
+        register_scalar_function("empty_dictionary", func_doc, in_types,
+                                 out_type, unary_scalar_function)
+
+
+def test_non_uniform_input_udfs():
+
+    def unary_scalar_function(scalar1, array1, scalar2):
+        coeff = pc.call_function("add", [scalar1, scalar2])
+        return pc.call_function("multiply", [coeff, array1])
+
+    in_types = {"scalar1": InputType.scalar(pa.int64()),
+                "scalar2": InputType.array(pa.int64()),
+                "scalar3": InputType.scalar(pa.int64()),
+                }
+    func_doc = {
+        "summary": "multi type udf",
+        "description": "desc"
+    }
+    register_scalar_function("multi_type_udf", func_doc,
+                             in_types,
+                             pa.int64(), unary_scalar_function)
+
+    res = pc.call_function("multi_type_udf",
+                           [pa.scalar(10), pa.array([1, 2, 3]), pa.scalar(20)])
+
+    assert pc.sum(pc.equal(res, pa.array([30, 60, 90]))).as_py() == 3

Review Comment:
   Just `assert res == pa.array(...)`



##########
python/pyarrow/_compute.pyx:
##########
@@ -199,6 +204,81 @@ FunctionDoc = namedtuple(
      "options_required"))
 
 
+cdef wrap_input_type(const CInputType c_input_type):
+    """
+    Wrap a C++ InputType in an InputType object.
+    """
+    cdef InputType input_type = InputType.__new__(InputType)
+    input_type.init(c_input_type)
+    return input_type
+
+
+cdef class InputType(_Weakrefable):
+    """
+    An interface for defining input-types for streaming execution engine
+    applications. 
+    """
+
+    def __init__(self):
+        raise TypeError("Do not call {}'s constructor directly"
+                        .format(self.__class__.__name__))
+
+    cdef void init(self, const CInputType &input_type):
+        self.input_type = input_type
+
+    @staticmethod
+    def scalar(data_type):
+        """
+        Create a scalar input type of the given data type.
+        Input data to a function is defined with a InputType
+        of Scalar. Here the accepted argument to the function
+        must be of type defined as `data_type` and it must be
+        a scalar value.  
+
+        Parameter
+        ---------
+        data_type : DataType
+
+        Examples
+        --------
+
+        >>> import pyarrow as pa
+        >>> from pyarrow.compute import InputType
+        >>> in_type = InputType.scalar(pa.int32())
+        <pyarrow._compute.InputType object at 0x1029fdcb0>

Review Comment:
   Can we add a suitable `__repr__`? Like `Scalar[int32]` or `Array[int64]`



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2331,169 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input type name, InputType
+        objects which defines the input types for the function.

Review Comment:
   ```suggestion
           objects which defines the arguments to the function.
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -23,14 +23,19 @@ from cpython.object cimport Py_LT, Py_EQ, Py_GT, Py_LE, 
Py_NE, Py_GE
 from cython.operator cimport dereference as deref
 
 from collections import namedtuple
+import inspect
 
 from pyarrow.lib import frombytes, tobytes, ordered_dict
 from pyarrow.lib cimport *
 from pyarrow.includes.libarrow cimport *
 import pyarrow.lib as lib
 
+from libcpp cimport bool as c_bool
+
 import numpy as np
 
+from typing import Dict

Review Comment:
   nit: sort imports consistently; keep the stdlib modules in one block



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2331,169 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.

Review Comment:
   ```suggestion
       Register a user-defined scalar function.
   ```
   
   And explain what a scalar function is.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status ExecuteFunction(const compute::ExecBatch& batch, PyObject* function,
+                       const compute::OutputType& exp_out_type, Datum* out) {
+  int num_args = static_cast<int64_t>(batch.values.size());
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  // wrap exec_batch objects into Python objects based on the datum type
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    switch (batch[arg_id].kind()) {
+      case Datum::SCALAR: {
+        auto c_data = batch[arg_id].scalar();
+        PyObject* data = wrap_scalar(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      case Datum::ARRAY: {
+        auto c_data = batch[arg_id].make_array();
+        PyObject* data = wrap_array(c_data);
+        PyTuple_SetItem(arg_tuple, arg_id, data);
+        break;
+      }
+      default:
+        return Status::NotImplemented(
+            "User-defined-functions are not supported to the datum kind ",

Review Comment:
   ```suggestion
               "User-defined functions are not supported for the datum kind ",
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2331,169 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input type name, InputType
+        objects which defines the input types for the function.
+        When defining a list of InputType for a varargs function,
+        the list only needs to contain the number of elements equal
+        to the num_args (which is the miniumu required arguments). 
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        User-defined-function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output should be an Array or a Scalar.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity* c_arity_ptr
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise ValueError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(function)
+    num_args = -1
+    if isinstance(in_types, dict):
+        for in_type in in_types.values():
+            if isinstance(in_type, InputType):
+                in_tmp = (<InputType> in_type).input_type
+                c_in_types.push_back(in_tmp)
+            else:
+                raise ValueError("Expected an object of type InputType")
+        function_doc["arg_names"] = in_types.keys()
+        num_args = len(in_types)
+    else:
+        if num_args == -1:
+            raise ValueError(
+                "Input types must be an empty list or a List[InputType]")
+
+    if func_spec.varargs:
+        if num_args < 0:

Review Comment:
   We can't get to this branch in the first place.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2331,169 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input type name, InputType
+        objects which defines the input types for the function.
+        When defining a list of InputType for a varargs function,
+        the list only needs to contain the number of elements equal
+        to the num_args (which is the miniumu required arguments). 
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        User-defined-function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output should be an Array or a Scalar.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity* c_arity_ptr
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise ValueError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(function)
+    num_args = -1
+    if isinstance(in_types, dict):
+        for in_type in in_types.values():
+            if isinstance(in_type, InputType):
+                in_tmp = (<InputType> in_type).input_type
+                c_in_types.push_back(in_tmp)
+            else:
+                raise ValueError("Expected an object of type InputType")

Review Comment:
   ```suggestion
                   raise TypeError(f"in_types must be a list of InputType, 
found {type(in_type)}")
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -199,6 +204,81 @@ FunctionDoc = namedtuple(
      "options_required"))
 
 
+cdef wrap_input_type(const CInputType c_input_type):
+    """
+    Wrap a C++ InputType in an InputType object.
+    """
+    cdef InputType input_type = InputType.__new__(InputType)
+    input_type.init(c_input_type)
+    return input_type
+
+
+cdef class InputType(_Weakrefable):
+    """
+    An interface for defining input-types for streaming execution engine
+    applications. 
+    """
+
+    def __init__(self):
+        raise TypeError("Do not call {}'s constructor directly"
+                        .format(self.__class__.__name__))
+
+    cdef void init(self, const CInputType &input_type):
+        self.input_type = input_type
+
+    @staticmethod
+    def scalar(data_type):
+        """
+        Create a scalar input type of the given data type.
+        Input data to a function is defined with a InputType
+        of Scalar. Here the accepted argument to the function
+        must be of type defined as `data_type` and it must be
+        a scalar value.  

Review Comment:
   ```suggestion
           Create a scalar input type of the given data type.
           
           Arguments to a UDF have both a data type and a shape,
           either array or scalar. A scalar InputType means that
           this argument must be passed a Scalar.
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2331,169 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input type name, InputType
+        objects which defines the input types for the function.
+        When defining a list of InputType for a varargs function,
+        the list only needs to contain the number of elements equal
+        to the num_args (which is the miniumu required arguments). 
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        User-defined-function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output should be an Array or a Scalar.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity* c_arity_ptr
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise ValueError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(function)
+    num_args = -1
+    if isinstance(in_types, dict):
+        for in_type in in_types.values():
+            if isinstance(in_type, InputType):
+                in_tmp = (<InputType> in_type).input_type
+                c_in_types.push_back(in_tmp)
+            else:
+                raise ValueError("Expected an object of type InputType")
+        function_doc["arg_names"] = in_types.keys()
+        num_args = len(in_types)
+    else:
+        if num_args == -1:
+            raise ValueError(
+                "Input types must be an empty list or a List[InputType]")
+
+    if func_spec.varargs:
+        if num_args < 0:
+            raise ValueError("Number of arguments must be >= 0")
+        c_arity = CArity.VarArgs(num_args)
+    else:
+        if num_args < 0:
+            raise ValueError("Number of arguments must be >= 0")
+        if num_args == 0:
+            c_arity = CArity.Nullary()
+        elif num_args == 1:
+            c_arity = CArity.Unary()
+        elif num_args == 2:
+            c_arity = CArity.Binary()
+        elif num_args == 3:
+            c_arity = CArity.Ternary()
+        elif num_args > 3:
+            c_arity_ptr = new CArity(num_args, False)
+            c_arity = deref(c_arity_ptr)
+
+    c_func_doc = _make_function_doc(function_doc)
+
+    if out_type:
+        c_type = pyarrow_unwrap_data_type(out_type)
+    else:
+        raise ValueError("Output value type must be defined")

Review Comment:
   ```suggestion
           raise TypeError(f"out_type must be a DataType, not {out_type!r}")
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2331,169 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input type name, InputType
+        objects which defines the input types for the function.
+        When defining a list of InputType for a varargs function,
+        the list only needs to contain the number of elements equal
+        to the num_args (which is the miniumu required arguments). 
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        User-defined-function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output should be an Array or a Scalar.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity* c_arity_ptr
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise ValueError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(function)
+    num_args = -1
+    if isinstance(in_types, dict):
+        for in_type in in_types.values():
+            if isinstance(in_type, InputType):
+                in_tmp = (<InputType> in_type).input_type
+                c_in_types.push_back(in_tmp)
+            else:
+                raise ValueError("Expected an object of type InputType")
+        function_doc["arg_names"] = in_types.keys()
+        num_args = len(in_types)
+    else:
+        if num_args == -1:
+            raise ValueError(
+                "Input types must be an empty list or a List[InputType]")
+
+    if func_spec.varargs:
+        if num_args < 0:
+            raise ValueError("Number of arguments must be >= 0")
+        c_arity = CArity.VarArgs(num_args)
+    else:
+        if num_args < 0:
+            raise ValueError("Number of arguments must be >= 0")
+        if num_args == 0:
+            c_arity = CArity.Nullary()
+        elif num_args == 1:
+            c_arity = CArity.Unary()
+        elif num_args == 2:
+            c_arity = CArity.Binary()
+        elif num_args == 3:
+            c_arity = CArity.Ternary()
+        elif num_args > 3:
+            c_arity_ptr = new CArity(num_args, False)

Review Comment:
   Frankly, there's no reason to call Nullary/Unary/etc above, they're just 
conveniences for this constructor; we only need this branch.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2331,169 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input type name, InputType
+        objects which defines the input types for the function.
+        When defining a list of InputType for a varargs function,
+        the list only needs to contain the number of elements equal
+        to the num_args (which is the miniumu required arguments). 
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        User-defined-function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output should be an Array or a Scalar.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity* c_arity_ptr
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise ValueError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(function)
+    num_args = -1
+    if isinstance(in_types, dict):
+        for in_type in in_types.values():
+            if isinstance(in_type, InputType):
+                in_tmp = (<InputType> in_type).input_type
+                c_in_types.push_back(in_tmp)
+            else:
+                raise ValueError("Expected an object of type InputType")
+        function_doc["arg_names"] = in_types.keys()
+        num_args = len(in_types)
+    else:
+        if num_args == -1:
+            raise ValueError(
+                "Input types must be an empty list or a List[InputType]")
+
+    if func_spec.varargs:
+        if num_args < 0:
+            raise ValueError("Number of arguments must be >= 0")
+        c_arity = CArity.VarArgs(num_args)
+    else:
+        if num_args < 0:

Review Comment:
   We can't get to this branch.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2331,169 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input type name, InputType
+        objects which defines the input types for the function.
+        When defining a list of InputType for a varargs function,
+        the list only needs to contain the number of elements equal
+        to the num_args (which is the miniumu required arguments). 
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        User-defined-function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output should be an Array or a Scalar.

Review Comment:
   ```suggestion
           A callable implementing the user-defined function.
           It must take arguments equal to the number of
           in_types defined. It must return an Array or Scalar
           matching the out_type. It must return a Scalar if
           all arguments are scalar, else it must return an array.
           
           To define a varargs function, pass a callable that takes
           varargs. The last in_type will be the type of the all
           varargs arguments.
   ```



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,439 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_scalar_function
+from pyarrow.compute import InputType
+
+
+unary_doc = {"summary": "add function",
+             "description": "test add function"}
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+
+binary_doc = {"summary": "y=mx",
+              "description": "find y from y = mx"}
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+
+ternary_doc = {"summary": "y=mx+c",
+               "description": "find y from y = mx + c"}
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+
+varargs_doc = {"summary": "z=ax+by+c",
+               "description": "find z from z = ax + by + c"
+               }
+
+
+def varargs_function(*args):
+    a, x, b, y, c = args
+    ax = pc.call_function("multiply", [a, x])
+    by = pc.call_function("multiply", [b, y])
+    ax_by = pc.call_function("add", [ax, by])
+    return pc.call_function("add", [ax_by, c])

Review Comment:
   nit, but this isn't really a varargs function, it's a 5-adic function.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2331,169 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input type name, InputType
+        objects which defines the input types for the function.
+        When defining a list of InputType for a varargs function,
+        the list only needs to contain the number of elements equal
+        to the num_args (which is the miniumu required arguments). 
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        User-defined-function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output should be an Array or a Scalar.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity* c_arity_ptr
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise ValueError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(function)
+    num_args = -1
+    if isinstance(in_types, dict):
+        for in_type in in_types.values():
+            if isinstance(in_type, InputType):
+                in_tmp = (<InputType> in_type).input_type
+                c_in_types.push_back(in_tmp)
+            else:
+                raise ValueError("Expected an object of type InputType")
+        function_doc["arg_names"] = in_types.keys()
+        num_args = len(in_types)
+    else:
+        if num_args == -1:
+            raise ValueError(
+                "Input types must be an empty list or a List[InputType]")
+
+    if func_spec.varargs:
+        if num_args < 0:
+            raise ValueError("Number of arguments must be >= 0")
+        c_arity = CArity.VarArgs(num_args)
+    else:
+        if num_args < 0:
+            raise ValueError("Number of arguments must be >= 0")
+        if num_args == 0:
+            c_arity = CArity.Nullary()
+        elif num_args == 1:
+            c_arity = CArity.Unary()
+        elif num_args == 2:
+            c_arity = CArity.Binary()
+        elif num_args == 3:
+            c_arity = CArity.Ternary()
+        elif num_args > 3:
+            c_arity_ptr = new CArity(num_args, False)
+            c_arity = deref(c_arity_ptr)
+
+    c_func_doc = _make_function_doc(function_doc)
+
+    if out_type:
+        c_type = pyarrow_unwrap_data_type(out_type)
+    else:
+        raise ValueError("Output value type must be defined")
+
+    c_out_type = new COutputType(c_type)
+    # Note: The VectorUDF, TableUDF and AggregatorUDFs will be defined
+    # when they are implemented. Only ScalarUDFBuilder is supported at the
+    # moment.
+    c_options = new CScalarUdfOptions(c_func_name, c_arity, c_func_doc,
+                                      c_in_types, deref(c_out_type))
+    c_sc_builder = new CScalarUdfBuilder()

Review Comment:
   `new` will cause memory leak, just stack-allocate.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArrayInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_array()) {
+      return Status::Invalid("Expected array input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyScalarInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_scalar()) {
+      return Status::Invalid("Expected scalar input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  if (!arity.is_varargs) {
+    bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+    if (!match) {
+      return Status::Invalid(
+          "Function Arity and Input data shape doesn't match, expected ", 
arity.num_args,
+          ", got ", batch.values.size());
+    }
+  } else {
+    bool match = static_cast<uint64_t>(arity.num_args) <= batch.values.size();
+    if (!match) {
+      return Status::Invalid("Required minimum number of arguments", 
arity.num_args,
+                             " in VarArgs function is not met.", ", Received ",
+                             batch.values.size());
+    }
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int64_t num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);

Review Comment:
   We aren't calling CheckPyError, we need to check if the Python code raised 
an exception before we do anything else.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2331,169 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input type name, InputType
+        objects which defines the input types for the function.
+        When defining a list of InputType for a varargs function,
+        the list only needs to contain the number of elements equal
+        to the num_args (which is the miniumu required arguments). 
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        User-defined-function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output should be an Array or a Scalar.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity* c_arity_ptr
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise ValueError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(function)
+    num_args = -1
+    if isinstance(in_types, dict):
+        for in_type in in_types.values():
+            if isinstance(in_type, InputType):
+                in_tmp = (<InputType> in_type).input_type
+                c_in_types.push_back(in_tmp)
+            else:
+                raise ValueError("Expected an object of type InputType")
+        function_doc["arg_names"] = in_types.keys()
+        num_args = len(in_types)
+    else:
+        if num_args == -1:
+            raise ValueError(

Review Comment:
   ```suggestion
               raise TypeError(
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -199,6 +204,81 @@ FunctionDoc = namedtuple(
      "options_required"))
 
 
+cdef wrap_input_type(const CInputType c_input_type):
+    """
+    Wrap a C++ InputType in an InputType object.
+    """
+    cdef InputType input_type = InputType.__new__(InputType)
+    input_type.init(c_input_type)
+    return input_type
+
+
+cdef class InputType(_Weakrefable):
+    """
+    An interface for defining input-types for streaming execution engine
+    applications. 
+    """
+
+    def __init__(self):
+        raise TypeError("Do not call {}'s constructor directly"
+                        .format(self.__class__.__name__))
+
+    cdef void init(self, const CInputType &input_type):
+        self.input_type = input_type
+
+    @staticmethod
+    def scalar(data_type):
+        """
+        Create a scalar input type of the given data type.
+        Input data to a function is defined with a InputType
+        of Scalar. Here the accepted argument to the function
+        must be of type defined as `data_type` and it must be
+        a scalar value.  

Review Comment:
   And ditto below.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArrayInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_array()) {
+      return Status::Invalid("Expected array input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyScalarInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_scalar()) {
+      return Status::Invalid("Expected scalar input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  if (!arity.is_varargs) {
+    bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+    if (!match) {
+      return Status::Invalid(
+          "Function Arity and Input data shape doesn't match, expected ", 
arity.num_args,
+          ", got ", batch.values.size());
+    }
+  } else {
+    bool match = static_cast<uint64_t>(arity.num_args) <= batch.values.size();
+    if (!match) {
+      return Status::Invalid("Required minimum number of arguments", 
arity.num_args,
+                             " in VarArgs function is not met.", ", Received ",
+                             batch.values.size());
+    }
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int64_t num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected a scalar");
+  }
+  if (!is_scalar(result)) {
+    return Status::Invalid("Output from function is not a scalar");
+  }
+  ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
+  *out = unwrapped_result;
+  return Status::OK();
+}
+
+Status ExecFunctionArray(const compute::ExecBatch& batch, PyObject* function,
+                         const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  if (!is_array(result)) {
+    return Status::Invalid("Output from function is not an array");
+  }
+  return unwrap_array(result).Value(out);
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* 
options) {
+  if (function == NULL) {
+    return Status::Invalid("python function cannot be null");
+  }
+  Py_INCREF(function);
+  function_.reset(function);
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), 
arity, doc);
+  auto func = function_.obj();
+  auto exec = [func, arity](compute::KernelContext* ctx, const 
compute::ExecBatch& batch,
+                            Datum* out) -> Status {
+    PyAcquireGIL lock;
+    RETURN_NOT_OK(VerifyArityAndInput(arity, batch));
+    if (VerifyArrayInput(batch).ok()) {  // checke 0-th element to select 
array callable
+      RETURN_NOT_OK(ExecFunctionArray(batch, func, arity, out));
+    } else if (VerifyScalarInput(batch)
+                   .ok()) {  // check 0-th element to select scalar callable
+      RETURN_NOT_OK(ExecFunctionScalar(batch, func, arity, out));
+    } else {
+      return Status::Invalid("Unexpected input type, scalar or array type 
expected.");
+    }
+    return Status::OK();
+  };
+
+  compute::ScalarKernel kernel(
+      compute::KernelSignature::Make(options->input_types(), 
options->output_type(),
+                                     arity.is_varargs),
+      exec);

Review Comment:
   As discussed in the PR, we can use something like 
TrivialScalarUnaryAsArraysExec, I don't see that here. That only works for 
unary functions, so we might need another helper, but I don't see that here 
either. Of course, we might want to punt that to a separate Jira, but please 
file one if so.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2331,169 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input type name, InputType
+        objects which defines the input types for the function.
+        When defining a list of InputType for a varargs function,
+        the list only needs to contain the number of elements equal
+        to the num_args (which is the miniumu required arguments). 
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        User-defined-function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output should be an Array or a Scalar.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity* c_arity_ptr
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise ValueError("Object must be a callable")

Review Comment:
   ```suggestion
           raise TypeError("Object must be a callable")
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2331,169 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    This function accepts a dictionary and expect the 
+    summary(str), description(str) and arg_names(List[str]) keys. 
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+
+    if len(func_doc) <= 1:
+        raise ValueError(
+            "Function doc must contain a summary, a description and arg_names")
+
+    if not "summary" in func_doc.keys():
+        raise ValueError("Function doc must contain a summary")
+
+    if not "description" in func_doc.keys():
+        raise ValueError("Function doc must contain a description")
+
+    if not "arg_names" in func_doc.keys():
+        raise ValueError("Function doc must contain arg_names")
+
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    f_doc.options_required = False
+    return f_doc
+
+
+def register_scalar_function(func_name, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        and "description" (str).
+    in_types : Dict[str, InputType]
+        Dictionary containing items with input type name, InputType
+        objects which defines the input types for the function.
+        When defining a list of InputType for a varargs function,
+        the list only needs to contain the number of elements equal
+        to the num_args (which is the miniumu required arguments). 
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        User-defined-function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output should be an Array or a Scalar.
+
+    Example
+    -------
+
+    >>> import pyarrow.compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity* c_arity_ptr
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise ValueError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(function)
+    num_args = -1
+    if isinstance(in_types, dict):
+        for in_type in in_types.values():
+            if isinstance(in_type, InputType):
+                in_tmp = (<InputType> in_type).input_type
+                c_in_types.push_back(in_tmp)
+            else:
+                raise ValueError("Expected an object of type InputType")
+        function_doc["arg_names"] = in_types.keys()
+        num_args = len(in_types)
+    else:
+        if num_args == -1:
+            raise ValueError(
+                "Input types must be an empty list or a List[InputType]")
+
+    if func_spec.varargs:
+        if num_args < 0:
+            raise ValueError("Number of arguments must be >= 0")
+        c_arity = CArity.VarArgs(num_args)
+    else:
+        if num_args < 0:
+            raise ValueError("Number of arguments must be >= 0")
+        if num_args == 0:
+            c_arity = CArity.Nullary()
+        elif num_args == 1:
+            c_arity = CArity.Unary()
+        elif num_args == 2:
+            c_arity = CArity.Binary()
+        elif num_args == 3:
+            c_arity = CArity.Ternary()
+        elif num_args > 3:
+            c_arity_ptr = new CArity(num_args, False)

Review Comment:
   We can then get rid of `new` and `c_arity_ptr`. Also this would have been a 
memory leak here anyways.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to