This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 558ab9abae [GH-2376] Preserve CRS SRID from GeoParquet metadata when 
reading geometries (#2661)
558ab9abae is described below

commit 558ab9abae56afe1da1d7ef2e3a4c827004d780c
Author: Jia Yu <[email protected]>
AuthorDate: Thu Feb 19 00:19:53 2026 -0700

    [GH-2376] Preserve CRS SRID from GeoParquet metadata when reading 
geometries (#2661)
---
 pom.xml                                            |   2 +-
 .../geoparquet/GeoParquetMetaData.scala            |  54 +++++
 .../geoparquet/GeoParquetRowConverter.scala        |   4 +
 .../geoparquet/GeoParquetSchemaConverter.scala     |  23 ++
 .../org/apache/sedona/sql/geoparquetIOTests.scala  | 241 +++++++++++++++++++++
 5 files changed, 323 insertions(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index b8025399e6..4e8ec472e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,7 +96,7 @@
         
<scala-collection-compat.version>2.5.0</scala-collection-compat.version>
         <geoglib.version>1.52</geoglib.version>
         <caffeine.version>2.9.2</caffeine.version>
-        <proj4sedona.version>0.0.4</proj4sedona.version>
+        <proj4sedona.version>0.0.5</proj4sedona.version>
 
         <geotools.scope>provided</geotools.scope>
         <!-- Because it's not in Maven central, make it provided by default -->
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
index 982c0523d4..1f2ad3ace7 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
@@ -18,7 +18,10 @@
  */
 package org.apache.spark.sql.execution.datasources.geoparquet
 
+import scala.util.control.NonFatal
+
 import org.apache.spark.sql.types.{DoubleType, FloatType, StructType}
+import org.datasyslab.proj4sedona.core.Proj
 import org.json4s.jackson.JsonMethods.parse
 import org.json4s.jackson.compactJson
 import org.json4s.{DefaultFormats, Extraction, JField, JNothing, JNull, 
JObject, JValue}
@@ -137,6 +140,57 @@ object GeoParquetMetaData {
     compactJson(serializedGeoObject)
   }
 
+  /**
+   * Default SRID for GeoParquet files where the CRS field is omitted. Per the 
GeoParquet spec,
+   * omitting the CRS implies OGC:CRS84, which is equivalent to EPSG:4326.
+   */
+  val DEFAULT_SRID: Int = 4326
+
+  /**
+   * Extract SRID from a GeoParquet CRS metadata value.
+   *
+   * Per the GeoParquet specification:
+   *   - If the CRS field is absent (None), the CRS is OGC:CRS84 (EPSG:4326).
+   *   - If the CRS field is explicitly null, the CRS is unknown (SRID 0).
+   *   - If the CRS field is a PROJJSON object with an "id" containing 
"authority" and "code", the
+   *     EPSG code is used as the SRID.
+   *
+   * @param crs
+   *   The CRS field from GeoParquet column metadata.
+   * @return
+   *   The SRID corresponding to the CRS. Returns 4326 for omitted CRS, 0 for 
null or unrecognized
+   *   CRS, and the EPSG code for PROJJSON with an EPSG identifier.
+   */
+  def extractSridFromCrs(crs: Option[JValue]): Int = {
+    crs match {
+      case None =>
+        // CRS omitted: default to OGC:CRS84 (EPSG:4326) per GeoParquet spec
+        DEFAULT_SRID
+      case Some(JNull) =>
+        // CRS explicitly null: unknown CRS
+        0
+      case Some(projjson) =>
+        // Use proj4sedona to extract authority and code from PROJJSON
+        try {
+          val jsonStr = compactJson(projjson)
+          val result = new Proj(jsonStr).toAuthority()
+          if (result != null && result.length == 2) {
+            result(0) match {
+              case "EPSG" =>
+                try { result(1).toInt }
+                catch { case _: NumberFormatException => 0 }
+              case "OGC" if result(1) == "CRS84" => DEFAULT_SRID
+              case _ => 0
+            }
+          } else {
+            0
+          }
+        } catch {
+          case NonFatal(_) => 0
+        }
+    }
+  }
+
   def createCoveringColumnMetadata(coveringColumnName: String, schema: 
StructType): Covering = {
     val coveringColumnIndex = schema.fieldIndex(coveringColumnName)
     schema(coveringColumnIndex).dataType match {
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetRowConverter.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetRowConverter.scala
index 489899f44a..67e9669dd4 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetRowConverter.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetRowConverter.scala
@@ -215,11 +215,14 @@ private[geoparquet] class GeoParquetRowConverter(
         new ParquetPrimitiveConverter(updater)
 
       case GeometryUDT =>
+        // Compute SRID once per column converter, not per row
+        val srid = schemaConverter.getSrid(parquetType.getName)
         if (parquetType.isPrimitive) {
           new ParquetPrimitiveConverter(updater) {
             override def addBinary(value: Binary): Unit = {
               val wkbReader = new WKBReader()
               val geom = wkbReader.read(value.getBytes)
+              geom.setSRID(srid)
               this.updater.set(GeometryUDT.serialize(geom))
             }
           }
@@ -233,6 +236,7 @@ private[geoparquet] class GeoParquetRowConverter(
                 val wkbReader = new WKBReader()
                 val byteArray = currentArray.map(_.asInstanceOf[Byte]).toArray
                 val geom = wkbReader.read(byteArray)
+                geom.setSRID(srid)
                 this.updater.set(GeometryUDT.serialize(geom))
               }
             }
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSchemaConverter.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSchemaConverter.scala
index 34b9f44ec5..f61d2b2cc4 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSchemaConverter.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSchemaConverter.scala
@@ -57,6 +57,14 @@ class GeoParquetToSparkSchemaConverter(
   private val geoParquetMetaData: GeoParquetMetaData =
     GeoParquetUtils.parseGeoParquetMetaData(keyValueMetaData, parameters)
 
+  /**
+   * Cached SRID values per geometry column, computed once from CRS metadata 
to avoid repeated
+   * PROJJSON parsing during row conversion.
+   */
+  private val sridCache: Map[String, Int] = geoParquetMetaData.columns.map {
+    case (name, fieldMetadata) => name -> 
GeoParquetMetaData.extractSridFromCrs(fieldMetadata.crs)
+  }
+
   def this(
       keyValueMetaData: java.util.Map[String, String],
       conf: PortableSQLConf,
@@ -78,6 +86,21 @@ class GeoParquetToSparkSchemaConverter(
       
conf.get(PortableSQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean,
     parameters = parameters)
 
+  /**
+   * Returns the SRID for a geometry column based on the CRS metadata in the 
GeoParquet file. The
+   * result is served from a pre-computed cache to avoid repeated PROJJSON 
parsing.
+   *
+   * @param columnName
+   *   name of the geometry column
+   * @return
+   *   the SRID extracted from the CRS metadata. Returns 4326 if the CRS is 
omitted, 0 if the CRS
+   *   is null or unrecognized, or the EPSG code if the PROJJSON contains an 
EPSG identifier. If
+   *   the column name is not found in the GeoParquet metadata, returns 0.
+   */
+  def getSrid(columnName: String): Int = {
+    sridCache.getOrElse(columnName, 0)
+  }
+
   /**
    * Returns true if TIMESTAMP_NTZ type is enabled in this 
ParquetToSparkSchemaConverter.
    */
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala 
b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index 7628c9a6da..63cd87971e 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -572,6 +572,194 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
       }
     }
 
+    it("GeoParquet read should set SRID from PROJJSON CRS with EPSG 
identifier") {
+      val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation4)
+      val projjson =
+        """
+          |{
+          |  "$schema": "https://proj.org/schemas/v0.4/projjson.schema.json";,
+          |  "type": "GeographicCRS",
+          |  "name": "NAD83(2011)",
+          |  "datum": {
+          |    "type": "GeodeticReferenceFrame",
+          |    "name": "NAD83 (National Spatial Reference System 2011)",
+          |    "ellipsoid": {
+          |      "name": "GRS 1980",
+          |      "semi_major_axis": 6378137,
+          |      "inverse_flattening": 298.257222101
+          |    }
+          |  },
+          |  "coordinate_system": {
+          |    "subtype": "ellipsoidal",
+          |    "axis": [
+          |      {
+          |        "name": "Geodetic latitude",
+          |        "abbreviation": "Lat",
+          |        "direction": "north",
+          |        "unit": "degree"
+          |      },
+          |      {
+          |        "name": "Geodetic longitude",
+          |        "abbreviation": "Lon",
+          |        "direction": "east",
+          |        "unit": "degree"
+          |      }
+          |    ]
+          |  },
+          |  "id": {
+          |    "authority": "EPSG",
+          |    "code": 6318
+          |  }
+          |}
+          |""".stripMargin
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_srid_epsg.parquet"
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.crs", projjson)
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+      val geoms = df2.select("geometry").collect().map(_.getAs[Geometry](0))
+      assert(geoms.nonEmpty)
+      geoms.foreach { geom =>
+        assert(geom.getSRID == 6318, s"Expected SRID 6318, got 
${geom.getSRID}")
+      }
+    }
+
+    it("GeoParquet read should set SRID 4326 when CRS is omitted") {
+      val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation4)
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_srid_omit.parquet"
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.crs", "")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+      val geoms = df2.select("geometry").collect().map(_.getAs[Geometry](0))
+      assert(geoms.nonEmpty)
+      geoms.foreach { geom =>
+        assert(geom.getSRID == 4326, s"Expected SRID 4326 for omitted CRS, got 
${geom.getSRID}")
+      }
+    }
+
+    it("GeoParquet read should set SRID 0 when CRS is null") {
+      val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation4)
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_srid_null.parquet"
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.crs", "null")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+      val geoms = df2.select("geometry").collect().map(_.getAs[Geometry](0))
+      assert(geoms.nonEmpty)
+      geoms.foreach { geom =>
+        assert(geom.getSRID == 0, s"Expected SRID 0 for null CRS, got 
${geom.getSRID}")
+      }
+    }
+
+    it("GeoParquet read should set SRID 0 for CRS without EPSG identifier") {
+      val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation4)
+      // A PROJJSON without the "id" field
+      val projjsonNoId =
+        """
+          |{
+          |  "$schema": "https://proj.org/schemas/v0.4/projjson.schema.json";,
+          |  "type": "GeographicCRS",
+          |  "name": "Unknown CRS",
+          |  "datum": {
+          |    "type": "GeodeticReferenceFrame",
+          |    "name": "Unknown datum",
+          |    "ellipsoid": {
+          |      "name": "GRS 1980",
+          |      "semi_major_axis": 6378137,
+          |      "inverse_flattening": 298.257222101
+          |    }
+          |  },
+          |  "coordinate_system": {
+          |    "subtype": "ellipsoidal",
+          |    "axis": [
+          |      {
+          |        "name": "Geodetic latitude",
+          |        "abbreviation": "Lat",
+          |        "direction": "north",
+          |        "unit": "degree"
+          |      },
+          |      {
+          |        "name": "Geodetic longitude",
+          |        "abbreviation": "Lon",
+          |        "direction": "east",
+          |        "unit": "degree"
+          |      }
+          |    ]
+          |  }
+          |}
+          |""".stripMargin
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_srid_no_id.parquet"
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.crs", projjsonNoId)
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+      val geoms = df2.select("geometry").collect().map(_.getAs[Geometry](0))
+      assert(geoms.nonEmpty)
+      geoms.foreach { geom =>
+        assert(geom.getSRID == 0, s"Expected SRID 0 for CRS without id, got 
${geom.getSRID}")
+      }
+    }
+
+    it("GeoParquet read should set per-column SRID from different CRSes") {
+      val wktReader = new WKTReader()
+      val testData = Seq(
+        Row(
+          1,
+          wktReader.read("POINT (1 2)"),
+          wktReader.read("POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))")))
+      val schema = StructType(
+        Seq(
+          StructField("id", IntegerType, nullable = false),
+          StructField("g0", GeometryUDT(), nullable = false),
+          StructField("g1", GeometryUDT(), nullable = false)))
+      val df = sparkSession.createDataFrame(testData.asJava, 
schema).repartition(1)
+
+      // g0: EPSG:4326, g1: EPSG:32632
+      val projjson4326 =
+        """{"type":"GeographicCRS","name":"WGS 
84","id":{"authority":"EPSG","code":4326}}"""
+      val projjson32632 =
+        """{"type":"ProjectedCRS","name":"WGS 84 / UTM zone 32N",""" +
+          """"base_crs":{"name":"WGS 
84","datum":{"type":"GeodeticReferenceFrame",""" +
+          """"name":"World Geodetic System 1984",""" +
+          """"ellipsoid":{"name":"WGS 
84","semi_major_axis":6378137,"inverse_flattening":298.257223563}}},""" +
+          """"conversion":{"name":"UTM zone 32N","method":{"name":"Transverse 
Mercator"},""" +
+          """"parameters":[{"name":"Latitude of natural origin","value":0},""" 
+
+          """{"name":"Longitude of natural origin","value":9},""" +
+          """{"name":"Scale factor at natural origin","value":0.9996},""" +
+          """{"name":"False easting","value":500000},""" +
+          """{"name":"False northing","value":0}]},""" +
+          """"coordinate_system":{"subtype":"Cartesian","axis":[""" +
+          """{"name":"Easting","direction":"east","unit":"metre"},""" +
+          """{"name":"Northing","direction":"north","unit":"metre"}]},""" +
+          """"id":{"authority":"EPSG","code":32632}}"""
+
+      val geoParquetSavePath =
+        geoparquetoutputlocation + "/gp_srid_multi_column.parquet"
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.crs", projjson4326)
+        .option("geoparquet.crs.g1", projjson32632)
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+
+      val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+      val rows = df2.collect()
+      assert(rows.length == 1)
+      val g0 = rows(0).getAs[Geometry](rows(0).fieldIndex("g0"))
+      val g1 = rows(0).getAs[Geometry](rows(0).fieldIndex("g1"))
+      assert(g0.getSRID == 4326, s"Expected g0 SRID 4326, got ${g0.getSRID}")
+      assert(g1.getSRID == 32632, s"Expected g1 SRID 32632, got ${g1.getSRID}")
+    }
+
     it("GeoParquet load should raise exception when loading plain parquet 
files") {
       val e = intercept[SparkException] {
         sparkSession.read.format("geoparquet").load(resourceFolder + 
"geoparquet/plain.parquet")
@@ -885,6 +1073,59 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
     }
   }
 
+  describe("GeoParquetMetaData.extractSridFromCrs") {
+    it("should return 4326 when CRS is omitted (None)") {
+      assert(GeoParquetMetaData.extractSridFromCrs(None) == 4326)
+    }
+
+    it("should return 0 when CRS is null") {
+      assert(GeoParquetMetaData.extractSridFromCrs(Some(org.json4s.JNull)) == 
0)
+    }
+
+    it("should extract EPSG code from PROJJSON") {
+      val projjson =
+        
parseJson("""{"type":"GeographicCRS","id":{"authority":"EPSG","code":4326}}""")
+      assert(GeoParquetMetaData.extractSridFromCrs(Some(projjson)) == 4326)
+    }
+
+    it("should extract non-4326 EPSG code") {
+      // Use a complete ProjectedCRS PROJJSON so proj4sedona can parse it
+      val projjson = parseJson(
+        """{"type":"ProjectedCRS","name":"WGS 84 / UTM zone 32N",""" +
+          """"base_crs":{"name":"WGS 
84","datum":{"type":"GeodeticReferenceFrame",""" +
+          """"name":"World Geodetic System 1984",""" +
+          """"ellipsoid":{"name":"WGS 
84","semi_major_axis":6378137,"inverse_flattening":298.257223563}}},""" +
+          """"conversion":{"name":"UTM zone 32N","method":{"name":"Transverse 
Mercator"},""" +
+          """"parameters":[{"name":"Latitude of natural origin","value":0},""" 
+
+          """{"name":"Longitude of natural origin","value":9},""" +
+          """{"name":"Scale factor at natural origin","value":0.9996},""" +
+          """{"name":"False easting","value":500000},""" +
+          """{"name":"False northing","value":0}]},""" +
+          """"coordinate_system":{"subtype":"Cartesian","axis":[""" +
+          """{"name":"Easting","direction":"east","unit":"metre"},""" +
+          """{"name":"Northing","direction":"north","unit":"metre"}]},""" +
+          """"id":{"authority":"EPSG","code":32632}}""")
+      assert(GeoParquetMetaData.extractSridFromCrs(Some(projjson)) == 32632)
+    }
+
+    it("should return 4326 for OGC:CRS84") {
+      val projjson =
+        
parseJson("""{"type":"GeographicCRS","id":{"authority":"OGC","code":"CRS84"}}""")
+      assert(GeoParquetMetaData.extractSridFromCrs(Some(projjson)) == 4326)
+    }
+
+    it("should return 0 for CRS without id field") {
+      val projjson = parseJson("""{"type":"GeographicCRS","name":"Unknown"}""")
+      assert(GeoParquetMetaData.extractSridFromCrs(Some(projjson)) == 0)
+    }
+
+    it("should return 0 for CRS with non-EPSG authority") {
+      val projjson =
+        
parseJson("""{"type":"GeographicCRS","id":{"authority":"IAU","code":49900}}""")
+      assert(GeoParquetMetaData.extractSridFromCrs(Some(projjson)) == 0)
+    }
+  }
+
   def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue => 
Unit): Unit = {
     val parquetFiles = new 
File(path).listFiles().filter(_.getName.endsWith(".parquet"))
     parquetFiles.foreach { filePath =>

Reply via email to