paleolimbot commented on code in PR #1299:
URL:
https://github.com/apache/datafusion-python/pull/1299#discussion_r2494902051
##########
src/udf.rs:
##########
@@ -15,71 +15,160 @@
// specific language governing permissions and limitations
// under the License.
-use std::sync::Arc;
-
-use datafusion_ffi::udf::{FFI_ScalarUDF, ForeignScalarUDF};
-use pyo3::types::PyCapsule;
-use pyo3::{prelude::*, types::PyTuple};
-
-use datafusion::arrow::array::{make_array, Array, ArrayData, ArrayRef};
-use datafusion::arrow::datatypes::DataType;
-use datafusion::arrow::pyarrow::FromPyArrow;
-use datafusion::arrow::pyarrow::{PyArrowType, ToPyArrow};
-use datafusion::error::DataFusionError;
-use datafusion::logical_expr::function::ScalarFunctionImplementation;
-use datafusion::logical_expr::ScalarUDF;
-use datafusion::logical_expr::{create_udf, ColumnarValue};
-
use crate::errors::to_datafusion_err;
use crate::errors::{py_datafusion_err, PyDataFusionResult};
use crate::expr::PyExpr;
use crate::utils::{parse_volatility, validate_pycapsule};
+use arrow::datatypes::{Field, FieldRef};
+use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
+use datafusion::arrow::array::{make_array, Array, ArrayData, ArrayRef};
+use datafusion::arrow::datatypes::DataType;
+use datafusion::arrow::pyarrow::FromPyArrow;
+use datafusion::arrow::pyarrow::PyArrowType;
+use datafusion::error::DataFusionError;
+use datafusion::logical_expr::{ColumnarValue, Volatility};
+use datafusion::logical_expr::{
+ ReturnFieldArgs, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
+};
+use datafusion_ffi::udf::{FFI_ScalarUDF, ForeignScalarUDF};
+use pyo3::ffi::Py_uintptr_t;
+use pyo3::types::PyCapsule;
+use pyo3::{prelude::*, types::PyTuple};
+use std::any::Any;
+use std::hash::{Hash, Hasher};
+use std::ptr::addr_of;
+use std::sync::Arc;
-/// Create a Rust callable function from a python function that expects
pyarrow arrays
-fn pyarrow_function_to_rust(
+/// This struct holds the Python written function that is a
+/// ScalarUDF.
+#[derive(Debug)]
+struct PythonFunctionScalarUDF {
+ name: String,
func: PyObject,
-) -> impl Fn(&[ArrayRef]) -> Result<ArrayRef, DataFusionError> {
- move |args: &[ArrayRef]| -> Result<ArrayRef, DataFusionError> {
+ signature: Signature,
+ return_field: FieldRef,
+}
+
+impl PythonFunctionScalarUDF {
+ fn new(
+ name: String,
+ func: PyObject,
+ input_fields: Vec<Field>,
+ return_field: Field,
+ volatility: Volatility,
+ ) -> Self {
+ let input_types = input_fields.iter().map(|f|
f.data_type().clone()).collect();
+ let signature = Signature::exact(input_types, volatility);
+ Self {
+ name,
+ func,
+ signature,
+ return_field: Arc::new(return_field),
+ }
+ }
+}
+
+impl Eq for PythonFunctionScalarUDF {}
+impl PartialEq for PythonFunctionScalarUDF {
+ fn eq(&self, other: &Self) -> bool {
+ self.name == other.name
+ && self.signature == other.signature
+ && self.return_field == other.return_field
+ && Python::with_gil(|py|
self.func.bind(py).eq(other.func.bind(py)).unwrap_or(false))
+ }
+}
+
+impl Hash for PythonFunctionScalarUDF {
+ fn hash<H: Hasher>(&self, state: &mut H) {
+ self.name.hash(state);
+ self.signature.hash(state);
+ self.return_field.hash(state);
+
+ Python::with_gil(|py| {
+ let py_hash = self.func.bind(py).hash().unwrap_or(0); // Handle
unhashable objects
+
+ state.write_isize(py_hash);
+ });
+ }
+}
+
+fn array_to_pyarrow_with_field(
+ py: Python,
+ array: ArrayRef,
+ field: &FieldRef,
+) -> PyResult<PyObject> {
+ let array = FFI_ArrowArray::new(&array.to_data());
+ let schema = FFI_ArrowSchema::try_from(field).map_err(py_datafusion_err)?;
+
+ let module = py.import("pyarrow")?;
+ let class = module.getattr("Array")?;
+ let array = class.call_method1(
+ "_import_from_c",
+ (
+ addr_of!(array) as Py_uintptr_t,
+ addr_of!(schema) as Py_uintptr_t,
+ ),
Review Comment:
A more modern way is to use `__arrow_c_schema__` (although I think
import_from_c will be around for a while). It's only a few lines:
https://github.com/apache/sedona-db/blob/main/python/sedonadb/src/import_from.rs#L151-L157
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]