This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch add-sedona-worker-daemon-mode
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 0fd81806a87ebc8cdaf50d75dccbd1a6bf7e16e3
Author: pawelkocinski <[email protected]>
AuthorDate: Wed Jan 14 16:17:48 2026 +0100

    add sedonadb sedona udf worker example
---
 python/pyproject.toml                              | 49 +++++++++++++---------
 python/sedona/spark/utils/geometry_serde.py        |  6 ++-
 python/sedona/spark/worker/daemon.py               | 17 --------
 python/src/geom_buf.c                              |  2 -
 python/src/geomserde.c                             |  1 -
 python/src/geomserde_speedup_module.c              | 33 ++++++++++++++-
 python/tests/test_base.py                          |  2 +-
 .../org/apache/sedona/spark/SedonaContext.scala    |  1 +
 .../execution/python/SedonaPythonArrowInput.scala  | 30 -------------
 .../execution/python/SedonaPythonArrowOutput.scala |  5 ---
 .../org/apache/sedona/sql/SQLSyntaxTestScala.scala |  8 ++--
 .../org/apache/sedona/sql/TestBaseScala.scala      | 22 ++++------
 .../apache/spark/sql/udf/TestScalarPandasUDF.scala |  3 --
 13 files changed, 77 insertions(+), 102 deletions(-)

diff --git a/python/pyproject.toml b/python/pyproject.toml
index 37159cf83b..8b2a06a5d3 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -26,7 +26,7 @@ description = "Apache Sedona is a cluster computing system 
for processing large-
 readme = "README.md"
 license = { text = "Apache-2.0" }
 authors = [ { name = "Apache Sedona", email = "[email protected]" } ]
-requires-python = ">=3.12"
+requires-python = ">=3.8"
 classifiers = [
   "Programming Language :: Python :: 3",
   "License :: OSI Approved :: Apache Software License",
@@ -50,16 +50,38 @@ kepler-map = ["geopandas", "keplergl==0.3.2"]
 flink = ["apache-flink>=1.19.0"]
 db = ["sedonadb[geopandas]; python_version >= '3.9'"]
 all = [
-#  "pyspark>=3.4.0,<4.1.0",
-#  "geopandas",
-#  "pydeck==0.8.0",
-#  "keplergl==0.3.2",
-#  "rasterio>=1.2.10",
+  "pyspark>=3.4.0,<4.1.0",
+  "geopandas",
+  "pydeck==0.8.0",
+  "keplergl==0.3.2",
+  "rasterio>=1.2.10",
 ]
 
 [dependency-groups]
 dev = [
-    "pytest>=9.0.2",
+  "pytest",
+  "pytest-cov",
+  "notebook==6.4.12",
+  "jupyter",
+  "mkdocs",
+  "scikit-learn",
+  "esda",
+  "libpysal",
+  "matplotlib",  # implicit dependency of esda
+  # prevent incompatibility with pysal 4.7.0, which is what is resolved to 
when shapely >2 is specified
+  "scipy<=1.10.0",
+  "pandas>=2.0.0",
+  "numpy<2",
+  "geopandas",
+  # 
https://stackoverflow.com/questions/78949093/how-to-resolve-attributeerror-module-fiona-has-no-attribute-path
+  # cannot set geopandas>=0.14.4 since it doesn't support python 3.8, so we 
pin fiona to <1.10.0
+  "fiona<1.10.0",
+  "pyarrow",
+  "pyspark>=3.4.0,<4.1.0",
+  "keplergl==0.3.2",
+  "pydeck==0.8.0",
+  "pystac==1.5.0",
+  "rasterio>=1.2.10",
 ]
 
 [project.urls]
@@ -80,20 +102,7 @@ exclude = ["*.tests", "*.tests.*", "tests", "tests.*"]
 name = "sedona.spark.utils.geomserde_speedup"
 sources = [
   "src/geomserde_speedup_module.c",
-  "src/sedonaserde_vectorized_udf_module.c",
   "src/geomserde.c",
   "src/geom_buf.c",
   "src/geos_c_dyn.c",
 ]
-
-[[tool.setuptools.ext-modules]]
-name = "sedona.spark.utils.sedonaserde_vectorized_udf_module"
-sources = [
-    "src/sedonaserde_vectorized_udf_module.c",
-    "src/geomserde.c",
-    "src/geom_buf.c",
-    "src/geos_c_dyn.c",
-]
-
-[tool.uv.sources]
-sedonadb = { path = 
"../../../sedona-db/target/wheels/sedonadb-0.3.0-cp312-cp312-macosx_11_0_arm64.whl"
 }
diff --git a/python/sedona/spark/utils/geometry_serde.py 
b/python/sedona/spark/utils/geometry_serde.py
index 0ef3d4ed5c..103eb49817 100644
--- a/python/sedona/spark/utils/geometry_serde.py
+++ b/python/sedona/spark/utils/geometry_serde.py
@@ -25,6 +25,9 @@ from shapely.geometry.base import BaseGeometry
 
 speedup_enabled = False
 
+
+# Use geomserde_speedup when available, otherwise fallback to general pure
+# python implementation.
 try:
     from . import geomserde_speedup
 
@@ -57,9 +60,8 @@ try:
         def deserialize(buf: bytearray) -> Optional[BaseGeometry]:
             if buf is None:
                 return None
-            return geomserde_speedup.deserialize_2(buf)
+            return geomserde_speedup.deserialize(buf)
 
-        # Export the from_sedona_func for use with numpy ufuncs
         speedup_enabled = True
 
     elif shapely.__version__.startswith("1."):
diff --git a/python/sedona/spark/worker/daemon.py 
b/python/sedona/spark/worker/daemon.py
index ce75e376ea..266baf76d5 100644
--- a/python/sedona/spark/worker/daemon.py
+++ b/python/sedona/spark/worker/daemon.py
@@ -40,23 +40,7 @@ def compute_real_exit_code(exit_code):
     else:
         return 1
 
-
-logger = logging.getLogger(__name__)
-logger.setLevel(logging.INFO)
-
-file_handler = logging.FileHandler(
-    
"/Users/pawelkocinski/Desktop/projects/sedonaworker/sedonaworker/logs/worker_daemon_main.log",
-    delay=False,
-)
-file_handler.flush = file_handler.stream.flush
-
-logger.addHandler(file_handler)
-
-
 def worker(sock, authenticated):
-    logger.info(
-        "Starting worker process with pid =" + str(os.getpid()) + " socket " + 
str(sock)
-    )
     """
     Called by a worker process after the fork().
     """
@@ -207,7 +191,6 @@ def manager():
                         authenticated = False
                         while True:
                             code = worker(sock, authenticated)
-                            logger.info("Worker exited with code %d", code)
                             if code == 0:
                                 authenticated = True
                             if not reuse or code:
diff --git a/python/src/geom_buf.c b/python/src/geom_buf.c
index d6a51bb3d0..5239de5ae0 100644
--- a/python/src/geom_buf.c
+++ b/python/src/geom_buf.c
@@ -208,8 +208,6 @@ SedonaErrorCode geom_buf_alloc(GeomBuffer *geom_buf,
   return SEDONA_SUCCESS;
 }
 
-#include <stdio.h>
-
 SedonaErrorCode read_geom_buf_header(const char *buf, int buf_size,
                                      GeomBuffer *geom_buf,
                                      CoordinateSequenceInfo *cs_info,
diff --git a/python/src/geomserde.c b/python/src/geomserde.c
index 81dafe216f..c1f7427738 100644
--- a/python/src/geomserde.c
+++ b/python/src/geomserde.c
@@ -718,7 +718,6 @@ static SedonaErrorCode 
deserialize_geom_buf(GEOSContextHandle_t handle,
   return SEDONA_SUCCESS;
 }
 
-#include <stdio.h>
 SedonaErrorCode sedona_deserialize_geom(GEOSContextHandle_t handle,
                                         const char *buf, int buf_size,
                                         GEOSGeometry **p_geom,
diff --git a/python/src/geomserde_speedup_module.c 
b/python/src/geomserde_speedup_module.c
index 1d7aefcd77..610c4d1b05 100644
--- a/python/src/geomserde_speedup_module.c
+++ b/python/src/geomserde_speedup_module.c
@@ -287,9 +287,24 @@ static PyObject *to_sedona_func(PyObject *self, PyObject 
*args) {
     PyObject *obj = objs[i];
     GEOSGeometry *geos_geom = NULL;
     char success = PyGEOS_GetGEOSGeometry(obj, &geos_geom);
+    if (!success || geos_geom == NULL) {
+        PyErr_SetString(PyExc_TypeError, "Invalid GEOS geometry");
+        Py_DECREF(out);
+        return NULL;
+    }
 
     PyObject *serialized = do_serialize(geos_geom);
-    PyArray_SETITEM(out, PyArray_GETPTR1(out, i), serialized);
+    if (!serialized) {
+        Py_DECREF(out);
+        return NULL;
+    }
+
+    if (PyArray_SETITEM(out, PyArray_GETPTR1(out, i), serialized) < 0) {
+        Py_DECREF(serialized);
+        Py_DECREF(out);
+        return NULL;
+    }
+    Py_DECREF(serialized);
   }
 
   return out;
@@ -318,6 +333,8 @@ static PyObject *from_sedona_func(PyObject *self, PyObject 
*args) {
     PyObject *obj = objs[i];
     if (!PyBytes_Check(obj)) {
       PyErr_SetString(PyExc_TypeError, "Expected bytes");
+      Py_DECREF(out);
+
       return NULL;
     }
 
@@ -331,11 +348,23 @@ static PyObject *from_sedona_func(PyObject *self, 
PyObject *args) {
         sedona_deserialize_geom(handle, buf, len, &geom, &p_bytes_read);
     if (err != SEDONA_SUCCESS) {
       handle_geomserde_error(err);
+      Py_DECREF(out);
       return NULL;
     }
+
     PyObject *pygeom = PyGEOS_CreateGeometry(geom, handle);
+    if (!pygeom) {
+        Py_DECREF(out);
+        return NULL;
+    }
+
+   if (PyArray_SETITEM(out, PyArray_GETPTR1(out, i), pygeom) < 0) {
+        Py_DECREF(pygeom);
+        Py_DECREF(out);
+        return NULL;
+    }
 
-    PyArray_SETITEM(out, PyArray_GETPTR1(out, i), pygeom);
+    Py_DECREF(pygeom);
   }
 
   return out;
diff --git a/python/tests/test_base.py b/python/tests/test_base.py
index 300d937d27..3974930207 100644
--- a/python/tests/test_base.py
+++ b/python/tests/test_base.py
@@ -77,7 +77,7 @@ class TestBase:
                     "sedona.spark.worker.daemon",
                 )
                 .config(
-                    "sedona.python.worker.daemon.enabled", "false"
+                    "sedona.python.worker.daemon.enabled", "true"
                 )  # Pandas on PySpark doesn't work with ANSI mode, which is 
enabled by default
                 # in Spark 4
                 .config("spark.sql.ansi.enabled", "false")
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala 
b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
index add3caf225..c9e8497f7e 100644
--- a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
@@ -41,6 +41,7 @@ class InternalApi(
     extends StaticAnnotation
 
 object SedonaContext {
+
   private def customOptimizationsWithSession(sparkSession: SparkSession) =
     Seq(
       new TransformNestedUDTParquet(sparkSession),
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
index 2544e63a97..6602967351 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala
@@ -18,23 +18,6 @@
  */
 package org.apache.spark.sql.execution.python
 
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 import org.apache.arrow.vector.VectorSchemaRoot
 import org.apache.arrow.vector.ipc.ArrowStreamWriter
 import org.apache.spark.sql.catalyst.InternalRow
@@ -87,21 +70,8 @@ private[python] trait SedonaPythonArrowInput[IN] extends 
PythonArrowInput[IN] {
 
           writeIteratorToArrowStream(root, writer, dataOut, inputIterator)
 
-          // end writes footer to the output stream and doesn't clean any 
resources.
-          // It could throw exception if the output stream is closed, so it 
should be
-          // in the try block.
           writer.end()
         } {
-          // If we close root and allocator in TaskCompletionListener, there 
could be a race
-          // condition where the writer thread keeps writing to the 
VectorSchemaRoot while
-          // it's being closed by the TaskCompletion listener.
-          // Closing root and allocator here is cleaner because root and 
allocator is owned
-          // by the writer thread and is only visible to the writer thread.
-          //
-          // If the writer thread is interrupted by TaskCompletionListener, it 
should either
-          // (1) in the try block, in which case it will get an 
InterruptedException when
-          // performing io, and goes into the finally block or (2) in the 
finally block,
-          // in which case it will ignore the interruption and close the 
resources.
           root.close()
           allocator.close()
         }
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
index 27764c2a54..8940a376a2 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowOutput.scala
@@ -99,11 +99,6 @@ private[python] trait SedonaPythonArrowOutput[OUT <: AnyRef] 
{ self: BasePythonR
         }
         eos = true
       }
-//      def handleEndOfDataSectionSedona(): Unit = {
-//        if (stream.readInt() == SpecialLengths.END_OF_STREAM) {}
-//
-//        eos = true
-//      }
 
       protected override def handleEndOfDataSection(): Unit = {
         handleEndOfDataSectionSedona()
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala
index 72a27461f6..6f873d0a08 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/SQLSyntaxTestScala.scala
@@ -47,11 +47,11 @@ class SQLSyntaxTestScala extends TestBaseScala with 
TableDrivenPropertyChecks {
       try {
         sparkSession.sql("CREATE TABLE T_TEST_EXPLICIT_GEOMETRY (GEO_COL 
GEOMETRY)")
         sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY") should 
be(true)
-//        sparkSession.sparkContext.getConf.get(keyParserExtension) should 
be("true")
+        sparkSession.sparkContext.getConf.get(keyParserExtension) should 
be("true")
       } catch {
         case ex: Exception =>
           ex.getClass.getName.endsWith("ParseException") should be(true)
-//          sparkSession.sparkContext.getConf.get(keyParserExtension) should 
be("false")
+          sparkSession.sparkContext.getConf.get(keyParserExtension) should 
be("false")
       }
     }
 
@@ -61,11 +61,11 @@ class SQLSyntaxTestScala extends TestBaseScala with 
TableDrivenPropertyChecks {
         sparkSession.sql(
           "CREATE TABLE T_TEST_EXPLICIT_GEOMETRY_2 (INT_COL INT, GEO_COL 
GEOMETRY)")
         sparkSession.catalog.tableExists("T_TEST_EXPLICIT_GEOMETRY_2") should 
be(true)
-//        sparkSession.sparkContext.getConf.get(keyParserExtension) should 
be("true")
+        sparkSession.sparkContext.getConf.get(keyParserExtension) should 
be("true")
       } catch {
         case ex: Exception =>
           ex.getClass.getName.endsWith("ParseException") should be(true)
-//          sparkSession.sparkContext.getConf.get(keyParserExtension) should 
be("false")
+          sparkSession.sparkContext.getConf.get(keyParserExtension) should 
be("false")
       }
     }
   }
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index c9b4d6ac28..50d751f484 100644
--- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -30,13 +30,13 @@ import java.io.FileInputStream
 import java.util.concurrent.ThreadLocalRandom
 
 trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
-//  Logger.getRootLogger().setLevel(Level.WARN)
-//  Logger.getLogger("org.apache").setLevel(Level.WARN)
-//  Logger.getLogger("com").setLevel(Level.WARN)
-//  Logger.getLogger("akka").setLevel(Level.WARN)
-//  Logger.getLogger("org.apache.sedona.core").setLevel(Level.WARN)
+  Logger.getRootLogger().setLevel(Level.WARN)
+  Logger.getLogger("org.apache").setLevel(Level.WARN)
+  Logger.getLogger("com").setLevel(Level.WARN)
+  Logger.getLogger("akka").setLevel(Level.WARN)
+  Logger.getLogger("org.apache.sedona.core").setLevel(Level.WARN)
 
-//  val keyParserExtension = "spark.sedona.enableParserExtension"
+  val keyParserExtension = "spark.sedona.enableParserExtension"
   val warehouseLocation = System.getProperty("user.dir") + "/target/"
   val sparkSession = SedonaContext
     .builder()
@@ -49,17 +49,9 @@ trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
     .config("sedona.python.worker.udf.module", "sedona.spark.worker.worker")
     .config("sedona.python.worker.udf.daemon.module", "sedonaworker.daemon")
     .config("sedona.python.worker.daemon.enabled", "false")
-//    .config(keyParserExtension, ThreadLocalRandom.current().nextBoolean())
+    .config(keyParserExtension, ThreadLocalRandom.current().nextBoolean())
     .getOrCreate()
 
-//    private val useDaemon: Boolean =
-//    SparkEnv.get.conf.getBoolean("sedona.python.worker.daemon.enabled", 
false)
-//
-//  private val sedonaUDFWorkerModule =
-//    SparkEnv.get.conf.get("sedona.python.worker.udf.module", 
"sedona.spark.worker.worker")
-//
-//  private val sedonaDaemonModule =
-//    SparkEnv.get.conf.get("sedona.python.worker.udf.daemon.module", 
"sedona.spark.worker.daemon")
   val sparkSessionMinio = SedonaContext
     .builder()
     .master("local[*]")
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
index 23aac14bbe..d2c0d71c70 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
@@ -45,9 +45,6 @@ object ScalarUDF {
     }
   }
 
-  SparkEnv.get.conf.set(PYTHON_USE_DAEMON, false)
-  SparkEnv.get.conf.set(PYTHON_WORKER_MODULE, "sedonaworker.work")
-
   private[spark] lazy val pythonPath = sys.env.getOrElse("PYTHONPATH", "")
   protected lazy val sparkHome: String = {
     sys.props.getOrElse("spark.test.home", sys.env("SPARK_HOME"))

Reply via email to