westonpace commented on a change in pull request #12590:
URL: https://github.com/apache/arrow/pull/12590#discussion_r840882112



##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* 
function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_scalar(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();

Review comment:
       ```suggestion
     ARROW_ASSIGN_OR_RAISE(auto unwrapped_result, unwrap_scalar(result));
     *out = unwrapped_result;
     return Status::OK();
   ```
   It may be a little odd you can do `*out = unwrapped_result` where `out` is a 
`Datum` and `unwrapped_result` is a `Scalar` but `Datum` was intentionally 
designed with a lot of implicit constructors to allow for [`implicit 
conversion`](https://en.cppreference.com/w/cpp/language/implicit_conversion)
   
   If you really want to be concise...we have a pattern for "set output 
parameter from result and return status":
   ```
   return unwrap_scalar(result).Value(out);
   ```
   
   How it is written right now is actually a memory leak.  We use `shared_ptr` 
and `unique_ptr` almost exclusively for lifecycle management in Arrow.  Any 
time you are doing a `new` (except for `std::unique_ptr<Foo>(new Foo(...))` it 
should be a red flag to double check that you really do intend for this.
   
   When a function has an out parameter (`Datum * out`) you know that the 
caller has already allocated a spot for this (possibly on the stack, possibly 
on the heap) so there is no need to do any allocation here.  All you need to do 
is fill in the spot the caller has allocated.
   
   You could do...
   ```
   auto datum = new Datum(c_res_data);
   *out = *datum;
   delete datum;
   ```
   but that would just be both complex and inefficient.

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* 
function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_scalar(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status exec_function_array(const compute::ExecBatch& batch, PyObject* function,
+                           int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_array(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+  if (!match) {
+    return Status::Invalid(
+        "Function Arity and Input data shape doesn't match, expected {}");
+  }
+  return Status::OK();
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* 
options) {
+  // creating a copy of objects for the lambda function
+  Py_INCREF(function);
+  function_.reset(function);
+  if (function_.obj() == NULL) {
+    return Status::ExecutionError("python function cannot be null");
+  }
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), 
arity, &doc);
+
+  // lambda function
+  auto call_back = [&, arity](compute::KernelContext* ctx,

Review comment:
       Nit: Normally we write `callback` without the space.
   
   I'm not sure callback is a good name though.  I think of a callback as 
something that runs when a task is finished.  This is the kernel's exec 
function.  Can we just call it `exec`?

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* 
function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_scalar(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status exec_function_array(const compute::ExecBatch& batch, PyObject* function,
+                           int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_array(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();

Review comment:
       ```suggestion
     return unwrap_array(result).Value(out);
   ```

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* 
function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;

Review comment:
       It's a bit odd to have this defined so far away from where it is used.

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* 
function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_scalar(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status exec_function_array(const compute::ExecBatch& batch, PyObject* function,
+                           int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_array(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+  if (!match) {
+    return Status::Invalid(
+        "Function Arity and Input data shape doesn't match, expected {}");

Review comment:
       This error message seems incomplete.

##########
File path: cpp/src/arrow/python/udf.h
##########
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/python/platform.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/registry.h"
+#include "arrow/datum.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/python/common.h"
+#include "arrow/python/pyarrow.h"
+#include "arrow/python/visibility.h"
+
+namespace arrow {
+
+namespace py {
+
+// Exposing the UDFOptions: https://issues.apache.org/jira/browse/ARROW-16041
+class ARROW_PYTHON_EXPORT UdfOptions {
+ public:
+  UdfOptions(const compute::Function::Kind kind, const compute::Arity arity,
+             const compute::FunctionDoc func_doc,
+             const std::vector<compute::InputType> in_types,
+             const compute::OutputType out_type,
+             const compute::MemAllocation::type mem_allocation,
+             const compute::NullHandling::type null_handling)
+      : kind_(kind),
+        arity_(arity),
+        func_doc_(func_doc),
+        in_types_(in_types),
+        out_type_(out_type),
+        mem_allocation_(mem_allocation),
+        null_handling_(null_handling) {}
+
+  compute::Function::Kind kind() { return kind_; }
+
+  const compute::Arity& arity() const { return arity_; }
+
+  const compute::FunctionDoc doc() const { return func_doc_; }
+
+  const std::vector<compute::InputType>& input_types() const { return 
in_types_; }
+
+  const compute::OutputType& output_type() const { return out_type_; }
+
+  compute::MemAllocation::type mem_allocation() { return mem_allocation_; }

Review comment:
       I'm not sure it makes sense to expose `mem_allocation` to the user.  If 
I understand correctly, using `PREALLOCATE` means that the `out` datum passed 
to the kernel function is already populated with allocated buffers.  So the 
kernel function can just fill in the buffers instead of allocating new buffers. 
 Right now there is no way for a python function to fill in preallocated 
buffers and I can't imagine that is something we are going to need anytime soon.
   
   So for this PR I would hide `mem_allocation` from the user entirely and just 
hard code it to `NO_PREALLOCATE`.

##########
File path: python/pyarrow/tests/test_udf.py
##########
@@ -0,0 +1,350 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import List
+
+import pytest
+
+import pyarrow as pa
+from pyarrow import compute as pc
+from pyarrow.compute import register_function
+from pyarrow.compute import InputType
+
+
+def get_function_doc(summary: str, desc: str, arg_names: List[str]):
+    func_doc = {}
+    func_doc["summary"] = summary
+    func_doc["description"] = desc
+    func_doc["arg_names"] = arg_names
+    return func_doc
+
+# scalar unary function data
+
+
+unary_doc = get_function_doc("add function",
+                             "test add function",
+                             ["scalar1"])
+
+
+def unary_function(scalar1):
+    return pc.call_function("add", [scalar1, 1])
+
+# scalar binary function data
+
+
+binary_doc = get_function_doc("y=mx",
+                              "find y from y = mx",
+                              ["m", "x"])
+
+
+def binary_function(m, x):
+    return pc.call_function("multiply", [m, x])
+
+# scalar ternary function data
+
+
+ternary_doc = get_function_doc("y=mx+c",
+                               "find y from y = mx + c",
+                               ["m", "x", "c"])
+
+
+def ternary_function(m, x, c):
+    mx = pc.call_function("multiply", [m, x])
+    return pc.call_function("add", [mx, c])
+
+# scalar varargs function data
+
+
+varargs_doc = get_function_doc("z=ax+by+c",
+                               "find z from z = ax + by + c",
+                               ["a", "x", "b", "y", "c"])
+
+
+def varargs_function(a, x, b, y, c):
+    ax = pc.call_function("multiply", [a, x])
+    by = pc.call_function("multiply", [b, y])
+    ax_by = pc.call_function("add", [ax, by])
+    return pc.call_function("add", [ax_by, c])
+
+
[email protected]
+def function_input_types():
+    return [
+        # scalar data input types
+        [
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        [
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64()),
+            InputType.scalar(pa.int64())
+        ],
+        # array data input types
+        [
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ],
+        [
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64()),
+            InputType.array(pa.int64())
+        ]
+    ]
+
+
[email protected]
+def function_output_types():
+    return [
+        pa.int64(),
+        pa.int64(),
+        pa.int64(),
+        pa.int64()
+    ]
+
+
[email protected]
+def function_names():
+    return [
+        # scalar data function names
+        "scalar_y=x+k",
+        "scalar_y=mx",
+        "scalar_y=mx+c",
+        "scalar_z=ax+by+c",
+        # array data function names
+        "array_y=x+k",
+        "array_y=mx",
+        "array_y=mx+c",
+        "array_z=ax+by+c"
+    ]
+
+
[email protected]
+def function_arities():
+    return [
+        1,
+        2,
+        3,
+        5,
+    ]
+
+
[email protected]
+def function_docs():
+    return [
+        unary_doc,
+        binary_doc,
+        ternary_doc,
+        varargs_doc
+    ]
+
+
[email protected]
+def functions():
+    return [
+        unary_function,
+        binary_function,
+        ternary_function,
+        varargs_function
+    ]
+
+
[email protected]
+def function_inputs():
+    return [
+        # scalar input data
+        [
+            pa.scalar(10, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64())
+        ],
+        [
+            pa.scalar(10, pa.int64()),
+            pa.scalar(2, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        [
+            pa.scalar(2, pa.int64()),
+            pa.scalar(10, pa.int64()),
+            pa.scalar(3, pa.int64()),
+            pa.scalar(20, pa.int64()),
+            pa.scalar(5, pa.int64())
+        ],
+        # array input data
+        [
+            pa.array([10, 20], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64())
+        ],
+        [
+            pa.array([10, 20], pa.int64()),
+            pa.array([2, 4], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ],
+        [
+            pa.array([2, 3], pa.int64()),
+            pa.array([10, 20], pa.int64()),
+            pa.array([3, 7], pa.int64()),
+            pa.array([20, 30], pa.int64()),
+            pa.array([5, 10], pa.int64())
+        ]
+    ]
+
+
[email protected]
+def expected_outputs():
+    return [
+        # scalar output data
+        pa.scalar(11, pa.int64()),  # 10 + 1
+        pa.scalar(20, pa.int64()),  # 10 * 2
+        pa.scalar(25, pa.int64()),  # 10 * 2 + 5
+        pa.scalar(85, pa.int64()),  # (2 * 10) + (3 * 20) + 5
+        # array output data
+        pa.array([11, 21], pa.int64()),  # [10 + 1, 20 + 1]
+        pa.array([20, 80], pa.int64()),  # [10 * 2, 20 * 4]
+        pa.array([25, 90], pa.int64()),  # [(10 * 2) + 5, (20 * 4) + 10]
+        # [(2 * 10) + (3 * 20) + 5, (3 * 20) + (7 * 30) + 10]
+        pa.array([85, 280], pa.int64())
+    ]
+
+
+def test_scalar_udf_function_with_scalar_data(function_names,

Review comment:
       I was trying to reproduce an error on this test and I noticed that I 
can't repeat this test because it modifies the global state (adds a function to 
the global function registry) and so the second run fails (function already 
exists).
   
   I don't know a great way to fix this.  We could expose the ability to create 
new function registries to python but that might be a bit of work and we would 
need to update the register function to take a new registry argument that is 
rarely specified so it adds complexity.
   
   We could also add the ability to unregister a function.  That might be the 
easiest.  Since function names have to be unique it should be straightforward 
to unregister a function by name.  I don't know if either of those things have 
to be fixed as part of this PR but I was curious to get your thoughts.

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* 
function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_scalar(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status exec_function_array(const compute::ExecBatch& batch, PyObject* function,
+                           int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_array(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+  if (!match) {
+    return Status::Invalid(
+        "Function Arity and Input data shape doesn't match, expected {}");
+  }
+  return Status::OK();
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* 
options) {
+  // creating a copy of objects for the lambda function
+  Py_INCREF(function);
+  function_.reset(function);
+  if (function_.obj() == NULL) {
+    return Status::ExecutionError("python function cannot be null");
+  }

Review comment:
       Nit: These checks could probably be the first things we do (before the 
`Py_INCREF`)

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* 
function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);

Review comment:
       Can we / should we confirm that `result` is a scalar here?

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* 
function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_scalar(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status exec_function_array(const compute::ExecBatch& batch, PyObject* function,
+                           int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_array(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+  if (!match) {
+    return Status::Invalid(
+        "Function Arity and Input data shape doesn't match, expected {}");
+  }
+  return Status::OK();
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* 
options) {
+  // creating a copy of objects for the lambda function
+  Py_INCREF(function);
+  function_.reset(function);
+  if (function_.obj() == NULL) {
+    return Status::ExecutionError("python function cannot be null");
+  }
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), 
arity, &doc);
+
+  // lambda function
+  auto call_back = [&, arity](compute::KernelContext* ctx,

Review comment:
       The default capture specifier of `&` is a little misleading.  The only 
thing you are capturing with this flag is `this` and `this` is never captured 
by reference.  When I see a default capture of `&` it usually tells me that the 
lambda is going to only be executed within the current scope (otherwise 
anything it's referencing would be out of scope) and I know this lambda is 
going to be executed outside this scope so it is strange to see.
   
   I often like to be specific when I'm not capturing too many things.  You 
could use `[this, arity]` to be more clear.
   

##########
File path: cpp/src/arrow/python/udf.cc
##########
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/python/udf.h"
+
+#include <cstddef>
+#include <memory>
+#include <sstream>
+
+#include "arrow/compute/function.h"
+#include "arrow/python/common.h"
+
+namespace arrow {
+
+namespace py {
+
+Status exec_function_scalar(const compute::ExecBatch& batch, PyObject* 
function,
+                            int num_args, Datum* out) {
+  std::shared_ptr<Scalar> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_scalar()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].scalar();
+    PyObject* data = wrap_scalar(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_scalar(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status exec_function_array(const compute::ExecBatch& batch, PyObject* function,
+                           int num_args, Datum* out) {
+  std::shared_ptr<Array> c_res_data;
+  PyObject* arg_tuple = PyTuple_New(num_args);
+  for (int arg_id = 0; arg_id < num_args; arg_id++) {
+    if (!batch[arg_id].is_array()) {
+      return Status::Invalid("Input type and data type doesn't match");
+    }
+    auto c_data = batch[arg_id].make_array();
+    PyObject* data = wrap_array(c_data);
+    PyTuple_SetItem(arg_tuple, arg_id, data);
+  }
+  PyObject* result = PyObject_CallObject(function, arg_tuple);
+  if (result == NULL) {
+    return Status::ExecutionError("Error occured in computation");
+  }
+  auto res = unwrap_array(result);
+  if (!res.status().ok()) {
+    return res.status();
+  }
+  c_res_data = res.ValueOrDie();
+  auto datum = new Datum(c_res_data);
+  *out = *datum;
+  return Status::OK();
+}
+
+Status VerifyArityAndInput(compute::Arity arity, const compute::ExecBatch& 
batch) {
+  bool match = static_cast<uint64_t>(arity.num_args) == batch.values.size();
+  if (!match) {
+    return Status::Invalid(
+        "Function Arity and Input data shape doesn't match, expected {}");
+  }
+  return Status::OK();
+}
+
+Status ScalarUdfBuilder::MakeFunction(PyObject* function, ScalarUdfOptions* 
options) {
+  // creating a copy of objects for the lambda function
+  Py_INCREF(function);
+  function_.reset(function);
+  if (function_.obj() == NULL) {
+    return Status::ExecutionError("python function cannot be null");
+  }
+  if (!PyCallable_Check(function_.obj())) {
+    return Status::TypeError("Expected a callable python object.");
+  }
+  auto doc = options->doc();
+  auto arity = options->arity();
+  scalar_func_ = std::make_shared<compute::ScalarFunction>(options->name(), 
arity, &doc);

Review comment:
       doc is a stack allocated variable that will be destroyed when this 
function exits.  It is not safe to take the address to it in this way.  More on 
this in the top-level review comment.  This is most likely what is causing the 
python tests to fail.

##########
File path: cpp/src/arrow/python/udf.h
##########
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/python/platform.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "arrow/compute/api_scalar.h"
+#include "arrow/compute/cast.h"
+#include "arrow/compute/exec.h"
+#include "arrow/compute/function.h"
+#include "arrow/compute/registry.h"
+#include "arrow/datum.h"
+#include "arrow/util/cpu_info.h"
+#include "arrow/util/logging.h"
+
+#include "arrow/python/common.h"
+#include "arrow/python/pyarrow.h"
+#include "arrow/python/visibility.h"
+
+namespace arrow {
+
+namespace py {
+
+// Exposing the UDFOptions: https://issues.apache.org/jira/browse/ARROW-16041
+class ARROW_PYTHON_EXPORT UdfOptions {
+ public:
+  UdfOptions(const compute::Function::Kind kind, const compute::Arity arity,
+             const compute::FunctionDoc func_doc,
+             const std::vector<compute::InputType> in_types,
+             const compute::OutputType out_type,
+             const compute::MemAllocation::type mem_allocation,
+             const compute::NullHandling::type null_handling)
+      : kind_(kind),
+        arity_(arity),
+        func_doc_(func_doc),
+        in_types_(in_types),
+        out_type_(out_type),
+        mem_allocation_(mem_allocation),
+        null_handling_(null_handling) {}
+
+  compute::Function::Kind kind() { return kind_; }
+
+  const compute::Arity& arity() const { return arity_; }
+
+  const compute::FunctionDoc doc() const { return func_doc_; }
+
+  const std::vector<compute::InputType>& input_types() const { return 
in_types_; }
+
+  const compute::OutputType& output_type() const { return out_type_; }
+
+  compute::MemAllocation::type mem_allocation() { return mem_allocation_; }
+
+  compute::NullHandling::type null_handling() { return null_handling_; }

Review comment:
       Similiarly, we should hardcode this to `COMPUTED_NO_PREALLOCATE` and 
hide this detail from the user.  I don't think the other options make sense if 
we are doing `NO_PREALLOCATE`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to