This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch add-sedona-worker-daemon-mode
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 42229e5f66bb0cf032348799b358991c77e3e63a
Author: pawelkocinski <[email protected]>
AuthorDate: Tue Jan 13 23:23:05 2026 +0100

    add sedonadb sedona udf worker example
---
 python/sedona/spark/sql/functions.py               |  29 ++-
 python/sedona/spark/utils/udf.py                   |  22 ++-
 python/sedona/spark/worker/__init__.py             |  16 ++
 python/sedona/spark/worker/daemon.py               |  48 +++--
 python/sedona/spark/worker/serde.py                |  25 ++-
 python/sedona/spark/worker/udf_info.py             |  25 ++-
 python/sedona/spark/worker/worker.py               |  56 ++++--
 python/setup.py                                    |  17 ++
 python/src/geomserde_speedup_module.c              | 138 ++++++-------
 python/tests/test_base.py                          |  14 +-
 .../tests/utils/test_sedona_db_vectorized_udf.py   | 218 +++++----------------
 .../org/apache/spark/sql/udf/StrategySuite.scala   |  18 +-
 12 files changed, 311 insertions(+), 315 deletions(-)

diff --git a/python/sedona/spark/sql/functions.py 
b/python/sedona/spark/sql/functions.py
index 232ccb50a3..d8bf73c152 100644
--- a/python/sedona/spark/sql/functions.py
+++ b/python/sedona/spark/sql/functions.py
@@ -28,7 +28,14 @@ import pyarrow as pa
 import geoarrow.pyarrow as ga
 from sedonadb import udf as sedona_udf_module
 from sedona.spark.sql.types import GeometryType
-from pyspark.sql.types import DataType, FloatType, DoubleType, IntegerType, 
StringType, ByteType
+from pyspark.sql.types import (
+    DataType,
+    FloatType,
+    DoubleType,
+    IntegerType,
+    StringType,
+    ByteType,
+)
 
 from sedona.spark.utils.udf import has_sedona_serializer_speedup
 
@@ -52,7 +59,7 @@ sedona_udf_to_eval_type = {
 
 
 def sedona_vectorized_udf(
-        return_type: DataType, udf_type: SedonaUDFType = 
SedonaUDFType.SHAPELY_SCALAR
+    return_type: DataType, udf_type: SedonaUDFType = 
SedonaUDFType.SHAPELY_SCALAR
 ):
     import geopandas as gpd
 
@@ -93,7 +100,7 @@ def sedona_vectorized_udf(
 
 
 def _apply_shapely_series_udf(
-        fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool
+    fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool
 ):
     def apply(series: pd.Series) -> pd.Series:
         applied = series.apply(
@@ -114,7 +121,7 @@ def _apply_shapely_series_udf(
 
 
 def _apply_geo_series_udf(
-        fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool
+    fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool
 ):
     import geopandas as gpd
 
@@ -166,7 +173,11 @@ def infer_pa_type(spark_type: DataType):
 def infer_input_type(spark_type: DataType):
     if isinstance(spark_type, GeometryType):
         return sedona_udf_module.GEOMETRY
-    elif isinstance(spark_type, FloatType) or isinstance(spark_type, 
DoubleType) or isinstance(spark_type, IntegerType):
+    elif (
+        isinstance(spark_type, FloatType)
+        or isinstance(spark_type, DoubleType)
+        or isinstance(spark_type, IntegerType)
+    ):
         return sedona_udf_module.NUMERIC
     elif isinstance(spark_type, StringType):
         return sedona_udf_module.STRING
@@ -186,12 +197,12 @@ def infer_input_types(spark_types: list[DataType]):
 
 
 def sedona_db_vectorized_udf(
-        return_type: DataType,
-        input_types: list[DataType],
+    return_type: DataType,
+    input_types: list[DataType],
 ):
-    eval_type = 6201
+    eval_type = 6200
     if has_sedona_serializer_speedup():
-        eval_type = 6200
+        eval_type = 6201
 
     def apply_fn(fn):
         out_type = infer_pa_type(return_type)
diff --git a/python/sedona/spark/utils/udf.py b/python/sedona/spark/utils/udf.py
index 01a38a675a..0f88ef07f2 100644
--- a/python/sedona/spark/utils/udf.py
+++ b/python/sedona/spark/utils/udf.py
@@ -1,3 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
 import shapely
 
 
@@ -8,7 +25,8 @@ def has_sedona_serializer_speedup():
         return False
     return True
 
-def to_sedona_func(arr):
+
+def to_sedona(arr):
     try:
         from . import geomserde_speedup
     except ImportError:
@@ -17,7 +35,7 @@ def to_sedona_func(arr):
     return geomserde_speedup.to_sedona_func(arr)
 
 
-def from_sedona_func(arr):
+def from_sedona(arr):
     try:
         from . import geomserde_speedup
     except ImportError:
diff --git a/python/sedona/spark/worker/__init__.py 
b/python/sedona/spark/worker/__init__.py
index e69de29bb2..13a83393a9 100644
--- a/python/sedona/spark/worker/__init__.py
+++ b/python/sedona/spark/worker/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/python/sedona/spark/worker/daemon.py 
b/python/sedona/spark/worker/daemon.py
index 0d64a543c5..ce75e376ea 100644
--- a/python/sedona/spark/worker/daemon.py
+++ b/python/sedona/spark/worker/daemon.py
@@ -1,19 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
+#   http://www.apache.org/licenses/LICENSE-2.0
 #
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
 import logging
 import numbers
 import os
@@ -39,16 +40,23 @@ def compute_real_exit_code(exit_code):
     else:
         return 1
 
+
 logger = logging.getLogger(__name__)
 logger.setLevel(logging.INFO)
 
-file_handler = 
logging.FileHandler("/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/logs/worker_daemon_main.log",
 delay=False)
+file_handler = logging.FileHandler(
+    
"/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/logs/worker_daemon_main.log",
+    delay=False,
+)
 file_handler.flush = file_handler.stream.flush
 
 logger.addHandler(file_handler)
 
+
 def worker(sock, authenticated):
-    logger.info("Starting worker process with pid =" + str(os.getpid()) + " 
socket " + str(sock))
+    logger.info(
+        "Starting worker process with pid =" + str(os.getpid()) + " socket " + 
str(sock)
+    )
     """
     Called by a worker process after the fork().
     """
@@ -69,10 +77,10 @@ def worker(sock, authenticated):
     if not authenticated:
         client_secret = UTF8Deserializer().loads(infile)
         if os.environ["PYTHON_WORKER_FACTORY_SECRET"] == client_secret:
-            write_with_length("ok".encode("utf-8"), outfile)
+            write_with_length(b"ok", outfile)
             outfile.flush()
         else:
-            write_with_length("err".encode("utf-8"), outfile)
+            write_with_length(b"err", outfile)
             outfile.flush()
             sock.close()
             return 1
@@ -132,7 +140,7 @@ def manager():
         while True:
             try:
                 ready_fds = select.select([0, listen_sock], [], [], 1)[0]
-            except select.error as ex:
+            except OSError as ex:
                 if ex[0] == EINTR:
                     continue
                 else:
@@ -186,7 +194,7 @@ def manager():
                     # Therefore, here we redirects it to '/dev/null' by 
duplicating
                     # another file descriptor for '/dev/null' to the standard 
input (0).
                     # See SPARK-26175.
-                    devnull = open(os.devnull, "r")
+                    devnull = open(os.devnull)
                     os.dup2(devnull.fileno(), 0)
                     devnull.close()
 
diff --git a/python/sedona/spark/worker/serde.py 
b/python/sedona/spark/worker/serde.py
index 5a33a26610..52e7b663a5 100644
--- a/python/sedona/spark/worker/serde.py
+++ b/python/sedona/spark/worker/serde.py
@@ -1,11 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
 from pyspark.serializers import write_int, SpecialLengths
 from pyspark.sql.pandas.serializers import ArrowStreamPandasSerializer
 
 from sedona.spark.worker.udf_info import UDFInfo
 
+
 class SedonaDBSerializer(ArrowStreamPandasSerializer):
     def __init__(self, timezone, safecheck, db, udf_info: UDFInfo, 
cast_to_wkb=False):
-        super(SedonaDBSerializer, self).__init__(timezone, safecheck)
+        super().__init__(timezone, safecheck)
         self.db = db
         self.udf_info = udf_info
         self.cast_to_wkb = cast_to_wkb
@@ -18,12 +36,15 @@ class SedonaDBSerializer(ArrowStreamPandasSerializer):
         for batch in batches:
             table = pa.Table.from_batches(batches=[batch])
             import pyarrow as pa
+
             df = self.db.create_data_frame(table)
             table_name = f"my_table_{index}"
 
             df.to_view(table_name)
 
-            sql_expression = 
self.udf_info.sedona_db_transformation_expr(table_name, self.cast_to_wkb)
+            sql_expression = self.udf_info.sedona_db_transformation_expr(
+                table_name, self.cast_to_wkb
+            )
 
             index += 1
 
diff --git a/python/sedona/spark/worker/udf_info.py 
b/python/sedona/spark/worker/udf_info.py
index 7853133e77..eb278a1511 100644
--- a/python/sedona/spark/worker/udf_info.py
+++ b/python/sedona/spark/worker/udf_info.py
@@ -1,3 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
 from dataclasses import dataclass
 
 from sedona.spark import GeometryType
@@ -15,11 +32,15 @@ class UDFInfo:
         arg_offset_str = ", ".join([f"_{el}" for el in self.arg_offsets])
         function_expr = f"{self.name}({arg_offset_str})"
         if isinstance(self.return_type, GeometryType) and cast_to_wkb:
-            return f"SELECT ST_GeomToSedonaSpark({function_expr}) AS _0 FROM 
{table_name}"
+            return (
+                f"SELECT ST_GeomToSedonaSpark({function_expr}) AS _0 FROM 
{table_name}"
+            )
 
         return f"SELECT {function_expr} AS _0 FROM {table_name}"
 
-    def sedona_db_transformation_expr(self, table_name: str, cast_to_wkb: bool 
= False) -> str:
+    def sedona_db_transformation_expr(
+        self, table_name: str, cast_to_wkb: bool = False
+    ) -> str:
         fields = []
         for arg in self.arg_offsets:
             if arg in self.geom_offsets and cast_to_wkb:
diff --git a/python/sedona/spark/worker/worker.py 
b/python/sedona/spark/worker/worker.py
index 17dae02e63..02fedf0058 100644
--- a/python/sedona/spark/worker/worker.py
+++ b/python/sedona/spark/worker/worker.py
@@ -1,3 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
 import importlib
 import os
 import sys
@@ -8,8 +25,16 @@ from pyspark import TaskContext, shuffle, SparkFiles
 from pyspark.errors import PySparkRuntimeError
 from pyspark.java_gateway import local_connect_and_auth
 from pyspark.resource import ResourceInformation
-from pyspark.serializers import read_int, UTF8Deserializer, read_bool, 
read_long, CPickleSerializer, write_int, \
-    write_long, SpecialLengths
+from pyspark.serializers import (
+    read_int,
+    UTF8Deserializer,
+    read_bool,
+    read_long,
+    CPickleSerializer,
+    write_int,
+    write_long,
+    SpecialLengths,
+)
 
 from sedona.spark.worker.serde import SedonaDBSerializer
 from sedona.spark.worker.udf_info import UDFInfo
@@ -18,11 +43,13 @@ from sedona.spark.worker.udf_info import UDFInfo
 def apply_iterator(db, iterator, udf_info: UDFInfo, cast_to_wkb: bool = False):
     i = 0
     for df in iterator:
-        i+=1
+        i += 1
         table_name = f"output_table_{i}"
         df.to_view(table_name)
 
-        function_call_sql = udf_info.get_function_call_sql(table_name, 
cast_to_wkb=cast_to_wkb)
+        function_call_sql = udf_info.get_function_call_sql(
+            table_name, cast_to_wkb=cast_to_wkb
+        )
 
         df_out = db.sql(function_call_sql)
 
@@ -30,8 +57,7 @@ def apply_iterator(db, iterator, udf_info: UDFInfo, 
cast_to_wkb: bool = False):
         at = df_out.to_arrow_table()
         batches = at.combine_chunks().to_batches()
 
-        for batch in batches:
-            yield batch
+        yield from batches
 
 
 def check_python_version(utf_serde: UTF8Deserializer, infile) -> str:
@@ -50,6 +76,7 @@ def check_python_version(utf_serde: UTF8Deserializer, infile) 
-> str:
 
     return version
 
+
 def check_barrier_flag(infile):
     is_barrier = read_bool(infile)
     bound_port = read_int(infile)
@@ -66,6 +93,7 @@ def check_barrier_flag(infile):
 
     return is_barrier
 
+
 def assign_task_context(utf_serde: UTF8Deserializer, infile):
     stage_id = read_int(infile)
     partition_id = read_int(infile)
@@ -97,6 +125,7 @@ def assign_task_context(utf_serde: UTF8Deserializer, infile):
 
     return task_context
 
+
 def resolve_python_path(utf_serde: UTF8Deserializer, infile):
     def add_path(path: str):
         # worker can be used, so do not add path multiple times
@@ -131,6 +160,7 @@ def check_broadcast_variables(infile):
             },
         )
 
+
 def get_runner_conf(utf_serde: UTF8Deserializer, infile):
     runner_conf = {}
     num_conf = read_int(infile)
@@ -145,6 +175,7 @@ def read_command(serializer, infile):
     command = serializer._read_with_length(infile)
     return command
 
+
 def read_udf(infile, pickle_ser) -> UDFInfo:
     num_arg = read_int(infile)
     arg_offsets = [read_int(infile) for i in range(num_arg)]
@@ -162,9 +193,10 @@ def read_udf(infile, pickle_ser) -> UDFInfo:
         function=sedona_db_udf_expression,
         return_type=return_type,
         name=sedona_db_udf_expression._name,
-        geom_offsets=[0]
+        geom_offsets=[0],
     )
 
+
 def register_sedona_db_udf(infile, pickle_ser) -> UDFInfo:
     num_udfs = read_int(infile)
 
@@ -237,7 +269,7 @@ def main(infile, outfile):
         safecheck=False,
         db=sedona_db,
         udf_info=udf,
-        cast_to_wkb=cast_to_wkb
+        cast_to_wkb=cast_to_wkb,
     )
 
     number_of_geometries = read_int(infile)
@@ -251,13 +283,13 @@ def main(infile, outfile):
     udf.geom_offsets = geom_offsets
 
     iterator = serde.load_stream(infile)
-    out_iterator = apply_iterator(db=sedona_db, iterator=iterator, 
udf_info=udf, cast_to_wkb=cast_to_wkb)
+    out_iterator = apply_iterator(
+        db=sedona_db, iterator=iterator, udf_info=udf, cast_to_wkb=cast_to_wkb
+    )
 
     serde.dump_stream(out_iterator, outfile)
 
-    write_statistics(
-        infile, outfile, boot_time=boot_time, init_time=init_time
-    )
+    write_statistics(infile, outfile, boot_time=boot_time, init_time=init_time)
 
 
 if __name__ == "__main__":
diff --git a/python/setup.py b/python/setup.py
index 66ab74701b..ae5e7bf174 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -1,3 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
 from setuptools import setup
 import numpy
 
diff --git a/python/src/geomserde_speedup_module.c 
b/python/src/geomserde_speedup_module.c
index 621f956cd0..1d7aefcd77 100644
--- a/python/src/geomserde_speedup_module.c
+++ b/python/src/geomserde_speedup_module.c
@@ -19,16 +19,14 @@
 
 #define PY_SSIZE_T_CLEAN
 #include <Python.h>
+#include <numpy/ndarraytypes.h>
+#include <numpy/npy_3kcompat.h>
+#include <numpy/ufuncobject.h>
 #include <stdio.h>
-//
-//#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
 
 #include "geomserde.h"
 #include "geos_c_dyn.h"
 #include "pygeos/c_api.h"
-#include <numpy/ndarraytypes.h>
-#include <numpy/npy_3kcompat.h>
-#include <numpy/ufuncobject.h>
 
 PyDoc_STRVAR(module_doc, "Geometry serialization/deserialization module.");
 
@@ -230,7 +228,7 @@ static PyObject *serialize(PyObject *self, PyObject *args) {
   return do_serialize(geos_geom);
 }
 
-static PyObject *deserialize_2(PyObject *self, PyObject *args) {
+static PyObject *deserialize(PyObject *self, PyObject *args) {
   GEOSContextHandle_t handle = NULL;
   int length = 0;
   GEOSGeometry *geom = do_deserialize(args, &handle, &length);
@@ -268,86 +266,86 @@ static PyObject *deserialize_1(PyObject *self, PyObject 
*args) {
 }
 
 static PyObject *to_sedona_func(PyObject *self, PyObject *args) {
-    import_array();
-    PyObject *input_obj = NULL;
-    if (!PyArg_ParseTuple(args, "O", &input_obj)){
-        return NULL;
-    };
-
-    PyArrayObject *array = (PyArrayObject *)input_obj;
-    PyObject **objs = (PyObject **)PyArray_DATA(array);
-
-    GEOSContextHandle_t handle = get_geos_context_handle();
-      if (handle == NULL) {
-        return NULL;
-      }
-
-    npy_intp n = PyArray_SIZE(input_obj);
-    npy_intp dims[1] = {n};
-    PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, 
NPY_OBJECT);
-    for (npy_intp i = 0; i < PyArray_SIZE(array); i++) {
-          PyObject *obj = objs[i];
-          GEOSGeometry *geos_geom = NULL;
-          char success = PyGEOS_GetGEOSGeometry(obj, &geos_geom);
-
-          PyObject *serialized = do_serialize(geos_geom);
-          PyArray_SETITEM(out, PyArray_GETPTR1(out, i), serialized);
-    }
+  import_array();
+  PyObject *input_obj = NULL;
+  if (!PyArg_ParseTuple(args, "O", &input_obj)) {
+    return NULL;
+  };
+
+  PyArrayObject *array = (PyArrayObject *)input_obj;
+  PyObject **objs = (PyObject **)PyArray_DATA(array);
 
-    return out;
+  GEOSContextHandle_t handle = get_geos_context_handle();
+  if (handle == NULL) {
+    return NULL;
+  }
+
+  npy_intp n = PyArray_SIZE(input_obj);
+  npy_intp dims[1] = {n};
+  PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT);
+  for (npy_intp i = 0; i < PyArray_SIZE(array); i++) {
+    PyObject *obj = objs[i];
+    GEOSGeometry *geos_geom = NULL;
+    char success = PyGEOS_GetGEOSGeometry(obj, &geos_geom);
+
+    PyObject *serialized = do_serialize(geos_geom);
+    PyArray_SETITEM(out, PyArray_GETPTR1(out, i), serialized);
+  }
+
+  return out;
 }
 /* Module definition for Shapely 2.x */
 static PyObject *from_sedona_func(PyObject *self, PyObject *args) {
-    import_array();
-    PyObject *input_obj = NULL;
-    if (!PyArg_ParseTuple(args, "O", &input_obj)){
-        return NULL;
-    };
+  import_array();
+  PyObject *input_obj = NULL;
+  if (!PyArg_ParseTuple(args, "O", &input_obj)) {
+    return NULL;
+  };
 
-    GEOSContextHandle_t handle = get_geos_context_handle();
+  GEOSContextHandle_t handle = get_geos_context_handle();
 
-    PyArrayObject *array = (PyArrayObject *)input_obj;
-    PyObject **objs = (PyObject **)PyArray_DATA(array);
+  PyArrayObject *array = (PyArrayObject *)input_obj;
+  PyObject **objs = (PyObject **)PyArray_DATA(array);
 
-    int p_bytes_read = 0;
+  int p_bytes_read = 0;
 
-    npy_intp n = PyArray_SIZE(input_obj);
+  npy_intp n = PyArray_SIZE(input_obj);
 
-    npy_intp dims[1] = {n};
-    PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, 
NPY_OBJECT);
+  npy_intp dims[1] = {n};
+  PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT);
 
-    for (npy_intp i = 0; i < PyArray_SIZE(array); i++) {
-        PyObject *obj = objs[i];
-        if (!PyBytes_Check(obj)) {
-            PyErr_SetString(PyExc_TypeError, "Expected bytes");
-            return NULL;
-        }
+  for (npy_intp i = 0; i < PyArray_SIZE(array); i++) {
+    PyObject *obj = objs[i];
+    if (!PyBytes_Check(obj)) {
+      PyErr_SetString(PyExc_TypeError, "Expected bytes");
+      return NULL;
+    }
 
-        char *buf = PyBytes_AS_STRING(obj);
+    char *buf = PyBytes_AS_STRING(obj);
 
-        Py_ssize_t len = PyBytes_GET_SIZE(obj);
+    Py_ssize_t len = PyBytes_GET_SIZE(obj);
 
-        GEOSGeometry *geom = NULL;
+    GEOSGeometry *geom = NULL;
 
-        SedonaErrorCode err = sedona_deserialize_geom(handle, buf, len, &geom, 
&p_bytes_read);
-        if (err != SEDONA_SUCCESS) {
-          handle_geomserde_error(err);
-          return NULL;
-        }
-          PyObject *pygeom = PyGEOS_CreateGeometry(geom, handle);
+    SedonaErrorCode err =
+        sedona_deserialize_geom(handle, buf, len, &geom, &p_bytes_read);
+    if (err != SEDONA_SUCCESS) {
+      handle_geomserde_error(err);
+      return NULL;
+    }
+    PyObject *pygeom = PyGEOS_CreateGeometry(geom, handle);
 
-          PyArray_SETITEM(out, PyArray_GETPTR1(out, i), pygeom);
-        }
+    PyArray_SETITEM(out, PyArray_GETPTR1(out, i), pygeom);
+  }
 
-        return out;
+  return out;
 }
 
-
 static PyMethodDef geomserde_methods_shapely_2[] = {
     {"load_libgeos_c", load_libgeos_c, METH_VARARGS, "Load libgeos_c."},
     {"serialize", serialize, METH_VARARGS,
      "Serialize geometry object as bytearray."},
-    {"deserialize_2", deserialize_2, METH_VARARGS,
+    {"deserialize", deserialize, METH_VARARGS,
      "Deserialize bytes-like object to geometry object."},
     {"from_sedona_func", from_sedona_func, METH_VARARGS,
      "Deserialize bytes-like object to geometry object."},
@@ -355,18 +353,6 @@ static PyMethodDef geomserde_methods_shapely_2[] = {
      "Deserialize bytes-like object to geometry object."},
     {NULL, NULL, 0, NULL}, /* Sentinel */
 };
-//
-//static int add_from_sedona_func_to_module(PyObject *m) {
-//  PyObject *capsule = PyCapsule_New((void *)from_sedona_func, 
"from_sedona_func", NULL);
-//  if (capsule == NULL) {
-//    return -1;
-//  }
-//  if (PyModule_AddObject(m, "from_sedona_func", capsule) < 0) {
-//    Py_DECREF(capsule);
-//    return -1;
-//  }
-//  return 0;
-//}
 
 static struct PyModuleDef geomserde_module_shapely_2 = {
     PyModuleDef_HEAD_INIT, "geomserde_speedup", module_doc, 0,
diff --git a/python/tests/test_base.py b/python/tests/test_base.py
index e240a09758..300d937d27 100644
--- a/python/tests/test_base.py
+++ b/python/tests/test_base.py
@@ -70,11 +70,15 @@ class TestBase:
                     "spark.sedona.stac.load.itemsLimitMax",
                     "20",
                 )
-                .config("spark.executor.memory", "10G") \
-                .config("spark.driver.memory", "10G") \
-                .config("sedona.python.worker.udf.daemon.module", 
"sedona.spark.worker.daemon") \
-                .config("sedona.python.worker.daemon.enabled", "false") \
-                # Pandas on PySpark doesn't work with ANSI mode, which is 
enabled by default
+                .config("spark.executor.memory", "10G")
+                .config("spark.driver.memory", "10G")
+                .config(
+                    "sedona.python.worker.udf.daemon.module",
+                    "sedona.spark.worker.daemon",
+                )
+                .config(
+                    "sedona.python.worker.daemon.enabled", "false"
+                )  # Pandas on PySpark doesn't work with ANSI mode, which is 
enabled by default
                 # in Spark 4
                 .config("spark.sql.ansi.enabled", "false")
             )
diff --git a/python/tests/utils/test_sedona_db_vectorized_udf.py 
b/python/tests/utils/test_sedona_db_vectorized_udf.py
index 4b266384fa..eea84eec91 100644
--- a/python/tests/utils/test_sedona_db_vectorized_udf.py
+++ b/python/tests/utils/test_sedona_db_vectorized_udf.py
@@ -1,9 +1,24 @@
-import time
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 
 import numpy as np
 
 from sedona.spark.sql.functions import sedona_db_vectorized_udf
-from sedona.spark.utils.udf import to_sedona_func, from_sedona_func
+from sedona.spark.utils.udf import to_sedona, from_sedona
 from tests.test_base import TestBase
 import pyarrow as pa
 import shapely
@@ -11,107 +26,21 @@ from sedona.sql import GeometryType
 from pyspark.sql.functions import expr, lit
 from pyspark.sql.types import DoubleType, IntegerType, ByteType
 from sedona.spark.sql import ST_X
-from shapely._enum import ParamEnum
-
-def test_m():
-    on_invalid="raise"
-    wkb = 
b'\x12\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\xf0?'
-    geometry = np.asarray([wkb, wkb], dtype=object)
-
-    DecodingErrorOptions = ParamEnum(
-        "DecodingErrorOptions", {"ignore": 0, "warn": 1, "raise": 2, "fix": 3}
-    )
-
-    # print("sss")
-
-
-    # <class 'numpy.ndarray'>
-    # object
-    #   C_CONTIGUOUS : True
-    #   F_CONTIGUOUS : True
-    #   OWNDATA : False
-    #   WRITEABLE : True
-    #   ALIGNED : True
-    #   WRITEBACKIFCOPY : False
-    # print(type(geometry))
-    # print(geometry.dtype)
-    # print(geometry.flags)
-
-    result = from_sedona_func(geometry)
-
-    result2 = to_sedona_func(result)
-
-# ensure the input has object dtype, to avoid numpy inferring it as a
-# fixed-length string dtype (which removes trailing null bytes upon access
-# of array elements)
-    #
-    # def from_sedona_func(arr):
-    #     try:
-    #         from . import sedonaserde_vectorized_udf_module
-    #         print(sedonaserde_vectorized_udf_module.from_sedona_func_3(arr))
-    #     except Exception as e:
-    #         print("Cannot import sedonaserde_vectorized_udf_module:")
-    #         print(e)
-    # # print()
-    # return None
-#
-# def from_wkb(geometry, on_invalid="raise", **kwargs):
-#     r"""Create geometries from the Well-Known Binary (WKB) representation.
-#
-#     The Well-Known Binary format is defined in the `OGC Simple Features
-#     Specification for SQL <https://www.opengeospatial.org/standards/sfs>`__.
-#
-#     Parameters
-#     ----------
-#     geometry : str or array_like
-#         The WKB byte object(s) to convert.
-#     on_invalid : {"raise", "warn", "ignore", "fix"}, default "raise"
-#         Indicates what to do when an invalid WKB is encountered. Note that 
the
-#         validations involved are very basic, e.g. the minimum number of 
points
-#         for the geometry type. For a thorough check, use :func:`is_valid` 
after
-#         conversion to geometries. Valid options are:
-#
-#         - raise: an exception will be raised if any input geometry is 
invalid.
-#         - warn: a warning will be raised and invalid WKT geometries will be
-#           returned as ``None``.
-#         - ignore: invalid geometries will be returned as ``None`` without a
-#           warning.
-#         - fix: an effort is made to fix invalid input geometries (currently 
just
-#           unclosed rings). If this is not possible, they are returned as
-#           ``None`` without a warning. Requires GEOS >= 3.11.
-#
-#           .. versionadded:: 2.1.0
-#     **kwargs
-#         See :ref:`NumPy ufunc docs <ufuncs.kwargs>` for other keyword 
arguments.
-#
-#     Examples
-#     --------
-#     >>> import shapely
-#     >>> 
shapely.from_wkb(b'\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\xf0?')
-#     <POINT (1 1)>
-#
-#     """  # noqa: E501
-#     if not np.isscalar(on_invalid):
-#         raise TypeError("on_invalid only accepts scalar values")
-#
-#     invalid_handler = np.uint8(DecodingErrorOptions.get_value(on_invalid))
-#
-#     # ensure the input has object dtype, to avoid numpy inferring it as a
-#     # fixed-length string dtype (which removes trailing null bytes upon 
access
-#     # of array elements)
-#     geometry = np.asarray(geometry, dtype=object)
-#     return lib.from_wkb(geometry, invalid_handler, **kwargs)
+
 
 class TestSedonaDBArrowFunction(TestBase):
     def test_vectorized_udf(self):
-        @sedona_db_vectorized_udf(return_type=GeometryType(), 
input_types=[GeometryType(), IntegerType()])
+        @sedona_db_vectorized_udf(
+            return_type=GeometryType(), input_types=[ByteType(), IntegerType()]
+        )
         def my_own_function(geom, distance):
             geom_wkb = pa.array(geom.storage.to_array())
+            geometry_array = np.asarray(geom_wkb, dtype=object)
             distance = pa.array(distance.to_array())
-            geom = shapely.from_wkb(geom_wkb)
+            geom = from_sedona(geometry_array)
             result_shapely = shapely.centroid(geom)
 
-            return pa.array(shapely.to_wkb(result_shapely))
+            return pa.array(to_sedona(result_shapely))
 
         df = self.spark.createDataFrame(
             [
@@ -125,56 +54,61 @@ class TestSedonaDBArrowFunction(TestBase):
         df.select(ST_X(my_own_function(df.wkt, lit(100)).alias("geom"))).show()
 
     def test_geometry_to_double(self):
-        @sedona_db_vectorized_udf(return_type=DoubleType(), 
input_types=[GeometryType()])
+        @sedona_db_vectorized_udf(return_type=DoubleType(), 
input_types=[ByteType()])
         def geometry_to_non_geometry_udf(geom):
             geom_wkb = pa.array(geom.storage.to_array())
-            geom = shapely.from_wkb(geom_wkb)
+            geometry_array = np.asarray(geom_wkb, dtype=object)
+            geom = from_sedona(geometry_array)
 
             result_shapely = shapely.get_x(shapely.centroid(geom))
 
-            return pa.array(result_shapely, pa.float64())
+            return pa.array(result_shapely)
 
         df = self.spark.createDataFrame(
             [(1, "POINT (1 1)"), (2, "POINT (2 2)"), (3, "POINT (3 3)")],
             ["id", "wkt"],
         ).withColumn("wkt", expr("ST_GeomFromWKT(wkt)"))
 
-        values = 
df.select(geometry_to_non_geometry_udf(df.wkt).alias("x_coord")) \
-            .collect()
+        values = df.select(
+            geometry_to_non_geometry_udf(df.wkt).alias("x_coord")
+        ).collect()
 
         values_list = [row["x_coord"] for row in values]
 
         assert values_list == [1.0, 2.0, 3.0]
 
     def test_geometry_to_int(self):
-        @sedona_db_vectorized_udf(return_type=IntegerType(), 
input_types=[GeometryType()])
+        @sedona_db_vectorized_udf(return_type=IntegerType(), 
input_types=[ByteType()])
         def geometry_to_int(geom):
             geom_wkb = pa.array(geom.storage.to_array())
-            geom = shapely.from_wkb(geom_wkb)
+            geometry_array = np.asarray(geom_wkb, dtype=object)
+
+            geom = from_sedona(geometry_array)
 
             result_shapely = shapely.get_num_points(geom)
 
-            return pa.array(result_shapely, pa.int32())
+            return pa.array(result_shapely)
 
         df = self.spark.createDataFrame(
             [(1, "POINT (1 1)"), (2, "POINT (2 2)"), (3, "POINT (3 3)")],
             ["id", "wkt"],
         ).withColumn("wkt", expr("ST_GeomFromWKT(wkt)"))
 
-        values = df.select(geometry_to_int(df.wkt)) \
-            .collect()
+        values = df.select(geometry_to_int(df.wkt)).collect()
 
         values_list = [row[0] for row in values]
 
         assert values_list == [0, 0, 0]
 
     def test_geometry_crs_preservation(self):
-        @sedona_db_vectorized_udf(return_type=GeometryType(), 
input_types=[GeometryType()])
+        @sedona_db_vectorized_udf(return_type=GeometryType(), 
input_types=[ByteType()])
         def return_same_geometry(geom):
             geom_wkb = pa.array(geom.storage.to_array())
-            geom = shapely.from_wkb(geom_wkb)
+            geometry_array = np.asarray(geom_wkb, dtype=object)
+
+            geom = from_sedona(geometry_array)
 
-            return pa.array(shapely.to_wkb(geom))
+            return pa.array(to_sedona(geom))
 
         df = self.spark.createDataFrame(
             [(1, "POINT (1 1)"), (2, "POINT (2 2)"), (3, "POINT (3 3)")],
@@ -183,70 +117,8 @@ class TestSedonaDBArrowFunction(TestBase):
 
         result_df = df.select(return_same_geometry(df.wkt).alias("geom"))
 
-        crs_list = result_df.selectExpr("ST_SRID(geom)").rdd.flatMap(lambda x: 
x).collect()
+        crs_list = (
+            result_df.selectExpr("ST_SRID(geom)").rdd.flatMap(lambda x: 
x).collect()
+        )
 
         assert crs_list == [3857, 3857, 3857]
-
-    def test_geometry_to_geometry(self):
-        @sedona_db_vectorized_udf(return_type=GeometryType(), 
input_types=[ByteType()])
-        def buffer_geometry(geom):
-            geom_wkb = pa.array(geom.storage.to_array())
-            geometry_array = np.asarray(geom_wkb, dtype=object)
-            geom = from_sedona_func(geometry_array)
-
-            result_shapely = shapely.buffer(geom, 10)
-
-            return pa.array(to_sedona_func(result_shapely))
-
-        df = self.spark.read.\
-            format("geoparquet").\
-            
load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona_dupl_l1")
-
-        # 1 045 770
-        # print(df.count())
-
-        # df.unionAll(df).unionAll(df).unionAll(df).unionAll(df).unionAll(df).\
-        #     
unionAll(df).unionAll(df).unionAll(df).unionAll(df).unionAll(df).\
-        #     
write.format("geoparquet").mode("overwrite").save("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona_dupl_l2")
-        # 18 24
-        # df.union(df).union(df).union(df).union(df).union(df).union(df).\
-        #     
write.format("geoparquet").mode("overwrite").save("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings_large_3")
-
-        values = df.select(buffer_geometry(df.geometry).alias("geometry")).\
-            selectExpr("ST_Area(geometry) as area").\
-            selectExpr("Sum(area) as total_area")
-
-        values.show()
-
-        # for _ in range(4):
-        #     start_time = time.time()
-        #     values.show()
-        #     end_time = time.time()
-        #     print(f"Execution time: {end_time - start_time} seconds")
-
-    def test_geometry_to_geometry_normal_udf(self):
-        from pyspark.sql.functions import udf
-
-        def create_buffer(geom):
-            return geom.buffer(10)
-
-        create_buffer_udf = udf(create_buffer, GeometryType())
-
-        df = self.spark.read. \
-            format("geoparquet"). \
-            
load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona_dupl_l2")
-
-        # print(df.count())
-        # df.limit(10).collect()
-        values = df.select(create_buffer_udf(df.geometry).alias("geometry")). \
-            selectExpr("ST_Area(geometry) as area"). \
-            selectExpr("Sum(area) as total_area")
-
-        values.show()
-
-        # for _ in range(4):
-        #     start_time = time.time()
-        #     values.show()
-        #     end_time = time.time()
-        #     print(f"Execution time: {end_time - start_time} seconds")
-# 1 045 770
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
index 4fe4acfb12..94ce194c65 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
@@ -73,21 +73,11 @@ class StrategySuite extends TestBaseScala with Matchers {
       .withColumn("geometry", expr("ST_SetSRID(geom, '4326')"))
       .select(sedonaDBGeometryToGeometryFunction(col("geometry"), 
lit(100)).alias("geom"))
 
-    dfVectorized.selectExpr("ST_X(ST_Centroid(geom)) AS x")
+    dfVectorized
+      .selectExpr("ST_X(ST_Centroid(geom)) AS x")
       .selectExpr("sum(x)")
       .as[Double]
-      .collect().head shouldEqual 101
-//
-//    val dfCopied = sparkSession.read
-//      .format("geoparquet")
-//      .load(
-//        
"/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona/barcelona.geoparquet")
-//
-//    val values = dfCopied
-//      .select(sedonaDBGeometryToGeometryFunction(col("geometry"), 
lit(10)).alias("geom"))
-//      .selectExpr("ST_Area(geom) as area")
-//      .selectExpr("Sum(area) as total_area")
-//
-//    values.show()
+      .collect()
+      .head shouldEqual 101
   }
 }


Reply via email to