This is an automated email from the ASF dual-hosted git repository.

imbruced pushed a commit to branch arrow-worker
in repository https://gitbox.apache.org/repos/asf/sedona.git

commit 4156ea468492d693b82bf01141fbd5a110392572
Author: pawelkocinski <[email protected]>
AuthorDate: Sat Nov 22 19:25:04 2025 +0100

    SEDONA-748 add working example
---
 .../common/geometrySerde/GeometrySerializer.java   |  60 ++++-----
 .../sedona/sql/utils/GeometrySerializer.scala      |   2 -
 .../spark/api/python/SedonaPythonRunner.scala      |   3 +-
 .../sedona/sql/GeoParquetMetadataTests.scala       | 138 +++++++++++++++++++++
 .../org/apache/spark/sql/udf/StrategySuite.scala   |   8 +-
 .../apache/spark/sql/udf/TestScalarPandasUDF.scala |  18 ++-
 6 files changed, 189 insertions(+), 40 deletions(-)

diff --git 
a/common/src/main/java/org/apache/sedona/common/geometrySerde/GeometrySerializer.java
 
b/common/src/main/java/org/apache/sedona/common/geometrySerde/GeometrySerializer.java
index ba135aa6a1..d0f2f39d46 100644
--- 
a/common/src/main/java/org/apache/sedona/common/geometrySerde/GeometrySerializer.java
+++ 
b/common/src/main/java/org/apache/sedona/common/geometrySerde/GeometrySerializer.java
@@ -40,27 +40,27 @@ public class GeometrySerializer {
   private static final PrecisionModel PRECISION_MODEL = new PrecisionModel();
 
   public static byte[] serialize(Geometry geometry) {
-    return new WKBWriter().write(geometry);
-//    GeometryBuffer buffer;
-//    if (geometry instanceof Point) {
-//      buffer = serializePoint((Point) geometry);
-//    } else if (geometry instanceof MultiPoint) {
-//      buffer = serializeMultiPoint((MultiPoint) geometry);
-//    } else if (geometry instanceof LineString) {
-//      buffer = serializeLineString((LineString) geometry);
-//    } else if (geometry instanceof MultiLineString) {
-//      buffer = serializeMultiLineString((MultiLineString) geometry);
-//    } else if (geometry instanceof Polygon) {
-//      buffer = serializePolygon((Polygon) geometry);
-//    } else if (geometry instanceof MultiPolygon) {
-//      buffer = serializeMultiPolygon((MultiPolygon) geometry);
-//    } else if (geometry instanceof GeometryCollection) {
-//      buffer = serializeGeometryCollection((GeometryCollection) geometry);
-//    } else {
-//      throw new UnsupportedOperationException(
-//          "Geometry type is not supported: " + 
geometry.getClass().getSimpleName());
-//    }
-//    return buffer.toByteArray();
+//    return new WKBWriter().write(geometry);
+    GeometryBuffer buffer;
+    if (geometry instanceof Point) {
+      buffer = serializePoint((Point) geometry);
+    } else if (geometry instanceof MultiPoint) {
+      buffer = serializeMultiPoint((MultiPoint) geometry);
+    } else if (geometry instanceof LineString) {
+      buffer = serializeLineString((LineString) geometry);
+    } else if (geometry instanceof MultiLineString) {
+      buffer = serializeMultiLineString((MultiLineString) geometry);
+    } else if (geometry instanceof Polygon) {
+      buffer = serializePolygon((Polygon) geometry);
+    } else if (geometry instanceof MultiPolygon) {
+      buffer = serializeMultiPolygon((MultiPolygon) geometry);
+    } else if (geometry instanceof GeometryCollection) {
+      buffer = serializeGeometryCollection((GeometryCollection) geometry);
+    } else {
+      throw new UnsupportedOperationException(
+          "Geometry type is not supported: " + 
geometry.getClass().getSimpleName());
+    }
+    return buffer.toByteArray();
   }
 
   public static byte[] serializeLegacy(Geometry geometry) {
@@ -87,14 +87,14 @@ public class GeometrySerializer {
   }
 
   public static Geometry deserialize(byte[] bytes) {
-    WKBReader reader = new WKBReader();
-    try {
-      return reader.read(bytes);
-    } catch (Exception e) {
-      throw new IllegalArgumentException("Failed to deserialize geometry from 
bytes", e);
-    }
-//    GeometryBuffer buffer = GeometryBufferFactory.wrap(bytes);
-//    return deserialize(buffer);
+//    WKBReader reader = new WKBReader();
+//    try {
+//      return reader.read(bytes);
+//    } catch (Exception e) {
+//      throw new IllegalArgumentException("Failed to deserialize geometry 
from bytes", e);
+//    }
+    GeometryBuffer buffer = GeometryBufferFactory.wrap(bytes);
+    return deserialize(buffer);
   }
 
   public static Geometry deserializeLegacy(byte[] bytes) {
@@ -170,7 +170,7 @@ public class GeometrySerializer {
       buffer.mark(8);
     } else {
       int bufferSize = 8 + coordType.bytes;
-      checkBufferSize(buffer, bufferSize);
+//      checkBufferSize(buffer, bufferSize);
       CoordinateSequence coordinates = buffer.getCoordinate(8);
       point = factory.createPoint(coordinates);
       buffer.mark(bufferSize);
diff --git 
a/spark/common/src/main/scala/org/apache/sedona/sql/utils/GeometrySerializer.scala
 
b/spark/common/src/main/scala/org/apache/sedona/sql/utils/GeometrySerializer.scala
index 246286c608..02f7f3157d 100644
--- 
a/spark/common/src/main/scala/org/apache/sedona/sql/utils/GeometrySerializer.scala
+++ 
b/spark/common/src/main/scala/org/apache/sedona/sql/utils/GeometrySerializer.scala
@@ -36,8 +36,6 @@ object GeometrySerializer {
    */
   def serialize(geometry: Geometry): Array[Byte] = {
     val serialized = geometrySerde.GeometrySerializer.serialize(geometry)
-
-    println(serialized.map(el => if (el < 0) el + 256 else el ).mkString(","))
     serialized
   }
 
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/api/python/SedonaPythonRunner.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/api/python/SedonaPythonRunner.scala
index 6656d85f5c..bdc989d82b 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/api/python/SedonaPythonRunner.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/api/python/SedonaPythonRunner.scala
@@ -135,7 +135,6 @@ private[spark] abstract class SedonaBasePythonRunner[IN, 
OUT](
                partitionIndex: Int,
                context: TaskContext): Iterator[OUT] = {
     val startTime = System.currentTimeMillis
-    val sedonaEnv = SedonaSparkEnv.get
     val env = SparkEnv.get
 
     // Get the executor cores and pyspark memory, they are passed via the 
local properties when
@@ -310,7 +309,7 @@ private[spark] abstract class SedonaBasePythonRunner[IN, 
OUT](
                   val requestMethod = input.readInt()
                   // The BarrierTaskContext function may wait infinitely, 
socket shall not timeout
                   // before the function finishes.
-                  sock.setSoTimeout(0)
+                  sock.setSoTimeout(10000)
                   requestMethod match {
                     case BarrierTaskContextMessageProtocol.BARRIER_FUNCTION =>
                       barrierAndServe(requestMethod, sock)
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 421890c700..5198008392 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -18,19 +18,157 @@
  */
 package org.apache.sedona.sql
 
+import org.apache.sedona.sql.utils.GeometrySerializer
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.locationtech.jts.geom.Geometry
+import org.locationtech.jts.io.WKTReader
 import org.scalatest.BeforeAndAfterAll
 
 import java.util.Collections
 import scala.collection.JavaConverters._
 
+case class GeoDataHex(id: Int, geometry_hex: String)
+case class GeoData(id: Int, geometry: Geometry)
+
 class GeoParquetMetadataTests extends TestBaseScala with BeforeAndAfterAll {
   val geoparquetdatalocation: String = resourceFolder + "geoparquet/"
   val geoparquetoutputlocation: String = resourceFolder + 
"geoparquet/geoparquet_output/"
 
+  import sparkSession.implicits._
+
   describe("GeoParquet Metadata tests") {
+    it("reading and writing GeoParquet files") {
+//           'POINT(30.0123 10.2131)', \
+      //                    'POINT(-20 20)', \
+      //                    'POINT(10 30)', \
+      //                    'POINT(40 -40)' \
+
+//      [0] = {u8} 18
+//[1] = {u8} 0
+//[2] = {u8} 0
+//[3] = {u8} 0
+//[4] = {u8} 1
+//[5] = {u8} 165
+//[6] = {u8} 189
+//[7] = {u8} 193
+//[8] = {u8} 23
+//[9] = {u8} 38
+//[10] = {u8} 3
+//[11] = {u8} 62
+//[12] = {u8} 64
+//[13] = {u8} 34
+//[14] = {u8} 142
+//[15] = {u8} 117
+//[16] = {u8} 113
+//[17] = {u8} 27
+//[18] = {u8} 109
+//[19] = {u8} 36
+//[20] = {u8} 64
+      val byteArray = Array[Int](
+        18, 0, 0, 0, 1,
+        165, 189, 193, 23, 38, 3, 62, 64, 34, 142, 117, 113, 27, 109, 36, 64
+      )
+        .map(_.toByte)
+
+//      [ 18, 0, 0, 0, 1, -91, -67, -63, 23, 38, 3, 62, 64, 34, -114, 117, 
113, 27, 109, 36, 64 ]
+
+//      GeometrySerializer.deserialize(byteArray)
+//        [18, 0, 0, 0, 1, -91, -67, -63, 23, 38, 3, 62, 64, 34, -114, 117, 
113, 27, 109, 36, 64]
+//    18 18
+//    0 0
+//    0 0
+//    0 0
+//    1 1
+//    0
+//    0
+//    0
+//    -91 -91
+//    -67 -67
+//    -63 -63
+//    23 23
+//    38 38
+//    3 3
+//    62 62
+//    64 64
+//    34 34
+//    -114 -114
+//    117 117
+//    113 113
+//    27 27
+//    109 109
+//    36 36
+//    64 64
+
+//      val wktReader = new WKTReader()
+//      val pointWKT = "POINT(30.0123 10.2131)"
+//      val point = wktReader.read(pointWKT)
+//      val serializedBytes = GeometrySerializer.serialize(point)
+//      serializedBytes.foreach(
+//        byte => println(byte)
+//      )
+//
+      def hexToBytes(hex: String): Array[Byte] =
+        hex.grouped(2).map(Integer.parseInt(_, 16).toByte).toArray
+//
+      def bytesToHex(bytes: Array[Byte]): String =
+        bytes.map("%02x".format(_)).mkString
+
+//      Seq(
+//        (1, "POINT(30.0123 10.2131)"),
+////        (2, "POINT(-20 20)"),
+////        (3, "POINT(10 30)"),
+////        (4, "POINT(40 -40)")
+//      ).toDF("id", "wkt")
+//        .selectExpr("id", "ST_GeomFromWKT(wkt) AS geometry")
+//        .as[GeoData]
+//        .map(
+//          row => (row.id, 
bytesToHex(GeometrySerializer.serialize(row.geometry)))
+//        ).show(4, false)
+
+
+//
+//      val data = Seq(
+//        (1, "1200000001000000a5bdc11726033e40228e75711b6d2440"),
+//      )
+//        .toDF("id", "geometry_hex")
+//        .as[GeoDataHex]
+//
+//      data.map(
+//        row => GeoData(row.id, 
GeometrySerializer.deserialize(hexToBytes(row.geometry_hex)))
+//      ).show
+
+      val wkt = "LINESTRING (  20.9972017 52.1696936,   20.9971687 52.1696659, 
  20.997156 52.169644,   20.9971487 52.1696213 ) "
+      val reader = new WKTReader()
+      val geometry = reader.read(wkt)
+      val serialized = GeometrySerializer.serialize(geometry)
+
+      Seq(
+        (1, serialized)
+      ).toDF("id", "geometry_bytes")
+        .show(1, false)
+
+//      println(bytesToHex(serialized))
+
+//
+//      val binaryData = "1200000001000000bb9d61f7b6c92c40f1ba168a85"
+//      val binaryData2 = "120000000100000046b6f3fdd4083e404e62105839342440"
+//      val value = new GeometryUDT().deserialize(hexToBytes(binaryData))
+//      val value3 = new GeometryUDT().deserialize(hexToBytes(binaryData2))
+//      println(value)
+//      println(value3)
+//
+//      val reader = new WKTReader()
+//      val geometryPoint = "POINT (30.0345 10.1020)"
+//      val point = reader.read(geometryPoint)
+//      val result = new GeometryUDT().serialize(point)
+//
+//      val value2 = new GeometryUDT().deserialize(result)
+//      println(bytesToHex(result))
+//      println(value2)
+//      println("ssss")
+    }
     it("Reading GeoParquet Metadata") {
       val df = 
sparkSession.read.format("geoparquet.metadata").load(geoparquetdatalocation)
       val metadataArray = df.collect()
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
index 3396defd01..391587e586 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/StrategySuite.scala
@@ -47,14 +47,16 @@ class StrategySuite extends AnyFunSuite with Matchers {
 //    spark.sql("select 1").show()
     val df = spark.read.format("geoparquet")
       
.load("/Users/pawelkocinski/Desktop/projects/sedona-production/apache-sedona-book/data/warehouse/buildings")
-      .limit(10)
+      .selectExpr("ST_Centroid(geometry) AS geometry")
+
+      df
       .select(
 //        geometryToNonGeometryFunction(col("geometry")),
         geometryToGeometryFunction(col("geometry")),
 //        nonGeometryToGeometryFunction(expr("ST_AsText(geometry)")),
-      )
+      ).show(10, false)
 
-    df.show()
+//    df.show()
     1 shouldBe 1
 
 //    val df = Seq(
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
index 83f87f401e..bf8134e423 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/spark/sql/udf/TestScalarPandasUDF.scala
@@ -105,12 +105,24 @@ object ScalarUDF {
           pythonExec,
           "-c",
           f"""
+             |import pyarrow as pa
+             |import shapely
+             |import geoarrow.pyarrow as ga
+             |from sedonadb import udf
              |from sedona.sql.types import GeometryType
              |from pyspark.serializers import CloudPickleSerializer
+             |
+             |@udf.arrow_udf(ga.wkb(), [udf.GEOMETRY, udf.NUMERIC])
+             |def shapely_udf(geom, distance):
+             |    geom_wkb = pa.array(geom.storage.to_array())
+             |    distance = pa.array(distance.to_array())
+             |    geom = shapely.from_wkb(geom_wkb)
+             |    result_shapely = shapely.buffer(geom, distance)
+             |
+             |    return pa.array(shapely.to_wkb(result_shapely))
+             |
              |f = open('$path', 'wb');
-             |def apply_geopandas(x):
-             |    return x.buffer(1)
-             |f.write(CloudPickleSerializer().dumps((apply_geopandas, 
GeometryType())))
+             |f.write(CloudPickleSerializer().dumps((shapely_udf, 
GeometryType())))
              |""".stripMargin),
         None,
         "PYTHONPATH" -> s"$pysparkPythonPath:$pythonPath").!!

Reply via email to