Kontinuation commented on code in PR #2593: URL: https://github.com/apache/sedona/pull/2593#discussion_r2792064284
########## spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.sedona.sql.UDF.PythonEvalType +import org.apache.sedona.sql.UDF.PythonEvalType.{SQL_SCALAR_SEDONA_DB_SPEEDUP_UDF, SQL_SCALAR_SEDONA_DB_UDF, SQL_SCALAR_SEDONA_UDF} +import org.apache.spark.api.python.ChainedPythonFunctions +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow, PythonUDF} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.udf.SedonaArrowEvalPython +import org.apache.spark.{JobArtifactSet, TaskContext} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT + +import scala.collection.JavaConverters.asScalaIteratorConverter + +// We use custom Strategy to avoid Apache Spark assert on types, we +// can consider extending this to support other engines working with +// arrow data +class SedonaArrowStrategy extends Strategy { + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case SedonaArrowEvalPython(udfs, output, child, evalType) => + SedonaArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: Nil + case _ => Nil + } +} + +// It's modification og Apache Spark's ArrowEvalPythonExec, we remove the check on the types to allow geometry types +// here, it's initial version to allow the vectorized udf for Sedona geometry types. We can consider extending this +// to support other engines working with arrow data +case class SedonaArrowEvalPythonExec( + udfs: Seq[PythonUDF], + resultAttrs: Seq[Attribute], + child: SparkPlan, + evalType: Int) + extends EvalPythonExec + with PythonSQLMetrics { + + private val batchSize = conf.arrowMaxRecordsPerBatch + private val sessionLocalTimeZone = conf.sessionLocalTimeZone + private val largeVarTypes = conf.arrowUseLargeVarTypes + private val pythonRunnerConf = + Map[String, String](SQLConf.SESSION_LOCAL_TIMEZONE.key -> conf.sessionLocalTimeZone) + private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid) + + private def inferCRS(iterator: Iterator[InternalRow], schema: StructType): Seq[(Int, Int)] = { + // this triggers the iterator + if (!iterator.hasNext) { + return Seq.empty + } + + val row = iterator.next() + + val rowMatched = row match { + case generic: GenericInternalRow => + Some(generic) + case _ => None + } Review Comment: We are only taking the SRID of the geometry value in the first row as the SRID of the entire field, this does not work well with geometry data with mixed SRIDs. SedonaDB has item-crs data type added for such data: https://github.com/apache/sedona-db/pull/410, I think we should always use this to bridge Sedona and SedonaDB. ########## spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaArrowStrategy.scala: ########## @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.sedona.sql.UDF.PythonEvalType +import org.apache.sedona.sql.UDF.PythonEvalType.{SQL_SCALAR_SEDONA_DB_SPEEDUP_UDF, SQL_SCALAR_SEDONA_DB_UDF, SQL_SCALAR_SEDONA_UDF} +import org.apache.spark.api.python.ChainedPythonFunctions +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericInternalRow, PythonUDF} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.udf.SedonaArrowEvalPython +import org.apache.spark.{JobArtifactSet, TaskContext} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT + +import scala.collection.JavaConverters.asScalaIteratorConverter + +// We use custom Strategy to avoid Apache Spark assert on types, we +// can consider extending this to support other engines working with +// arrow data +class SedonaArrowStrategy extends Strategy { + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case SedonaArrowEvalPython(udfs, output, child, evalType) => + SedonaArrowEvalPythonExec(udfs, output, planLater(child), evalType) :: Nil + case _ => Nil + } +} + +// It's modification og Apache Spark's ArrowEvalPythonExec, we remove the check on the types to allow geometry types +// here, it's initial version to allow the vectorized udf for Sedona geometry types. We can consider extending this +// to support other engines working with arrow data +case class SedonaArrowEvalPythonExec( + udfs: Seq[PythonUDF], + resultAttrs: Seq[Attribute], + child: SparkPlan, + evalType: Int) + extends EvalPythonExec + with PythonSQLMetrics { + + private val batchSize = conf.arrowMaxRecordsPerBatch + private val sessionLocalTimeZone = conf.sessionLocalTimeZone + private val largeVarTypes = conf.arrowUseLargeVarTypes + private val pythonRunnerConf = + Map[String, String](SQLConf.SESSION_LOCAL_TIMEZONE.key -> conf.sessionLocalTimeZone) + private[this] val jobArtifactUUID = JobArtifactSet.getCurrentJobArtifactState.map(_.uuid) + + private def inferCRS(iterator: Iterator[InternalRow], schema: StructType): Seq[(Int, Int)] = { + // this triggers the iterator + if (!iterator.hasNext) { + return Seq.empty + } + + val row = iterator.next() + + val rowMatched = row match { + case generic: GenericInternalRow => + Some(generic) + case _ => None + } + + schema + .filter { field => + field.dataType == GeometryUDT + } + .zipWithIndex + .map { case (_, index) => + if (rowMatched.isEmpty || rowMatched.get.values(index) == null) (index, 0) + else { + val geom = rowMatched.get.get(index, GeometryUDT).asInstanceOf[Array[Byte]] + val preambleByte = geom(0) & 0xff + val hasSrid = (preambleByte & 0x01) != 0 + + var srid = 0 + if (hasSrid) { + val srid2 = (geom(1) & 0xff) << 16 + val srid1 = (geom(2) & 0xff) << 8 + val srid0 = geom(3) & 0xff + srid = srid2 | srid1 | srid0 + } + + (index, srid) Review Comment: We can extract the code for parsing srid in https://github.com/apache/sedona/blob/6df699f97048f6c4f5014392a08db3b2068290f1/common/src/main/java/org/apache/sedona/common/geometrySerde/GeometrySerializer.java#L73-L85 to a static function ########## spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaBasePythonRunner.scala: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.execution.python + +import java.net._ +import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.JavaConverters._ +import org.apache.spark._ +import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.EXECUTOR_CORES +import org.apache.spark.internal.config.Python._ +import org.apache.spark.resource.ResourceProfile.{EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY} +import org.apache.spark.util._ + +private object SedonaBasePythonRunner { + + private lazy val faultHandlerLogDir = Utils.createTempDir(namePrefix = "faulthandler") +} + +private[spark] abstract class SedonaBasePythonRunner[IN, OUT]( + funcs: Seq[ChainedPythonFunctions], + evalType: Int, + argOffsets: Array[Array[Int]], + jobArtifactUUID: Option[String], + val geometryFields: Seq[(Int, Int)] = Seq.empty, + val castGeometryToWKB: Boolean = false) + extends BasePythonRunner[IN, OUT](funcs, evalType, argOffsets, jobArtifactUUID) + with Logging { + + require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs") + + private val conf = SparkEnv.get.conf + private val reuseWorker = + conf.getBoolean(PYTHON_WORKER_REUSE.key, PYTHON_WORKER_REUSE.defaultValue.get) + private val faultHandlerEnabled = conf.get(PYTHON_WORKER_FAULTHANLDER_ENABLED) + + private def getWorkerMemoryMb(mem: Option[Long], cores: Int): Option[Long] = { + mem.map(_ / cores) + } + + import java.io._ Review Comment: Move to the top of the file ########## python/sedona/spark/sql/functions.py: ########## @@ -142,3 +150,76 @@ def serialize_to_geometry_if_geom(data, return_type: DataType): return geometry_serde.serialize(data) return data + + +def infer_pa_type(spark_type: DataType): + import pyarrow as pa + import geoarrow.pyarrow as ga + + if isinstance(spark_type, GeometryType): + return ga.wkb() + elif isinstance(spark_type, FloatType): + return pa.float32() + elif isinstance(spark_type, DoubleType): + return pa.float64() + elif isinstance(spark_type, IntegerType): + return pa.int32() + elif isinstance(spark_type, StringType): + return pa.string() + else: + raise NotImplementedError(f"Type {spark_type} is not supported yet.") + + +def infer_input_type(spark_type: DataType): + from sedonadb import udf as sedona_udf_module + + if isinstance(spark_type, GeometryType): + return sedona_udf_module.GEOMETRY + elif ( + isinstance(spark_type, FloatType) + or isinstance(spark_type, DoubleType) + or isinstance(spark_type, IntegerType) + ): + return sedona_udf_module.NUMERIC + elif isinstance(spark_type, StringType): + return sedona_udf_module.STRING + elif isinstance(spark_type, ByteType): + return sedona_udf_module.BINARY + else: + raise NotImplementedError(f"Type {spark_type} is not supported yet.") + + +def infer_input_types(spark_types: list[DataType]): + pa_types = [] + for spark_type in spark_types: + pa_type = infer_input_type(spark_type) + pa_types.append(pa_type) + + return pa_types + + +def sedona_db_vectorized_udf( + return_type: DataType, + input_types: list[DataType], +): + from sedonadb import udf as sedona_udf_module + + eval_type = 6200 + if sedona_db_speedup_enabled: + eval_type = 6201 Review Comment: Define thse eval types as constants such as `SQL_SCALAR_SEDONA_DB_UDF`. I believe that we should follow the same pattern as `SEDONA_SCALAR_EVAL_TYPE`. ########## python/sedona/spark/worker/serde.py: ########## @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyspark.serializers import write_int, SpecialLengths +from pyspark.sql.pandas.serializers import ArrowStreamPandasSerializer + +from sedona.spark.worker.udf_info import UDFInfo + + +class SedonaDBSerializer(ArrowStreamPandasSerializer): + def __init__(self, timezone, safecheck, db, udf_info: UDFInfo, cast_to_wkb=False): + super().__init__(timezone, safecheck) + self.db = db + self.udf_info = udf_info + self.cast_to_wkb = cast_to_wkb + + def load_stream(self, stream): + import pyarrow as pa + + batches = super(ArrowStreamPandasSerializer, self).load_stream(stream) + index = 0 + for batch in batches: + table = pa.Table.from_batches(batches=[batch]) + import pyarrow as pa + + df = self.db.create_data_frame(table) + table_name = f"my_table_{index}" + + df.to_view(table_name) Review Comment: Do we need `overwrite=True` here? ########## python/src/geomserde_speedup_module.c: ########## @@ -262,14 +265,121 @@ static PyObject *deserialize_1(PyObject *self, PyObject *args) { return Py_BuildValue("(Kibi)", geom, geom_type_id, has_z, length); } +static PyObject *to_sedona_func(PyObject *self, PyObject *args) { + import_array(); + PyObject *input_obj = NULL; + if (!PyArg_ParseTuple(args, "O", &input_obj)) { + return NULL; + }; + + PyArrayObject *array = (PyArrayObject *)input_obj; + PyObject **objs = (PyObject **)PyArray_DATA(array); Review Comment: Do we need to check the type of `input_obj` using `PyArray_Check` before casting it to `PyArrayObject` and calling `PyArray_*` methods? ########## python/sedona/spark/worker/worker.py: ########## @@ -0,0 +1,304 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import importlib +import os +import sys +import time + +import sedonadb +from pyspark import TaskContext, shuffle, SparkFiles +from pyspark.errors import PySparkRuntimeError +from pyspark.java_gateway import local_connect_and_auth +from pyspark.resource import ResourceInformation +from pyspark.serializers import ( + read_int, + UTF8Deserializer, + read_bool, + read_long, + CPickleSerializer, + write_int, + write_long, + SpecialLengths, +) + +from sedona.spark.worker.serde import SedonaDBSerializer +from sedona.spark.worker.udf_info import UDFInfo + + +def apply_iterator(db, iterator, udf_info: UDFInfo, cast_to_wkb: bool = False): + i = 0 + for df in iterator: + i += 1 + table_name = f"output_table_{i}" + df.to_view(table_name) + + function_call_sql = udf_info.get_function_call_sql( + table_name, cast_to_wkb=cast_to_wkb + ) + + df_out = db.sql(function_call_sql) + + df_out.to_view(f"view_{i}") + at = df_out.to_arrow_table() + batches = at.combine_chunks().to_batches() + + yield from batches + + +def check_python_version(utf_serde: UTF8Deserializer, infile) -> str: + version = utf_serde.loads(infile) + + python_major, python_minor = sys.version_info[:2] + + if version != f"{python_major}.{python_minor}": + raise PySparkRuntimeError( + error_class="PYTHON_VERSION_MISMATCH", + message_parameters={ + "worker_version": str(sys.version_info[:2]), + "driver_version": str(version), + }, + ) + + return version + + +def check_barrier_flag(infile): + is_barrier = read_bool(infile) + bound_port = read_int(infile) + secret = UTF8Deserializer().loads(infile) + + if is_barrier: + raise PySparkRuntimeError( + error_class="BARRIER_MODE_NOT_SUPPORTED", + message_parameters={ + "worker_version": str(sys.version_info[:2]), + "message": "Barrier mode is not supported by SedonaDB vectorized functions.", + }, + ) + + return is_barrier + + +def assign_task_context(utf_serde: UTF8Deserializer, infile): + stage_id = read_int(infile) + partition_id = read_int(infile) + attempt_number = read_long(infile) + task_attempt_id = read_int(infile) + cpus = read_int(infile) + + task_context = TaskContext._getOrCreate() + task_context._stage_id = stage_id + task_context._partition_id = partition_id + task_context._attempt_number = attempt_number + task_context._task_attempt_id = task_attempt_id + task_context._cpus = cpus + + for r in range(read_int(infile)): + key = utf_serde.loads(infile) + name = utf_serde.loads(infile) + addresses = [] + task_context._resources = {} + for a in range(read_int(infile)): + addresses.append(utf_serde.loads(infile)) + task_context._resources[key] = ResourceInformation(name, addresses) + + task_context._localProperties = {} + for i in range(read_int(infile)): + k = utf_serde.loads(infile) + v = utf_serde.loads(infile) + task_context._localProperties[k] = v + + return task_context + + +def resolve_python_path(utf_serde: UTF8Deserializer, infile): + def add_path(path: str): + # worker can be used, so do not add path multiple times + if path not in sys.path: + # overwrite system packages + sys.path.insert(1, path) + + spark_files_dir = utf_serde.loads(infile) + + SparkFiles._root_directory = spark_files_dir + SparkFiles._is_running_on_worker = True + + add_path(spark_files_dir) # *.py files that were added will be copied here + num_python_includes = read_int(infile) + for _ in range(num_python_includes): + filename = utf_serde.loads(infile) + add_path(os.path.join(spark_files_dir, filename)) + + importlib.invalidate_caches() + + +def check_broadcast_variables(infile): + needs_broadcast_decryption_server = read_bool(infile) + num_broadcast_variables = read_int(infile) + + if needs_broadcast_decryption_server or num_broadcast_variables > 0: + raise PySparkRuntimeError( + error_class="BROADCAST_VARS_NOT_SUPPORTED", + message_parameters={ + "worker_version": str(sys.version_info[:2]), + "message": "Broadcast variables are not supported by SedonaDB vectorized functions.", + }, + ) + + +def get_runner_conf(utf_serde: UTF8Deserializer, infile): + runner_conf = {} + num_conf = read_int(infile) + for i in range(num_conf): + k = utf_serde.loads(infile) + v = utf_serde.loads(infile) + runner_conf[k] = v + return runner_conf + + +def read_command(serializer, infile): + command = serializer._read_with_length(infile) + return command + + +def read_udf(infile, pickle_ser) -> UDFInfo: + num_arg = read_int(infile) + arg_offsets = [read_int(infile) for i in range(num_arg)] + + function = None + return_type = None + + for i in range(read_int(infile)): + function, return_type = read_command(pickle_ser, infile) + + sedona_db_udf_expression = function() + + return UDFInfo( + arg_offsets=arg_offsets, + function=sedona_db_udf_expression, + return_type=return_type, + name=sedona_db_udf_expression._name, + geom_offsets=[0], + ) + + +def register_sedona_db_udf(infile, pickle_ser) -> UDFInfo: + num_udfs = read_int(infile) + + udf = None + for _ in range(num_udfs): + udf = read_udf(infile, pickle_ser) + + return udf Review Comment: Is it intended to discard udfs except the last one? ########## python/sedona/spark/worker/daemon.py: ########## @@ -0,0 +1,218 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + Review Comment: I'm not sure if monkeypatching the worker_main property of PySpark's daemon module after importing it would work, but I still prefer the current approach. Maintaining a fork of daemon.py is fine once we know where are the modified pieces. ########## spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/python/SedonaPythonArrowInput.scala: ########## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.execution.python + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.ipc.ArrowStreamWriter +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.arrow.ArrowWriter +import org.apache.spark.sql.util.ArrowUtils +import org.apache.spark.util.Utils +import org.apache.spark.{SparkEnv, TaskContext} + +import java.io.DataOutputStream +import java.net.Socket + +private[python] trait SedonaPythonArrowInput[IN] extends PythonArrowInput[IN] { Review Comment: I suggest add a github link to the comment noting where most of the code was taken from. This suggestion also applies to SedonaPythonArrowOutput. ########## python/src/geomserde_speedup_module.c: ########## @@ -262,14 +265,121 @@ static PyObject *deserialize_1(PyObject *self, PyObject *args) { return Py_BuildValue("(Kibi)", geom, geom_type_id, has_z, length); } +static PyObject *to_sedona_func(PyObject *self, PyObject *args) { + import_array(); + PyObject *input_obj = NULL; + if (!PyArg_ParseTuple(args, "O", &input_obj)) { + return NULL; + }; + + PyArrayObject *array = (PyArrayObject *)input_obj; + PyObject **objs = (PyObject **)PyArray_DATA(array); + + GEOSContextHandle_t handle = get_geos_context_handle(); + if (handle == NULL) { + return NULL; + } + + npy_intp n = PyArray_SIZE(array); + npy_intp dims[1] = {n}; + PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT); + for (npy_intp i = 0; i < PyArray_SIZE(array); i++) { + PyObject *obj = objs[i]; + GEOSGeometry *geos_geom = NULL; + char success = PyGEOS_GetGEOSGeometry(obj, &geos_geom); + if (!success || geos_geom == NULL) { + PyErr_SetString(PyExc_TypeError, "Invalid GEOS geometry"); + Py_DECREF(out); + return NULL; + } + + PyObject *serialized = do_serialize(geos_geom); + if (!serialized) { + Py_DECREF(out); + return NULL; + } + + if (PyArray_SETITEM(out, PyArray_GETPTR1(out, i), serialized) < 0) { + Py_DECREF(serialized); + Py_DECREF(out); + return NULL; + } + Py_DECREF(serialized); + } + + return (PyObject *)out; +} /* Module definition for Shapely 2.x */ +static PyObject *from_sedona_func(PyObject *self, PyObject *args) { + import_array(); + PyObject *input_obj = NULL; + if (!PyArg_ParseTuple(args, "O", &input_obj)) { + return NULL; + }; + + GEOSContextHandle_t handle = get_geos_context_handle(); + + PyArrayObject *array = (PyArrayObject *)input_obj; + PyObject **objs = (PyObject **)PyArray_DATA(array); + + int p_bytes_read = 0; + + npy_intp n = PyArray_SIZE(array); + + npy_intp dims[1] = {n}; + PyArrayObject *out = (PyArrayObject *)PyArray_SimpleNew(1, dims, NPY_OBJECT); + + for (npy_intp i = 0; i < PyArray_SIZE(array); i++) { + PyObject *obj = objs[i]; + if (!PyBytes_Check(obj)) { + PyErr_SetString(PyExc_TypeError, "Expected bytes"); + Py_DECREF(out); + + return NULL; + } + + char *buf = PyBytes_AS_STRING(obj); + + Py_ssize_t len = PyBytes_GET_SIZE(obj); + + GEOSGeometry *geom = NULL; + + SedonaErrorCode err = + sedona_deserialize_geom(handle, buf, len, &geom, &p_bytes_read); + if (err != SEDONA_SUCCESS) { + handle_geomserde_error(err); + Py_DECREF(out); + return NULL; + } + + PyObject *pygeom = PyGEOS_CreateGeometry(geom, handle); + if (!pygeom) { + Py_DECREF(out); + return NULL; + } + + if (PyArray_SETITEM(out, PyArray_GETPTR1(out, i), pygeom) < 0) { + Py_DECREF(pygeom); + Py_DECREF(out); + return NULL; + } + + Py_DECREF(pygeom); + } + + return (PyObject *)out; +} static PyMethodDef geomserde_methods_shapely_2[] = { {"load_libgeos_c", load_libgeos_c, METH_VARARGS, "Load libgeos_c."}, {"serialize", serialize, METH_VARARGS, "Serialize geometry object as bytearray."}, {"deserialize", deserialize, METH_VARARGS, "Deserialize bytes-like object to geometry object."}, + {"from_sedona_func", from_sedona_func, METH_VARARGS, + "Deserialize bytes-like object to geometry object."}, + {"to_sedona_func", to_sedona_func, METH_VARARGS, + "Deserialize bytes-like object to geometry object."}, Review Comment: These functions are for working with arrays, we should clarify this in the description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
