Kontinuation commented on code in PR #2593:
URL: https://github.com/apache/sedona/pull/2593#discussion_r2792064284


##########
spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.sedona.sql.UDF.PythonEvalType
+import 
org.apache.sedona.sql.UDF.PythonEvalType.{SQL_SCALAR_SEDONA_DB_SPEEDUP_UDF, 
SQL_SCALAR_SEDONA_DB_UDF, SQL_SCALAR_SEDONA_UDF}
+import org.apache.spark.api.python.ChainedPythonFunctions
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
GenericInternalRow, PythonUDF}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.udf.SedonaArrowEvalPython
+import org.apache.spark.{JobArtifactSet, TaskContext}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+// We use custom Strategy to avoid Apache Spark assert on types, we
+// can consider extending this to support other engines working with
+// arrow data
+class SedonaArrowStrategy extends Strategy {
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case SedonaArrowEvalPython(udfs, output, child, evalType) =>
+      SedonaArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: 
Nil
+    case _ => Nil
+  }
+}
+
+// It's modification og Apache Spark's ArrowEvalPythonExec, we remove the 
check on the types to allow geometry types
+// here, it's initial version to allow the vectorized udf for Sedona geometry 
types. We can consider extending this
+// to support other engines working with arrow data
+case class SedonaArrowEvalPythonExec(
+    udfs: Seq[PythonUDF],
+    resultAttrs: Seq[Attribute],
+    child: SparkPlan,
+    evalType: Int)
+    extends EvalPythonExec
+    with PythonSQLMetrics {
+
+  private val batchSize = conf.arrowMaxRecordsPerBatch
+  private val sessionLocalTimeZone = conf.sessionLocalTimeZone
+  private val largeVarTypes = conf.arrowUseLargeVarTypes
+  private val pythonRunnerConf =
+    Map[String, String](SQLConf.SESSION_LOCAL_TIMEZONE.key -> 
conf.sessionLocalTimeZone)
+  private[this] val jobArtifactUUID = 
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+
+  private def inferCRS(iterator: Iterator[InternalRow], schema: StructType): 
Seq[(Int, Int)] = {
+    // this triggers the iterator
+    if (!iterator.hasNext) {
+      return Seq.empty
+    }
+
+    val row = iterator.next()
+
+    val rowMatched = row match {
+      case generic: GenericInternalRow =>
+        Some(generic)
+      case _ => None
+    }

Review Comment:
   We are only taking the SRID of the geometry value in the first row as the 
SRID of the entire field, this does not work well with geometry data with mixed 
SRIDs. SedonaDB has item-crs data type added for such data: 
https://github.com/apache/sedona-db/pull/410, I think we should always use this 
to bridge Sedona and SedonaDB.



##########
spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.sedona.sql.UDF.PythonEvalType
+import 
org.apache.sedona.sql.UDF.PythonEvalType.{SQL_SCALAR_SEDONA_DB_SPEEDUP_UDF, 
SQL_SCALAR_SEDONA_DB_UDF, SQL_SCALAR_SEDONA_UDF}
+import org.apache.spark.api.python.ChainedPythonFunctions
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
GenericInternalRow, PythonUDF}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.udf.SedonaArrowEvalPython
+import org.apache.spark.{JobArtifactSet, TaskContext}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+
+import scala.collection.JavaConverters.asScalaIteratorConverter
+
+// We use custom Strategy to avoid Apache Spark assert on types, we
+// can consider extending this to support other engines working with
+// arrow data
+class SedonaArrowStrategy extends Strategy {
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case SedonaArrowEvalPython(udfs, output, child, evalType) =>
+      SedonaArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: 
Nil
+    case _ => Nil
+  }
+}
+
+// It's modification og Apache Spark's ArrowEvalPythonExec, we remove the 
check on the types to allow geometry types
+// here, it's initial version to allow the vectorized udf for Sedona geometry 
types. We can consider extending this
+// to support other engines working with arrow data
+case class SedonaArrowEvalPythonExec(
+    udfs: Seq[PythonUDF],
+    resultAttrs: Seq[Attribute],
+    child: SparkPlan,
+    evalType: Int)
+    extends EvalPythonExec
+    with PythonSQLMetrics {
+
+  private val batchSize = conf.arrowMaxRecordsPerBatch
+  private val sessionLocalTimeZone = conf.sessionLocalTimeZone
+  private val largeVarTypes = conf.arrowUseLargeVarTypes
+  private val pythonRunnerConf =
+    Map[String, String](SQLConf.SESSION_LOCAL_TIMEZONE.key -> 
conf.sessionLocalTimeZone)
+  private[this] val jobArtifactUUID = 
JobArtifactSet.getCurrentJobArtifactState.map(_.uuid)
+
+  private def inferCRS(iterator: Iterator[InternalRow], schema: StructType): 
Seq[(Int, Int)] = {
+    // this triggers the iterator
+    if (!iterator.hasNext) {
+      return Seq.empty
+    }
+
+    val row = iterator.next()
+
+    val rowMatched = row match {
+      case generic: GenericInternalRow =>
+        Some(generic)
+      case _ => None
+    }
+
+    schema
+      .filter { field =>
+        field.dataType == GeometryUDT
+      }
+      .zipWithIndex
+      .map { case (_, index) =>
+        if (rowMatched.isEmpty || rowMatched.get.values(index) == null) 
(index, 0)
+        else {
+          val geom = rowMatched.get.get(index, 
GeometryUDT).asInstanceOf[Array[Byte]]
+          val preambleByte = geom(0) & 0xff
+          val hasSrid = (preambleByte & 0x01) != 0
+
+          var srid = 0
+          if (hasSrid) {
+            val srid2 = (geom(1) & 0xff) << 16
+            val srid1 = (geom(2) & 0xff) << 8
+            val srid0 = geom(3) & 0xff
+            srid = srid2 | srid1 | srid0
+          }
+
+          (index, srid)

Review Comment:
   We can extract the code for parsing srid in 
https://github.com/apache/sedona/blob/6df699f97048f6c4f5014392a08db3b2068290f1/common/src/main/java/org/apache/sedona/common/geometrySerde/GeometrySerializer.java#L73-L85
 to a static function



##########
spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import java.net._
+import java.util.concurrent.atomic.AtomicBoolean
+import scala.collection.JavaConverters._
+import org.apache.spark._
+import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.EXECUTOR_CORES
+import org.apache.spark.internal.config.Python._
+import 
org.apache.spark.resource.ResourceProfile.{EXECUTOR_CORES_LOCAL_PROPERTY, 
PYSPARK_MEMORY_LOCAL_PROPERTY}
+import org.apache.spark.util._
+
+private object SedonaBasePythonRunner {
+
+  private lazy val faultHandlerLogDir = Utils.createTempDir(namePrefix = 
"faulthandler")
+}
+
+private[spark] abstract class SedonaBasePythonRunner[IN, OUT](
+    funcs: Seq[ChainedPythonFunctions],
+    evalType: Int,
+    argOffsets: Array[Array[Int]],
+    jobArtifactUUID: Option[String],
+    val geometryFields: Seq[(Int, Int)] = Seq.empty,
+    val castGeometryToWKB: Boolean = false)
+    extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets, 
jobArtifactUUID)
+    with Logging {
+
+  require(funcs.length == argOffsets.length, "argOffsets should have the same 
length as funcs")
+
+  private val conf = SparkEnv.get.conf
+  private val reuseWorker =
+    conf.getBoolean(PYTHON_WORKER_REUSE.key, 
PYTHON_WORKER_REUSE.defaultValue.get)
+  private val faultHandlerEnabled = 
conf.get(PYTHON_WORKER_FAULTHANLDER_ENABLED)
+
+  private def getWorkerMemoryMb(mem: Option[Long], cores: Int): Option[Long] = 
{
+    mem.map(_ / cores)
+  }
+
+  import java.io._

Review Comment:
   Move to the top of the file



##########
python/sedona/spark/sql/functions.py:
##########
@@ -142,3 +150,76 @@ def serialize_to_geometry_if_geom(data, return_type: 
DataType):
         return geometry_serde.serialize(data)
 
     return data
+
+
+def infer_pa_type(spark_type: DataType):
+    import pyarrow as pa
+    import geoarrow.pyarrow as ga
+
+    if isinstance(spark_type, GeometryType):
+        return ga.wkb()
+    elif isinstance(spark_type, FloatType):
+        return pa.float32()
+    elif isinstance(spark_type, DoubleType):
+        return pa.float64()
+    elif isinstance(spark_type, IntegerType):
+        return pa.int32()
+    elif isinstance(spark_type, StringType):
+        return pa.string()
+    else:
+        raise NotImplementedError(f"Type {spark_type} is not supported yet.")
+
+
+def infer_input_type(spark_type: DataType):
+    from sedonadb import udf as sedona_udf_module
+
+    if isinstance(spark_type, GeometryType):
+        return sedona_udf_module.GEOMETRY
+    elif (
+        isinstance(spark_type, FloatType)
+        or isinstance(spark_type, DoubleType)
+        or isinstance(spark_type, IntegerType)
+    ):
+        return sedona_udf_module.NUMERIC
+    elif isinstance(spark_type, StringType):
+        return sedona_udf_module.STRING
+    elif isinstance(spark_type, ByteType):
+        return sedona_udf_module.BINARY
+    else:
+        raise NotImplementedError(f"Type {spark_type} is not supported yet.")
+
+
+def infer_input_types(spark_types: list[DataType]):
+    pa_types = []
+    for spark_type in spark_types:
+        pa_type = infer_input_type(spark_type)
+        pa_types.append(pa_type)
+
+    return pa_types
+
+
+def sedona_db_vectorized_udf(
+    return_type: DataType,
+    input_types: list[DataType],
+):
+    from sedonadb import udf as sedona_udf_module
+
+    eval_type = 6200
+    if sedona_db_speedup_enabled:
+        eval_type = 6201

Review Comment:
   Define thse eval types as constants such as `SQL_SCALAR_SEDONA_DB_UDF`. I 
believe that we should follow the same pattern as `SEDONA_SCALAR_EVAL_TYPE`.



##########
python/sedona/spark/worker/serde.py:
##########
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pyspark.serializers import write_int, SpecialLengths
+from pyspark.sql.pandas.serializers import ArrowStreamPandasSerializer
+
+from sedona.spark.worker.udf_info import UDFInfo
+
+
+class SedonaDBSerializer(ArrowStreamPandasSerializer):
+    def __init__(self, timezone, safecheck, db, udf_info: UDFInfo, 
cast_to_wkb=False):
+        super().__init__(timezone, safecheck)
+        self.db = db
+        self.udf_info = udf_info
+        self.cast_to_wkb = cast_to_wkb
+
+    def load_stream(self, stream):
+        import pyarrow as pa
+
+        batches = super(ArrowStreamPandasSerializer, self).load_stream(stream)
+        index = 0
+        for batch in batches:
+            table = pa.Table.from_batches(batches=[batch])
+            import pyarrow as pa
+
+            df = self.db.create_data_frame(table)
+            table_name = f"my_table_{index}"
+
+            df.to_view(table_name)

Review Comment:
   Do we need `overwrite=True` here?



##########
python/src/geomserde_speedup_module.c:
##########
@@ -262,14 +265,121 @@ static PyObject *deserialize_1(PyObject *self, PyObject 
*args) {
   return Py_BuildValue("(Kibi)", geom, geom_type_id, has_z, length);
 }
 
+static PyObject *to_sedona_func(PyObject *self, PyObject *args) {
+  import_array();
+  PyObject *input_obj = NULL;
+  if (!PyArg_ParseTuple(args, "O", &input_obj)) {
+    return NULL;
+  };
+
+  PyArrayObject *array = (PyArrayObject *)input_obj;
+  PyObject **objs = (PyObject **)PyArray_DATA(array);

Review Comment:
   Do we need to check the type of `input_obj` using `PyArray_Check` before 
casting it to `PyArrayObject` and calling `PyArray_*` methods?



##########
python/sedona/spark/worker/worker.py:
##########
@@ -0,0 +1,304 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import importlib
+import os
+import sys
+import time
+
+import sedonadb
+from pyspark import TaskContext, shuffle, SparkFiles
+from pyspark.errors import PySparkRuntimeError
+from pyspark.java_gateway import local_connect_and_auth
+from pyspark.resource import ResourceInformation
+from pyspark.serializers import (
+    read_int,
+    UTF8Deserializer,
+    read_bool,
+    read_long,
+    CPickleSerializer,
+    write_int,
+    write_long,
+    SpecialLengths,
+)
+
+from sedona.spark.worker.serde import SedonaDBSerializer
+from sedona.spark.worker.udf_info import UDFInfo
+
+
+def apply_iterator(db, iterator, udf_info: UDFInfo, cast_to_wkb: bool = False):
+    i = 0
+    for df in iterator:
+        i += 1
+        table_name = f"output_table_{i}"
+        df.to_view(table_name)
+
+        function_call_sql = udf_info.get_function_call_sql(
+            table_name, cast_to_wkb=cast_to_wkb
+        )
+
+        df_out = db.sql(function_call_sql)
+
+        df_out.to_view(f"view_{i}")
+        at = df_out.to_arrow_table()
+        batches = at.combine_chunks().to_batches()
+
+        yield from batches
+
+
+def check_python_version(utf_serde: UTF8Deserializer, infile) -> str:
+    version = utf_serde.loads(infile)
+
+    python_major, python_minor = sys.version_info[:2]
+
+    if version != f"{python_major}.{python_minor}":
+        raise PySparkRuntimeError(
+            error_class="PYTHON_VERSION_MISMATCH",
+            message_parameters={
+                "worker_version": str(sys.version_info[:2]),
+                "driver_version": str(version),
+            },
+        )
+
+    return version
+
+
+def check_barrier_flag(infile):
+    is_barrier = read_bool(infile)
+    bound_port = read_int(infile)
+    secret = UTF8Deserializer().loads(infile)
+
+    if is_barrier:
+        raise PySparkRuntimeError(
+            error_class="BARRIER_MODE_NOT_SUPPORTED",
+            message_parameters={
+                "worker_version": str(sys.version_info[:2]),
+                "message": "Barrier mode is not supported by SedonaDB 
vectorized functions.",
+            },
+        )
+
+    return is_barrier
+
+
+def assign_task_context(utf_serde: UTF8Deserializer, infile):
+    stage_id = read_int(infile)
+    partition_id = read_int(infile)
+    attempt_number = read_long(infile)
+    task_attempt_id = read_int(infile)
+    cpus = read_int(infile)
+
+    task_context = TaskContext._getOrCreate()
+    task_context._stage_id = stage_id
+    task_context._partition_id = partition_id
+    task_context._attempt_number = attempt_number
+    task_context._task_attempt_id = task_attempt_id
+    task_context._cpus = cpus
+
+    for r in range(read_int(infile)):
+        key = utf_serde.loads(infile)
+        name = utf_serde.loads(infile)
+        addresses = []
+        task_context._resources = {}
+        for a in range(read_int(infile)):
+            addresses.append(utf_serde.loads(infile))
+        task_context._resources[key] = ResourceInformation(name, addresses)
+
+    task_context._localProperties = {}
+    for i in range(read_int(infile)):
+        k = utf_serde.loads(infile)
+        v = utf_serde.loads(infile)
+        task_context._localProperties[k] = v
+
+    return task_context
+
+
+def resolve_python_path(utf_serde: UTF8Deserializer, infile):
+    def add_path(path: str):
+        # worker can be used, so do not add path multiple times
+        if path not in sys.path:
+            # overwrite system packages
+            sys.path.insert(1, path)
+
+    spark_files_dir = utf_serde.loads(infile)
+
+    SparkFiles._root_directory = spark_files_dir
+    SparkFiles._is_running_on_worker = True
+
+    add_path(spark_files_dir)  # *.py files that were added will be copied here
+    num_python_includes = read_int(infile)
+    for _ in range(num_python_includes):
+        filename = utf_serde.loads(infile)
+        add_path(os.path.join(spark_files_dir, filename))
+
+    importlib.invalidate_caches()
+
+
+def check_broadcast_variables(infile):
+    needs_broadcast_decryption_server = read_bool(infile)
+    num_broadcast_variables = read_int(infile)
+
+    if needs_broadcast_decryption_server or num_broadcast_variables > 0:
+        raise PySparkRuntimeError(
+            error_class="BROADCAST_VARS_NOT_SUPPORTED",
+            message_parameters={
+                "worker_version": str(sys.version_info[:2]),
+                "message": "Broadcast variables are not supported by SedonaDB 
vectorized functions.",
+            },
+        )
+
+
+def get_runner_conf(utf_serde: UTF8Deserializer, infile):
+    runner_conf = {}
+    num_conf = read_int(infile)
+    for i in range(num_conf):
+        k = utf_serde.loads(infile)
+        v = utf_serde.loads(infile)
+        runner_conf[k] = v
+    return runner_conf
+
+
+def read_command(serializer, infile):
+    command = serializer._read_with_length(infile)
+    return command
+
+
+def read_udf(infile, pickle_ser) -> UDFInfo:
+    num_arg = read_int(infile)
+    arg_offsets = [read_int(infile) for i in range(num_arg)]
+
+    function = None
+    return_type = None
+
+    for i in range(read_int(infile)):
+        function, return_type = read_command(pickle_ser, infile)
+
+    sedona_db_udf_expression = function()
+
+    return UDFInfo(
+        arg_offsets=arg_offsets,
+        function=sedona_db_udf_expression,
+        return_type=return_type,
+        name=sedona_db_udf_expression._name,
+        geom_offsets=[0],
+    )
+
+
+def register_sedona_db_udf(infile, pickle_ser) -> UDFInfo:
+    num_udfs = read_int(infile)
+
+    udf = None
+    for _ in range(num_udfs):
+        udf = read_udf(infile, pickle_ser)
+
+    return udf

Review Comment:
   Is it intended to discard udfs except the last one?



##########
python/sedona/spark/worker/daemon.py:
##########
@@ -0,0 +1,218 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+

Review Comment:
   I'm not sure if monkeypatching the worker_main property of PySpark's daemon 
module after importing it would work, but I still prefer the current approach. 
Maintaining a fork of daemon.py is fine once we know where are the modified 
pieces.



##########
spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.arrow.ArrowWriter
+import org.apache.spark.sql.util.ArrowUtils
+import org.apache.spark.util.Utils
+import org.apache.spark.{SparkEnv, TaskContext}
+
+import java.io.DataOutputStream
+import java.net.Socket
+
+private[python] trait SedonaPythonArrowInput[IN] extends PythonArrowInput[IN] {

Review Comment:
   I suggest add a github link to the comment noting where most of the code was 
taken from. This suggestion also applies to SedonaPythonArrowOutput.



##########
python/src/geomserde_speedup_module.c:
##########
@@ -262,14 +265,121 @@ static PyObject *deserialize_1(PyObject *self, PyObject 
*args) {
   return Py_BuildValue("(Kibi)", geom, geom_type_id, has_z, length);
 }
 
+static PyObject *to_sedona_func(PyObject *self, PyObject *args) {
+  import_array();
+  PyObject *input_obj = NULL;
+  if (!PyArg_ParseTuple(args, "O", &input_obj)) {
+    return NULL;
+  };
+
+  PyArrayObject *array = (PyArrayObject *)input_obj;
+  PyObject **objs = (PyObject **)PyArray_DATA(array);
+
+  GEOSContextHandle_t handle = get_geos_context_handle();
+  if (handle == NULL) {
+    return NULL;
+  }
+
+  npy_intp n = PyArray_SIZE(array);
+  npy_intp dims[1] = {n};
+  PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT);
+  for (npy_intp i = 0; i < PyArray_SIZE(array); i++) {
+    PyObject *obj = objs[i];
+    GEOSGeometry *geos_geom = NULL;
+    char success = PyGEOS_GetGEOSGeometry(obj, &geos_geom);
+    if (!success || geos_geom == NULL) {
+      PyErr_SetString(PyExc_TypeError, "Invalid GEOS geometry");
+      Py_DECREF(out);
+      return NULL;
+    }
+
+    PyObject *serialized = do_serialize(geos_geom);
+    if (!serialized) {
+      Py_DECREF(out);
+      return NULL;
+    }
+
+    if (PyArray_SETITEM(out, PyArray_GETPTR1(out, i), serialized) < 0) {
+      Py_DECREF(serialized);
+      Py_DECREF(out);
+      return NULL;
+    }
+    Py_DECREF(serialized);
+  }
+
+  return (PyObject *)out;
+}
 /* Module definition for Shapely 2.x */
+static PyObject *from_sedona_func(PyObject *self, PyObject *args) {
+  import_array();
+  PyObject *input_obj = NULL;
+  if (!PyArg_ParseTuple(args, "O", &input_obj)) {
+    return NULL;
+  };
+
+  GEOSContextHandle_t handle = get_geos_context_handle();
+
+  PyArrayObject *array = (PyArrayObject *)input_obj;
+  PyObject **objs = (PyObject **)PyArray_DATA(array);
+
+  int p_bytes_read = 0;
+
+  npy_intp n = PyArray_SIZE(array);
+
+  npy_intp dims[1] = {n};
+  PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT);
+
+  for (npy_intp i = 0; i < PyArray_SIZE(array); i++) {
+    PyObject *obj = objs[i];
+    if (!PyBytes_Check(obj)) {
+      PyErr_SetString(PyExc_TypeError, "Expected bytes");
+      Py_DECREF(out);
+
+      return NULL;
+    }
+
+    char *buf = PyBytes_AS_STRING(obj);
+
+    Py_ssize_t len = PyBytes_GET_SIZE(obj);
+
+    GEOSGeometry *geom = NULL;
+
+    SedonaErrorCode err =
+        sedona_deserialize_geom(handle, buf, len, &geom, &p_bytes_read);
+    if (err != SEDONA_SUCCESS) {
+      handle_geomserde_error(err);
+      Py_DECREF(out);
+      return NULL;
+    }
+
+    PyObject *pygeom = PyGEOS_CreateGeometry(geom, handle);
+    if (!pygeom) {
+      Py_DECREF(out);
+      return NULL;
+    }
+
+    if (PyArray_SETITEM(out, PyArray_GETPTR1(out, i), pygeom) < 0) {
+      Py_DECREF(pygeom);
+      Py_DECREF(out);
+      return NULL;
+    }
+
+    Py_DECREF(pygeom);
+  }
+
+  return (PyObject *)out;
+}
 
 static PyMethodDef geomserde_methods_shapely_2[] = {
     {"load_libgeos_c", load_libgeos_c, METH_VARARGS, "Load libgeos_c."},
     {"serialize", serialize, METH_VARARGS,
      "Serialize geometry object as bytearray."},
     {"deserialize", deserialize, METH_VARARGS,
      "Deserialize bytes-like object to geometry object."},
+    {"from_sedona_func", from_sedona_func, METH_VARARGS,
+     "Deserialize bytes-like object to geometry object."},
+    {"to_sedona_func", to_sedona_func, METH_VARARGS,
+     "Deserialize bytes-like object to geometry object."},

Review Comment:
   These functions are for working with arrays, we should clarify this in the 
description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to