This is an automated email from the ASF dual-hosted git repository. jiayu pushed a commit to branch fix/geoparquet-preserve-crs-srid-2376 in repository https://gitbox.apache.org/repos/asf/sedona.git
commit dc198250de4de92fbac1398ea53e81181c2a6050 Author: Jia Yu <[email protected]> AuthorDate: Wed Feb 18 21:51:30 2026 -0800 [GH-2376] Preserve CRS SRID from GeoParquet metadata when reading geometries GeoParquet reader now extracts the SRID from CRS metadata (PROJJSON) and sets it on deserialized geometries. Previously all geometries were given SRID=0 regardless of the CRS declared in the file metadata. Uses proj4sedona's Proj.toAuthority() to parse PROJJSON and extract the authority/code pair. Bumps proj4sedona dependency from 0.0.4 to 0.0.5. --- pom.xml | 2 +- .../geoparquet/GeoParquetMetaData.scala | 52 +++++ .../geoparquet/GeoParquetRowConverter.scala | 4 + .../geoparquet/GeoParquetSchemaConverter.scala | 15 ++ .../org/apache/sedona/sql/geoparquetIOTests.scala | 241 +++++++++++++++++++++ 5 files changed, 313 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b8025399e6..4e8ec472e8 100644 --- a/pom.xml +++ b/pom.xml @@ -96,7 +96,7 @@ <scala-collection-compat.version>2.5.0</scala-collection-compat.version> <geoglib.version>1.52</geoglib.version> <caffeine.version>2.9.2</caffeine.version> - <proj4sedona.version>0.0.4</proj4sedona.version> + <proj4sedona.version>0.0.5</proj4sedona.version> <geotools.scope>provided</geotools.scope> <!-- Because it's not in Maven central, make it provided by default --> diff --git a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala index 982c0523d4..f0423c967a 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.geoparquet import org.apache.spark.sql.types.{DoubleType, FloatType, StructType} +import org.datasyslab.proj4sedona.core.Proj import org.json4s.jackson.JsonMethods.parse import org.json4s.jackson.compactJson import org.json4s.{DefaultFormats, Extraction, JField, JNothing, JNull, JObject, JValue} @@ -137,6 +138,57 @@ object GeoParquetMetaData { compactJson(serializedGeoObject) } + /** + * Default SRID for GeoParquet files where the CRS field is omitted. Per the GeoParquet spec, + * omitting the CRS implies OGC:CRS84, which is equivalent to EPSG:4326. + */ + val DEFAULT_SRID: Int = 4326 + + /** + * Extract SRID from a GeoParquet CRS metadata value. + * + * Per the GeoParquet specification: + * - If the CRS field is absent (None), the CRS is OGC:CRS84 (EPSG:4326). + * - If the CRS field is explicitly null, the CRS is unknown (SRID 0). + * - If the CRS field is a PROJJSON object with an "id" containing "authority" and "code", the + * EPSG code is used as the SRID. + * + * @param crs + * The CRS field from GeoParquet column metadata. + * @return + * The SRID corresponding to the CRS. Returns 4326 for omitted CRS, 0 for null or unrecognized + * CRS, and the EPSG code for PROJJSON with an EPSG identifier. + */ + def extractSridFromCrs(crs: Option[JValue]): Int = { + crs match { + case None => + // CRS omitted: default to OGC:CRS84 (EPSG:4326) per GeoParquet spec + DEFAULT_SRID + case Some(JNull) => + // CRS explicitly null: unknown CRS + 0 + case Some(projjson) => + // Use proj4sedona to extract authority and code from PROJJSON + try { + val jsonStr = compactJson(projjson) + val result = new Proj(jsonStr).toAuthority() + if (result != null && result.length == 2) { + result(0) match { + case "EPSG" => + try { result(1).toInt } + catch { case _: NumberFormatException => 0 } + case "OGC" if result(1) == "CRS84" => 4326 + case _ => 0 + } + } else { + 0 + } + } catch { + case _: Exception => 0 + } + } + } + def createCoveringColumnMetadata(coveringColumnName: String, schema: StructType): Covering = { val coveringColumnIndex = schema.fieldIndex(coveringColumnName) schema(coveringColumnIndex).dataType match { diff --git a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetRowConverter.scala b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetRowConverter.scala index 489899f44a..e197ee0c86 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetRowConverter.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetRowConverter.scala @@ -220,6 +220,8 @@ private[geoparquet] class GeoParquetRowConverter( override def addBinary(value: Binary): Unit = { val wkbReader = new WKBReader() val geom = wkbReader.read(value.getBytes) + val srid = schemaConverter.getSrid(parquetType.getName) + geom.setSRID(srid) this.updater.set(GeometryUDT.serialize(geom)) } } @@ -233,6 +235,8 @@ private[geoparquet] class GeoParquetRowConverter( val wkbReader = new WKBReader() val byteArray = currentArray.map(_.asInstanceOf[Byte]).toArray val geom = wkbReader.read(byteArray) + val srid = schemaConverter.getSrid(parquetType.getName) + geom.setSRID(srid) this.updater.set(GeometryUDT.serialize(geom)) } } diff --git a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSchemaConverter.scala b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSchemaConverter.scala index 34b9f44ec5..dccf7a1c01 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSchemaConverter.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetSchemaConverter.scala @@ -78,6 +78,21 @@ class GeoParquetToSparkSchemaConverter( conf.get(PortableSQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key).toBoolean, parameters = parameters) + /** + * Returns the SRID for a geometry column based on the CRS metadata in the GeoParquet file. + * @param columnName + * name of the geometry column + * @return + * the SRID extracted from the CRS metadata. Returns 4326 if the CRS is omitted, 0 if the CRS + * is null or unrecognized, or the EPSG code if the PROJJSON contains an EPSG identifier. + */ + def getSrid(columnName: String): Int = { + geoParquetMetaData.columns.get(columnName) match { + case Some(fieldMetadata) => GeoParquetMetaData.extractSridFromCrs(fieldMetadata.crs) + case None => 0 + } + } + /** * Returns true if TIMESTAMP_NTZ type is enabled in this ParquetToSparkSchemaConverter. */ diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala index 7628c9a6da..63cd87971e 100644 --- a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala +++ b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala @@ -572,6 +572,194 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { } } + it("GeoParquet read should set SRID from PROJJSON CRS with EPSG identifier") { + val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation4) + val projjson = + """ + |{ + | "$schema": "https://proj.org/schemas/v0.4/projjson.schema.json", + | "type": "GeographicCRS", + | "name": "NAD83(2011)", + | "datum": { + | "type": "GeodeticReferenceFrame", + | "name": "NAD83 (National Spatial Reference System 2011)", + | "ellipsoid": { + | "name": "GRS 1980", + | "semi_major_axis": 6378137, + | "inverse_flattening": 298.257222101 + | } + | }, + | "coordinate_system": { + | "subtype": "ellipsoidal", + | "axis": [ + | { + | "name": "Geodetic latitude", + | "abbreviation": "Lat", + | "direction": "north", + | "unit": "degree" + | }, + | { + | "name": "Geodetic longitude", + | "abbreviation": "Lon", + | "direction": "east", + | "unit": "degree" + | } + | ] + | }, + | "id": { + | "authority": "EPSG", + | "code": 6318 + | } + |} + |""".stripMargin + val geoParquetSavePath = geoparquetoutputlocation + "/gp_srid_epsg.parquet" + df.write + .format("geoparquet") + .option("geoparquet.crs", projjson) + .mode("overwrite") + .save(geoParquetSavePath) + val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath) + val geoms = df2.select("geometry").collect().map(_.getAs[Geometry](0)) + assert(geoms.nonEmpty) + geoms.foreach { geom => + assert(geom.getSRID == 6318, s"Expected SRID 6318, got ${geom.getSRID}") + } + } + + it("GeoParquet read should set SRID 4326 when CRS is omitted") { + val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation4) + val geoParquetSavePath = geoparquetoutputlocation + "/gp_srid_omit.parquet" + df.write + .format("geoparquet") + .option("geoparquet.crs", "") + .mode("overwrite") + .save(geoParquetSavePath) + val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath) + val geoms = df2.select("geometry").collect().map(_.getAs[Geometry](0)) + assert(geoms.nonEmpty) + geoms.foreach { geom => + assert(geom.getSRID == 4326, s"Expected SRID 4326 for omitted CRS, got ${geom.getSRID}") + } + } + + it("GeoParquet read should set SRID 0 when CRS is null") { + val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation4) + val geoParquetSavePath = geoparquetoutputlocation + "/gp_srid_null.parquet" + df.write + .format("geoparquet") + .option("geoparquet.crs", "null") + .mode("overwrite") + .save(geoParquetSavePath) + val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath) + val geoms = df2.select("geometry").collect().map(_.getAs[Geometry](0)) + assert(geoms.nonEmpty) + geoms.foreach { geom => + assert(geom.getSRID == 0, s"Expected SRID 0 for null CRS, got ${geom.getSRID}") + } + } + + it("GeoParquet read should set SRID 0 for CRS without EPSG identifier") { + val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation4) + // A PROJJSON without the "id" field + val projjsonNoId = + """ + |{ + | "$schema": "https://proj.org/schemas/v0.4/projjson.schema.json", + | "type": "GeographicCRS", + | "name": "Unknown CRS", + | "datum": { + | "type": "GeodeticReferenceFrame", + | "name": "Unknown datum", + | "ellipsoid": { + | "name": "GRS 1980", + | "semi_major_axis": 6378137, + | "inverse_flattening": 298.257222101 + | } + | }, + | "coordinate_system": { + | "subtype": "ellipsoidal", + | "axis": [ + | { + | "name": "Geodetic latitude", + | "abbreviation": "Lat", + | "direction": "north", + | "unit": "degree" + | }, + | { + | "name": "Geodetic longitude", + | "abbreviation": "Lon", + | "direction": "east", + | "unit": "degree" + | } + | ] + | } + |} + |""".stripMargin + val geoParquetSavePath = geoparquetoutputlocation + "/gp_srid_no_id.parquet" + df.write + .format("geoparquet") + .option("geoparquet.crs", projjsonNoId) + .mode("overwrite") + .save(geoParquetSavePath) + val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath) + val geoms = df2.select("geometry").collect().map(_.getAs[Geometry](0)) + assert(geoms.nonEmpty) + geoms.foreach { geom => + assert(geom.getSRID == 0, s"Expected SRID 0 for CRS without id, got ${geom.getSRID}") + } + } + + it("GeoParquet read should set per-column SRID from different CRSes") { + val wktReader = new WKTReader() + val testData = Seq( + Row( + 1, + wktReader.read("POINT (1 2)"), + wktReader.read("POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))"))) + val schema = StructType( + Seq( + StructField("id", IntegerType, nullable = false), + StructField("g0", GeometryUDT(), nullable = false), + StructField("g1", GeometryUDT(), nullable = false))) + val df = sparkSession.createDataFrame(testData.asJava, schema).repartition(1) + + // g0: EPSG:4326, g1: EPSG:32632 + val projjson4326 = + """{"type":"GeographicCRS","name":"WGS 84","id":{"authority":"EPSG","code":4326}}""" + val projjson32632 = + """{"type":"ProjectedCRS","name":"WGS 84 / UTM zone 32N",""" + + """"base_crs":{"name":"WGS 84","datum":{"type":"GeodeticReferenceFrame",""" + + """"name":"World Geodetic System 1984",""" + + """"ellipsoid":{"name":"WGS 84","semi_major_axis":6378137,"inverse_flattening":298.257223563}}},""" + + """"conversion":{"name":"UTM zone 32N","method":{"name":"Transverse Mercator"},""" + + """"parameters":[{"name":"Latitude of natural origin","value":0},""" + + """{"name":"Longitude of natural origin","value":9},""" + + """{"name":"Scale factor at natural origin","value":0.9996},""" + + """{"name":"False easting","value":500000},""" + + """{"name":"False northing","value":0}]},""" + + """"coordinate_system":{"subtype":"Cartesian","axis":[""" + + """{"name":"Easting","direction":"east","unit":"metre"},""" + + """{"name":"Northing","direction":"north","unit":"metre"}]},""" + + """"id":{"authority":"EPSG","code":32632}}""" + + val geoParquetSavePath = + geoparquetoutputlocation + "/gp_srid_multi_column.parquet" + df.write + .format("geoparquet") + .option("geoparquet.crs", projjson4326) + .option("geoparquet.crs.g1", projjson32632) + .mode("overwrite") + .save(geoParquetSavePath) + + val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath) + val rows = df2.collect() + assert(rows.length == 1) + val g0 = rows(0).getAs[Geometry](rows(0).fieldIndex("g0")) + val g1 = rows(0).getAs[Geometry](rows(0).fieldIndex("g1")) + assert(g0.getSRID == 4326, s"Expected g0 SRID 4326, got ${g0.getSRID}") + assert(g1.getSRID == 32632, s"Expected g1 SRID 32632, got ${g1.getSRID}") + } + it("GeoParquet load should raise exception when loading plain parquet files") { val e = intercept[SparkException] { sparkSession.read.format("geoparquet").load(resourceFolder + "geoparquet/plain.parquet") @@ -885,6 +1073,59 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { } } + describe("GeoParquetMetaData.extractSridFromCrs") { + it("should return 4326 when CRS is omitted (None)") { + assert(GeoParquetMetaData.extractSridFromCrs(None) == 4326) + } + + it("should return 0 when CRS is null") { + assert(GeoParquetMetaData.extractSridFromCrs(Some(org.json4s.JNull)) == 0) + } + + it("should extract EPSG code from PROJJSON") { + val projjson = + parseJson("""{"type":"GeographicCRS","id":{"authority":"EPSG","code":4326}}""") + assert(GeoParquetMetaData.extractSridFromCrs(Some(projjson)) == 4326) + } + + it("should extract non-4326 EPSG code") { + // Use a complete ProjectedCRS PROJJSON so proj4sedona can parse it + val projjson = parseJson( + """{"type":"ProjectedCRS","name":"WGS 84 / UTM zone 32N",""" + + """"base_crs":{"name":"WGS 84","datum":{"type":"GeodeticReferenceFrame",""" + + """"name":"World Geodetic System 1984",""" + + """"ellipsoid":{"name":"WGS 84","semi_major_axis":6378137,"inverse_flattening":298.257223563}}},""" + + """"conversion":{"name":"UTM zone 32N","method":{"name":"Transverse Mercator"},""" + + """"parameters":[{"name":"Latitude of natural origin","value":0},""" + + """{"name":"Longitude of natural origin","value":9},""" + + """{"name":"Scale factor at natural origin","value":0.9996},""" + + """{"name":"False easting","value":500000},""" + + """{"name":"False northing","value":0}]},""" + + """"coordinate_system":{"subtype":"Cartesian","axis":[""" + + """{"name":"Easting","direction":"east","unit":"metre"},""" + + """{"name":"Northing","direction":"north","unit":"metre"}]},""" + + """"id":{"authority":"EPSG","code":32632}}""") + assert(GeoParquetMetaData.extractSridFromCrs(Some(projjson)) == 32632) + } + + it("should return 4326 for OGC:CRS84") { + val projjson = + parseJson("""{"type":"GeographicCRS","id":{"authority":"OGC","code":"CRS84"}}""") + assert(GeoParquetMetaData.extractSridFromCrs(Some(projjson)) == 4326) + } + + it("should return 0 for CRS without id field") { + val projjson = parseJson("""{"type":"GeographicCRS","name":"Unknown"}""") + assert(GeoParquetMetaData.extractSridFromCrs(Some(projjson)) == 0) + } + + it("should return 0 for CRS with non-EPSG authority") { + val projjson = + parseJson("""{"type":"GeographicCRS","id":{"authority":"IAU","code":49900}}""") + assert(GeoParquetMetaData.extractSridFromCrs(Some(projjson)) == 0) + } + } + def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue => Unit): Unit = { val parquetFiles = new File(path).listFiles().filter(_.getName.endsWith(".parquet")) parquetFiles.foreach { filePath =>
