This is an automated email from the ASF dual-hosted git repository. imbruced pushed a commit to branch arrow-worker in repository https://gitbox.apache.org/repos/asf/sedona.git
commit 4156ea468492d693b82bf01141fbd5a110392572 Author: pawelkocinski <[email protected]> AuthorDate: Sat Nov 22 19:25:04 2025 +0100 SEDONA-748 add working example --- .../common/geometrySerde/GeometrySerializer.java | 60 ++++----- .../sedona/sql/utils/GeometrySerializer.scala | 2 - .../spark/api/python/SedonaPythonRunner.scala | 3 +- .../sedona/sql/GeoParquetMetadataTests.scala | 138 +++++++++++++++++++++ .../org/apache/spark/sql/udf/StrategySuite.scala | 8 +- .../apache/spark/sql/udf/TestScalarPandasUDF.scala | 18 ++- 6 files changed, 189 insertions(+), 40 deletions(-) diff --git a/common/src/main/java/org/apache/sedona/common/geometrySerde/GeometrySerializer.java b/common/src/main/java/org/apache/sedona/common/geometrySerde/GeometrySerializer.java index ba135aa6a1..d0f2f39d46 100644 --- a/common/src/main/java/org/apache/sedona/common/geometrySerde/GeometrySerializer.java +++ b/common/src/main/java/org/apache/sedona/common/geometrySerde/GeometrySerializer.java @@ -40,27 +40,27 @@ public class GeometrySerializer { private static final PrecisionModel PRECISION_MODEL = new PrecisionModel(); public static byte[] serialize(Geometry geometry) { - return new WKBWriter().write(geometry); -// GeometryBuffer buffer; -// if (geometry instanceof Point) { -// buffer = serializePoint((Point) geometry); -// } else if (geometry instanceof MultiPoint) { -// buffer = serializeMultiPoint((MultiPoint) geometry); -// } else if (geometry instanceof LineString) { -// buffer = serializeLineString((LineString) geometry); -// } else if (geometry instanceof MultiLineString) { -// buffer = serializeMultiLineString((MultiLineString) geometry); -// } else if (geometry instanceof Polygon) { -// buffer = serializePolygon((Polygon) geometry); -// } else if (geometry instanceof MultiPolygon) { -// buffer = serializeMultiPolygon((MultiPolygon) geometry); -// } else if (geometry instanceof GeometryCollection) { -// buffer = serializeGeometryCollection((GeometryCollection) geometry); -// } else { -// throw new UnsupportedOperationException( -// "Geometry type is not supported: " + geometry.getClass().getSimpleName()); -// } -// return buffer.toByteArray(); +// return new WKBWriter().write(geometry); + GeometryBuffer buffer; + if (geometry instanceof Point) { + buffer = serializePoint((Point) geometry); + } else if (geometry instanceof MultiPoint) { + buffer = serializeMultiPoint((MultiPoint) geometry); + } else if (geometry instanceof LineString) { + buffer = serializeLineString((LineString) geometry); + } else if (geometry instanceof MultiLineString) { + buffer = serializeMultiLineString((MultiLineString) geometry); + } else if (geometry instanceof Polygon) { + buffer = serializePolygon((Polygon) geometry); + } else if (geometry instanceof MultiPolygon) { + buffer = serializeMultiPolygon((MultiPolygon) geometry); + } else if (geometry instanceof GeometryCollection) { + buffer = serializeGeometryCollection((GeometryCollection) geometry); + } else { + throw new UnsupportedOperationException( + "Geometry type is not supported: " + geometry.getClass().getSimpleName()); + } + return buffer.toByteArray(); } public static byte[] serializeLegacy(Geometry geometry) { @@ -87,14 +87,14 @@ public class GeometrySerializer { } public static Geometry deserialize(byte[] bytes) { - WKBReader reader = new WKBReader(); - try { - return reader.read(bytes); - } catch (Exception e) { - throw new IllegalArgumentException("Failed to deserialize geometry from bytes", e); - } -// GeometryBuffer buffer = GeometryBufferFactory.wrap(bytes); -// return deserialize(buffer); +// WKBReader reader = new WKBReader(); +// try { +// return reader.read(bytes); +// } catch (Exception e) { +// throw new IllegalArgumentException("Failed to deserialize geometry from bytes", e); +// } + GeometryBuffer buffer = GeometryBufferFactory.wrap(bytes); + return deserialize(buffer); } public static Geometry deserializeLegacy(byte[] bytes) { @@ -170,7 +170,7 @@ public class GeometrySerializer { buffer.mark(8); } else { int bufferSize = 8 + coordType.bytes; - checkBufferSize(buffer, bufferSize); +// checkBufferSize(buffer, bufferSize); CoordinateSequence coordinates = buffer.getCoordinate(8); point = factory.createPoint(coordinates); buffer.mark(bufferSize); diff --git a/spark/common/src/main/scala/org/apache/sedona/sql/utils/GeometrySerializer.scala b/spark/common/src/main/scala/org/apache/sedona/sql/utils/GeometrySerializer.scala index 246286c608..02f7f3157d 100644 --- a/spark/common/src/main/scala/org/apache/sedona/sql/utils/GeometrySerializer.scala +++ b/spark/common/src/main/scala/org/apache/sedona/sql/utils/GeometrySerializer.scala @@ -36,8 +36,6 @@ object GeometrySerializer { */ def serialize(geometry: Geometry): Array[Byte] = { val serialized = geometrySerde.GeometrySerializer.serialize(geometry) - - println(serialized.map(el => if (el < 0) el + 256 else el ).mkString(",")) serialized } diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/api/python/SedonaPythonRunner.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/api/python/SedonaPythonRunner.scala index 6656d85f5c..bdc989d82b 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/api/python/SedonaPythonRunner.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/api/python/SedonaPythonRunner.scala @@ -135,7 +135,6 @@ private[spark] abstract class SedonaBasePythonRunner[IN, OUT]( partitionIndex: Int, context: TaskContext): Iterator[OUT] = { val startTime = System.currentTimeMillis - val sedonaEnv = SedonaSparkEnv.get val env = SparkEnv.get // Get the executor cores and pyspark memory, they are passed via the local properties when @@ -310,7 +309,7 @@ private[spark] abstract class SedonaBasePythonRunner[IN, OUT]( val requestMethod = input.readInt() // The BarrierTaskContext function may wait infinitely, socket shall not timeout // before the function finishes. - sock.setSoTimeout(0) + sock.setSoTimeout(10000) requestMethod match { case BarrierTaskContextMessageProtocol.BARRIER_FUNCTION => barrierAndServe(requestMethod, sock) diff --git a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala index 421890c700..5198008392 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala @@ -18,19 +18,157 @@ */ package org.apache.sedona.sql +import org.apache.sedona.sql.utils.GeometrySerializer import org.apache.spark.sql.Row import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT import org.apache.spark.sql.types.{IntegerType, StructField, StructType} +import org.locationtech.jts.geom.Geometry +import org.locationtech.jts.io.WKTReader import org.scalatest.BeforeAndAfterAll import java.util.Collections import scala.collection.JavaConverters._ +case class GeoDataHex(id: Int, geometry_hex: String) +case class GeoData(id: Int, geometry: Geometry) + class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll { val geoparquetdatalocation: String = resourceFolder + "geoparquet/" val geoparquetoutputlocation: String = resourceFolder + "geoparquet/geoparquet_output/" + import sparkSession.implicits._ + describe("GeoParquet Metadata tests") { + it("reading and writing GeoParquet files") { +// 'POINT(30.0123 10.2131)', \ + // 'POINT(-20 20)', \ + // 'POINT(10 30)', \ + // 'POINT(40 -40)' \ + +// [0] = {u8} 18 +//[1] = {u8} 0 +//[2] = {u8} 0 +//[3] = {u8} 0 +//[4] = {u8} 1 +//[5] = {u8} 165 +//[6] = {u8} 189 +//[7] = {u8} 193 +//[8] = {u8} 23 +//[9] = {u8} 38 +//[10] = {u8} 3 +//[11] = {u8} 62 +//[12] = {u8} 64 +//[13] = {u8} 34 +//[14] = {u8} 142 +//[15] = {u8} 117 +//[16] = {u8} 113 +//[17] = {u8} 27 +//[18] = {u8} 109 +//[19] = {u8} 36 +//[20] = {u8} 64 + val byteArray = Array[Int]( + 18, 0, 0, 0, 1, + 165, 189, 193, 23, 38, 3, 62, 64, 34, 142, 117, 113, 27, 109, 36, 64 + ) + .map(_.toByte) + +// [ 18, 0, 0, 0, 1, -91, -67, -63, 23, 38, 3, 62, 64, 34, -114, 117, 113, 27, 109, 36, 64 ] + +// GeometrySerializer.deserialize(byteArray) +// [18, 0, 0, 0, 1, -91, -67, -63, 23, 38, 3, 62, 64, 34, -114, 117, 113, 27, 109, 36, 64] +// 18 18 +// 0 0 +// 0 0 +// 0 0 +// 1 1 +// 0 +// 0 +// 0 +// -91 -91 +// -67 -67 +// -63 -63 +// 23 23 +// 38 38 +// 3 3 +// 62 62 +// 64 64 +// 34 34 +// -114 -114 +// 117 117 +// 113 113 +// 27 27 +// 109 109 +// 36 36 +// 64 64 + +// val wktReader = new WKTReader() +// val pointWKT = "POINT(30.0123 10.2131)" +// val point = wktReader.read(pointWKT) +// val serializedBytes = GeometrySerializer.serialize(point) +// serializedBytes.foreach( +// byte => println(byte) +// ) +// + def hexToBytes(hex: String): Array[Byte] = + hex.grouped(2).map(Integer.parseInt(_, 16).toByte).toArray +// + def bytesToHex(bytes: Array[Byte]): String = + bytes.map("%02x".format(_)).mkString + +// Seq( +// (1, "POINT(30.0123 10.2131)"), +//// (2, "POINT(-20 20)"), +//// (3, "POINT(10 30)"), +//// (4, "POINT(40 -40)") +// ).toDF("id", "wkt") +// .selectExpr("id", "ST_GeomFromWKT(wkt) AS geometry") +// .as[GeoData] +// .map( +// row => (row.id, bytesToHex(GeometrySerializer.serialize(row.geometry))) +// ).show(4, false) + + +// +// val data = Seq( +// (1, "1200000001000000a5bdc11726033e40228e75711b6d2440"), +// ) +// .toDF("id", "geometry_hex") +// .as[GeoDataHex] +// +// data.map( +// row => GeoData(row.id, GeometrySerializer.deserialize(hexToBytes(row.geometry_hex))) +// ).show + + val wkt = "LINESTRING ( 20.9972017 52.1696936, 20.9971687 52.1696659, 20.997156 52.169644, 20.9971487 52.1696213 ) " + val reader = new WKTReader() + val geometry = reader.read(wkt) + val serialized = GeometrySerializer.serialize(geometry) + + Seq( + (1, serialized) + ).toDF("id", "geometry_bytes") + .show(1, false) + +// println(bytesToHex(serialized)) + +// +// val binaryData = "1200000001000000bb9d61f7b6c92c40f1ba168a85" +// val binaryData2 = "120000000100000046b6f3fdd4083e404e62105839342440" +// val value = new GeometryUDT().deserialize(hexToBytes(binaryData)) +// val value3 = new GeometryUDT().deserialize(hexToBytes(binaryData2)) +// println(value) +// println(value3) +// +// val reader = new WKTReader() +// val geometryPoint = "POINT (30.0345 10.1020)" +// val point = reader.read(geometryPoint) +// val result = new GeometryUDT().serialize(point) +// +// val value2 = new GeometryUDT().deserialize(result) +// println(bytesToHex(result)) +// println(value2) +// println("ssss") + } it("Reading GeoParquet Metadata") { val df = sparkSession.read.format("geoparquet.metadata").load(geoparquetdatalocation) val metadataArray = df.collect() diff --git a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala index 3396defd01..391587e586 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala @@ -47,14 +47,16 @@ class StrategySuite extends AnyFunSuite with Matchers { // spark.sql("select 1").show() val df = spark.read.format("geoparquet") .load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings") - .limit(10) + .selectExpr("ST_Centroid(geometry) AS geometry") + + df .select( // geometryToNonGeometryFunction(col("geometry")), geometryToGeometryFunction(col("geometry")), // nonGeometryToGeometryFunction(expr("ST_AsText(geometry)")), - ) + ).show(10, false) - df.show() +// df.show() 1 shouldBe 1 // val df = Seq( diff --git a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala index 83f87f401e..bf8134e423 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala @@ -105,12 +105,24 @@ object ScalarUDF { pythonExec, "-c", f""" + |import pyarrow as pa + |import shapely + |import geoarrow.pyarrow as ga + |from sedonadb import udf |from sedona.sql.types import GeometryType |from pyspark.serializers import CloudPickleSerializer + | + |@udf.arrow_udf(ga.wkb(), [udf.GEOMETRY, udf.NUMERIC]) + |def shapely_udf(geom, distance): + | geom_wkb = pa.array(geom.storage.to_array()) + | distance = pa.array(distance.to_array()) + | geom = shapely.from_wkb(geom_wkb) + | result_shapely = shapely.buffer(geom, distance) + | + | return pa.array(shapely.to_wkb(result_shapely)) + | |f = open('$path', 'wb'); - |def apply_geopandas(x): - | return x.buffer(1) - |f.write(CloudPickleSerializer().dumps((apply_geopandas, GeometryType()))) + |f.write(CloudPickleSerializer().dumps((shapely_udf, GeometryType()))) |""".stripMargin), None, "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!
