This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 558ab9abae [GH-2376] Preserve CRS SRID from GeoParquet metadata when
reading geometries (#2661)
558ab9abae is described below
commit 558ab9abae56afe1da1d7ef2e3a4c827004d780c
Author: Jia Yu <[email protected]>
AuthorDate: Thu Feb 19 00:19:53 2026 -0700
[GH-2376] Preserve CRS SRID from GeoParquet metadata when reading
geometries (#2661)
---
pom.xml | 2 +-
.../geoparquet/GeoParquetMetaData.scala | 54 +++++
.../geoparquet/GeoParquetRowConverter.scala | 4 +
.../geoparquet/GeoParquetSchemaConverter.scala | 23 ++
.../org/apache/sedona/sql/geoparquetIOTests.scala | 241 +++++++++++++++++++++
5 files changed, 323 insertions(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index b8025399e6..4e8ec472e8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,7 +96,7 @@
<scala-collection-compat.version>2.5.0</scala-collection-compat.version>
<geoglib.version>1.52</geoglib.version>
<caffeine.version>2.9.2</caffeine.version>
- <proj4sedona.version>0.0.4</proj4sedona.version>
+ <proj4sedona.version>0.0.5</proj4sedona.version>
<geotools.scope>provided</geotools.scope>
<!-- Because it's not in Maven central, make it provided by default -->
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
index 982c0523d4..1f2ad3ace7 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
@@ -18,7 +18,10 @@
*/
package org.apache.spark.sql.execution.datasources.geoparquet
+import scala.util.control.NonFatal
+
import org.apache.spark.sql.types.{DoubleType, FloatType, StructType}
+import org.datasyslab.proj4sedona.core.Proj
import org.json4s.jackson.JsonMethods.parse
import org.json4s.jackson.compactJson
import org.json4s.{DefaultFormats, Extraction, JField, JNothing, JNull,
JObject, JValue}
@@ -137,6 +140,57 @@ object GeoParquetMetaData {
compactJson(serializedGeoObject)
}
+ /**
+ * Default SRID for GeoParquet files where the CRS field is omitted. Per the
GeoParquet spec,
+ * omitting the CRS implies OGC:CRS84, which is equivalent to EPSG:4326.
+ */
+ val DEFAULT_SRID: Int = 4326
+
+ /**
+ * Extract SRID from a GeoParquet CRS metadata value.
+ *
+ * Per the GeoParquet specification:
+ * - If the CRS field is absent (None), the CRS is OGC:CRS84 (EPSG:4326).
+ * - If the CRS field is explicitly null, the CRS is unknown (SRID 0).
+ * - If the CRS field is a PROJJSON object with an "id" containing
"authority" and "code", the
+ * EPSG code is used as the SRID.
+ *
+ * @param crs
+ * The CRS field from GeoParquet column metadata.
+ * @return
+ * The SRID corresponding to the CRS. Returns 4326 for omitted CRS, 0 for
null or unrecognized
+ * CRS, and the EPSG code for PROJJSON with an EPSG identifier.
+ */
+ def extractSridFromCrs(crs: Option[JValue]): Int = {
+ crs match {
+ case None =>
+ // CRS omitted: default to OGC:CRS84 (EPSG:4326) per GeoParquet spec
+ DEFAULT_SRID
+ case Some(JNull) =>
+ // CRS explicitly null: unknown CRS
+ 0
+ case Some(projjson) =>
+ // Use proj4sedona to extract authority and code from PROJJSON
+ try {
+ val jsonStr = compactJson(projjson)
+ val result = new Proj(jsonStr).toAuthority()
+ if (result != null && result.length == 2) {
+ result(0) match {
+ case "EPSG" =>
+ try { result(1).toInt }
+ catch { case _: NumberFormatException => 0 }
+ case "OGC" if result(1) == "CRS84" => DEFAULT_SRID
+ case _ => 0
+ }
+ } else {
+ 0
+ }
+ } catch {
+ case NonFatal(_) => 0
+ }
+ }
+ }
+
def createCoveringColumnMetadata(coveringColumnName: String, schema:
StructType): Covering = {
val coveringColumnIndex = schema.fieldIndex(coveringColumnName)
schema(coveringColumnIndex).dataType match {
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetRowConverter.scala
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetRowConverter.scala
index 489899f44a..67e9669dd4 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetRowConverter.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetRowConverter.scala
@@ -215,11 +215,14 @@ private[geoparquet] class GeoParquetRowConverter(
new ParquetPrimitiveConverter(updater)
case GeometryUDT =>
+ // Compute SRID once per column converter, not per row
+ val srid = schemaConverter.getSrid(parquetType.getName)
if (parquetType.isPrimitive) {
new ParquetPrimitiveConverter(updater) {
override def addBinary(value: Binary): Unit = {
val wkbReader = new WKBReader()
val geom = wkbReader.read(value.getBytes)
+ geom.setSRID(srid)
this.updater.set(GeometryUDT.serialize(geom))
}
}
@@ -233,6 +236,7 @@ private[geoparquet] class GeoParquetRowConverter(
val wkbReader = new WKBReader()
val byteArray = currentArray.map(_.asInstanceOf[Byte]).toArray
val geom = wkbReader.read(byteArray)
+ geom.setSRID(srid)
this.updater.set(GeometryUDT.serialize(geom))
}
}
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSchemaConverter.scala
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSchemaConverter.scala
index 34b9f44ec5..f61d2b2cc4 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSchemaConverter.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSchemaConverter.scala
@@ -57,6 +57,14 @@ class GeoParquetToSparkSchemaConverter(
private val geoParquetMetaData: GeoParquetMetaData =
GeoParquetUtils.parseGeoParquetMetaData(keyValueMetaData, parameters)
+ /**
+ * Cached SRID values per geometry column, computed once from CRS metadata
to avoid repeated
+ * PROJJSON parsing during row conversion.
+ */
+ private val sridCache: Map[String, Int] = geoParquetMetaData.columns.map {
+ case (name, fieldMetadata) => name ->
GeoParquetMetaData.extractSridFromCrs(fieldMetadata.crs)
+ }
+
def this(
keyValueMetaData: java.util.Map[String, String],
conf: PortableSQLConf,
@@ -78,6 +86,21 @@ class GeoParquetToSparkSchemaConverter(
conf.get(PortableSQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean,
parameters = parameters)
+ /**
+ * Returns the SRID for a geometry column based on the CRS metadata in the
GeoParquet file. The
+ * result is served from a pre-computed cache to avoid repeated PROJJSON
parsing.
+ *
+ * @param columnName
+ * name of the geometry column
+ * @return
+ * the SRID extracted from the CRS metadata. Returns 4326 if the CRS is
omitted, 0 if the CRS
+ * is null or unrecognized, or the EPSG code if the PROJJSON contains an
EPSG identifier. If
+ * the column name is not found in the GeoParquet metadata, returns 0.
+ */
+ def getSrid(columnName: String): Int = {
+ sridCache.getOrElse(columnName, 0)
+ }
+
/**
* Returns true if TIMESTAMP_NTZ type is enabled in this
ParquetToSparkSchemaConverter.
*/
diff --git
a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index 7628c9a6da..63cd87971e 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -572,6 +572,194 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
}
}
+ it("GeoParquet read should set SRID from PROJJSON CRS with EPSG
identifier") {
+ val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation4)
+ val projjson =
+ """
+ |{
+ | "$schema": "https://proj.org/schemas/v0.4/projjson.schema.json",
+ | "type": "GeographicCRS",
+ | "name": "NAD83(2011)",
+ | "datum": {
+ | "type": "GeodeticReferenceFrame",
+ | "name": "NAD83 (National Spatial Reference System 2011)",
+ | "ellipsoid": {
+ | "name": "GRS 1980",
+ | "semi_major_axis": 6378137,
+ | "inverse_flattening": 298.257222101
+ | }
+ | },
+ | "coordinate_system": {
+ | "subtype": "ellipsoidal",
+ | "axis": [
+ | {
+ | "name": "Geodetic latitude",
+ | "abbreviation": "Lat",
+ | "direction": "north",
+ | "unit": "degree"
+ | },
+ | {
+ | "name": "Geodetic longitude",
+ | "abbreviation": "Lon",
+ | "direction": "east",
+ | "unit": "degree"
+ | }
+ | ]
+ | },
+ | "id": {
+ | "authority": "EPSG",
+ | "code": 6318
+ | }
+ |}
+ |""".stripMargin
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_srid_epsg.parquet"
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.crs", projjson)
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+ val geoms = df2.select("geometry").collect().map(_.getAs[Geometry](0))
+ assert(geoms.nonEmpty)
+ geoms.foreach { geom =>
+ assert(geom.getSRID == 6318, s"Expected SRID 6318, got
${geom.getSRID}")
+ }
+ }
+
+ it("GeoParquet read should set SRID 4326 when CRS is omitted") {
+ val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation4)
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_srid_omit.parquet"
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.crs", "")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+ val geoms = df2.select("geometry").collect().map(_.getAs[Geometry](0))
+ assert(geoms.nonEmpty)
+ geoms.foreach { geom =>
+ assert(geom.getSRID == 4326, s"Expected SRID 4326 for omitted CRS, got
${geom.getSRID}")
+ }
+ }
+
+ it("GeoParquet read should set SRID 0 when CRS is null") {
+ val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation4)
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_srid_null.parquet"
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.crs", "null")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+ val geoms = df2.select("geometry").collect().map(_.getAs[Geometry](0))
+ assert(geoms.nonEmpty)
+ geoms.foreach { geom =>
+ assert(geom.getSRID == 0, s"Expected SRID 0 for null CRS, got
${geom.getSRID}")
+ }
+ }
+
+ it("GeoParquet read should set SRID 0 for CRS without EPSG identifier") {
+ val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation4)
+ // A PROJJSON without the "id" field
+ val projjsonNoId =
+ """
+ |{
+ | "$schema": "https://proj.org/schemas/v0.4/projjson.schema.json",
+ | "type": "GeographicCRS",
+ | "name": "Unknown CRS",
+ | "datum": {
+ | "type": "GeodeticReferenceFrame",
+ | "name": "Unknown datum",
+ | "ellipsoid": {
+ | "name": "GRS 1980",
+ | "semi_major_axis": 6378137,
+ | "inverse_flattening": 298.257222101
+ | }
+ | },
+ | "coordinate_system": {
+ | "subtype": "ellipsoidal",
+ | "axis": [
+ | {
+ | "name": "Geodetic latitude",
+ | "abbreviation": "Lat",
+ | "direction": "north",
+ | "unit": "degree"
+ | },
+ | {
+ | "name": "Geodetic longitude",
+ | "abbreviation": "Lon",
+ | "direction": "east",
+ | "unit": "degree"
+ | }
+ | ]
+ | }
+ |}
+ |""".stripMargin
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_srid_no_id.parquet"
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.crs", projjsonNoId)
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+ val geoms = df2.select("geometry").collect().map(_.getAs[Geometry](0))
+ assert(geoms.nonEmpty)
+ geoms.foreach { geom =>
+ assert(geom.getSRID == 0, s"Expected SRID 0 for CRS without id, got
${geom.getSRID}")
+ }
+ }
+
+ it("GeoParquet read should set per-column SRID from different CRSes") {
+ val wktReader = new WKTReader()
+ val testData = Seq(
+ Row(
+ 1,
+ wktReader.read("POINT (1 2)"),
+ wktReader.read("POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))")))
+ val schema = StructType(
+ Seq(
+ StructField("id", IntegerType, nullable = false),
+ StructField("g0", GeometryUDT(), nullable = false),
+ StructField("g1", GeometryUDT(), nullable = false)))
+ val df = sparkSession.createDataFrame(testData.asJava,
schema).repartition(1)
+
+ // g0: EPSG:4326, g1: EPSG:32632
+ val projjson4326 =
+ """{"type":"GeographicCRS","name":"WGS
84","id":{"authority":"EPSG","code":4326}}"""
+ val projjson32632 =
+ """{"type":"ProjectedCRS","name":"WGS 84 / UTM zone 32N",""" +
+ """"base_crs":{"name":"WGS
84","datum":{"type":"GeodeticReferenceFrame",""" +
+ """"name":"World Geodetic System 1984",""" +
+ """"ellipsoid":{"name":"WGS
84","semi_major_axis":6378137,"inverse_flattening":298.257223563}}},""" +
+ """"conversion":{"name":"UTM zone 32N","method":{"name":"Transverse
Mercator"},""" +
+ """"parameters":[{"name":"Latitude of natural origin","value":0},"""
+
+ """{"name":"Longitude of natural origin","value":9},""" +
+ """{"name":"Scale factor at natural origin","value":0.9996},""" +
+ """{"name":"False easting","value":500000},""" +
+ """{"name":"False northing","value":0}]},""" +
+ """"coordinate_system":{"subtype":"Cartesian","axis":[""" +
+ """{"name":"Easting","direction":"east","unit":"metre"},""" +
+ """{"name":"Northing","direction":"north","unit":"metre"}]},""" +
+ """"id":{"authority":"EPSG","code":32632}}"""
+
+ val geoParquetSavePath =
+ geoparquetoutputlocation + "/gp_srid_multi_column.parquet"
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.crs", projjson4326)
+ .option("geoparquet.crs.g1", projjson32632)
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+
+ val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+ val rows = df2.collect()
+ assert(rows.length == 1)
+ val g0 = rows(0).getAs[Geometry](rows(0).fieldIndex("g0"))
+ val g1 = rows(0).getAs[Geometry](rows(0).fieldIndex("g1"))
+ assert(g0.getSRID == 4326, s"Expected g0 SRID 4326, got ${g0.getSRID}")
+ assert(g1.getSRID == 32632, s"Expected g1 SRID 32632, got ${g1.getSRID}")
+ }
+
it("GeoParquet load should raise exception when loading plain parquet
files") {
val e = intercept[SparkException] {
sparkSession.read.format("geoparquet").load(resourceFolder +
"geoparquet/plain.parquet")
@@ -885,6 +1073,59 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
}
}
+ describe("GeoParquetMetaData.extractSridFromCrs") {
+ it("should return 4326 when CRS is omitted (None)") {
+ assert(GeoParquetMetaData.extractSridFromCrs(None) == 4326)
+ }
+
+ it("should return 0 when CRS is null") {
+ assert(GeoParquetMetaData.extractSridFromCrs(Some(org.json4s.JNull)) ==
0)
+ }
+
+ it("should extract EPSG code from PROJJSON") {
+ val projjson =
+
parseJson("""{"type":"GeographicCRS","id":{"authority":"EPSG","code":4326}}""")
+ assert(GeoParquetMetaData.extractSridFromCrs(Some(projjson)) == 4326)
+ }
+
+ it("should extract non-4326 EPSG code") {
+ // Use a complete ProjectedCRS PROJJSON so proj4sedona can parse it
+ val projjson = parseJson(
+ """{"type":"ProjectedCRS","name":"WGS 84 / UTM zone 32N",""" +
+ """"base_crs":{"name":"WGS
84","datum":{"type":"GeodeticReferenceFrame",""" +
+ """"name":"World Geodetic System 1984",""" +
+ """"ellipsoid":{"name":"WGS
84","semi_major_axis":6378137,"inverse_flattening":298.257223563}}},""" +
+ """"conversion":{"name":"UTM zone 32N","method":{"name":"Transverse
Mercator"},""" +
+ """"parameters":[{"name":"Latitude of natural origin","value":0},"""
+
+ """{"name":"Longitude of natural origin","value":9},""" +
+ """{"name":"Scale factor at natural origin","value":0.9996},""" +
+ """{"name":"False easting","value":500000},""" +
+ """{"name":"False northing","value":0}]},""" +
+ """"coordinate_system":{"subtype":"Cartesian","axis":[""" +
+ """{"name":"Easting","direction":"east","unit":"metre"},""" +
+ """{"name":"Northing","direction":"north","unit":"metre"}]},""" +
+ """"id":{"authority":"EPSG","code":32632}}""")
+ assert(GeoParquetMetaData.extractSridFromCrs(Some(projjson)) == 32632)
+ }
+
+ it("should return 4326 for OGC:CRS84") {
+ val projjson =
+
parseJson("""{"type":"GeographicCRS","id":{"authority":"OGC","code":"CRS84"}}""")
+ assert(GeoParquetMetaData.extractSridFromCrs(Some(projjson)) == 4326)
+ }
+
+ it("should return 0 for CRS without id field") {
+ val projjson = parseJson("""{"type":"GeographicCRS","name":"Unknown"}""")
+ assert(GeoParquetMetaData.extractSridFromCrs(Some(projjson)) == 0)
+ }
+
+ it("should return 0 for CRS with non-EPSG authority") {
+ val projjson =
+
parseJson("""{"type":"GeographicCRS","id":{"authority":"IAU","code":49900}}""")
+ assert(GeoParquetMetaData.extractSridFromCrs(Some(projjson)) == 0)
+ }
+ }
+
def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue =>
Unit): Unit = {
val parquetFiles = new
File(path).listFiles().filter(_.getName.endsWith(".parquet"))
parquetFiles.foreach { filePath =>