This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch add-sedona-worker-daemon-mode
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit e00915751a90249f3395dc5d71957e0521f4a640
Author: pawelkocinski <[email protected]>
AuthorDate: Tue Jan 13 00:25:30 2026 +0100

    add sedonadb sedona udf worker example
---
 python/pyproject.toml                              |  14 +-
 python/sedona/spark/sql/functions.py               |  22 +-
 python/sedona/spark/utils/geometry_serde.py        |   6 +-
 python/sedona/spark/utils/udf.py                   |  26 +++
 python/sedona/spark/worker/daemon.py               | 227 +++++++++++++++++++++
 python/sedona/spark/worker/serde.py                |   6 +-
 python/sedona/spark/worker/udf_info.py             |   9 +-
 python/sedona/spark/worker/worker.py               |  18 +-
 python/setup.py                                    |   6 +
 python/src/geom_buf.c                              |   2 +
 python/src/geomserde.c                             |   1 +
 python/src/geomserde_speedup_module.c              |  99 ++++++++-
 python/tests/test_base.py                          |   2 +
 .../tests/utils/test_sedona_db_vectorized_udf.py   | 132 +++++++++++-
 .../org/apache/sedona/sql/UDF/PythonEvalType.scala |   4 +-
 .../execution/python/SedonaArrowPythonRunner.scala |   6 +-
 .../sql/execution/python/SedonaArrowStrategy.scala |  10 +-
 .../execution/python/SedonaBasePythonRunner.scala  |  12 +-
 .../execution/python/SedonaDBWorkerFactory.scala   |  14 +-
 .../execution/python/SedonaPythonArrowInput.scala  |   3 +
 .../execution/python/SedonaPythonArrowOutput.scala |   3 +-
 .../spark/sql/execution/python/WorkerContext.scala |  16 +-
 .../spark/sql/udf/ExtractSedonaUDFRule.scala       |   3 +-
 .../org/apache/sedona/sql/TestBaseScala.scala      |   4 +-
 .../org/apache/spark/sql/udf/StrategySuite.scala   |  59 +++---
 25 files changed, 602 insertions(+), 102 deletions(-)

diff --git a/python/pyproject.toml b/python/pyproject.toml
index 76169261c3..37159cf83b 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -16,7 +16,7 @@
 # under the License.
 
 [build-system]
-requires = ["setuptools>=80.9.0", "wheel"]
+requires = ["setuptools>=80.9.0", "wheel", "numpy"]
 build-backend = "setuptools.build_meta"
 
 [project]
@@ -36,6 +36,7 @@ dependencies = [
   "geoarrow-c>=0.3.1",
   "geoarrow-pyarrow>=0.2.0",
   "geopandas>=1.1.2",
+  "numpy>=2.1.3",
   "pyarrow>=16.1.0",
   "pyspark==3.5.4",
   "sedonadb",
@@ -79,14 +80,19 @@ exclude = ["*.tests", "*.tests.*", "tests", "tests.*"]
 name = "sedona.spark.utils.geomserde_speedup"
 sources = [
   "src/geomserde_speedup_module.c",
+  "src/sedonaserde_vectorized_udf_module.c",
   "src/geomserde.c",
   "src/geom_buf.c",
   "src/geos_c_dyn.c",
 ]
 
-[tool.uv]
-dev-dependencies = [
-    "pytest>=9.0.2",
+[[tool.setuptools.ext-modules]]
+name = "sedona.spark.utils.sedonaserde_vectorized_udf_module"
+sources = [
+    "src/sedonaserde_vectorized_udf_module.c",
+    "src/geomserde.c",
+    "src/geom_buf.c",
+    "src/geos_c_dyn.c",
 ]
 
 [tool.uv.sources]
diff --git a/python/sedona/spark/sql/functions.py 
b/python/sedona/spark/sql/functions.py
index 7c480e1700..232ccb50a3 100644
--- a/python/sedona/spark/sql/functions.py
+++ b/python/sedona/spark/sql/functions.py
@@ -28,8 +28,9 @@ import pyarrow as pa
 import geoarrow.pyarrow as ga
 from sedonadb import udf as sedona_udf_module
 from sedona.spark.sql.types import GeometryType
-from pyspark.sql.types import DataType, FloatType, DoubleType, IntegerType, 
StringType
+from pyspark.sql.types import DataType, FloatType, DoubleType, IntegerType, 
StringType, ByteType
 
+from sedona.spark.utils.udf import has_sedona_serializer_speedup
 
 SEDONA_SCALAR_EVAL_TYPE = 5200
 SEDONA_PANDAS_ARROW_NAME = "SedonaPandasArrowUDF"
@@ -51,7 +52,7 @@ sedona_udf_to_eval_type = {
 
 
 def sedona_vectorized_udf(
-    return_type: DataType, udf_type: SedonaUDFType = 
SedonaUDFType.SHAPELY_SCALAR
+        return_type: DataType, udf_type: SedonaUDFType = 
SedonaUDFType.SHAPELY_SCALAR
 ):
     import geopandas as gpd
 
@@ -92,7 +93,7 @@ def sedona_vectorized_udf(
 
 
 def _apply_shapely_series_udf(
-    fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool
+        fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool
 ):
     def apply(series: pd.Series) -> pd.Series:
         applied = series.apply(
@@ -113,7 +114,7 @@ def _apply_shapely_series_udf(
 
 
 def _apply_geo_series_udf(
-    fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool
+        fn, return_type: DataType, serialize_geom: bool, deserialize_geom: bool
 ):
     import geopandas as gpd
 
@@ -161,6 +162,7 @@ def infer_pa_type(spark_type: DataType):
     else:
         raise NotImplementedError(f"Type {spark_type} is not supported yet.")
 
+
 def infer_input_type(spark_type: DataType):
     if isinstance(spark_type, GeometryType):
         return sedona_udf_module.GEOMETRY
@@ -168,9 +170,12 @@ def infer_input_type(spark_type: DataType):
         return sedona_udf_module.NUMERIC
     elif isinstance(spark_type, StringType):
         return sedona_udf_module.STRING
+    elif isinstance(spark_type, ByteType):
+        return sedona_udf_module.BINARY
     else:
         raise NotImplementedError(f"Type {spark_type} is not supported yet.")
 
+
 def infer_input_types(spark_types: list[DataType]):
     pa_types = []
     for spark_type in spark_types:
@@ -182,8 +187,12 @@ def infer_input_types(spark_types: list[DataType]):
 
 def sedona_db_vectorized_udf(
         return_type: DataType,
-        input_types: list[DataType]
+        input_types: list[DataType],
 ):
+    eval_type = 6201
+    if has_sedona_serializer_speedup():
+        eval_type = 6200
+
     def apply_fn(fn):
         out_type = infer_pa_type(return_type)
         input_types_sedona_db = infer_input_types(input_types)
@@ -193,10 +202,9 @@ def sedona_db_vectorized_udf(
             return fn(*args, **kwargs)
 
         udf = UserDefinedFunction(
-            lambda: shapely_udf, return_type, "SedonaPandasArrowUDF", 
evalType=6200
+            lambda: shapely_udf, return_type, "SedonaPandasArrowUDF", 
evalType=eval_type
         )
 
         return udf
 
-
     return apply_fn
diff --git a/python/sedona/spark/utils/geometry_serde.py 
b/python/sedona/spark/utils/geometry_serde.py
index 103eb49817..0ef3d4ed5c 100644
--- a/python/sedona/spark/utils/geometry_serde.py
+++ b/python/sedona/spark/utils/geometry_serde.py
@@ -25,9 +25,6 @@ from shapely.geometry.base import BaseGeometry
 
 speedup_enabled = False
 
-
-# Use geomserde_speedup when available, otherwise fallback to general pure
-# python implementation.
 try:
     from . import geomserde_speedup
 
@@ -60,8 +57,9 @@ try:
         def deserialize(buf: bytearray) -> Optional[BaseGeometry]:
             if buf is None:
                 return None
-            return geomserde_speedup.deserialize(buf)
+            return geomserde_speedup.deserialize_2(buf)
 
+        # Export the from_sedona_func for use with numpy ufuncs
         speedup_enabled = True
 
     elif shapely.__version__.startswith("1."):
diff --git a/python/sedona/spark/utils/udf.py b/python/sedona/spark/utils/udf.py
new file mode 100644
index 0000000000..01a38a675a
--- /dev/null
+++ b/python/sedona/spark/utils/udf.py
@@ -0,0 +1,26 @@
+import shapely
+
+
+def has_sedona_serializer_speedup():
+    try:
+        from . import geomserde_speedup
+    except ImportError:
+        return False
+    return True
+
+def to_sedona_func(arr):
+    try:
+        from . import geomserde_speedup
+    except ImportError:
+        return shapely.to_wkb(arr)
+
+    return geomserde_speedup.to_sedona_func(arr)
+
+
+def from_sedona_func(arr):
+    try:
+        from . import geomserde_speedup
+    except ImportError:
+        return shapely.from_wkb(arr)
+
+    return geomserde_speedup.from_sedona_func(arr)
diff --git a/python/sedona/spark/worker/daemon.py 
b/python/sedona/spark/worker/daemon.py
new file mode 100644
index 0000000000..0d64a543c5
--- /dev/null
+++ b/python/sedona/spark/worker/daemon.py
@@ -0,0 +1,227 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import logging
+import numbers
+import os
+import signal
+import select
+import socket
+import sys
+import traceback
+import time
+import gc
+from errno import EINTR, EAGAIN
+from socket import AF_INET, AF_INET6, SOCK_STREAM, SOMAXCONN
+from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT
+
+from sedona.spark.worker.worker import main as worker_main
+from pyspark.serializers import read_int, write_int, write_with_length, 
UTF8Deserializer
+
+
+def compute_real_exit_code(exit_code):
+    # SystemExit's code can be integer or string, but os._exit only accepts 
integers
+    if isinstance(exit_code, numbers.Integral):
+        return exit_code
+    else:
+        return 1
+
+logger = logging.getLogger(__name__)
+logger.setLevel(logging.INFO)
+
+file_handler = 
logging.FileHandler("/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/logs/worker_daemon_main.log",
 delay=False)
+file_handler.flush = file_handler.stream.flush
+
+logger.addHandler(file_handler)
+
+def worker(sock, authenticated):
+    logger.info("Starting worker process with pid =" + str(os.getpid()) + " 
socket " + str(sock))
+    """
+    Called by a worker process after the fork().
+    """
+    signal.signal(SIGHUP, SIG_DFL)
+    signal.signal(SIGCHLD, SIG_DFL)
+    signal.signal(SIGTERM, SIG_DFL)
+    # restore the handler for SIGINT,
+    # it's useful for debugging (show the stacktrace before exit)
+    signal.signal(SIGINT, signal.default_int_handler)
+
+    # Read the socket using fdopen instead of socket.makefile() because the 
latter
+    # seems to be very slow; note that we need to dup() the file descriptor 
because
+    # otherwise writes also cause a seek that makes us miss data on the read 
side.
+    buffer_size = int(os.environ.get("SPARK_BUFFER_SIZE", 65536))
+    infile = os.fdopen(os.dup(sock.fileno()), "rb", buffer_size)
+    outfile = os.fdopen(os.dup(sock.fileno()), "wb", buffer_size)
+
+    if not authenticated:
+        client_secret = UTF8Deserializer().loads(infile)
+        if os.environ["PYTHON_WORKER_FACTORY_SECRET"] == client_secret:
+            write_with_length("ok".encode("utf-8"), outfile)
+            outfile.flush()
+        else:
+            write_with_length("err".encode("utf-8"), outfile)
+            outfile.flush()
+            sock.close()
+            return 1
+
+    exit_code = 0
+    try:
+        worker_main(infile, outfile)
+    except SystemExit as exc:
+        exit_code = compute_real_exit_code(exc.code)
+    finally:
+        try:
+            outfile.flush()
+        except Exception:
+            pass
+    return exit_code
+
+
+def manager():
+    # Create a new process group to corral our children
+    os.setpgid(0, 0)
+
+    # Create a listening socket on the loopback interface
+    if os.environ.get("SPARK_PREFER_IPV6", "false").lower() == "true":
+        listen_sock = socket.socket(AF_INET6, SOCK_STREAM)
+        listen_sock.bind(("::1", 0, 0, 0))
+        listen_sock.listen(max(1024, SOMAXCONN))
+        listen_host, listen_port, _, _ = listen_sock.getsockname()
+    else:
+        listen_sock = socket.socket(AF_INET, SOCK_STREAM)
+        listen_sock.bind(("127.0.0.1", 0))
+        listen_sock.listen(max(1024, SOMAXCONN))
+        listen_host, listen_port = listen_sock.getsockname()
+
+    # re-open stdin/stdout in 'wb' mode
+    stdin_bin = os.fdopen(sys.stdin.fileno(), "rb", 4)
+    stdout_bin = os.fdopen(sys.stdout.fileno(), "wb", 4)
+    write_int(listen_port, stdout_bin)
+    stdout_bin.flush()
+
+    def shutdown(code):
+        signal.signal(SIGTERM, SIG_DFL)
+        # Send SIGHUP to notify workers of shutdown
+        os.kill(0, SIGHUP)
+        sys.exit(code)
+
+    def handle_sigterm(*args):
+        shutdown(1)
+
+    signal.signal(SIGTERM, handle_sigterm)  # Gracefully exit on SIGTERM
+    signal.signal(SIGHUP, SIG_IGN)  # Don't die on SIGHUP
+    signal.signal(SIGCHLD, SIG_IGN)
+
+    reuse = os.environ.get("SPARK_REUSE_WORKER")
+
+    # Initialization complete
+    try:
+        while True:
+            try:
+                ready_fds = select.select([0, listen_sock], [], [], 1)[0]
+            except select.error as ex:
+                if ex[0] == EINTR:
+                    continue
+                else:
+                    raise
+
+            if 0 in ready_fds:
+                try:
+                    worker_pid = read_int(stdin_bin)
+                except EOFError:
+                    # Spark told us to exit by closing stdin
+                    shutdown(0)
+                try:
+                    os.kill(worker_pid, signal.SIGKILL)
+                except OSError:
+                    pass  # process already died
+
+            if listen_sock in ready_fds:
+                try:
+                    sock, _ = listen_sock.accept()
+                except OSError as e:
+                    if e.errno == EINTR:
+                        continue
+                    raise
+
+                # Launch a worker process
+                try:
+                    pid = os.fork()
+                except OSError as e:
+                    if e.errno in (EAGAIN, EINTR):
+                        time.sleep(1)
+                        pid = os.fork()  # error here will shutdown daemon
+                    else:
+                        outfile = sock.makefile(mode="wb")
+                        write_int(e.errno, outfile)  # Signal that the fork 
failed
+                        outfile.flush()
+                        outfile.close()
+                        sock.close()
+                        continue
+
+                if pid == 0:
+                    # in child process
+                    listen_sock.close()
+
+                    # It should close the standard input in the child process 
so that
+                    # Python native function executions stay intact.
+                    #
+                    # Note that if we just close the standard input (file 
descriptor 0),
+                    # the lowest file descriptor (file descriptor 0) will be 
allocated,
+                    # later when other file descriptors should happen to open.
+                    #
+                    # Therefore, here we redirects it to '/dev/null' by 
duplicating
+                    # another file descriptor for '/dev/null' to the standard 
input (0).
+                    # See SPARK-26175.
+                    devnull = open(os.devnull, "r")
+                    os.dup2(devnull.fileno(), 0)
+                    devnull.close()
+
+                    try:
+                        # Acknowledge that the fork was successful
+                        outfile = sock.makefile(mode="wb")
+                        write_int(os.getpid(), outfile)
+                        outfile.flush()
+                        outfile.close()
+                        authenticated = False
+                        while True:
+                            code = worker(sock, authenticated)
+                            logger.info("Worker exited with code %d", code)
+                            if code == 0:
+                                authenticated = True
+                            if not reuse or code:
+                                # wait for closing
+                                try:
+                                    while sock.recv(1024):
+                                        pass
+                                except Exception:
+                                    pass
+                                break
+                            gc.collect()
+                    except BaseException:
+                        traceback.print_exc()
+                        os._exit(1)
+                    else:
+                        os._exit(0)
+                else:
+                    sock.close()
+
+    finally:
+        shutdown(1)
+
+
+if __name__ == "__main__":
+    manager()
diff --git a/python/sedona/spark/worker/serde.py 
b/python/sedona/spark/worker/serde.py
index 3954d075b7..5a33a26610 100644
--- a/python/sedona/spark/worker/serde.py
+++ b/python/sedona/spark/worker/serde.py
@@ -4,10 +4,11 @@ from pyspark.sql.pandas.serializers import 
ArrowStreamPandasSerializer
 from sedona.spark.worker.udf_info import UDFInfo
 
 class SedonaDBSerializer(ArrowStreamPandasSerializer):
-    def __init__(self, timezone, safecheck, db, udf_info: UDFInfo):
+    def __init__(self, timezone, safecheck, db, udf_info: UDFInfo, 
cast_to_wkb=False):
         super(SedonaDBSerializer, self).__init__(timezone, safecheck)
         self.db = db
         self.udf_info = udf_info
+        self.cast_to_wkb = cast_to_wkb
 
     def load_stream(self, stream):
         import pyarrow as pa
@@ -22,7 +23,7 @@ class SedonaDBSerializer(ArrowStreamPandasSerializer):
 
             df.to_view(table_name)
 
-            sql_expression = 
self.udf_info.sedona_db_transformation_expr(table_name)
+            sql_expression = 
self.udf_info.sedona_db_transformation_expr(table_name, self.cast_to_wkb)
 
             index += 1
 
@@ -37,7 +38,6 @@ class SedonaDBSerializer(ArrowStreamPandasSerializer):
                 if writer is None:
                     writer = pa.RecordBatchStreamWriter(stream, batch.schema)
                 writer.write_batch(batch)
-                # stream.flush()
         finally:
             if writer is not None:
                 writer.close()
diff --git a/python/sedona/spark/worker/udf_info.py 
b/python/sedona/spark/worker/udf_info.py
index d354bcea7e..7853133e77 100644
--- a/python/sedona/spark/worker/udf_info.py
+++ b/python/sedona/spark/worker/udf_info.py
@@ -11,24 +11,23 @@ class UDFInfo:
     return_type: object
     name: str
 
-    def get_function_call_sql(self, table_name: str) -> str:
+    def get_function_call_sql(self, table_name: str, cast_to_wkb: bool = 
False) -> str:
         arg_offset_str = ", ".join([f"_{el}" for el in self.arg_offsets])
         function_expr = f"{self.name}({arg_offset_str})"
-        if isinstance(self.return_type, GeometryType):
+        if isinstance(self.return_type, GeometryType) and cast_to_wkb:
             return f"SELECT ST_GeomToSedonaSpark({function_expr}) AS _0 FROM 
{table_name}"
 
         return f"SELECT {function_expr} AS _0 FROM {table_name}"
 
-    def sedona_db_transformation_expr(self, table_name: str) -> str:
+    def sedona_db_transformation_expr(self, table_name: str, cast_to_wkb: bool 
= False) -> str:
         fields = []
         for arg in self.arg_offsets:
-            if arg in self.geom_offsets:
+            if arg in self.geom_offsets and cast_to_wkb:
                 crs = self.geom_offsets[arg]
                 fields.append(f"ST_GeomFromSedonaSpark(_{arg}, 'EPSG:{crs}') 
AS _{arg}")
                 continue
 
             fields.append(f"_{arg}")
 
-
         fields_expr = ", ".join(fields)
         return f"SELECT {fields_expr} FROM {table_name}"
diff --git a/python/sedona/spark/worker/worker.py 
b/python/sedona/spark/worker/worker.py
index 6b2a18c8f2..17dae02e63 100644
--- a/python/sedona/spark/worker/worker.py
+++ b/python/sedona/spark/worker/worker.py
@@ -15,16 +15,17 @@ from sedona.spark.worker.serde import SedonaDBSerializer
 from sedona.spark.worker.udf_info import UDFInfo
 
 
-def apply_iterator(db, iterator, udf_info: UDFInfo):
+def apply_iterator(db, iterator, udf_info: UDFInfo, cast_to_wkb: bool = False):
     i = 0
     for df in iterator:
         i+=1
         table_name = f"output_table_{i}"
         df.to_view(table_name)
 
-        function_call_sql = udf_info.get_function_call_sql(table_name)
+        function_call_sql = udf_info.get_function_call_sql(table_name, 
cast_to_wkb=cast_to_wkb)
 
         df_out = db.sql(function_call_sql)
+
         df_out.to_view(f"view_{i}")
         at = df_out.to_arrow_table()
         batches = at.combine_chunks().to_batches()
@@ -207,9 +208,9 @@ def main(infile, outfile):
     pickle_ser = CPickleSerializer()
 
     split_index = read_int(infile)
-    #
+
     check_python_version(utf8_deserializer, infile)
-    #
+
     check_barrier_flag(infile)
 
     task_context = assign_task_context(utf_serde=utf8_deserializer, 
infile=infile)
@@ -217,7 +218,7 @@ def main(infile, outfile):
     shuffle.DiskBytesSpilled = 0
 
     resolve_python_path(utf8_deserializer, infile)
-    #
+
     check_broadcast_variables(infile)
 
     eval_type = read_int(infile)
@@ -229,11 +230,14 @@ def main(infile, outfile):
     sedona_db.register_udf(udf.function)
     init_time = time.time()
 
+    cast_to_wkb = read_bool(infile)
+
     serde = SedonaDBSerializer(
         timezone=runner_conf.get("spark.sql.session.timeZone", "UTC"),
         safecheck=False,
         db=sedona_db,
-        udf_info=udf
+        udf_info=udf,
+        cast_to_wkb=cast_to_wkb
     )
 
     number_of_geometries = read_int(infile)
@@ -247,7 +251,7 @@ def main(infile, outfile):
     udf.geom_offsets = geom_offsets
 
     iterator = serde.load_stream(infile)
-    out_iterator = apply_iterator(db=sedona_db, iterator=iterator, 
udf_info=udf)
+    out_iterator = apply_iterator(db=sedona_db, iterator=iterator, 
udf_info=udf, cast_to_wkb=cast_to_wkb)
 
     serde.dump_stream(out_iterator, outfile)
 
diff --git a/python/setup.py b/python/setup.py
new file mode 100644
index 0000000000..66ab74701b
--- /dev/null
+++ b/python/setup.py
@@ -0,0 +1,6 @@
+from setuptools import setup
+import numpy
+
+setup(
+    include_dirs=[numpy.get_include()],
+)
diff --git a/python/src/geom_buf.c b/python/src/geom_buf.c
index 5239de5ae0..d6a51bb3d0 100644
--- a/python/src/geom_buf.c
+++ b/python/src/geom_buf.c
@@ -208,6 +208,8 @@ SedonaErrorCode geom_buf_alloc(GeomBuffer *geom_buf,
   return SEDONA_SUCCESS;
 }
 
+#include <stdio.h>
+
 SedonaErrorCode read_geom_buf_header(const char *buf, int buf_size,
                                      GeomBuffer *geom_buf,
                                      CoordinateSequenceInfo *cs_info,
diff --git a/python/src/geomserde.c b/python/src/geomserde.c
index c1f7427738..81dafe216f 100644
--- a/python/src/geomserde.c
+++ b/python/src/geomserde.c
@@ -718,6 +718,7 @@ static SedonaErrorCode 
deserialize_geom_buf(GEOSContextHandle_t handle,
   return SEDONA_SUCCESS;
 }
 
+#include <stdio.h>
 SedonaErrorCode sedona_deserialize_geom(GEOSContextHandle_t handle,
                                         const char *buf, int buf_size,
                                         GEOSGeometry **p_geom,
diff --git a/python/src/geomserde_speedup_module.c 
b/python/src/geomserde_speedup_module.c
index a95ced29e5..621f956cd0 100644
--- a/python/src/geomserde_speedup_module.c
+++ b/python/src/geomserde_speedup_module.c
@@ -20,10 +20,15 @@
 #define PY_SSIZE_T_CLEAN
 #include <Python.h>
 #include <stdio.h>
+//
+//#define NPY_NO_DEPRECATED_API NPY_1_7_API_VERSION
 
 #include "geomserde.h"
 #include "geos_c_dyn.h"
 #include "pygeos/c_api.h"
+#include <numpy/ndarraytypes.h>
+#include <numpy/npy_3kcompat.h>
+#include <numpy/ufuncobject.h>
 
 PyDoc_STRVAR(module_doc, "Geometry serialization/deserialization module.");
 
@@ -225,7 +230,7 @@ static PyObject *serialize(PyObject *self, PyObject *args) {
   return do_serialize(geos_geom);
 }
 
-static PyObject *deserialize(PyObject *self, PyObject *args) {
+static PyObject *deserialize_2(PyObject *self, PyObject *args) {
   GEOSContextHandle_t handle = NULL;
   int length = 0;
   GEOSGeometry *geom = do_deserialize(args, &handle, &length);
@@ -262,16 +267,106 @@ static PyObject *deserialize_1(PyObject *self, PyObject 
*args) {
   return Py_BuildValue("(Kibi)", geom, geom_type_id, has_z, length);
 }
 
+static PyObject *to_sedona_func(PyObject *self, PyObject *args) {
+    import_array();
+    PyObject *input_obj = NULL;
+    if (!PyArg_ParseTuple(args, "O", &input_obj)){
+        return NULL;
+    };
+
+    PyArrayObject *array = (PyArrayObject *)input_obj;
+    PyObject **objs = (PyObject **)PyArray_DATA(array);
+
+    GEOSContextHandle_t handle = get_geos_context_handle();
+      if (handle == NULL) {
+        return NULL;
+      }
+
+    npy_intp n = PyArray_SIZE(input_obj);
+    npy_intp dims[1] = {n};
+    PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, 
NPY_OBJECT);
+    for (npy_intp i = 0; i < PyArray_SIZE(array); i++) {
+          PyObject *obj = objs[i];
+          GEOSGeometry *geos_geom = NULL;
+          char success = PyGEOS_GetGEOSGeometry(obj, &geos_geom);
+
+          PyObject *serialized = do_serialize(geos_geom);
+          PyArray_SETITEM(out, PyArray_GETPTR1(out, i), serialized);
+    }
+
+    return out;
+}
 /* Module definition for Shapely 2.x */
+static PyObject *from_sedona_func(PyObject *self, PyObject *args) {
+    import_array();
+    PyObject *input_obj = NULL;
+    if (!PyArg_ParseTuple(args, "O", &input_obj)){
+        return NULL;
+    };
+
+    GEOSContextHandle_t handle = get_geos_context_handle();
+
+    PyArrayObject *array = (PyArrayObject *)input_obj;
+    PyObject **objs = (PyObject **)PyArray_DATA(array);
+
+    int p_bytes_read = 0;
+
+    npy_intp n = PyArray_SIZE(input_obj);
+
+    npy_intp dims[1] = {n};
+    PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, 
NPY_OBJECT);
+
+    for (npy_intp i = 0; i < PyArray_SIZE(array); i++) {
+        PyObject *obj = objs[i];
+        if (!PyBytes_Check(obj)) {
+            PyErr_SetString(PyExc_TypeError, "Expected bytes");
+            return NULL;
+        }
+
+        char *buf = PyBytes_AS_STRING(obj);
+
+        Py_ssize_t len = PyBytes_GET_SIZE(obj);
+
+        GEOSGeometry *geom = NULL;
+
+        SedonaErrorCode err = sedona_deserialize_geom(handle, buf, len, &geom, 
&p_bytes_read);
+        if (err != SEDONA_SUCCESS) {
+          handle_geomserde_error(err);
+          return NULL;
+        }
+          PyObject *pygeom = PyGEOS_CreateGeometry(geom, handle);
+
+          PyArray_SETITEM(out, PyArray_GETPTR1(out, i), pygeom);
+        }
+
+        return out;
+}
+
 
 static PyMethodDef geomserde_methods_shapely_2[] = {
     {"load_libgeos_c", load_libgeos_c, METH_VARARGS, "Load libgeos_c."},
     {"serialize", serialize, METH_VARARGS,
      "Serialize geometry object as bytearray."},
-    {"deserialize", deserialize, METH_VARARGS,
+    {"deserialize_2", deserialize_2, METH_VARARGS,
+     "Deserialize bytes-like object to geometry object."},
+    {"from_sedona_func", from_sedona_func, METH_VARARGS,
+     "Deserialize bytes-like object to geometry object."},
+    {"to_sedona_func", to_sedona_func, METH_VARARGS,
      "Deserialize bytes-like object to geometry object."},
     {NULL, NULL, 0, NULL}, /* Sentinel */
 };
+//
+//static int add_from_sedona_func_to_module(PyObject *m) {
+//  PyObject *capsule = PyCapsule_New((void *)from_sedona_func, 
"from_sedona_func", NULL);
+//  if (capsule == NULL) {
+//    return -1;
+//  }
+//  if (PyModule_AddObject(m, "from_sedona_func", capsule) < 0) {
+//    Py_DECREF(capsule);
+//    return -1;
+//  }
+//  return 0;
+//}
 
 static struct PyModuleDef geomserde_module_shapely_2 = {
     PyModuleDef_HEAD_INIT, "geomserde_speedup", module_doc, 0,
diff --git a/python/tests/test_base.py b/python/tests/test_base.py
index 911860e416..e240a09758 100644
--- a/python/tests/test_base.py
+++ b/python/tests/test_base.py
@@ -72,6 +72,8 @@ class TestBase:
                 )
                 .config("spark.executor.memory", "10G") \
                 .config("spark.driver.memory", "10G") \
+                .config("sedona.python.worker.udf.daemon.module", 
"sedona.spark.worker.daemon") \
+                .config("sedona.python.worker.daemon.enabled", "false") \
                 # Pandas on PySpark doesn't work with ANSI mode, which is 
enabled by default
                 # in Spark 4
                 .config("spark.sql.ansi.enabled", "false")
diff --git a/python/tests/utils/test_sedona_db_vectorized_udf.py 
b/python/tests/utils/test_sedona_db_vectorized_udf.py
index 904d59a282..4b266384fa 100644
--- a/python/tests/utils/test_sedona_db_vectorized_udf.py
+++ b/python/tests/utils/test_sedona_db_vectorized_udf.py
@@ -1,12 +1,106 @@
+import time
+
+import numpy as np
+
 from sedona.spark.sql.functions import sedona_db_vectorized_udf
+from sedona.spark.utils.udf import to_sedona_func, from_sedona_func
 from tests.test_base import TestBase
 import pyarrow as pa
 import shapely
 from sedona.sql import GeometryType
 from pyspark.sql.functions import expr, lit
-from pyspark.sql.types import DoubleType, IntegerType
+from pyspark.sql.types import DoubleType, IntegerType, ByteType
 from sedona.spark.sql import ST_X
-
+from shapely._enum import ParamEnum
+
+def test_m():
+    on_invalid="raise"
+    wkb = 
b'\x12\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\xf0?'
+    geometry = np.asarray([wkb, wkb], dtype=object)
+
+    DecodingErrorOptions = ParamEnum(
+        "DecodingErrorOptions", {"ignore": 0, "warn": 1, "raise": 2, "fix": 3}
+    )
+
+    # print("sss")
+
+
+    # <class 'numpy.ndarray'>
+    # object
+    #   C_CONTIGUOUS : True
+    #   F_CONTIGUOUS : True
+    #   OWNDATA : False
+    #   WRITEABLE : True
+    #   ALIGNED : True
+    #   WRITEBACKIFCOPY : False
+    # print(type(geometry))
+    # print(geometry.dtype)
+    # print(geometry.flags)
+
+    result = from_sedona_func(geometry)
+
+    result2 = to_sedona_func(result)
+
+# ensure the input has object dtype, to avoid numpy inferring it as a
+# fixed-length string dtype (which removes trailing null bytes upon access
+# of array elements)
+    #
+    # def from_sedona_func(arr):
+    #     try:
+    #         from . import sedonaserde_vectorized_udf_module
+    #         print(sedonaserde_vectorized_udf_module.from_sedona_func_3(arr))
+    #     except Exception as e:
+    #         print("Cannot import sedonaserde_vectorized_udf_module:")
+    #         print(e)
+    # # print()
+    # return None
+#
+# def from_wkb(geometry, on_invalid="raise", **kwargs):
+#     r"""Create geometries from the Well-Known Binary (WKB) representation.
+#
+#     The Well-Known Binary format is defined in the `OGC Simple Features
+#     Specification for SQL <https://www.opengeospatial.org/standards/sfs>`__.
+#
+#     Parameters
+#     ----------
+#     geometry : str or array_like
+#         The WKB byte object(s) to convert.
+#     on_invalid : {"raise", "warn", "ignore", "fix"}, default "raise"
+#         Indicates what to do when an invalid WKB is encountered. Note that 
the
+#         validations involved are very basic, e.g. the minimum number of 
points
+#         for the geometry type. For a thorough check, use :func:`is_valid` 
after
+#         conversion to geometries. Valid options are:
+#
+#         - raise: an exception will be raised if any input geometry is 
invalid.
+#         - warn: a warning will be raised and invalid WKT geometries will be
+#           returned as ``None``.
+#         - ignore: invalid geometries will be returned as ``None`` without a
+#           warning.
+#         - fix: an effort is made to fix invalid input geometries (currently 
just
+#           unclosed rings). If this is not possible, they are returned as
+#           ``None`` without a warning. Requires GEOS >= 3.11.
+#
+#           .. versionadded:: 2.1.0
+#     **kwargs
+#         See :ref:`NumPy ufunc docs <ufuncs.kwargs>` for other keyword 
arguments.
+#
+#     Examples
+#     --------
+#     >>> import shapely
+#     >>> 
shapely.from_wkb(b'\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\xf0?\x00\x00\x00\x00\x00\x00\xf0?')
+#     <POINT (1 1)>
+#
+#     """  # noqa: E501
+#     if not np.isscalar(on_invalid):
+#         raise TypeError("on_invalid only accepts scalar values")
+#
+#     invalid_handler = np.uint8(DecodingErrorOptions.get_value(on_invalid))
+#
+#     # ensure the input has object dtype, to avoid numpy inferring it as a
+#     # fixed-length string dtype (which removes trailing null bytes upon 
access
+#     # of array elements)
+#     geometry = np.asarray(geometry, dtype=object)
+#     return lib.from_wkb(geometry, invalid_handler, **kwargs)
 
 class TestSedonaDBArrowFunction(TestBase):
     def test_vectorized_udf(self):
@@ -15,7 +109,6 @@ class TestSedonaDBArrowFunction(TestBase):
             geom_wkb = pa.array(geom.storage.to_array())
             distance = pa.array(distance.to_array())
             geom = shapely.from_wkb(geom_wkb)
-
             result_shapely = shapely.centroid(geom)
 
             return pa.array(shapely.to_wkb(result_shapely))
@@ -95,18 +188,26 @@ class TestSedonaDBArrowFunction(TestBase):
         assert crs_list == [3857, 3857, 3857]
 
     def test_geometry_to_geometry(self):
-        @sedona_db_vectorized_udf(return_type=GeometryType(), 
input_types=[GeometryType()])
+        @sedona_db_vectorized_udf(return_type=GeometryType(), 
input_types=[ByteType()])
         def buffer_geometry(geom):
             geom_wkb = pa.array(geom.storage.to_array())
-            geom = shapely.from_wkb(geom_wkb)
+            geometry_array = np.asarray(geom_wkb, dtype=object)
+            geom = from_sedona_func(geometry_array)
 
             result_shapely = shapely.buffer(geom, 10)
 
-            return pa.array(shapely.to_wkb(result_shapely))
+            return pa.array(to_sedona_func(result_shapely))
 
         df = self.spark.read.\
             format("geoparquet").\
-            
load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings_large_3")
+            
load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona_dupl_l1")
+
+        # 1 045 770
+        # print(df.count())
+
+        # df.unionAll(df).unionAll(df).unionAll(df).unionAll(df).unionAll(df).\
+        #     
unionAll(df).unionAll(df).unionAll(df).unionAll(df).unionAll(df).\
+        #     
write.format("geoparquet").mode("overwrite").save("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona_dupl_l2")
         # 18 24
         # df.union(df).union(df).union(df).union(df).union(df).union(df).\
         #     
write.format("geoparquet").mode("overwrite").save("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings_large_3")
@@ -117,6 +218,12 @@ class TestSedonaDBArrowFunction(TestBase):
 
         values.show()
 
+        # for _ in range(4):
+        #     start_time = time.time()
+        #     values.show()
+        #     end_time = time.time()
+        #     print(f"Execution time: {end_time - start_time} seconds")
+
     def test_geometry_to_geometry_normal_udf(self):
         from pyspark.sql.functions import udf
 
@@ -127,10 +234,19 @@ class TestSedonaDBArrowFunction(TestBase):
 
         df = self.spark.read. \
             format("geoparquet"). \
-            
load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings_large_3")
+            
load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona_dupl_l2")
 
+        # print(df.count())
+        # df.limit(10).collect()
         values = df.select(create_buffer_udf(df.geometry).alias("geometry")). \
             selectExpr("ST_Area(geometry) as area"). \
             selectExpr("Sum(area) as total_area")
 
         values.show()
+
+        # for _ in range(4):
+        #     start_time = time.time()
+        #     values.show()
+        #     end_time = time.time()
+        #     print(f"Execution time: {end_time - start_time} seconds")
+# 1 045 770
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala 
b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala
index 11263dd7f6..0f1a5fe0a0 100644
--- a/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/sql/UDF/PythonEvalType.scala
@@ -25,6 +25,7 @@ object PythonEvalType {
 
   // sedona db eval types
   val SQL_SCALAR_SEDONA_DB_UDF = 6200
+  val SQL_SCALAR_SEDONA_DB_NO_SPEEDUP_UDF = 6201
   val SEDONA_DB_UDF_TYPE_CONSTANT = 6000
 
   def toString(pythonEvalType: Int): String = pythonEvalType match {
@@ -32,5 +33,6 @@ object PythonEvalType {
     case SQL_SCALAR_SEDONA_DB_UDF => "SQL_SCALAR_SEDONA_DB_UDF"
   }
 
-  def evals(): Set[Int] = Set(SQL_SCALAR_SEDONA_UDF, SQL_SCALAR_SEDONA_DB_UDF)
+  def evals(): Set[Int] =
+    Set(SQL_SCALAR_SEDONA_UDF, SQL_SCALAR_SEDONA_DB_UDF, 
SQL_SCALAR_SEDONA_DB_NO_SPEEDUP_UDF)
 }
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala
index 0d3960d2d8..3055e768b9 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowPythonRunner.scala
@@ -38,13 +38,15 @@ class SedonaArrowPythonRunner(
     protected override val workerConf: Map[String, String],
     val pythonMetrics: Map[String, SQLMetric],
     jobArtifactUUID: Option[String],
-    geometryFields: Seq[(Int, Int)])
+    geometryFields: Seq[(Int, Int)],
+    castGeometryToWKB: Boolean = false)
     extends SedonaBasePythonRunner[Iterator[InternalRow], ColumnarBatch](
       funcs,
       evalType,
       argOffsets,
       jobArtifactUUID,
-      geometryFields)
+      geometryFields,
+      castGeometryToWKB)
     with SedonaBasicPythonArrowInput
     with SedonaBasicPythonArrowOutput {
 
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala
index bb897931b6..228ddc2cbc 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala
@@ -19,7 +19,7 @@
 package org.apache.spark.sql.execution.python
 
 import org.apache.sedona.sql.UDF.PythonEvalType
-import org.apache.sedona.sql.UDF.PythonEvalType.{SQL_SCALAR_SEDONA_DB_UDF, 
SQL_SCALAR_SEDONA_UDF}
+import 
org.apache.sedona.sql.UDF.PythonEvalType.{SQL_SCALAR_SEDONA_DB_NO_SPEEDUP_UDF, 
SQL_SCALAR_SEDONA_DB_UDF, SQL_SCALAR_SEDONA_UDF}
 import org.apache.spark.api.python.ChainedPythonFunctions
 import org.apache.spark.sql.Strategy
 import org.apache.spark.sql.catalyst.InternalRow
@@ -115,10 +115,10 @@ case class SedonaArrowEvalPythonExec(
     val batchIter = if (batchSize > 0) new BatchIterator(full, batchSize) else 
Iterator(full)
 
     evalType match {
-      case SQL_SCALAR_SEDONA_DB_UDF =>
+      case SQL_SCALAR_SEDONA_DB_UDF | SQL_SCALAR_SEDONA_DB_NO_SPEEDUP_UDF =>
         val columnarBatchIter = new SedonaArrowPythonRunner(
           funcs,
-          evalType - PythonEvalType.SEDONA_DB_UDF_TYPE_CONSTANT,
+          200,
           argOffsets,
           schema,
           sessionLocalTimeZone,
@@ -126,7 +126,9 @@ case class SedonaArrowEvalPythonExec(
           pythonRunnerConf,
           pythonMetrics,
           jobArtifactUUID,
-          geometryFields).compute(batchIter, context.partitionId(), context)
+          geometryFields,
+          evalType == SQL_SCALAR_SEDONA_DB_UDF)
+          .compute(batchIter, context.partitionId(), context)
 
         val result = columnarBatchIter.flatMap { batch =>
           batch.rowIterator.asScala
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
index 276383a0ee..055d5db15f 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala
@@ -39,14 +39,16 @@ private[spark] abstract class SedonaBasePythonRunner[IN, 
OUT](
     evalType: Int,
     argOffsets: Array[Array[Int]],
     jobArtifactUUID: Option[String],
-    val geometryFields: Seq[(Int, Int)] = Seq.empty)
+    val geometryFields: Seq[(Int, Int)] = Seq.empty,
+    val castGeometryToWKB: Boolean = false)
     extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets, 
jobArtifactUUID)
     with Logging {
 
   require(funcs.length == argOffsets.length, "argOffsets should have the same 
length as funcs")
 
   private val conf = SparkEnv.get.conf
-  private val reuseWorker = conf.getBoolean(PYTHON_WORKER_REUSE.key, 
PYTHON_WORKER_REUSE.defaultValue.get)
+  private val reuseWorker =
+    conf.getBoolean(PYTHON_WORKER_REUSE.key, 
PYTHON_WORKER_REUSE.defaultValue.get)
   private val faultHandlerEnabled = 
conf.get(PYTHON_WORKER_FAULTHANLDER_ENABLED)
 
   private def getWorkerMemoryMb(mem: Option[Long], cores: Int): Option[Long] = 
{
@@ -81,9 +83,12 @@ private[spark] abstract class SedonaBasePythonRunner[IN, 
OUT](
       envVars.put("PYTHON_FAULTHANDLER_DIR", 
SedonaBasePythonRunner.faultHandlerLogDir.toString)
     }
 
+    if (reuseWorker) {
+      envVars.put("SPARK_REUSE_WORKER", "1")
+    }
+
     envVars.put("SPARK_JOB_ARTIFACT_UUID", 
jobArtifactUUID.getOrElse("default"))
 
-    println("running the compute for SedonaBasePythonRunner and partition 
index: " + partitionIndex)
     val (worker: Socket, pid: Option[Int]) = {
       WorkerContext.createPythonWorker(pythonExec, envVars.asScala.toMap)
     }
@@ -98,7 +103,6 @@ private[spark] abstract class SedonaBasePythonRunner[IN, 
OUT](
 
       if (!reuseWorker || releasedOrClosed.compareAndSet(false, true)) {
         try {
-          logInfo("Shutting down worker socket")
           worker.close()
         } catch {
           case e: Exception =>
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala
index 93bcaee0c6..459388856b 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaDBWorkerFactory.scala
@@ -34,7 +34,6 @@ import org.apache.spark.security.SocketAuthHelper
 import 
org.apache.spark.sql.execution.python.SedonaPythonWorkerFactory.PROCESS_WAIT_TIMEOUT_MS
 import org.apache.spark.util.RedirectThread
 
-import java.util.concurrent.TimeUnit
 import javax.annotation.concurrent.GuardedBy
 
 class SedonaDBWorkerFactory(pythonExec: String, envVars: Map[String, String]) 
extends Logging {
@@ -181,7 +180,6 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: 
Map[String, String]) ex
   }
 
   private def stopDaemon(): Unit = {
-    logError("daemon stopping called")
     self.synchronized {
       if (useDaemon) {
         cleanupIdleWorkers()
@@ -194,7 +192,6 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: 
Map[String, String]) ex
         daemon = null
         daemonPort = 0
       } else {
-        println("Stopping simple workers")
         simpleWorkers.mapValues(_.destroy())
       }
     }
@@ -233,11 +230,11 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: 
Map[String, String]) ex
           daemonPort = in.readInt()
         } catch {
           case _: EOFException if daemon.isAlive =>
-            throw SparkCoreErrors.eofExceptionWhileReadPortNumberError(
-              sedonaDaemonModule)
+            throw 
SparkCoreErrors.eofExceptionWhileReadPortNumberError(sedonaDaemonModule)
           case _: EOFException =>
-            throw SparkCoreErrors.
-              eofExceptionWhileReadPortNumberError(sedonaDaemonModule, 
Some(daemon.exitValue))
+            throw SparkCoreErrors.eofExceptionWhileReadPortNumberError(
+              sedonaDaemonModule,
+              Some(daemon.exitValue))
         }
 
         // test that the returned port number is within a valid range.
@@ -261,7 +258,6 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: 
Map[String, String]) ex
         redirectStreamsToStderr(in, daemon.getErrorStream)
       } catch {
         case e: Exception =>
-
           // If the daemon exists, wait for it to finish and get its stderr
           val stderr = Option(daemon)
             .flatMap { d => Utils.getStderr(d, PROCESS_WAIT_TIMEOUT_MS) }
@@ -307,7 +303,6 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: 
Map[String, String]) ex
 
   def releaseWorker(worker: Socket): Unit = {
     if (useDaemon) {
-      logInfo("Releasing worker back to daemon pool")
       self.synchronized {
         lastActivityNs = System.nanoTime()
         idleWorkers.enqueue(worker)
@@ -345,5 +340,4 @@ class SedonaDBWorkerFactory(pythonExec: String, envVars: 
Map[String, String]) ex
 
 private object SedonaPythonWorkerFactory {
   val PROCESS_WAIT_TIMEOUT_MS = 10000
-  val IDLE_WORKER_TIMEOUT_NS = TimeUnit.MINUTES.toNanos(1)  // kill idle 
workers after 1 minute
 }
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
index 18db42ae0d..2544e63a97 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
@@ -60,6 +60,9 @@ private[python] trait SedonaPythonArrowInput[IN] extends 
PythonArrowInput[IN] {
         handleMetadataBeforeExec(dataOut)
         writeUDF(dataOut, funcs, argOffsets)
 
+        // if speedup is not available and we need to use casting
+        dataOut.writeBoolean(self.castGeometryToWKB)
+
         // write
         dataOut.writeInt(self.geometryFields.length)
         // write geometry field indices and their SRIDs
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
index 0c0b220933..27764c2a54 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
@@ -34,7 +34,8 @@ import org.apache.spark.sql.vectorized.{ArrowColumnVector, 
ColumnVector, Columna
 
 private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] { self: 
BasePythonRunner[_, OUT] =>
 
-  private val reuseWorker = 
SparkEnv.get.conf.getBoolean(PYTHON_WORKER_REUSE.key, 
PYTHON_WORKER_REUSE.defaultValue.get)
+  private val reuseWorker =
+    SparkEnv.get.conf.getBoolean(PYTHON_WORKER_REUSE.key, 
PYTHON_WORKER_REUSE.defaultValue.get)
 
   protected def pythonMetrics: Map[String, SQLMetric]
 
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala
index 82fe6dedda..6411bec97e 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/WorkerContext.scala
@@ -24,24 +24,28 @@ import scala.collection.mutable
 object WorkerContext {
 
   def createPythonWorker(
-                          pythonExec: String,
-                          envVars: Map[String, String]): (java.net.Socket, 
Option[Int]) = {
+      pythonExec: String,
+      envVars: Map[String, String]): (java.net.Socket, Option[Int]) = {
     synchronized {
       val key = (pythonExec, envVars)
       pythonWorkers.getOrElseUpdate(key, new SedonaDBWorkerFactory(pythonExec, 
envVars)).create()
     }
   }
 
-  def destroyPythonWorker(pythonExec: String,
-                          envVars: Map[String, String], worker: Socket): Unit 
= {
+  def destroyPythonWorker(
+      pythonExec: String,
+      envVars: Map[String, String],
+      worker: Socket): Unit = {
     synchronized {
       val key = (pythonExec, envVars)
       pythonWorkers.get(key).foreach(_.stopWorker(worker))
     }
   }
 
-  def releasePythonWorker(pythonExec: String,
-                          envVars: Map[String, String], worker: Socket): Unit 
= {
+  def releasePythonWorker(
+      pythonExec: String,
+      envVars: Map[String, String],
+      worker: Socket): Unit = {
     synchronized {
       val key = (pythonExec, envVars)
       pythonWorkers.get(key).foreach(_.releaseWorker(worker))
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
index ebb5a568e1..3584cb01bd 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/udf/ExtractSedonaUDFRule.scala
@@ -44,7 +44,8 @@ class ExtractSedonaUDFRule extends Rule[LogicalPlan] with 
Logging {
   }
 
   def isScalarPythonUDF(e: Expression): Boolean = {
-    e.isInstanceOf[PythonUDF] && 
PythonEvalType.evals.contains(e.asInstanceOf[PythonUDF].evalType)
+    e.isInstanceOf[PythonUDF] &&
+    PythonEvalType.evals.contains(e.asInstanceOf[PythonUDF].evalType)
   }
 
   private def collectEvaluableUDFsFromExpressions(
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index e64e9dec3b..c9b4d6ac28 100644
--- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -46,9 +46,9 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
     // We need to be explicit about broadcasting in tests.
     .config("sedona.join.autoBroadcastJoinThreshold", "-1")
     .config("spark.sql.extensions", 
"org.apache.sedona.sql.SedonaSqlExtensions")
-    .config("sedona.python.worker.udf.module", "sedonaworker.worker")
+    .config("sedona.python.worker.udf.module", "sedona.spark.worker.worker")
     .config("sedona.python.worker.udf.daemon.module", "sedonaworker.daemon")
-    .config("sedona.python.worker.daemon.enabled", "true")
+    .config("sedona.python.worker.daemon.enabled", "false")
 //    .config(keyParserExtension, ThreadLocalRandom.current().nextBoolean())
     .getOrCreate()
 
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
index 000c1f55b6..4fe4acfb12 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
@@ -35,7 +35,6 @@ class StrategySuite extends TestBaseScala with Matchers {
 
   import spark.implicits._
 
-
   it("sedona geospatial UDF - geopandas") {
     val df = Seq(
       (1, "value", wktReader.read("POINT(21 52)")),
@@ -50,7 +49,8 @@ class StrategySuite extends TestBaseScala with Matchers {
 
     geopandasUDFDF.count shouldEqual 5
 
-    geopandasUDFDF.selectExpr("ST_AsText(ST_ReducePrecision(geom_buffer, 2))")
+    geopandasUDFDF
+      .selectExpr("ST_AsText(ST_ReducePrecision(geom_buffer, 2))")
       .as[String]
       .collect() should contain theSameElementsAs Seq(
       "POLYGON ((20 51, 20 53, 22 53, 22 51, 20 51))",
@@ -61,36 +61,33 @@ class StrategySuite extends TestBaseScala with Matchers {
   }
 
   it("sedona geospatial UDF - sedona db") {
-//    val df = Seq(
-//      (1, "value", wktReader.read("POINT(21 52)")),
-//      (2, "value1", wktReader.read("POINT(20 50)")),
-//      (3, "value2", wktReader.read("POINT(20 49)")),
-//      (4, "value3", wktReader.read("POINT(20 48)")),
-//      (5, "value4", wktReader.read("POINT(20 47)")))
-//      .toDF("id", "value", "geom")
-//
-//    val dfVectorized = df
-//      .withColumn("geometry", expr("ST_SetSRID(geom, '4326')"))
-//      .select(sedonaDBGeometryToGeometryFunction(col("geometry"), 
lit(100)).alias("geom"))
-
-//    dfVectorized.selectExpr("ST_X(ST_Centroid(geom)) AS x")
-//      .selectExpr("sum(x)")
-//      .as[Double]
-//      .collect().head shouldEqual 101
-
-    val dfCopied = sparkSession.read
-      .format("geoparquet")
-      
.load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona/barcelona.geoparquet")
+    val df = Seq(
+      (1, "value", wktReader.read("POINT(21 52)")),
+      (2, "value1", wktReader.read("POINT(20 50)")),
+      (3, "value2", wktReader.read("POINT(20 49)")),
+      (4, "value3", wktReader.read("POINT(20 48)")),
+      (5, "value4", wktReader.read("POINT(20 47)")))
+      .toDF("id", "value", "geom")
 
-    val values = dfCopied.unionAll(dfCopied)
-      .unionAll(dfCopied)
-//      .unionAll(dfCopied)
-//      .unionAll(dfCopied)
-//      .unionAll(dfCopied)
-      .select(sedonaDBGeometryToGeometryFunction(col("geometry"), 
lit(10)).alias("geom"))
-      .selectExpr("ST_Area(geom) as area")
-      .selectExpr("Sum(area) as total_area")
+    val dfVectorized = df
+      .withColumn("geometry", expr("ST_SetSRID(geom, '4326')"))
+      .select(sedonaDBGeometryToGeometryFunction(col("geometry"), 
lit(100)).alias("geom"))
 
-    values.show()
+    dfVectorized.selectExpr("ST_X(ST_Centroid(geom)) AS x")
+      .selectExpr("sum(x)")
+      .as[Double]
+      .collect().head shouldEqual 101
+//
+//    val dfCopied = sparkSession.read
+//      .format("geoparquet")
+//      .load(
+//        
"/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/book/source_data/transportation_barcelona/barcelona.geoparquet")
+//
+//    val values = dfCopied
+//      .select(sedonaDBGeometryToGeometryFunction(col("geometry"), 
lit(10)).alias("geom"))
+//      .selectExpr("ST_Area(geom) as area")
+//      .selectExpr("Sum(area) as total_area")
+//
+//    values.show()
   }
 }

Reply via email to