lidavidm commented on code in PR #12590:
URL: https://github.com/apache/arrow/pull/12590#discussion_r843876162


##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArrayInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_array()) {
+      return Status::Invalid("Expected array input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyScalarInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_scalar()) {
+      return Status::Invalid("Expected scalar input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  if (!arity.is_varargs) {
+    bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+    if (!match) {
+      return Status::Invalid(
+          "Function Arity and Input data shape doesn't match, expected ", 
arity.num_args,
+          ", got ", batch.values.size());
+    }
+  } else {
+    bool match = static_cast<uint64_t>(arity.num_args) <= batch.values.size();
+    if (!match) {
+      return Status::Invalid("Required minimum number of arguments", 
arity.num_args,
+                             " in VarArgs function is not met.", ", Received ",
+                             batch.values.size());
+    }
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int64_t num_args = arity.is_varargs ? 
static_cast<int64_t>(batch.values.size()) : arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected a scalar");
+  }
+  if (!is_scalar(result)) {
+    return Status::Invalid("Output from function is not a scalar");
+  }
+  ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
+  *out = unwrapped_result;
+  return Status::OK();
+}
+
+Status ExecFunctionArray(const compute::ExecBatch& batch, PyObject* function,
+                         const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int num_args = arity.is_varargs ? static_cast<int64_t>(batch.values.size()) 
: arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  if (!is_array(result)) {
+    return Status::Invalid("Output from function is not an array");
+  }
+  return unwrap_array(result).Value(out);
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* 
options) {
+  if (function == NULL) {
+    return Status::Invalid("python function cannot be null");

Review Comment:
   ```suggestion
       return Status::Invalid("Python function cannot be null");
   ```



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArrayInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_array()) {
+      return Status::Invalid("Expected array input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyScalarInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_scalar()) {
+      return Status::Invalid("Expected scalar input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  if (!arity.is_varargs) {
+    bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+    if (!match) {
+      return Status::Invalid(
+          "Function Arity and Input data shape doesn't match, expected ", 
arity.num_args,
+          ", got ", batch.values.size());
+    }
+  } else {
+    bool match = static_cast<uint64_t>(arity.num_args) <= batch.values.size();
+    if (!match) {
+      return Status::Invalid("Required minimum number of arguments", 
arity.num_args,
+                             " in VarArgs function is not met.", ", Received ",
+                             batch.values.size());
+    }
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int64_t num_args = arity.is_varargs ? 
static_cast<int64_t>(batch.values.size()) : arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected a scalar");
+  }
+  if (!is_scalar(result)) {
+    return Status::Invalid("Output from function is not a scalar");
+  }
+  ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
+  *out = unwrapped_result;
+  return Status::OK();
+}
+
+Status ExecFunctionArray(const compute::ExecBatch& batch, PyObject* function,
+                         const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int num_args = arity.is_varargs ? static_cast<int64_t>(batch.values.size()) 
: arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  if (!is_array(result)) {
+    return Status::Invalid("Output from function is not an array");
+  }
+  return unwrap_array(result).Value(out);
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* 
options) {
+  if (function == NULL) {

Review Comment:
   nit: use nullptr in C++ code



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArrayInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_array()) {
+      return Status::Invalid("Expected array input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyScalarInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_scalar()) {
+      return Status::Invalid("Expected scalar input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  if (!arity.is_varargs) {
+    bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+    if (!match) {
+      return Status::Invalid(
+          "Function Arity and Input data shape doesn't match, expected ", 
arity.num_args,
+          ", got ", batch.values.size());
+    }
+  } else {
+    bool match = static_cast<uint64_t>(arity.num_args) <= batch.values.size();
+    if (!match) {
+      return Status::Invalid("Required minimum number of arguments", 
arity.num_args,
+                             " in VarArgs function is not met.", ", Received ",
+                             batch.values.size());
+    }
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int64_t num_args = arity.is_varargs ? 
static_cast<int64_t>(batch.values.size()) : arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected a scalar");
+  }
+  if (!is_scalar(result)) {
+    return Status::Invalid("Output from function is not a scalar");
+  }
+  ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
+  *out = unwrapped_result;
+  return Status::OK();
+}
+
+Status ExecFunctionArray(const compute::ExecBatch& batch, PyObject* function,
+                         const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int num_args = arity.is_varargs ? static_cast<int64_t>(batch.values.size()) 
: arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  if (!is_array(result)) {
+    return Status::Invalid("Output from function is not an array");
+  }
+  return unwrap_array(result).Value(out);
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* 
options) {
+  if (function == NULL) {
+    return Status::Invalid("python function cannot be null");
+  }
+  Py_INCREF(function);
+  function_.reset(function);
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");

Review Comment:
   ```suggestion
       return Status::TypeError("Expected a callable Python object.");
   ```



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArrayInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_array()) {
+      return Status::Invalid("Expected array input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyScalarInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_scalar()) {
+      return Status::Invalid("Expected scalar input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  if (!arity.is_varargs) {
+    bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+    if (!match) {
+      return Status::Invalid(
+          "Function Arity and Input data shape doesn't match, expected ", 
arity.num_args,
+          ", got ", batch.values.size());
+    }
+  } else {
+    bool match = static_cast<uint64_t>(arity.num_args) <= batch.values.size();
+    if (!match) {
+      return Status::Invalid("Required minimum number of arguments", 
arity.num_args,
+                             " in VarArgs function is not met.", ", Received ",
+                             batch.values.size());
+    }
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int64_t num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected a scalar");
+  }
+  if (!is_scalar(result)) {
+    return Status::Invalid("Output from function is not a scalar");
+  }
+  ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
+  *out = unwrapped_result;
+  return Status::OK();
+}
+
+Status ExecFunctionArray(const compute::ExecBatch& batch, PyObject* function,
+                         const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  if (!is_array(result)) {
+    return Status::Invalid("Output from function is not an array");
+  }
+  return unwrap_array(result).Value(out);
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* 
options) {
+  if (function == NULL) {
+    return Status::Invalid("python function cannot be null");
+  }
+  Py_INCREF(function);
+  function_.reset(function);
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), 
arity, doc);
+  auto func = function_.obj();
+  auto exec = [func, arity](compute::KernelContext* ctx, const 
compute::ExecBatch& batch,
+                            Datum* out) -> Status {
+    PyAcquireGIL lock;
+    RETURN_NOT_OK(VerifyArityAndInput(arity, batch));
+    if (VerifyArrayInput(batch).ok()) {  // checke 0-th element to select 
array callable
+      RETURN_NOT_OK(ExecFunctionArray(batch, func, arity, out));

Review Comment:
   I think I stated this before, but ExecFunctionArray and ExecFunctionScalar 
are effectively the same. We're already validating everything here multiple 
times (which I also believe isn't necessary), so why not merge the two 
functions? That will also let us eventually register kernels that can handle 
mixed array and scalar inputs.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArrayInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_array()) {
+      return Status::Invalid("Expected array input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyScalarInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_scalar()) {
+      return Status::Invalid("Expected scalar input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  if (!arity.is_varargs) {
+    bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+    if (!match) {
+      return Status::Invalid(
+          "Function Arity and Input data shape doesn't match, expected ", 
arity.num_args,
+          ", got ", batch.values.size());
+    }
+  } else {
+    bool match = static_cast<uint64_t>(arity.num_args) <= batch.values.size();
+    if (!match) {
+      return Status::Invalid("Required minimum number of arguments", 
arity.num_args,
+                             " in VarArgs function is not met.", ", Received ",
+                             batch.values.size());
+    }
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int64_t num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;

Review Comment:
   The number of arguments should always be equal to the number of values in 
the batch. Again, the framework already validates all of this for you, we 
shouldn't be adding lots of redundant validation.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArrayInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_array()) {
+      return Status::Invalid("Expected array input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyScalarInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_scalar()) {
+      return Status::Invalid("Expected scalar input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  if (!arity.is_varargs) {
+    bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+    if (!match) {
+      return Status::Invalid(
+          "Function Arity and Input data shape doesn't match, expected ", 
arity.num_args,
+          ", got ", batch.values.size());
+    }
+  } else {
+    bool match = static_cast<uint64_t>(arity.num_args) <= batch.values.size();
+    if (!match) {
+      return Status::Invalid("Required minimum number of arguments", 
arity.num_args,
+                             " in VarArgs function is not met.", ", Received ",
+                             batch.values.size());
+    }
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int64_t num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected a scalar");
+  }
+  if (!is_scalar(result)) {
+    return Status::Invalid("Output from function is not a scalar");
+  }
+  ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
+  *out = unwrapped_result;
+  return Status::OK();
+}
+
+Status ExecFunctionArray(const compute::ExecBatch& batch, PyObject* function,
+                         const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  if (!is_array(result)) {
+    return Status::Invalid("Output from function is not an array");
+  }
+  return unwrap_array(result).Value(out);
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* 
options) {
+  if (function == NULL) {
+    return Status::Invalid("python function cannot be null");
+  }
+  Py_INCREF(function);
+  function_.reset(function);
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), 
arity, doc);
+  auto func = function_.obj();
+  auto exec = [func, arity](compute::KernelContext* ctx, const 
compute::ExecBatch& batch,
+                            Datum* out) -> Status {
+    PyAcquireGIL lock;
+    RETURN_NOT_OK(VerifyArityAndInput(arity, batch));
+    if (VerifyArrayInput(batch).ok()) {  // checke 0-th element to select 
array callable

Review Comment:
   Comment is wrong. Also, why not just return bool?



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArrayInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_array()) {
+      return Status::Invalid("Expected array input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyScalarInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_scalar()) {
+      return Status::Invalid("Expected scalar input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  if (!arity.is_varargs) {
+    bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+    if (!match) {
+      return Status::Invalid(
+          "Function Arity and Input data shape doesn't match, expected ", 
arity.num_args,
+          ", got ", batch.values.size());
+    }
+  } else {
+    bool match = static_cast<uint64_t>(arity.num_args) <= batch.values.size();
+    if (!match) {
+      return Status::Invalid("Required minimum number of arguments", 
arity.num_args,
+                             " in VarArgs function is not met.", ", Received ",
+                             batch.values.size());
+    }
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int64_t num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);

Review Comment:
   Shouldn't we check for an exception here? In fact, don't we have a wrapper 
utility specifically for Python callbacks



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2325,158 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+
+    validate_expr = "summary" in func_doc.keys(
+    ) and "description" in func_doc.keys() and "arg_names" in func_doc.keys()
+    if not validate_expr:
+        raise ValueError(
+            "function doc dictionary must contain, summary, arg_names and a 
description")

Review Comment:
   ```suggestion
               "Function doc must contain summary, arg_names, and description")
   ```



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArrayInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_array()) {
+      return Status::Invalid("Expected array input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyScalarInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_scalar()) {
+      return Status::Invalid("Expected scalar input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  if (!arity.is_varargs) {
+    bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+    if (!match) {
+      return Status::Invalid(
+          "Function Arity and Input data shape doesn't match, expected ", 
arity.num_args,
+          ", got ", batch.values.size());
+    }
+  } else {
+    bool match = static_cast<uint64_t>(arity.num_args) <= batch.values.size();
+    if (!match) {
+      return Status::Invalid("Required minimum number of arguments", 
arity.num_args,
+                             " in VarArgs function is not met.", ", Received ",
+                             batch.values.size());
+    }
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int64_t num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }

Review Comment:
   We can switch on `Datum::kind` and convert arguments to scalar or array as 
appropriate. That way one function can handle both cases and it will be able to 
handle mixed input shape in the future as well.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArrayInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_array()) {
+      return Status::Invalid("Expected array input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyScalarInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_scalar()) {
+      return Status::Invalid("Expected scalar input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  if (!arity.is_varargs) {
+    bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+    if (!match) {
+      return Status::Invalid(
+          "Function Arity and Input data shape doesn't match, expected ", 
arity.num_args,
+          ", got ", batch.values.size());
+    }
+  } else {
+    bool match = static_cast<uint64_t>(arity.num_args) <= batch.values.size();
+    if (!match) {
+      return Status::Invalid("Required minimum number of arguments", 
arity.num_args,
+                             " in VarArgs function is not met.", ", Received ",
+                             batch.values.size());
+    }
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int64_t num_args = arity.is_varargs ? 
static_cast<int64_t>(batch.values.size()) : arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected a scalar");
+  }
+  if (!is_scalar(result)) {
+    return Status::Invalid("Output from function is not a scalar");
+  }
+  ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
+  *out = unwrapped_result;
+  return Status::OK();
+}
+
+Status ExecFunctionArray(const compute::ExecBatch& batch, PyObject* function,
+                         const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int num_args = arity.is_varargs ? static_cast<int64_t>(batch.values.size()) 
: arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  if (!is_array(result)) {
+    return Status::Invalid("Output from function is not an array");
+  }
+  return unwrap_array(result).Value(out);
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* 
options) {
+  if (function == NULL) {
+    return Status::Invalid("python function cannot be null");
+  }
+  Py_INCREF(function);
+  function_.reset(function);
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), 
arity, doc);

Review Comment:
   nit, but either `options->doc()` should return `const FunctionDoc&`, or we 
should `std::move` doc here



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,414 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_scalar_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data

Review Comment:
   nit, but IMO these comments are not very useful, I would rather we just cut 
down on the blank lines to make it more readable



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2325,158 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+
+    validate_expr = "summary" in func_doc.keys(
+    ) and "description" in func_doc.keys() and "arg_names" in func_doc.keys()
+    if not validate_expr:
+        raise ValueError(
+            "function doc dictionary must contain, summary, arg_names and a 
description")
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    c_options_required = False
+    f_doc.options_required = c_options_required
+    return f_doc
+
+
+def register_scalar_function(func_name, num_args, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    num_args : int
+       Number of arguments in the function.
+       When defining a function with variable arguments, 
+       the num_args represents the minimum number of arguments
+       required. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        "description" (str), and "arg_names" (list of str).
+    in_types : List[InputType]
+        List of InputType objects which defines the input 
+        types for the function. When defining a list of InputType
+        for a varargs function, the list only needs to contain the
+        number of elements equal to the num_args (which is the miniumu
+        required arguments).
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        User-defined-function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output should be an Array, Scalar, ChunkedArray,
+        Table, or RecordBatch based on the out_type.
+
+    Example
+    -------
+
+    >>> from pyarrow import compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> func_doc["arg_names"] = ["x"]
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> arity = 1
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, arity, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise ValueError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(function)
+    if func_spec.varargs:
+        if num_args <= 0:
+            raise ValueError("number of arguments must be >= 0")
+        c_arity = CArity.VarArgs(num_args)
+    else:
+        if num_args <= 0:
+            raise ValueError("number of arguments must be >= 0")
+        if num_args == 0:
+            c_arity = CArity.Nullary()
+        elif num_args == 1:
+            c_arity = CArity.Unary()
+        elif num_args == 2:
+            c_arity = CArity.Binary()
+        elif num_args == 3:
+            c_arity = CArity.Ternary()
+
+    c_func_doc = _make_function_doc(function_doc)
+
+    if in_types and isinstance(in_types, list):
+        for in_type in in_types:
+            in_tmp = (<InputType> in_type).input_type
+            c_in_types.push_back(in_tmp)
+    else:
+        raise ValueError("input types must be of type InputType")

Review Comment:
   1\) this message is misleading, and 2) idiomatically you would just iterate 
over the types without checking the argument type. What if someone wants to 
pass a tuple, or a generator?



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,414 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_scalar_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data
+
+
+unary_doc = get_function_doc("add function",
+                             "test add function",
+                             ["scalar1"])
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+# scalar binary function data
+
+
+binary_doc = get_function_doc("y=mx",
+                              "find y from y = mx",
+                              ["m", "x"])
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+# scalar ternary function data
+
+
+ternary_doc = get_function_doc("y=mx+c",
+                               "find y from y = mx + c",
+                               ["m", "x", "c"])
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+# scalar varargs function data
+
+
+varargs_doc = get_function_doc("z=ax+by+c",
+                               "find z from z = ax + by + c",
+                               ["a", "x", "b", "y", "c"])
+
+
+def varargs_function(*args):
+    a, x, b, y, c = args
+    ax = pc.call_function("multiply", [a, x])
+    by = pc.call_function("multiply", [b, y])
+    ax_by = pc.call_function("add", [ax, by])
+    return pc.call_function("add", [ax_by, c])
+
+
+@pytest.fixture
+def function_input_types():
+    return [
+        # scalar data input types
+        [
+            InputType.scalar(pa.int64()),
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+        ],
+        # array data input types
+        [
+            InputType.array(pa.int64()),
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+        ],
+    ]
+
+
+@pytest.fixture
+def function_output_types():
+    return [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+
+@pytest.fixture
+def function_names():
+    return [
+        # scalar data function names
+        "scalar_y=x+k",
+        "scalar_y=mx",
+        "scalar_y=mx+c",
+        "scalar_z=ax+by+c",
+        # array data function names
+        "array_y=x+k",
+        "array_y=mx",
+        "array_y=mx+c",
+        "array_z=ax+by+c"
+    ]
+
+
+@pytest.fixture
+def function_arities():
+    return [
+        1,
+        2,
+        3,
+        5,
+    ]
+
+
+@pytest.fixture
+def function_docs():
+    return [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+
+@pytest.fixture
+def functions():
+    return [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+
+@pytest.fixture
+def function_inputs():
+    return [
+        # scalar input data
+        [
+            pa.scalar(10, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        [
+            pa.scalar(2, pa.int64()),
+            pa.scalar(10, pa.int64()),
+            pa.scalar(3, pa.int64()),
+            pa.scalar(20, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        # array input data
+        [
+            pa.array([10, 20], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ],
+        [
+            pa.array([2, 3], pa.int64()),
+            pa.array([10, 20], pa.int64()),
+            pa.array([3, 7], pa.int64()),
+            pa.array([20, 30], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ]
+    ]
+
+
+@pytest.fixture
+def expected_outputs():
+    return [
+        # scalar output data
+        pa.scalar(11, pa.int64()),  # 10 + 1
+        pa.scalar(20, pa.int64()),  # 10 * 2
+        pa.scalar(25, pa.int64()),  # 10 * 2 + 5
+        pa.scalar(85, pa.int64()),  # (2 * 10) + (3 * 20) + 5
+        # array output data
+        pa.array([11, 21], pa.int64()),  # [10 + 1, 20 + 1]
+        pa.array([20, 80], pa.int64()),  # [10 * 2, 20 * 4]
+        pa.array([25, 90], pa.int64()),  # [(10 * 2) + 5, (20 * 4) + 10]
+        # [(2 * 10) + (3 * 20) + 5, (3 * 20) + (7 * 30) + 10]
+        pa.array([85, 280], pa.int64())
+    ]
+
+
+def test_scalar_udf_function_with_scalar_data(function_names,
+                                              function_arities,
+                                              function_input_types,
+                                              function_output_types,
+                                              function_docs,
+                                              functions,
+                                              function_inputs,
+                                              expected_outputs):
+
+    # Note: 2 * -> used to duplicate the list
+    # Because the values are same irrespective of the type i.e scalar or array
+    for name, \
+        arity, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input, \
+        expected_output in zip(function_names,
+                               2 * function_arities,
+                               function_input_types,
+                               2 * function_output_types,
+                               2 * function_docs,
+                               2 * functions,
+                               function_inputs,
+                               expected_outputs):
+
+        register_scalar_function(
+            name, arity, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_udf_input():
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate arity
+    arity = -1
+    func_name = "py_scalar_add_func"
+    in_types = [InputType.scalar(pa.int64())]
+    out_type = pa.int64()
+    doc = get_function_doc("scalar add function", "scalar add function",
+                           ["scalar_value"])
+    with pytest.raises(ValueError):
+        register_scalar_function(func_name, arity, doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # validate function name
+    with pytest.raises(TypeError):
+        register_scalar_function(None, 1, doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # validate function not matching defined arity config
+    def invalid_function(array1, array2):
+        return pc.call_function("add", [array1, array2])
+
+    with pytest.raises(pa.lib.ArrowInvalid):
+        register_scalar_function("invalid_function", 1, doc, in_types,
+                                 out_type, invalid_function)
+        pc.call_function("invalid_function", [pa.array([10]), pa.array([20])],
+                         options=None, memory_pool=None)
+
+    # validate function
+    with pytest.raises(ValueError) as execinfo:
+        register_scalar_function("none_function", 1, doc, in_types,
+                                 out_type, None)
+        assert "callback must be a callable" == execinfo.value
+
+    # validate output type
+    with pytest.raises(ValueError) as execinfo:
+        register_scalar_function(func_name, 1, doc, in_types,
+                                 None, unary_scalar_function)
+        assert "Output value type must be defined" == execinfo.value
+
+    # validate input type
+    with pytest.raises(ValueError) as execinfo:
+        register_scalar_function(func_name, 1, doc, None,
+                                 out_type, unary_scalar_function)
+        assert "input types must be of type InputType" == execinfo.value
+
+
+def test_varargs_function_validation():
+    def n_add(*values):
+        base_val = values[:2]
+        res = pc.call_function("add", base_val)
+        for other_val in values[2:]:
+            res = pc.call_function("add", [res, other_val])
+        return res
+
+    func_name = "n_add"
+    arity = 2
+    in_types = [InputType.array(pa.int64()), InputType.array(pa.int64())]
+    out_type = pa.int64()
+    doc = get_function_doc("n add function", "add N number of arrays",
+                           ["value1", "value2"])
+    register_scalar_function(func_name, arity, doc,
+                             in_types, out_type, n_add)
+
+    func = pc.get_function(func_name)
+
+    assert func.name == func_name
+
+    with pytest.raises(pa.lib.ArrowInvalid) as execinfo:
+        pc.call_function(func_name, [pa.array([1, 10]),
+                                     ])
+        error_msg = "VarArgs function 'n_add' needs at least 2 arguments"
+        +" but attempted to look up kernel(s) with only 1"
+        assert error_msg == execinfo.value
+
+
+def test_function_doc_validation():
+
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate arity
+    arity = 1
+    in_types = [InputType.scalar(pa.int64())]
+    out_type = pa.int64()
+
+    # doc with no summary
+    func_doc = {}
+    func_doc["description"] = "desc"
+    func_doc["arg_names"] = ["scalar1"]
+    
+    with pytest.raises(ValueError) as execinfo:
+        register_scalar_function("no_summary", arity, func_doc, in_types,
+                                    out_type, unary_scalar_function)
+        expected_expr = "must contain summary, arg_names and a description"
+        assert expected_expr in execinfo.value
+        
+    # doc with no decription
+    func_doc = {}
+    func_doc["summary"] = "test summary"
+    func_doc["arg_names"] = ["scalar1"]
+    
+    with pytest.raises(ValueError) as execinfo:
+        register_scalar_function("no_desc", arity, func_doc, in_types,
+                                    out_type, unary_scalar_function)
+        expected_expr = "must contain summary, arg_names and a description"
+        assert expected_expr in execinfo.value
+        
+    # doc with no arg_names
+    func_doc = {}
+    func_doc["summary"] = "test summary"
+    func_doc["description"] = "some test func"

Review Comment:
   General note, but just use `{"summary": ...}`. This feels like C++ code that 
is being translated into Python instead of Python code.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2325,158 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+
+    validate_expr = "summary" in func_doc.keys(
+    ) and "description" in func_doc.keys() and "arg_names" in func_doc.keys()
+    if not validate_expr:
+        raise ValueError(
+            "function doc dictionary must contain, summary, arg_names and a 
description")
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    c_options_required = False
+    f_doc.options_required = c_options_required
+    return f_doc
+
+
+def register_scalar_function(func_name, num_args, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    num_args : int
+       Number of arguments in the function.
+       When defining a function with variable arguments, 

Review Comment:
   Do we even expose varargs? Isn't this redundant with `len(in_types)`?



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArrayInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_array()) {
+      return Status::Invalid("Expected array input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyScalarInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_scalar()) {
+      return Status::Invalid("Expected scalar input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  if (!arity.is_varargs) {
+    bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+    if (!match) {
+      return Status::Invalid(
+          "Function Arity and Input data shape doesn't match, expected ", 
arity.num_args,
+          ", got ", batch.values.size());
+    }
+  } else {
+    bool match = static_cast<uint64_t>(arity.num_args) <= batch.values.size();
+    if (!match) {
+      return Status::Invalid("Required minimum number of arguments", 
arity.num_args,
+                             " in VarArgs function is not met.", ", Received ",
+                             batch.values.size());
+    }
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int64_t num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected a scalar");
+  }
+  if (!is_scalar(result)) {
+    return Status::Invalid("Output from function is not a scalar");
+  }
+  ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
+  *out = unwrapped_result;
+  return Status::OK();
+}
+
+Status ExecFunctionArray(const compute::ExecBatch& batch, PyObject* function,
+                         const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  if (!is_array(result)) {
+    return Status::Invalid("Output from function is not an array");
+  }
+  return unwrap_array(result).Value(out);
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* 
options) {
+  if (function == NULL) {
+    return Status::Invalid("python function cannot be null");
+  }
+  Py_INCREF(function);
+  function_.reset(function);
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), 
arity, doc);
+  auto func = function_.obj();
+  auto exec = [func, arity](compute::KernelContext* ctx, const 
compute::ExecBatch& batch,
+                            Datum* out) -> Status {
+    PyAcquireGIL lock;
+    RETURN_NOT_OK(VerifyArityAndInput(arity, batch));
+    if (VerifyArrayInput(batch).ok()) {  // checke 0-th element to select 
array callable
+      RETURN_NOT_OK(ExecFunctionArray(batch, func, arity, out));
+    } else if (VerifyScalarInput(batch)
+                   .ok()) {  // check 0-th element to select scalar callable

Review Comment:
   Comment is wrong, and again, why not just return bool?



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2325,158 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+
+    validate_expr = "summary" in func_doc.keys(
+    ) and "description" in func_doc.keys() and "arg_names" in func_doc.keys()
+    if not validate_expr:
+        raise ValueError(
+            "function doc dictionary must contain, summary, arg_names and a 
description")
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    c_options_required = False
+    f_doc.options_required = c_options_required
+    return f_doc
+
+
+def register_scalar_function(func_name, num_args, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    num_args : int
+       Number of arguments in the function.
+       When defining a function with variable arguments, 

Review Comment:
   Ah. If we do want varargs, we can still infer this from `len(in_types)`.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArrayInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_array()) {
+      return Status::Invalid("Expected array input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyScalarInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_scalar()) {
+      return Status::Invalid("Expected scalar input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  if (!arity.is_varargs) {
+    bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+    if (!match) {
+      return Status::Invalid(
+          "Function Arity and Input data shape doesn't match, expected ", 
arity.num_args,
+          ", got ", batch.values.size());
+    }
+  } else {
+    bool match = static_cast<uint64_t>(arity.num_args) <= batch.values.size();
+    if (!match) {
+      return Status::Invalid("Required minimum number of arguments", 
arity.num_args,
+                             " in VarArgs function is not met.", ", Received ",
+                             batch.values.size());
+    }
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int64_t num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected a scalar");
+  }
+  if (!is_scalar(result)) {
+    return Status::Invalid("Output from function is not a scalar");
+  }
+  ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
+  *out = unwrapped_result;
+  return Status::OK();
+}
+
+Status ExecFunctionArray(const compute::ExecBatch& batch, PyObject* function,
+                         const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  if (!is_array(result)) {
+    return Status::Invalid("Output from function is not an array");
+  }
+  return unwrap_array(result).Value(out);
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* 
options) {
+  if (function == NULL) {
+    return Status::Invalid("python function cannot be null");
+  }
+  Py_INCREF(function);
+  function_.reset(function);
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), 
arity, doc);
+  auto func = function_.obj();
+  auto exec = [func, arity](compute::KernelContext* ctx, const 
compute::ExecBatch& batch,
+                            Datum* out) -> Status {
+    PyAcquireGIL lock;
+    RETURN_NOT_OK(VerifyArityAndInput(arity, batch));

Review Comment:
   Hmm, do we need to check this? The kernels framework does this for you.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2325,158 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+
+    validate_expr = "summary" in func_doc.keys(
+    ) and "description" in func_doc.keys() and "arg_names" in func_doc.keys()
+    if not validate_expr:
+        raise ValueError(
+            "function doc dictionary must contain, summary, arg_names and a 
description")
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    c_options_required = False
+    f_doc.options_required = c_options_required
+    return f_doc
+
+
+def register_scalar_function(func_name, num_args, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    num_args : int
+       Number of arguments in the function.
+       When defining a function with variable arguments, 
+       the num_args represents the minimum number of arguments
+       required. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        "description" (str), and "arg_names" (list of str).
+    in_types : List[InputType]
+        List of InputType objects which defines the input 
+        types for the function. When defining a list of InputType
+        for a varargs function, the list only needs to contain the
+        number of elements equal to the num_args (which is the miniumu
+        required arguments).
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        User-defined-function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output should be an Array, Scalar, ChunkedArray,
+        Table, or RecordBatch based on the out_type.

Review Comment:
   I think this was noted already, but do we expose ChunkedArray, Table, or 
RecordBatch output types?



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2325,158 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    """

Review Comment:
   nit, but this docstring doesn't tell me anything that isn't already in the 
signature



##########
python/pyarrow/_compute.pyx:
##########
@@ -199,6 +202,77 @@ FunctionDoc = namedtuple(
      "options_required"))
 
 
+cdef wrap_input_type(const CInputType c_input_type):
+    """
+    Wrap a C++ InputType in an InputType object.
+    """
+    cdef InputType input_type = InputType.__new__(InputType)
+    input_type.init(c_input_type)
+    return input_type
+
+
+cdef class InputType(_Weakrefable):
+    """
+    An interface for defining input-types for streaming execution engine
+    applications. 
+    """
+
+    def __init__(self):
+        raise TypeError("Do not call {}'s constructor directly"
+                        .format(self.__class__.__name__))
+
+    cdef void init(self, const CInputType &input_type):
+        self.input_type = input_type
+
+    @staticmethod
+    def scalar(data_type):
+        """
+        Create a scalar input type of the given data type.
+

Review Comment:
   We should elaborate on what this means here



##########
python/pyarrow/includes/libarrow.pxd:
##########
@@ -19,6 +19,8 @@
 
 from pyarrow.includes.common cimport *
 
+from cpython.ref cimport PyObject

Review Comment:
   It seems `object` will manipulate the reference count for you, so `PyObject` 
is fine here then



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2325,158 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+
+    validate_expr = "summary" in func_doc.keys(
+    ) and "description" in func_doc.keys() and "arg_names" in func_doc.keys()
+    if not validate_expr:

Review Comment:
   ```suggestion
       if not all(key in func_doc for key in ("summary", "description", 
"arg_names")):
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -199,6 +202,77 @@ FunctionDoc = namedtuple(
      "options_required"))
 
 
+cdef wrap_input_type(const CInputType c_input_type):
+    """
+    Wrap a C++ InputType in an InputType object.
+    """
+    cdef InputType input_type = InputType.__new__(InputType)
+    input_type.init(c_input_type)
+    return input_type
+
+
+cdef class InputType(_Weakrefable):
+    """
+    An interface for defining input-types for streaming execution engine
+    applications. 
+    """
+
+    def __init__(self):
+        raise TypeError("Do not call {}'s constructor directly"
+                        .format(self.__class__.__name__))
+
+    cdef void init(self, const CInputType &input_type):
+        self.input_type = input_type
+
+    @staticmethod
+    def scalar(data_type):
+        """
+        Create a scalar input type of the given data type.
+
+        Parameter
+        ---------
+        data_type : DataType
+
+        Examples
+        --------
+
+        >>> import pyarrow as pa
+        >>> from pyarrow.compute import InputType
+        >>> in_type = InputType.scalar(pa.int32())
+        <pyarrow._compute.InputType object at 0x1029fdcb0>
+        """
+        cdef:
+            shared_ptr[CDataType] c_data_type
+            CInputType c_input_type
+        c_data_type = pyarrow_unwrap_data_type(data_type)
+        c_input_type = CInputType.Scalar(c_data_type)
+        return wrap_input_type(c_input_type)
+
+    @staticmethod
+    def array(data_type):
+        """
+        Create an array input type of the given data type.
+
+        Parameter
+        ---------
+        data_type : DataType
+
+        Examples
+        --------
+
+        >>> import pyarrow as pa
+        >>> from pyarrow.compute import InputType
+        >>> in_type = InputType.array(pa.int32())
+        <pyarrow._compute.InputType object at 0x102ba4850>
+        """
+        cdef:
+            shared_ptr[CDataType] c_data_type
+            CInputType c_input_type
+        c_data_type = pyarrow_unwrap_data_type(data_type)
+        c_input_type = CInputType.Array(c_data_type)
+        return wrap_input_type(c_input_type)
+

Review Comment:
   We will eventually want the 'any' shape as well but probably not for now



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,414 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_scalar_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data
+
+
+unary_doc = get_function_doc("add function",
+                             "test add function",
+                             ["scalar1"])
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+# scalar binary function data
+
+
+binary_doc = get_function_doc("y=mx",
+                              "find y from y = mx",
+                              ["m", "x"])
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+# scalar ternary function data
+
+
+ternary_doc = get_function_doc("y=mx+c",
+                               "find y from y = mx + c",
+                               ["m", "x", "c"])
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+# scalar varargs function data
+
+
+varargs_doc = get_function_doc("z=ax+by+c",
+                               "find z from z = ax + by + c",
+                               ["a", "x", "b", "y", "c"])
+
+
+def varargs_function(*args):
+    a, x, b, y, c = args
+    ax = pc.call_function("multiply", [a, x])
+    by = pc.call_function("multiply", [b, y])
+    ax_by = pc.call_function("add", [ax, by])
+    return pc.call_function("add", [ax_by, c])
+
+
+@pytest.fixture
+def function_input_types():
+    return [
+        # scalar data input types
+        [
+            InputType.scalar(pa.int64()),
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+        ],
+        # array data input types
+        [
+            InputType.array(pa.int64()),
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+        ],
+    ]
+
+
+@pytest.fixture
+def function_output_types():
+    return [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+
+@pytest.fixture
+def function_names():
+    return [
+        # scalar data function names
+        "scalar_y=x+k",
+        "scalar_y=mx",
+        "scalar_y=mx+c",
+        "scalar_z=ax+by+c",
+        # array data function names
+        "array_y=x+k",
+        "array_y=mx",
+        "array_y=mx+c",
+        "array_z=ax+by+c"
+    ]
+
+
+@pytest.fixture
+def function_arities():
+    return [
+        1,
+        2,
+        3,
+        5,
+    ]
+
+
+@pytest.fixture
+def function_docs():
+    return [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+
+@pytest.fixture
+def functions():
+    return [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+
+@pytest.fixture
+def function_inputs():
+    return [
+        # scalar input data
+        [
+            pa.scalar(10, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        [
+            pa.scalar(2, pa.int64()),
+            pa.scalar(10, pa.int64()),
+            pa.scalar(3, pa.int64()),
+            pa.scalar(20, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        # array input data
+        [
+            pa.array([10, 20], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ],
+        [
+            pa.array([2, 3], pa.int64()),
+            pa.array([10, 20], pa.int64()),
+            pa.array([3, 7], pa.int64()),
+            pa.array([20, 30], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ]
+    ]
+
+
+@pytest.fixture
+def expected_outputs():
+    return [
+        # scalar output data
+        pa.scalar(11, pa.int64()),  # 10 + 1
+        pa.scalar(20, pa.int64()),  # 10 * 2
+        pa.scalar(25, pa.int64()),  # 10 * 2 + 5
+        pa.scalar(85, pa.int64()),  # (2 * 10) + (3 * 20) + 5
+        # array output data
+        pa.array([11, 21], pa.int64()),  # [10 + 1, 20 + 1]
+        pa.array([20, 80], pa.int64()),  # [10 * 2, 20 * 4]
+        pa.array([25, 90], pa.int64()),  # [(10 * 2) + 5, (20 * 4) + 10]
+        # [(2 * 10) + (3 * 20) + 5, (3 * 20) + (7 * 30) + 10]
+        pa.array([85, 280], pa.int64())
+    ]
+
+
+def test_scalar_udf_function_with_scalar_data(function_names,
+                                              function_arities,
+                                              function_input_types,
+                                              function_output_types,
+                                              function_docs,
+                                              functions,
+                                              function_inputs,
+                                              expected_outputs):
+
+    # Note: 2 * -> used to duplicate the list
+    # Because the values are same irrespective of the type i.e scalar or array
+    for name, \
+        arity, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input, \
+        expected_output in zip(function_names,
+                               2 * function_arities,
+                               function_input_types,
+                               2 * function_output_types,
+                               2 * function_docs,
+                               2 * functions,
+                               function_inputs,
+                               expected_outputs):
+
+        register_scalar_function(
+            name, arity, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_udf_input():
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate arity
+    arity = -1
+    func_name = "py_scalar_add_func"
+    in_types = [InputType.scalar(pa.int64())]
+    out_type = pa.int64()
+    doc = get_function_doc("scalar add function", "scalar add function",
+                           ["scalar_value"])
+    with pytest.raises(ValueError):
+        register_scalar_function(func_name, arity, doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # validate function name
+    with pytest.raises(TypeError):
+        register_scalar_function(None, 1, doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # validate function not matching defined arity config
+    def invalid_function(array1, array2):
+        return pc.call_function("add", [array1, array2])
+
+    with pytest.raises(pa.lib.ArrowInvalid):
+        register_scalar_function("invalid_function", 1, doc, in_types,
+                                 out_type, invalid_function)
+        pc.call_function("invalid_function", [pa.array([10]), pa.array([20])],
+                         options=None, memory_pool=None)
+
+    # validate function
+    with pytest.raises(ValueError) as execinfo:
+        register_scalar_function("none_function", 1, doc, in_types,
+                                 out_type, None)
+        assert "callback must be a callable" == execinfo.value
+
+    # validate output type
+    with pytest.raises(ValueError) as execinfo:
+        register_scalar_function(func_name, 1, doc, in_types,
+                                 None, unary_scalar_function)
+        assert "Output value type must be defined" == execinfo.value
+
+    # validate input type
+    with pytest.raises(ValueError) as execinfo:
+        register_scalar_function(func_name, 1, doc, None,
+                                 out_type, unary_scalar_function)
+        assert "input types must be of type InputType" == execinfo.value
+
+
+def test_varargs_function_validation():
+    def n_add(*values):
+        base_val = values[:2]
+        res = pc.call_function("add", base_val)
+        for other_val in values[2:]:
+            res = pc.call_function("add", [res, other_val])
+        return res
+
+    func_name = "n_add"
+    arity = 2
+    in_types = [InputType.array(pa.int64()), InputType.array(pa.int64())]
+    out_type = pa.int64()
+    doc = get_function_doc("n add function", "add N number of arrays",
+                           ["value1", "value2"])
+    register_scalar_function(func_name, arity, doc,
+                             in_types, out_type, n_add)
+
+    func = pc.get_function(func_name)
+
+    assert func.name == func_name
+
+    with pytest.raises(pa.lib.ArrowInvalid) as execinfo:
+        pc.call_function(func_name, [pa.array([1, 10]),
+                                     ])
+        error_msg = "VarArgs function 'n_add' needs at least 2 arguments"
+        +" but attempted to look up kernel(s) with only 1"
+        assert error_msg == execinfo.value
+
+
+def test_function_doc_validation():
+
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate arity
+    arity = 1
+    in_types = [InputType.scalar(pa.int64())]
+    out_type = pa.int64()
+
+    # doc with no summary
+    func_doc = {}
+    func_doc["description"] = "desc"
+    func_doc["arg_names"] = ["scalar1"]
+    
+    with pytest.raises(ValueError) as execinfo:
+        register_scalar_function("no_summary", arity, func_doc, in_types,
+                                    out_type, unary_scalar_function)
+        expected_expr = "must contain summary, arg_names and a description"
+        assert expected_expr in execinfo.value
+        
+    # doc with no decription
+    func_doc = {}
+    func_doc["summary"] = "test summary"
+    func_doc["arg_names"] = ["scalar1"]
+    
+    with pytest.raises(ValueError) as execinfo:
+        register_scalar_function("no_desc", arity, func_doc, in_types,
+                                    out_type, unary_scalar_function)
+        expected_expr = "must contain summary, arg_names and a description"
+        assert expected_expr in execinfo.value

Review Comment:
   If you want to validate the exception message, use the `match` argument to 
`pytest.raises`.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArrayInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_array()) {
+      return Status::Invalid("Expected array input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyScalarInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_scalar()) {
+      return Status::Invalid("Expected scalar input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  if (!arity.is_varargs) {
+    bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+    if (!match) {
+      return Status::Invalid(
+          "Function Arity and Input data shape doesn't match, expected ", 
arity.num_args,
+          ", got ", batch.values.size());
+    }
+  } else {
+    bool match = static_cast<uint64_t>(arity.num_args) <= batch.values.size();
+    if (!match) {
+      return Status::Invalid("Required minimum number of arguments", 
arity.num_args,
+                             " in VarArgs function is not met.", ", Received ",
+                             batch.values.size());
+    }
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int64_t num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();

Review Comment:
   Note that when we want to handle functions that take only scalars, we will 
want to pass through `batch.length` eventually or else information will be 
lost. We may want to account for that now to avoid breaking changes.



##########
cpp/src/arrow/python/udf.cc:
##########
@@ -0,0 +1,162 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status VerifyArrayInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_array()) {
+      return Status::Invalid("Expected array input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyScalarInput(const compute::ExecBatch& batch) {
+  for (auto value : batch.values) {
+    if (!value.is_scalar()) {
+      return Status::Invalid("Expected scalar input, but got ", value.type());
+    }
+  }
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  if (!arity.is_varargs) {
+    bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+    if (!match) {
+      return Status::Invalid(
+          "Function Arity and Input data shape doesn't match, expected ", 
arity.num_args,
+          ", got ", batch.values.size());
+    }
+  } else {
+    bool match = static_cast<uint64_t>(arity.num_args) <= batch.values.size();
+    if (!match) {
+      return Status::Invalid("Required minimum number of arguments", 
arity.num_args,
+                             " in VarArgs function is not met.", ", Received ",
+                             batch.values.size());
+    }
+  }
+  return Status::OK();
+}
+
+Status ExecFunctionScalar(const compute::ExecBatch& batch, PyObject* function,
+                          const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int64_t num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected a scalar");
+  }
+  if (!is_scalar(result)) {
+    return Status::Invalid("Output from function is not a scalar");
+  }
+  ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
+  *out = unwrapped_result;
+  return Status::OK();
+}
+
+Status ExecFunctionArray(const compute::ExecBatch& batch, PyObject* function,
+                         const compute::Arity& arity, Datum* out) {
+  // num_args for arity varargs is arity.num_args, and for other arities,
+  // it is equal to the number of values in the batch
+  int num_args =
+      arity.is_varargs ? static_cast<int64_t>(batch.values.size()) : 
arity.num_args;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Output is null, but expected an array");
+  }
+  if (!is_array(result)) {
+    return Status::Invalid("Output from function is not an array");
+  }
+  return unwrap_array(result).Value(out);
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* 
options) {
+  if (function == NULL) {
+    return Status::Invalid("python function cannot be null");
+  }
+  Py_INCREF(function);
+  function_.reset(function);
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), 
arity, doc);
+  auto func = function_.obj();
+  auto exec = [func, arity](compute::KernelContext* ctx, const 
compute::ExecBatch& batch,
+                            Datum* out) -> Status {
+    PyAcquireGIL lock;
+    RETURN_NOT_OK(VerifyArityAndInput(arity, batch));
+    if (VerifyArrayInput(batch).ok()) {  // checke 0-th element to select 
array callable
+      RETURN_NOT_OK(ExecFunctionArray(batch, func, arity, out));
+    } else if (VerifyScalarInput(batch)
+                   .ok()) {  // check 0-th element to select scalar callable
+      RETURN_NOT_OK(ExecFunctionScalar(batch, func, arity, out));
+    } else {
+      return Status::Invalid("Unexpected input type, scalar or array type 
expected.");
+    }
+    return Status::OK();
+  };
+
+  compute::ScalarKernel kernel(
+      compute::KernelSignature::Make(options->input_types(), 
options->output_type(),
+                                     arity.is_varargs),
+      exec);

Review Comment:
   We could wrap `exec` here to allow a kernel implemented for arrays to handle 
scalars as well.



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2325,158 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+
+    validate_expr = "summary" in func_doc.keys(
+    ) and "description" in func_doc.keys() and "arg_names" in func_doc.keys()
+    if not validate_expr:
+        raise ValueError(
+            "function doc dictionary must contain, summary, arg_names and a 
description")
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    c_options_required = False
+    f_doc.options_required = c_options_required

Review Comment:
   I don't think Cython makes you bounce through a temp variable here? Just do 
`f_doc.options_required = False`



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2325,158 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+
+    validate_expr = "summary" in func_doc.keys(
+    ) and "description" in func_doc.keys() and "arg_names" in func_doc.keys()
+    if not validate_expr:
+        raise ValueError(
+            "function doc dictionary must contain, summary, arg_names and a 
description")
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    c_options_required = False
+    f_doc.options_required = c_options_required
+    return f_doc
+
+
+def register_scalar_function(func_name, num_args, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+

Review Comment:
   ```suggestion
   ```



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2325,158 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+
+    validate_expr = "summary" in func_doc.keys(
+    ) and "description" in func_doc.keys() and "arg_names" in func_doc.keys()
+    if not validate_expr:
+        raise ValueError(
+            "function doc dictionary must contain, summary, arg_names and a 
description")
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    c_options_required = False
+    f_doc.options_required = c_options_required
+    return f_doc
+
+
+def register_scalar_function(func_name, num_args, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    num_args : int
+       Number of arguments in the function.
+       When defining a function with variable arguments, 
+       the num_args represents the minimum number of arguments
+       required. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        "description" (str), and "arg_names" (list of str).
+    in_types : List[InputType]
+        List of InputType objects which defines the input 
+        types for the function. When defining a list of InputType
+        for a varargs function, the list only needs to contain the
+        number of elements equal to the num_args (which is the miniumu
+        required arguments).
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        User-defined-function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output should be an Array, Scalar, ChunkedArray,
+        Table, or RecordBatch based on the out_type.
+
+    Example
+    -------
+
+    >>> from pyarrow import compute as pc

Review Comment:
   'import pyarrow.compute as pc'



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2325,158 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+
+    validate_expr = "summary" in func_doc.keys(
+    ) and "description" in func_doc.keys() and "arg_names" in func_doc.keys()
+    if not validate_expr:
+        raise ValueError(
+            "function doc dictionary must contain, summary, arg_names and a 
description")
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    c_options_required = False
+    f_doc.options_required = c_options_required
+    return f_doc
+
+
+def register_scalar_function(func_name, num_args, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    num_args : int
+       Number of arguments in the function.
+       When defining a function with variable arguments, 

Review Comment:
   "with a variable number of arguments" or "when defining a variadic function"



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2325,158 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+
+    validate_expr = "summary" in func_doc.keys(
+    ) and "description" in func_doc.keys() and "arg_names" in func_doc.keys()
+    if not validate_expr:
+        raise ValueError(
+            "function doc dictionary must contain, summary, arg_names and a 
description")
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    c_options_required = False
+    f_doc.options_required = c_options_required
+    return f_doc
+
+
+def register_scalar_function(func_name, num_args, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    num_args : int
+       Number of arguments in the function.
+       When defining a function with variable arguments, 
+       the num_args represents the minimum number of arguments
+       required. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        "description" (str), and "arg_names" (list of str).

Review Comment:
   IMO, it would be slightly more idiomatic to have "in_types" be Dict[str, 
InputType] (relying on Python dictionaries being ordered), and possibly have 
separate "summary" and "description" arguments (though these two could still be 
wrapped in a dictionary).



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2325,158 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+
+    validate_expr = "summary" in func_doc.keys(
+    ) and "description" in func_doc.keys() and "arg_names" in func_doc.keys()
+    if not validate_expr:
+        raise ValueError(
+            "function doc dictionary must contain, summary, arg_names and a 
description")
+    f_doc.summary = tobytes(func_doc["summary"])
+    f_doc.description = tobytes(func_doc["description"])
+    for arg_name in func_doc["arg_names"]:
+        c_arg_names.push_back(tobytes(arg_name))
+    f_doc.arg_names = c_arg_names
+    # UDFOptions integration:
+    # TODO: https://issues.apache.org/jira/browse/ARROW-16041
+    f_doc.options_class = tobytes("None")
+    c_options_required = False
+    f_doc.options_required = c_options_required
+    return f_doc
+
+
+def register_scalar_function(func_name, num_args, function_doc, in_types,
+                             out_type, function):
+    """
+    Register a user-defined-function.
+
+    Parameters
+    ----------
+
+    func_name : str
+        Name of the function. This name must be globally unique. 
+    num_args : int
+       Number of arguments in the function.
+       When defining a function with variable arguments, 
+       the num_args represents the minimum number of arguments
+       required. 
+    function_doc : dict
+        A dictionary object with keys "summary" (str),
+        "description" (str), and "arg_names" (list of str).
+    in_types : List[InputType]
+        List of InputType objects which defines the input 
+        types for the function. When defining a list of InputType
+        for a varargs function, the list only needs to contain the
+        number of elements equal to the num_args (which is the miniumu
+        required arguments).
+    out_type : DataType
+        Output type of the function.
+    function : callable
+        User-defined-function
+        function includes arguments equal to the number
+        of input_types defined. The return type of the 
+        function is of the type defined as output_type. 
+        The output should be an Array, Scalar, ChunkedArray,
+        Table, or RecordBatch based on the out_type.
+
+    Example
+    -------
+
+    >>> from pyarrow import compute as pc
+    >>> from pyarrow.compute import register_scalar_function
+    >>> from pyarrow.compute import InputType
+    >>> 
+    >>> func_doc = {}
+    >>> func_doc["summary"] = "simple udf"
+    >>> func_doc["description"] = "add a constant to a scalar"
+    >>> func_doc["arg_names"] = ["x"]
+    >>> 
+    >>> def add_constant(array):
+    ...     return pc.call_function("add", [array, 1])
+    ... 
+    >>> 
+    >>> func_name = "py_add_func"
+    >>> arity = 1
+    >>> in_types = [InputType.array(pa.int64())]
+    >>> out_type = pa.int64()
+    >>> register_function(func_name, arity, func_doc,
+    ...                   in_types, out_type, add_constant)
+    >>> 
+    >>> func = pc.get_function(func_name)
+    >>> func.name
+    'py_add_func'
+    >>> ans = pc.call_function(func_name, [pa.array([20])])
+    >>> ans
+    <pyarrow.lib.Int64Array object at 0x10c22e700>
+    [
+    21
+    ]
+    """
+    cdef:
+        c_string c_func_name
+        CArity c_arity
+        CFunctionDoc c_func_doc
+        CInputType in_tmp
+        vector[CInputType] c_in_types
+        PyObject* c_function
+        shared_ptr[CDataType] c_type
+        COutputType* c_out_type
+        CScalarUdfBuilder* c_sc_builder
+        CStatus st
+        CScalarUdfOptions* c_options
+
+    c_func_name = tobytes(func_name)
+
+    if callable(function):
+        c_function = <PyObject*>function
+    else:
+        raise ValueError("Object must be a callable")
+
+    func_spec = inspect.getfullargspec(function)
+    if func_spec.varargs:
+        if num_args <= 0:
+            raise ValueError("number of arguments must be >= 0")
+        c_arity = CArity.VarArgs(num_args)
+    else:
+        if num_args <= 0:
+            raise ValueError("number of arguments must be >= 0")
+        if num_args == 0:
+            c_arity = CArity.Nullary()
+        elif num_args == 1:
+            c_arity = CArity.Unary()
+        elif num_args == 2:
+            c_arity = CArity.Binary()
+        elif num_args == 3:
+            c_arity = CArity.Ternary()

Review Comment:
   what if we have more arguments?



##########
python/pyarrow/_compute.pyx:
##########
@@ -2251,3 +2325,158 @@ cdef CExpression _bind(Expression filter, Schema 
schema) except *:
 
     return GetResultValue(filter.unwrap().Bind(
         deref(pyarrow_unwrap_schema(schema).get())))
+
+
+cdef CFunctionDoc _make_function_doc(dict func_doc) except *:
+    """
+    Helper function to generate the FunctionDoc
+    """
+    cdef:
+        CFunctionDoc f_doc
+        vector[c_string] c_arg_names
+        c_bool c_options_required
+
+    validate_expr = "summary" in func_doc.keys(
+    ) and "description" in func_doc.keys() and "arg_names" in func_doc.keys()
+    if not validate_expr:

Review Comment:
   Though, IMO, it's a little more natural to do
   
   ```
   if "summary" not in func_doc:
       raise ValueError("Function doc must contain summary")
   # repeat for other keys
   ```



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,414 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_scalar_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data
+
+
+unary_doc = get_function_doc("add function",
+                             "test add function",
+                             ["scalar1"])
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+# scalar binary function data
+
+
+binary_doc = get_function_doc("y=mx",
+                              "find y from y = mx",
+                              ["m", "x"])
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+# scalar ternary function data
+
+
+ternary_doc = get_function_doc("y=mx+c",
+                               "find y from y = mx + c",
+                               ["m", "x", "c"])
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+# scalar varargs function data
+
+
+varargs_doc = get_function_doc("z=ax+by+c",
+                               "find z from z = ax + by + c",
+                               ["a", "x", "b", "y", "c"])
+
+
+def varargs_function(*args):
+    a, x, b, y, c = args
+    ax = pc.call_function("multiply", [a, x])
+    by = pc.call_function("multiply", [b, y])
+    ax_by = pc.call_function("add", [ax, by])
+    return pc.call_function("add", [ax_by, c])
+
+
+@pytest.fixture
+def function_input_types():
+    return [
+        # scalar data input types
+        [
+            InputType.scalar(pa.int64()),
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+        ],
+        # array data input types
+        [
+            InputType.array(pa.int64()),
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+        ],
+    ]
+
+
+@pytest.fixture
+def function_output_types():
+    return [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+    ]
+
+
+@pytest.fixture
+def function_names():
+    return [
+        # scalar data function names
+        "scalar_y=x+k",
+        "scalar_y=mx",
+        "scalar_y=mx+c",
+        "scalar_z=ax+by+c",
+        # array data function names
+        "array_y=x+k",
+        "array_y=mx",
+        "array_y=mx+c",
+        "array_z=ax+by+c"
+    ]
+
+
+@pytest.fixture
+def function_arities():
+    return [
+        1,
+        2,
+        3,
+        5,
+    ]
+
+
+@pytest.fixture
+def function_docs():
+    return [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+
+@pytest.fixture
+def functions():
+    return [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+
+@pytest.fixture
+def function_inputs():
+    return [
+        # scalar input data
+        [
+            pa.scalar(10, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        [
+            pa.scalar(2, pa.int64()),
+            pa.scalar(10, pa.int64()),
+            pa.scalar(3, pa.int64()),
+            pa.scalar(20, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        # array input data
+        [
+            pa.array([10, 20], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ],
+        [
+            pa.array([2, 3], pa.int64()),
+            pa.array([10, 20], pa.int64()),
+            pa.array([3, 7], pa.int64()),
+            pa.array([20, 30], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ]
+    ]
+
+
+@pytest.fixture
+def expected_outputs():
+    return [
+        # scalar output data
+        pa.scalar(11, pa.int64()),  # 10 + 1
+        pa.scalar(20, pa.int64()),  # 10 * 2
+        pa.scalar(25, pa.int64()),  # 10 * 2 + 5
+        pa.scalar(85, pa.int64()),  # (2 * 10) + (3 * 20) + 5
+        # array output data
+        pa.array([11, 21], pa.int64()),  # [10 + 1, 20 + 1]
+        pa.array([20, 80], pa.int64()),  # [10 * 2, 20 * 4]
+        pa.array([25, 90], pa.int64()),  # [(10 * 2) + 5, (20 * 4) + 10]
+        # [(2 * 10) + (3 * 20) + 5, (3 * 20) + (7 * 30) + 10]
+        pa.array([85, 280], pa.int64())
+    ]
+
+
+def test_scalar_udf_function_with_scalar_data(function_names,
+                                              function_arities,
+                                              function_input_types,
+                                              function_output_types,
+                                              function_docs,
+                                              functions,
+                                              function_inputs,
+                                              expected_outputs):
+
+    # Note: 2 * -> used to duplicate the list
+    # Because the values are same irrespective of the type i.e scalar or array
+    for name, \
+        arity, \
+        in_types, \
+        out_type, \
+        doc, \
+        function, \
+        input, \
+        expected_output in zip(function_names,
+                               2 * function_arities,
+                               function_input_types,
+                               2 * function_output_types,
+                               2 * function_docs,
+                               2 * functions,
+                               function_inputs,
+                               expected_outputs):
+
+        register_scalar_function(
+            name, arity, doc, in_types, out_type, function)
+
+        func = pc.get_function(name)
+        assert func.name == name
+
+        result = pc.call_function(name, input)
+        assert result == expected_output
+
+
+def test_udf_input():
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate arity
+    arity = -1
+    func_name = "py_scalar_add_func"
+    in_types = [InputType.scalar(pa.int64())]
+    out_type = pa.int64()
+    doc = get_function_doc("scalar add function", "scalar add function",
+                           ["scalar_value"])
+    with pytest.raises(ValueError):
+        register_scalar_function(func_name, arity, doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # validate function name
+    with pytest.raises(TypeError):
+        register_scalar_function(None, 1, doc, in_types,
+                                 out_type, unary_scalar_function)
+
+    # validate function not matching defined arity config
+    def invalid_function(array1, array2):
+        return pc.call_function("add", [array1, array2])
+
+    with pytest.raises(pa.lib.ArrowInvalid):
+        register_scalar_function("invalid_function", 1, doc, in_types,
+                                 out_type, invalid_function)
+        pc.call_function("invalid_function", [pa.array([10]), pa.array([20])],
+                         options=None, memory_pool=None)
+
+    # validate function
+    with pytest.raises(ValueError) as execinfo:
+        register_scalar_function("none_function", 1, doc, in_types,
+                                 out_type, None)
+        assert "callback must be a callable" == execinfo.value
+
+    # validate output type
+    with pytest.raises(ValueError) as execinfo:
+        register_scalar_function(func_name, 1, doc, in_types,
+                                 None, unary_scalar_function)
+        assert "Output value type must be defined" == execinfo.value
+
+    # validate input type
+    with pytest.raises(ValueError) as execinfo:
+        register_scalar_function(func_name, 1, doc, None,
+                                 out_type, unary_scalar_function)
+        assert "input types must be of type InputType" == execinfo.value
+
+
+def test_varargs_function_validation():
+    def n_add(*values):
+        base_val = values[:2]
+        res = pc.call_function("add", base_val)
+        for other_val in values[2:]:
+            res = pc.call_function("add", [res, other_val])
+        return res
+
+    func_name = "n_add"
+    arity = 2
+    in_types = [InputType.array(pa.int64()), InputType.array(pa.int64())]
+    out_type = pa.int64()
+    doc = get_function_doc("n add function", "add N number of arrays",
+                           ["value1", "value2"])
+    register_scalar_function(func_name, arity, doc,
+                             in_types, out_type, n_add)
+
+    func = pc.get_function(func_name)
+
+    assert func.name == func_name
+
+    with pytest.raises(pa.lib.ArrowInvalid) as execinfo:
+        pc.call_function(func_name, [pa.array([1, 10]),
+                                     ])
+        error_msg = "VarArgs function 'n_add' needs at least 2 arguments"
+        +" but attempted to look up kernel(s) with only 1"
+        assert error_msg == execinfo.value
+
+
+def test_function_doc_validation():
+
+    def unary_scalar_function(scalar):
+        return pc.call_function("add", [scalar, 1])
+
+    # validate arity
+    arity = 1
+    in_types = [InputType.scalar(pa.int64())]
+    out_type = pa.int64()
+
+    # doc with no summary
+    func_doc = {}
+    func_doc["description"] = "desc"
+    func_doc["arg_names"] = ["scalar1"]
+    
+    with pytest.raises(ValueError) as execinfo:
+        register_scalar_function("no_summary", arity, func_doc, in_types,
+                                    out_type, unary_scalar_function)
+        expected_expr = "must contain summary, arg_names and a description"
+        assert expected_expr in execinfo.value
+        
+    # doc with no decription
+    func_doc = {}
+    func_doc["summary"] = "test summary"
+    func_doc["arg_names"] = ["scalar1"]
+    
+    with pytest.raises(ValueError) as execinfo:
+        register_scalar_function("no_desc", arity, func_doc, in_types,
+                                    out_type, unary_scalar_function)
+        expected_expr = "must contain summary, arg_names and a description"
+        assert expected_expr in execinfo.value

Review Comment:
   It's odd to see an assert inside a `pytest.raises`. Presumably, you wouldn't 
get to the assertion, right?



##########
python/pyarrow/tests/test_udf.py:
##########
@@ -0,0 +1,350 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data
+
+
+unary_doc = get_function_doc("add function",
+                             "test add function",
+                             ["scalar1"])

Review Comment:
   So again, defining the function adds nothing over just putting the 
dictionary inline; it will be clearer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to