This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new d51f92ebb [SEDONA-627] Write user specified column to the covering 
metadata of GeoParquet files (#1522)
d51f92ebb is described below

commit d51f92ebbdeb33fa9f7189c354bc473f0b25dad0
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Tue Jul 16 12:30:30 2024 +0800

    [SEDONA-627] Write user specified column to the covering metadata of 
GeoParquet files (#1522)
    
    * Support writing covering metadata to geoparquet files
    
    * Port covering support to Spark 3.4 and Spark 3.5
    
    * Update docs
    
    * Add example 1.1.0 geoparquet file
    
    * Document how to construct covering columns using ST functions
---
 docs/tutorial/sql.md                               |  27 +++++
 .../datasources/parquet/GeoParquetMetaData.scala   |  88 +++++++++++++++-
 .../resources/geoparquet/example-1.1.0.parquet     | Bin 0 -> 29838 bytes
 .../parquet/GeoParquetWriteSupport.scala           |  24 ++++-
 .../GeoParquetMetadataPartitionReaderFactory.scala |   8 +-
 .../metadata/GeoParquetMetadataTable.scala         |   3 +-
 .../sedona/sql/GeoParquetMetadataTests.scala       |  13 +++
 .../org/apache/sedona/sql/geoparquetIOTests.scala  | 111 ++++++++++++++++++++-
 .../parquet/GeoParquetWriteSupport.scala           |  24 ++++-
 .../GeoParquetMetadataPartitionReaderFactory.scala |   8 +-
 .../metadata/GeoParquetMetadataTable.scala         |   3 +-
 .../sedona/sql/GeoParquetMetadataTests.scala       |  13 +++
 .../org/apache/sedona/sql/geoparquetIOTests.scala  | 111 ++++++++++++++++++++-
 .../parquet/GeoParquetWriteSupport.scala           |  23 ++++-
 .../GeoParquetMetadataPartitionReaderFactory.scala |   8 +-
 .../metadata/GeoParquetMetadataTable.scala         |   3 +-
 .../sedona/sql/GeoParquetMetadataTests.scala       |  13 +++
 .../org/apache/sedona/sql/geoparquetIOTests.scala  | 111 ++++++++++++++++++++-
 18 files changed, 567 insertions(+), 24 deletions(-)

diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md
index f183a1270..0b617384b 100644
--- a/docs/tutorial/sql.md
+++ b/docs/tutorial/sql.md
@@ -1111,6 +1111,8 @@ Since v`1.3.0`, Sedona natively supports writing 
GeoParquet file. GeoParquet can
 df.write.format("geoparquet").save(geoparquetoutputlocation + 
"/GeoParquet_File_Name.parquet")
 ```
 
+### CRS Metadata
+
 Since v`1.5.1`, Sedona supports writing GeoParquet files with custom 
GeoParquet spec version and crs.
 The default GeoParquet spec version is `1.0.0` and the default crs is `null`. 
You can specify the GeoParquet spec version and crs as follows:
 
@@ -1146,6 +1148,31 @@ Its geoparquet writer will not leverage the SRID field 
of a geometry so you will
 
 Due to the same reason, Sedona geoparquet reader and writer do NOT check the 
axis order (lon/lat or lat/lon) and assume they are handled by the users 
themselves when writing / reading the files. You can always use 
[`ST_FlipCoordinates`](../api/sql/Function.md#st_flipcoordinates) to swap the 
axis order of your geometries.
 
+### Covering Metadata
+
+Since `v1.6.1`, Sedona supports writing the [`covering` 
field](https://github.com/opengeospatial/geoparquet/blob/v1.1.0/format-specs/geoparquet.md#covering)
 to geometry column metadata. The `covering` field specifies a bounding box 
column to help accelerate spatial data retrieval. The bounding box column 
should be a top-level struct column containing `xmin`, `ymin`, `xmax`, `ymax` 
columns. If the DataFrame you are writing contains such columns, you can 
specify `.option("geoparquet.coveri [...]
+
+```scala
+df.write.format("geoparquet")
+               .option("geoparquet.covering.geometry", "bbox")
+               .save("/path/to/saved_geoparquet.parquet")
+```
+
+If the DataFrame has only one geometry column, you can simply specify the 
`geoparquet.covering` option and omit the geometry column name:
+
+```scala
+df.write.format("geoparquet")
+               .option("geoparquet.covering", "bbox")
+               .save("/path/to/saved_geoparquet.parquet")
+```
+
+If the DataFrame does not have a covering column, you can construct one using 
Sedona's SQL functions:
+
+```scala
+val df_bbox = df.withColumn("bbox", expr("struct(ST_XMin(geometry) AS xmin, 
ST_YMin(geometry) AS ymin, ST_XMax(geometry) AS xmax, ST_YMax(geometry) AS 
ymax)"))
+df_bbox.write.format("geoparquet").option("geoparquet.covering.geometry", 
"bbox").save("/path/to/saved_geoparquet.parquet")
+```
+
 ## Sort then Save GeoParquet
 
 To maximize the performance of Sedona GeoParquet filter pushdown, we suggest 
that you sort the data by their geohash values (see 
[ST_GeoHash](../api/sql/Function.md#st_geohash)) and then save as a GeoParquet 
file. An example is as follows:
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
index ab84e764d..e67f2b3c4 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
@@ -18,6 +18,7 @@
  */
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.spark.sql.types.{DoubleType, FloatType, StructType}
 import org.json4s.jackson.JsonMethods.parse
 import org.json4s.jackson.compactJson
 import org.json4s.{DefaultFormats, Extraction, JField, JNothing, JNull, 
JObject, JValue}
@@ -33,12 +34,25 @@ import org.json4s.{DefaultFormats, Extraction, JField, 
JNothing, JNull, JObject,
  * @param crs
  *   The CRS of the geometries in the file. None if crs metadata is absent, 
Some(JNull) if crs is
  *   null, Some(value) if the crs is present and not null.
+ * @param covering
+ *   Object containing bounding box column names to help accelerate spatial 
data retrieval
  */
 case class GeometryFieldMetaData(
     encoding: String,
     geometryTypes: Seq[String],
     bbox: Seq[Double],
-    crs: Option[JValue] = None)
+    crs: Option[JValue] = None,
+    covering: Option[Covering] = None)
+
+case class Covering(bbox: CoveringBBox)
+
+case class CoveringBBox(
+    xmin: Seq[String],
+    ymin: Seq[String],
+    zmin: Option[Seq[String]],
+    xmax: Seq[String],
+    ymax: Seq[String],
+    zmax: Option[Seq[String]])
 
 /**
  * A case class that holds the metadata of GeoParquet file
@@ -55,9 +69,9 @@ case class GeoParquetMetaData(
     columns: Map[String, GeometryFieldMetaData])
 
 object GeoParquetMetaData {
-  // We're conforming to version 1.0.0 of the GeoParquet specification, please 
refer to
-  // https://geoparquet.org/releases/v1.0.0/ for more details.
-  val VERSION = "1.0.0"
+  // We're conforming to version 1.1.0 of the GeoParquet specification, please 
refer to
+  // https://geoparquet.org/releases/v1.1.0/ for more details.
+  val VERSION = "1.1.0"
 
   /**
    * Configuration key for overriding the version field in GeoParquet file 
metadata.
@@ -70,6 +84,14 @@ object GeoParquetMetaData {
    */
   val GEOPARQUET_CRS_KEY = "geoparquet.crs"
 
+  /**
+   * Configuration key prefix for setting the covering columns of the 
geometries in GeoParquet
+   * column metadata. The configuration key for geometry column named `x` is
+   * `geoparquet.covering.x`. If the parquet file contains only one geometry 
column, we can omit
+   * the column name and use `geoparquet.covering` directly.
+   */
+  val GEOPARQUET_COVERING_KEY = "geoparquet.covering"
+
   def parseKeyValueMetaData(
       keyValueMetaData: java.util.Map[String, String]): 
Option[GeoParquetMetaData] = {
     Option(keyValueMetaData.get("geo")).map { geo =>
@@ -114,4 +136,62 @@ object GeoParquetMetaData {
     }
     compactJson(serializedGeoObject)
   }
+
+  def createCoveringColumnMetadata(coveringColumnName: String, schema: 
StructType): Covering = {
+    val coveringColumnIndex = schema.fieldIndex(coveringColumnName)
+    schema(coveringColumnIndex).dataType match {
+      case coveringColumnType: StructType =>
+        coveringColumnTypeToCovering(coveringColumnName, coveringColumnType)
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Covering column $coveringColumnName is not a struct type")
+    }
+  }
+
+  private def coveringColumnTypeToCovering(
+      coveringColumnName: String,
+      coveringColumnType: StructType): Covering = {
+    def validateField(fieldName: String): Unit = {
+      val index = coveringColumnType.fieldIndex(fieldName)
+      val fieldType = coveringColumnType(index).dataType
+      if (fieldType != FloatType && fieldType != DoubleType) {
+        throw new IllegalArgumentException(
+          s"`$fieldName` in covering column `$coveringColumnName` is not float 
or double type")
+      }
+    }
+    // We only validate the existence and types of the fields here. Although 
the order of the fields is required to be
+    // xmin, ymin, [zmin], xmax, ymax, [zmax] (see 
https://github.com/opengeospatial/geoparquet/pull/202), but we don't
+    // validate it. The requirement on user provided covering column is quite 
lenient, it is up to the user to provide
+    // the covering column strictly follow the ordering requirement.
+    validateField("xmin")
+    validateField("ymin")
+    validateField("xmax")
+    validateField("ymax")
+    coveringColumnType.find(_.name == "zmin") match {
+      case Some(_) =>
+        validateField("zmin")
+        validateField("zmax")
+        Covering(
+          CoveringBBox(
+            Seq(coveringColumnName, "xmin"),
+            Seq(coveringColumnName, "ymin"),
+            Some(Seq(coveringColumnName, "zmin")),
+            Seq(coveringColumnName, "xmax"),
+            Seq(coveringColumnName, "ymax"),
+            Some(Seq(coveringColumnName, "zmax"))))
+      case None =>
+        if (coveringColumnType.fieldNames.contains("zmax")) {
+          throw new IllegalArgumentException(
+            s"zmax should not present in covering column `$coveringColumnName` 
since zmin is not present")
+        }
+        Covering(
+          CoveringBBox(
+            Seq(coveringColumnName, "xmin"),
+            Seq(coveringColumnName, "ymin"),
+            None,
+            Seq(coveringColumnName, "xmax"),
+            Seq(coveringColumnName, "ymax"),
+            None))
+    }
+  }
 }
diff --git a/spark/common/src/test/resources/geoparquet/example-1.1.0.parquet 
b/spark/common/src/test/resources/geoparquet/example-1.1.0.parquet
new file mode 100644
index 000000000..33e4cdcdd
Binary files /dev/null and 
b/spark/common/src/test/resources/geoparquet/example-1.1.0.parquet differ
diff --git 
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
 
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index 015a21121..90d6d962f 100644
--- 
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++ 
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -31,14 +31,13 @@ import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import 
org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_CRS_KEY,
 GEOPARQUET_VERSION_KEY, VERSION}
+import 
org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_COVERING_KEY,
 GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION, 
createCoveringColumnMetadata}
 import 
org.apache.spark.sql.execution.datasources.parquet.GeoParquetWriteSupport.GeometryColumnInfo
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.types._
 import org.json4s.{DefaultFormats, Extraction, JValue}
-import org.json4s.jackson.compactJson
 import org.json4s.jackson.JsonMethods.parse
 import org.locationtech.jts.geom.Geometry
 import org.locationtech.jts.io.WKBWriter
@@ -111,6 +110,7 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
   private var geoParquetVersion: Option[String] = None
   private var defaultGeoParquetCrs: Option[JValue] = None
   private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] = 
mutable.Map.empty
+  private val geoParquetColumnCoveringMap: mutable.Map[String, Covering] = 
mutable.Map.empty
 
   override def init(configuration: Configuration): WriteContext = {
     val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
@@ -155,6 +155,23 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
         case crs: String => geoParquetColumnCrsMap.put(name, Some(parse(crs)))
       }
     }
+    Option(configuration.get(GEOPARQUET_COVERING_KEY)).foreach { 
coveringColumnName =>
+      if (geometryColumnInfoMap.size > 1) {
+        throw new IllegalArgumentException(
+          s"$GEOPARQUET_COVERING_KEY is ambiguous when there are multiple 
geometry columns." +
+            s"Please specify $GEOPARQUET_COVERING_KEY.<columnName> for 
configured geometry column.")
+      }
+      val geometryColumnName = schema(geometryColumnInfoMap.keys.head).name
+      val covering = createCoveringColumnMetadata(coveringColumnName, schema)
+      geoParquetColumnCoveringMap.put(geometryColumnName, covering)
+    }
+    geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
+      Option(configuration.get(GEOPARQUET_COVERING_KEY + "." + name)).foreach {
+        coveringColumnName =>
+          val covering = createCoveringColumnMetadata(coveringColumnName, 
schema)
+          geoParquetColumnCoveringMap.put(name, covering)
+      }
+    }
 
     val messageType = new 
SparkToParquetSchemaConverter(configuration).convert(schema)
     val sparkSqlParquetRowMetadata = 
GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema)
@@ -203,7 +220,8 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
             columnInfo.bbox.maxY)
         } else Seq(0.0, 0.0, 0.0, 0.0)
         val crs = geoParquetColumnCrsMap.getOrElse(columnName, 
defaultGeoParquetCrs)
-        columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs)
+        val covering = geoParquetColumnCoveringMap.get(columnName)
+        columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs, 
covering)
       }.toMap
       val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion, 
primaryColumn, columns)
       val geoParquetMetadataJson = 
GeoParquetMetaData.toJson(geoParquetMetadata)
diff --git 
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
 
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index f7e17da70..1fe2faa2e 100644
--- 
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++ 
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.SerializableConfiguration
+import org.json4s.DefaultFormats
 import org.json4s.jackson.JsonMethods.{compact, render}
 
 case class GeoParquetMetadataPartitionReaderFactory(
@@ -74,13 +75,18 @@ object GeoParquetMetadataPartitionReaderFactory {
     val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
       case Some(geo) =>
         val geoColumnsMap = geo.columns.map { case (columnName, 
columnMetadata) =>
+          implicit val formats: org.json4s.Formats = DefaultFormats
+          import org.json4s.jackson.Serialization
           val columnMetadataFields: Array[Any] = Array(
             UTF8String.fromString(columnMetadata.encoding),
             new 
GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
             new GenericArrayData(columnMetadata.bbox.toArray),
             columnMetadata.crs
               .map(projjson => 
UTF8String.fromString(compact(render(projjson))))
-              .getOrElse(UTF8String.fromString("")))
+              .getOrElse(UTF8String.fromString("")),
+            columnMetadata.covering
+              .map(covering => 
UTF8String.fromString(Serialization.write(covering)))
+              .orNull)
           val columnMetadataStruct = new 
GenericInternalRow(columnMetadataFields)
           UTF8String.fromString(columnName) -> columnMetadataStruct
         }
diff --git 
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
 
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index ad1f91502..845764fae 100644
--- 
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++ 
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -56,7 +56,8 @@ object GeoParquetMetadataTable {
       StructField("encoding", StringType, nullable = true),
       StructField("geometry_types", ArrayType(StringType), nullable = true),
       StructField("bbox", ArrayType(DoubleType), nullable = true),
-      StructField("crs", StringType, nullable = true)))
+      StructField("crs", StringType, nullable = true),
+      StructField("covering", StringType, nullable = true)))
 
   private val columnsType = MapType(StringType, columnMetadataType, 
valueContainsNull = false)
 
diff --git 
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
 
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 8411e4dde..421890c70 100644
--- 
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++ 
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -135,5 +135,18 @@ class GeoParquetMetadataTests extends TestBaseScala with 
BeforeAndAfterAll {
       assert(!metadata.containsKey("geom_column2"))
       assert(!metadata.containsKey("geom_column_2"))
     }
+
+    it("Read GeoParquet with covering metadata") {
+      val dfMeta = sparkSession.read
+        .format("geoparquet.metadata")
+        .load(geoparquetdatalocation + "/example-1.1.0.parquet")
+      val row = dfMeta.collect()(0)
+      val metadata = 
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+      val covering = metadata.getAs[String]("covering")
+      assert(covering.nonEmpty)
+      Seq("bbox", "xmin", "ymin", "xmax", "ymax").foreach { key =>
+        assert(covering contains key)
+      }
+    }
   }
 }
diff --git 
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala 
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index 2e621af46..ccfd560c8 100644
--- 
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++ 
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -27,8 +27,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData
-import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport
+import org.apache.spark.sql.execution.datasources.parquet.{Covering, 
GeoParquetMetaData, ParquetReadSupport}
 import org.apache.spark.sql.functions.{col, expr}
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.sedona_sql.expressions.st_constructors.{ST_Point, 
ST_PolygonFromEnvelope}
@@ -51,6 +50,7 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
   val geoparquetdatalocation2: String = resourceFolder + 
"geoparquet/example2.parquet"
   val geoparquetdatalocation3: String = resourceFolder + 
"geoparquet/example3.parquet"
   val geoparquetdatalocation4: String = resourceFolder + 
"geoparquet/example-1.0.0-beta.1.parquet"
+  val geoparquetdatalocation5: String = resourceFolder + 
"geoparquet/example-1.1.0.parquet"
   val legacyparquetdatalocation: String =
     resourceFolder + "parquet/legacy-parquet-nested-columns.snappy.parquet"
   val geoparquetoutputlocation: String = resourceFolder + 
"geoparquet/geoparquet_output/"
@@ -158,6 +158,21 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
         assert(!sparkSqlRowMetadata.contains("GeometryUDT"))
       }
     }
+    it("GEOPARQUET Test example-1.1.0.parquet") {
+      val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation5)
+      val count = df.count()
+      val rows = df.collect()
+      assert(rows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
+      assert(count == rows.length)
+
+      val geoParquetSavePath = geoparquetoutputlocation + "/gp_sample5.parquet"
+      
df.write.format("geoparquet").mode(SaveMode.Overwrite).save(geoParquetSavePath)
+      val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+      val newRows = df2.collect()
+      assert(rows.length == newRows.length)
+      assert(newRows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
+      assert(rows sameElements newRows)
+    }
 
     it("GeoParquet with multiple geometry columns") {
       val wktReader = new WKTReader()
@@ -623,6 +638,98 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
         assert(structGeom.getAs[AnyRef]("g1").isInstanceOf[Geometry])
       }
     }
+
+    it("GeoParquet supports writing covering metadata") {
+      val df = sparkSession
+        .range(0, 100)
+        .toDF("id")
+        .withColumn("id", expr("CAST(id AS DOUBLE)"))
+        .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+        .withColumn(
+          "test_cov",
+          expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS 
ymax)"))
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_with_covering_metadata.parquet"
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.covering", "test_cov")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+        val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+        val covering = coveringJsValue.extract[Covering]
+        assert(covering.bbox.xmin == Seq("test_cov", "xmin"))
+        assert(covering.bbox.ymin == Seq("test_cov", "ymin"))
+        assert(covering.bbox.xmax == Seq("test_cov", "xmax"))
+        assert(covering.bbox.ymax == Seq("test_cov", "ymax"))
+      }
+
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.covering.geometry", "test_cov")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+        val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+        val covering = coveringJsValue.extract[Covering]
+        assert(covering.bbox.xmin == Seq("test_cov", "xmin"))
+        assert(covering.bbox.ymin == Seq("test_cov", "ymin"))
+        assert(covering.bbox.xmax == Seq("test_cov", "xmax"))
+        assert(covering.bbox.ymax == Seq("test_cov", "ymax"))
+      }
+    }
+
+    it("GeoParquet supports writing covering metadata for multiple columns") {
+      val df = sparkSession
+        .range(0, 100)
+        .toDF("id")
+        .withColumn("id", expr("CAST(id AS DOUBLE)"))
+        .withColumn("geom1", expr("ST_Point(id, id + 1)"))
+        .withColumn(
+          "test_cov1",
+          expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS 
ymax)"))
+        .withColumn("geom2", expr("ST_Point(10 * id, 10 * id + 1)"))
+        .withColumn(
+          "test_cov2",
+          expr(
+            "struct(10 * id AS xmin, 10 * id + 1 AS ymin, 10 * id AS xmax, 10 
* id + 1 AS ymax)"))
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_with_covering_metadata.parquet"
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.covering.geom1", "test_cov1")
+        .option("geoparquet.covering.geom2", "test_cov2")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+        Seq(("geom1", "test_cov1"), ("geom2", "test_cov2")).foreach {
+          case (geomName, coveringName) =>
+            val coveringJsValue = geo \ "columns" \ geomName \ "covering"
+            val covering = coveringJsValue.extract[Covering]
+            assert(covering.bbox.xmin == Seq(coveringName, "xmin"))
+            assert(covering.bbox.ymin == Seq(coveringName, "ymin"))
+            assert(covering.bbox.xmax == Seq(coveringName, "xmax"))
+            assert(covering.bbox.ymax == Seq(coveringName, "ymax"))
+        }
+      }
+
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.covering.geom2", "test_cov2")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+        assert(geo \ "columns" \ "geom1" \ "covering" == org.json4s.JNothing)
+        val coveringJsValue = geo \ "columns" \ "geom2" \ "covering"
+        val covering = coveringJsValue.extract[Covering]
+        assert(covering.bbox.xmin == Seq("test_cov2", "xmin"))
+        assert(covering.bbox.ymin == Seq("test_cov2", "ymin"))
+        assert(covering.bbox.xmax == Seq("test_cov2", "xmax"))
+        assert(covering.bbox.ymax == Seq("test_cov2", "ymax"))
+      }
+    }
   }
 
   def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue => 
Unit): Unit = {
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index e396bb658..3a6a89773 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -31,14 +31,13 @@ import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import 
org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_CRS_KEY,
 GEOPARQUET_VERSION_KEY, VERSION}
+import 
org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_COVERING_KEY,
 GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION, 
createCoveringColumnMetadata}
 import 
org.apache.spark.sql.execution.datasources.parquet.GeoParquetWriteSupport.GeometryColumnInfo
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.types._
 import org.json4s.{DefaultFormats, Extraction, JValue}
-import org.json4s.jackson.compactJson
 import org.json4s.jackson.JsonMethods.parse
 import org.locationtech.jts.geom.Geometry
 import org.locationtech.jts.io.WKBWriter
@@ -111,6 +110,7 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
   private var geoParquetVersion: Option[String] = None
   private var defaultGeoParquetCrs: Option[JValue] = None
   private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] = 
mutable.Map.empty
+  private val geoParquetColumnCoveringMap: mutable.Map[String, Covering] = 
mutable.Map.empty
 
   override def init(configuration: Configuration): WriteContext = {
     val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
@@ -155,6 +155,23 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
         case crs: String => geoParquetColumnCrsMap.put(name, Some(parse(crs)))
       }
     }
+    Option(configuration.get(GEOPARQUET_COVERING_KEY)).foreach { 
coveringColumnName =>
+      if (geometryColumnInfoMap.size > 1) {
+        throw new IllegalArgumentException(
+          s"$GEOPARQUET_COVERING_KEY is ambiguous when there are multiple 
geometry columns." +
+            s"Please specify $GEOPARQUET_COVERING_KEY.<columnName> for 
configured geometry column.")
+      }
+      val geometryColumnName = schema(geometryColumnInfoMap.keys.head).name
+      val covering = createCoveringColumnMetadata(coveringColumnName, schema)
+      geoParquetColumnCoveringMap.put(geometryColumnName, covering)
+    }
+    geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
+      Option(configuration.get(GEOPARQUET_COVERING_KEY + "." + name)).foreach {
+        coveringColumnName =>
+          val covering = createCoveringColumnMetadata(coveringColumnName, 
schema)
+          geoParquetColumnCoveringMap.put(name, covering)
+      }
+    }
 
     val messageType = new 
SparkToParquetSchemaConverter(configuration).convert(schema)
     val sparkSqlParquetRowMetadata = 
GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema)
@@ -203,7 +220,8 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
             columnInfo.bbox.maxY)
         } else Seq(0.0, 0.0, 0.0, 0.0)
         val crs = geoParquetColumnCrsMap.getOrElse(columnName, 
defaultGeoParquetCrs)
-        columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs)
+        val covering = geoParquetColumnCoveringMap.get(columnName)
+        columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs, 
covering)
       }.toMap
       val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion, 
primaryColumn, columns)
       val geoParquetMetadataJson = 
GeoParquetMetaData.toJson(geoParquetMetadata)
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index a37fa65ac..2a5e70624 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.SerializableConfiguration
+import org.json4s.DefaultFormats
 import org.json4s.jackson.JsonMethods.{compact, render}
 
 case class GeoParquetMetadataPartitionReaderFactory(
@@ -74,13 +75,18 @@ object GeoParquetMetadataPartitionReaderFactory {
     val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
       case Some(geo) =>
         val geoColumnsMap = geo.columns.map { case (columnName, 
columnMetadata) =>
+          implicit val formats: org.json4s.Formats = DefaultFormats
+          import org.json4s.jackson.Serialization
           val columnMetadataFields: Array[Any] = Array(
             UTF8String.fromString(columnMetadata.encoding),
             new 
GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
             new GenericArrayData(columnMetadata.bbox.toArray),
             columnMetadata.crs
               .map(projjson => 
UTF8String.fromString(compact(render(projjson))))
-              .getOrElse(UTF8String.fromString("")))
+              .getOrElse(UTF8String.fromString("")),
+            columnMetadata.covering
+              .map(covering => 
UTF8String.fromString(Serialization.write(covering)))
+              .orNull)
           val columnMetadataStruct = new 
GenericInternalRow(columnMetadataFields)
           UTF8String.fromString(columnName) -> columnMetadataStruct
         }
diff --git 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index ad1f91502..845764fae 100644
--- 
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++ 
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -56,7 +56,8 @@ object GeoParquetMetadataTable {
       StructField("encoding", StringType, nullable = true),
       StructField("geometry_types", ArrayType(StringType), nullable = true),
       StructField("bbox", ArrayType(DoubleType), nullable = true),
-      StructField("crs", StringType, nullable = true)))
+      StructField("crs", StringType, nullable = true),
+      StructField("covering", StringType, nullable = true)))
 
   private val columnsType = MapType(StringType, columnMetadataType, 
valueContainsNull = false)
 
diff --git 
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
 
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 8411e4dde..421890c70 100644
--- 
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++ 
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -135,5 +135,18 @@ class GeoParquetMetadataTests extends TestBaseScala with 
BeforeAndAfterAll {
       assert(!metadata.containsKey("geom_column2"))
       assert(!metadata.containsKey("geom_column_2"))
     }
+
+    it("Read GeoParquet with covering metadata") {
+      val dfMeta = sparkSession.read
+        .format("geoparquet.metadata")
+        .load(geoparquetdatalocation + "/example-1.1.0.parquet")
+      val row = dfMeta.collect()(0)
+      val metadata = 
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+      val covering = metadata.getAs[String]("covering")
+      assert(covering.nonEmpty)
+      Seq("bbox", "xmin", "ymin", "xmax", "ymax").foreach { key =>
+        assert(covering contains key)
+      }
+    }
   }
 }
diff --git 
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala 
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index 2e621af46..ccfd560c8 100644
--- 
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++ 
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -27,8 +27,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData
-import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport
+import org.apache.spark.sql.execution.datasources.parquet.{Covering, 
GeoParquetMetaData, ParquetReadSupport}
 import org.apache.spark.sql.functions.{col, expr}
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.sedona_sql.expressions.st_constructors.{ST_Point, 
ST_PolygonFromEnvelope}
@@ -51,6 +50,7 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
   val geoparquetdatalocation2: String = resourceFolder + 
"geoparquet/example2.parquet"
   val geoparquetdatalocation3: String = resourceFolder + 
"geoparquet/example3.parquet"
   val geoparquetdatalocation4: String = resourceFolder + 
"geoparquet/example-1.0.0-beta.1.parquet"
+  val geoparquetdatalocation5: String = resourceFolder + 
"geoparquet/example-1.1.0.parquet"
   val legacyparquetdatalocation: String =
     resourceFolder + "parquet/legacy-parquet-nested-columns.snappy.parquet"
   val geoparquetoutputlocation: String = resourceFolder + 
"geoparquet/geoparquet_output/"
@@ -158,6 +158,21 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
         assert(!sparkSqlRowMetadata.contains("GeometryUDT"))
       }
     }
+    it("GEOPARQUET Test example-1.1.0.parquet") {
+      val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation5)
+      val count = df.count()
+      val rows = df.collect()
+      assert(rows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
+      assert(count == rows.length)
+
+      val geoParquetSavePath = geoparquetoutputlocation + "/gp_sample5.parquet"
+      
df.write.format("geoparquet").mode(SaveMode.Overwrite).save(geoParquetSavePath)
+      val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+      val newRows = df2.collect()
+      assert(rows.length == newRows.length)
+      assert(newRows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
+      assert(rows sameElements newRows)
+    }
 
     it("GeoParquet with multiple geometry columns") {
       val wktReader = new WKTReader()
@@ -623,6 +638,98 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
         assert(structGeom.getAs[AnyRef]("g1").isInstanceOf[Geometry])
       }
     }
+
+    it("GeoParquet supports writing covering metadata") {
+      val df = sparkSession
+        .range(0, 100)
+        .toDF("id")
+        .withColumn("id", expr("CAST(id AS DOUBLE)"))
+        .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+        .withColumn(
+          "test_cov",
+          expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS 
ymax)"))
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_with_covering_metadata.parquet"
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.covering", "test_cov")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+        val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+        val covering = coveringJsValue.extract[Covering]
+        assert(covering.bbox.xmin == Seq("test_cov", "xmin"))
+        assert(covering.bbox.ymin == Seq("test_cov", "ymin"))
+        assert(covering.bbox.xmax == Seq("test_cov", "xmax"))
+        assert(covering.bbox.ymax == Seq("test_cov", "ymax"))
+      }
+
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.covering.geometry", "test_cov")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+        val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+        val covering = coveringJsValue.extract[Covering]
+        assert(covering.bbox.xmin == Seq("test_cov", "xmin"))
+        assert(covering.bbox.ymin == Seq("test_cov", "ymin"))
+        assert(covering.bbox.xmax == Seq("test_cov", "xmax"))
+        assert(covering.bbox.ymax == Seq("test_cov", "ymax"))
+      }
+    }
+
+    it("GeoParquet supports writing covering metadata for multiple columns") {
+      val df = sparkSession
+        .range(0, 100)
+        .toDF("id")
+        .withColumn("id", expr("CAST(id AS DOUBLE)"))
+        .withColumn("geom1", expr("ST_Point(id, id + 1)"))
+        .withColumn(
+          "test_cov1",
+          expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS 
ymax)"))
+        .withColumn("geom2", expr("ST_Point(10 * id, 10 * id + 1)"))
+        .withColumn(
+          "test_cov2",
+          expr(
+            "struct(10 * id AS xmin, 10 * id + 1 AS ymin, 10 * id AS xmax, 10 
* id + 1 AS ymax)"))
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_with_covering_metadata.parquet"
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.covering.geom1", "test_cov1")
+        .option("geoparquet.covering.geom2", "test_cov2")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+        Seq(("geom1", "test_cov1"), ("geom2", "test_cov2")).foreach {
+          case (geomName, coveringName) =>
+            val coveringJsValue = geo \ "columns" \ geomName \ "covering"
+            val covering = coveringJsValue.extract[Covering]
+            assert(covering.bbox.xmin == Seq(coveringName, "xmin"))
+            assert(covering.bbox.ymin == Seq(coveringName, "ymin"))
+            assert(covering.bbox.xmax == Seq(coveringName, "xmax"))
+            assert(covering.bbox.ymax == Seq(coveringName, "ymax"))
+        }
+      }
+
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.covering.geom2", "test_cov2")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+        assert(geo \ "columns" \ "geom1" \ "covering" == org.json4s.JNothing)
+        val coveringJsValue = geo \ "columns" \ "geom2" \ "covering"
+        val covering = coveringJsValue.extract[Covering]
+        assert(covering.bbox.xmin == Seq("test_cov2", "xmin"))
+        assert(covering.bbox.ymin == Seq("test_cov2", "ymin"))
+        assert(covering.bbox.xmax == Seq("test_cov2", "xmax"))
+        assert(covering.bbox.ymax == Seq("test_cov2", "ymax"))
+      }
+    }
   }
 
   def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue => 
Unit): Unit = {
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index 335f6c74f..fb5c92163 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.DataSourceUtils
-import 
org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_CRS_KEY,
 GEOPARQUET_VERSION_KEY, VERSION}
+import 
org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_COVERING_KEY,
 GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION, 
createCoveringColumnMetadata}
 import 
org.apache.spark.sql.execution.datasources.parquet.GeoParquetWriteSupport.GeometryColumnInfo
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
@@ -111,6 +111,7 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
   private var geoParquetVersion: Option[String] = None
   private var defaultGeoParquetCrs: Option[JValue] = None
   private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] = 
mutable.Map.empty
+  private val geoParquetColumnCoveringMap: mutable.Map[String, Covering] = 
mutable.Map.empty
 
   override def init(configuration: Configuration): WriteContext = {
     val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
@@ -155,6 +156,23 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
         case crs: String => geoParquetColumnCrsMap.put(name, Some(parse(crs)))
       }
     }
+    Option(configuration.get(GEOPARQUET_COVERING_KEY)).foreach { 
coveringColumnName =>
+      if (geometryColumnInfoMap.size > 1) {
+        throw new IllegalArgumentException(
+          s"$GEOPARQUET_COVERING_KEY is ambiguous when there are multiple 
geometry columns." +
+            s"Please specify $GEOPARQUET_COVERING_KEY.<columnName> for 
configured geometry column.")
+      }
+      val geometryColumnName = schema(geometryColumnInfoMap.keys.head).name
+      val covering = createCoveringColumnMetadata(coveringColumnName, schema)
+      geoParquetColumnCoveringMap.put(geometryColumnName, covering)
+    }
+    geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
+      Option(configuration.get(GEOPARQUET_COVERING_KEY + "." + name)).foreach {
+        coveringColumnName =>
+          val covering = createCoveringColumnMetadata(coveringColumnName, 
schema)
+          geoParquetColumnCoveringMap.put(name, covering)
+      }
+    }
 
     val messageType = new 
SparkToParquetSchemaConverter(configuration).convert(schema)
     val sparkSqlParquetRowMetadata = 
GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema)
@@ -203,7 +221,8 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
             columnInfo.bbox.maxY)
         } else Seq(0.0, 0.0, 0.0, 0.0)
         val crs = geoParquetColumnCrsMap.getOrElse(columnName, 
defaultGeoParquetCrs)
-        columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs)
+        val covering = geoParquetColumnCoveringMap.get(columnName)
+        columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs, 
covering)
       }.toMap
       val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion, 
primaryColumn, columns)
       val geoParquetMetadataJson = 
GeoParquetMetaData.toJson(geoParquetMetadata)
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index a37fa65ac..2a5e70624 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.SerializableConfiguration
+import org.json4s.DefaultFormats
 import org.json4s.jackson.JsonMethods.{compact, render}
 
 case class GeoParquetMetadataPartitionReaderFactory(
@@ -74,13 +75,18 @@ object GeoParquetMetadataPartitionReaderFactory {
     val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
       case Some(geo) =>
         val geoColumnsMap = geo.columns.map { case (columnName, 
columnMetadata) =>
+          implicit val formats: org.json4s.Formats = DefaultFormats
+          import org.json4s.jackson.Serialization
           val columnMetadataFields: Array[Any] = Array(
             UTF8String.fromString(columnMetadata.encoding),
             new 
GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
             new GenericArrayData(columnMetadata.bbox.toArray),
             columnMetadata.crs
               .map(projjson => 
UTF8String.fromString(compact(render(projjson))))
-              .getOrElse(UTF8String.fromString("")))
+              .getOrElse(UTF8String.fromString("")),
+            columnMetadata.covering
+              .map(covering => 
UTF8String.fromString(Serialization.write(covering)))
+              .orNull)
           val columnMetadataStruct = new 
GenericInternalRow(columnMetadataFields)
           UTF8String.fromString(columnName) -> columnMetadataStruct
         }
diff --git 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index ad1f91502..845764fae 100644
--- 
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++ 
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -56,7 +56,8 @@ object GeoParquetMetadataTable {
       StructField("encoding", StringType, nullable = true),
       StructField("geometry_types", ArrayType(StringType), nullable = true),
       StructField("bbox", ArrayType(DoubleType), nullable = true),
-      StructField("crs", StringType, nullable = true)))
+      StructField("crs", StringType, nullable = true),
+      StructField("covering", StringType, nullable = true)))
 
   private val columnsType = MapType(StringType, columnMetadataType, 
valueContainsNull = false)
 
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 8411e4dde..421890c70 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -135,5 +135,18 @@ class GeoParquetMetadataTests extends TestBaseScala with 
BeforeAndAfterAll {
       assert(!metadata.containsKey("geom_column2"))
       assert(!metadata.containsKey("geom_column_2"))
     }
+
+    it("Read GeoParquet with covering metadata") {
+      val dfMeta = sparkSession.read
+        .format("geoparquet.metadata")
+        .load(geoparquetdatalocation + "/example-1.1.0.parquet")
+      val row = dfMeta.collect()(0)
+      val metadata = 
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+      val covering = metadata.getAs[String]("covering")
+      assert(covering.nonEmpty)
+      Seq("bbox", "xmin", "ymin", "xmax", "ymax").foreach { key =>
+        assert(covering contains key)
+      }
+    }
   }
 }
diff --git 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index 2e621af46..ccfd560c8 100644
--- 
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++ 
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -27,8 +27,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData
-import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport
+import org.apache.spark.sql.execution.datasources.parquet.{Covering, 
GeoParquetMetaData, ParquetReadSupport}
 import org.apache.spark.sql.functions.{col, expr}
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
 import org.apache.spark.sql.sedona_sql.expressions.st_constructors.{ST_Point, 
ST_PolygonFromEnvelope}
@@ -51,6 +50,7 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
   val geoparquetdatalocation2: String = resourceFolder + 
"geoparquet/example2.parquet"
   val geoparquetdatalocation3: String = resourceFolder + 
"geoparquet/example3.parquet"
   val geoparquetdatalocation4: String = resourceFolder + 
"geoparquet/example-1.0.0-beta.1.parquet"
+  val geoparquetdatalocation5: String = resourceFolder + 
"geoparquet/example-1.1.0.parquet"
   val legacyparquetdatalocation: String =
     resourceFolder + "parquet/legacy-parquet-nested-columns.snappy.parquet"
   val geoparquetoutputlocation: String = resourceFolder + 
"geoparquet/geoparquet_output/"
@@ -158,6 +158,21 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
         assert(!sparkSqlRowMetadata.contains("GeometryUDT"))
       }
     }
+    it("GEOPARQUET Test example-1.1.0.parquet") {
+      val df = 
sparkSession.read.format("geoparquet").load(geoparquetdatalocation5)
+      val count = df.count()
+      val rows = df.collect()
+      assert(rows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
+      assert(count == rows.length)
+
+      val geoParquetSavePath = geoparquetoutputlocation + "/gp_sample5.parquet"
+      
df.write.format("geoparquet").mode(SaveMode.Overwrite).save(geoParquetSavePath)
+      val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+      val newRows = df2.collect()
+      assert(rows.length == newRows.length)
+      assert(newRows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
+      assert(rows sameElements newRows)
+    }
 
     it("GeoParquet with multiple geometry columns") {
       val wktReader = new WKTReader()
@@ -623,6 +638,98 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
         assert(structGeom.getAs[AnyRef]("g1").isInstanceOf[Geometry])
       }
     }
+
+    it("GeoParquet supports writing covering metadata") {
+      val df = sparkSession
+        .range(0, 100)
+        .toDF("id")
+        .withColumn("id", expr("CAST(id AS DOUBLE)"))
+        .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+        .withColumn(
+          "test_cov",
+          expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS 
ymax)"))
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_with_covering_metadata.parquet"
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.covering", "test_cov")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+        val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+        val covering = coveringJsValue.extract[Covering]
+        assert(covering.bbox.xmin == Seq("test_cov", "xmin"))
+        assert(covering.bbox.ymin == Seq("test_cov", "ymin"))
+        assert(covering.bbox.xmax == Seq("test_cov", "xmax"))
+        assert(covering.bbox.ymax == Seq("test_cov", "ymax"))
+      }
+
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.covering.geometry", "test_cov")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+        val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+        val covering = coveringJsValue.extract[Covering]
+        assert(covering.bbox.xmin == Seq("test_cov", "xmin"))
+        assert(covering.bbox.ymin == Seq("test_cov", "ymin"))
+        assert(covering.bbox.xmax == Seq("test_cov", "xmax"))
+        assert(covering.bbox.ymax == Seq("test_cov", "ymax"))
+      }
+    }
+
+    it("GeoParquet supports writing covering metadata for multiple columns") {
+      val df = sparkSession
+        .range(0, 100)
+        .toDF("id")
+        .withColumn("id", expr("CAST(id AS DOUBLE)"))
+        .withColumn("geom1", expr("ST_Point(id, id + 1)"))
+        .withColumn(
+          "test_cov1",
+          expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS 
ymax)"))
+        .withColumn("geom2", expr("ST_Point(10 * id, 10 * id + 1)"))
+        .withColumn(
+          "test_cov2",
+          expr(
+            "struct(10 * id AS xmin, 10 * id + 1 AS ymin, 10 * id AS xmax, 10 
* id + 1 AS ymax)"))
+      val geoParquetSavePath = geoparquetoutputlocation + 
"/gp_with_covering_metadata.parquet"
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.covering.geom1", "test_cov1")
+        .option("geoparquet.covering.geom2", "test_cov2")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+        Seq(("geom1", "test_cov1"), ("geom2", "test_cov2")).foreach {
+          case (geomName, coveringName) =>
+            val coveringJsValue = geo \ "columns" \ geomName \ "covering"
+            val covering = coveringJsValue.extract[Covering]
+            assert(covering.bbox.xmin == Seq(coveringName, "xmin"))
+            assert(covering.bbox.ymin == Seq(coveringName, "ymin"))
+            assert(covering.bbox.xmax == Seq(coveringName, "xmax"))
+            assert(covering.bbox.ymax == Seq(coveringName, "ymax"))
+        }
+      }
+
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.covering.geom2", "test_cov2")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+        assert(geo \ "columns" \ "geom1" \ "covering" == org.json4s.JNothing)
+        val coveringJsValue = geo \ "columns" \ "geom2" \ "covering"
+        val covering = coveringJsValue.extract[Covering]
+        assert(covering.bbox.xmin == Seq("test_cov2", "xmin"))
+        assert(covering.bbox.ymin == Seq("test_cov2", "ymin"))
+        assert(covering.bbox.xmax == Seq("test_cov2", "xmax"))
+        assert(covering.bbox.ymax == Seq("test_cov2", "ymax"))
+      }
+    }
   }
 
   def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue => 
Unit): Unit = {

Reply via email to