This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new c5acc9ceea [GH-2646] Auto-populate covering metadata for GeoParquet 
1.1.0 writes (#2665)
c5acc9ceea is described below

commit c5acc9ceea2de655416dd5786209ece5ea954fc3
Author: Jia Yu <[email protected]>
AuthorDate: Fri Feb 20 20:00:49 2026 -0700

    [GH-2646] Auto-populate covering metadata for GeoParquet 1.1.0 writes 
(#2665)
---
 R/tests/testthat/test-data-interface.R             |   3 +-
 docs/tutorial/files/geoparquet-sedona-spark.md     |  22 ++-
 .../geoparquet/GeoParquetMetaData.scala            |  12 ++
 .../geoparquet/GeoParquetWriteSupport.scala        | 145 ++++++++++++++--
 .../org/apache/sedona/sql/geoparquetIOTests.scala  | 186 ++++++++++++++++++++-
 5 files changed, 351 insertions(+), 17 deletions(-)

diff --git a/R/tests/testthat/test-data-interface.R 
b/R/tests/testthat/test-data-interface.R
index f24ae90282..0a9098280b 100644
--- a/R/tests/testthat/test-data-interface.R
+++ b/R/tests/testthat/test-data-interface.R
@@ -671,9 +671,10 @@ test_that("spark_write_geoparquet() works as expected", {
   lifecycle::expect_deprecated({
   geoparquet_2_sdf <- spark_read_geoparquet(sc, tmp_dest)
   })
+  original_cols <- colnames(geoparquet_sdf)
   expect_equivalent(
     geoparquet_sdf %>% mutate(geometry = geometry %>% st_astext()) %>% 
collect(),
-    geoparquet_2_sdf %>% mutate(geometry = geometry %>% st_astext()) %>% 
collect()
+    geoparquet_2_sdf %>% dplyr::select(dplyr::all_of(original_cols)) %>% 
mutate(geometry = geometry %>% st_astext()) %>% collect()
   )
 
   unlink(tmp_dest, recursive = TRUE)
diff --git a/docs/tutorial/files/geoparquet-sedona-spark.md 
b/docs/tutorial/files/geoparquet-sedona-spark.md
index bbe8273b3e..833437f577 100644
--- a/docs/tutorial/files/geoparquet-sedona-spark.md
+++ b/docs/tutorial/files/geoparquet-sedona-spark.md
@@ -173,7 +173,7 @@ The `columns` column contains bounding box information on 
each file in the GeoPa
 ## Write GeoParquet with CRS Metadata
 
 Since v`1.5.1`, Sedona supports writing GeoParquet files with custom 
GeoParquet spec version and crs.
-The default GeoParquet spec version is `1.0.0` and the default crs is `null`. 
You can specify the GeoParquet spec version and crs as follows:
+The default GeoParquet spec version is `1.1.0` (since `v1.9.0`) and the 
default crs is `null`. You can specify the GeoParquet spec version and crs as 
follows:
 
 ```scala
 val projjson = "{...}" // PROJJSON string for all geometry columns
@@ -225,6 +225,26 @@ df.write.format("geoparquet")
     .save("/path/to/saved_geoparquet.parquet")
 ```
 
+If you don't set a `geoparquet.covering` option, Sedona will automatically 
populate covering metadata for GeoParquet `1.1.0`.
+
+For each geometry column, Sedona uses `<geometryColumnName>_bbox` as the 
covering column:
+
+* If `<geometryColumnName>_bbox` already exists and is a valid covering struct 
(`xmin`, `ymin`, `xmax`, `ymax`), Sedona reuses it.
+* If `<geometryColumnName>_bbox` does not exist, Sedona generates it 
automatically while writing.
+
+Explicit `geoparquet.covering` or `geoparquet.covering.<geometryColumnName>` 
options take precedence over the default behavior.
+
+You can control this default behavior with `geoparquet.covering.mode`:
+
+* `auto` (default): enable automatic covering metadata/column generation for 
GeoParquet `1.1.0`.
+* `legacy`: disable automatic covering generation.
+
+```scala
+df.write.format("geoparquet")
+  .option("geoparquet.covering.mode", "legacy")
+  .save("/path/to/saved_geoparquet.parquet")
+```
+
 If the DataFrame does not have a covering column, you can construct one using 
Sedona's SQL functions:
 
 ```scala
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
index 1f2ad3ace7..a108e3bafa 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
@@ -95,6 +95,18 @@ object GeoParquetMetaData {
    */
   val GEOPARQUET_COVERING_KEY = "geoparquet.covering"
 
+  /**
+   * Configuration key for controlling default covering behavior.
+   *
+   * Supported values:
+   *   - `auto`: automatically generate/reuse `<geometryColumnName>_bbox` 
covering columns for
+   *     GeoParquet 1.1.0 when explicit covering options are not provided.
+   *   - `legacy`: disable automatic covering generation and keep legacy 
behavior.
+   */
+  val GEOPARQUET_COVERING_MODE_KEY = "geoparquet.covering.mode"
+  val GEOPARQUET_COVERING_MODE_AUTO = "auto"
+  val GEOPARQUET_COVERING_MODE_LEGACY = "legacy"
+
   def parseKeyValueMetaData(
       keyValueMetaData: java.util.Map[String, String]): 
Option[GeoParquetMetaData] = {
     Option(keyValueMetaData.get("geo")).map { geo =>
diff --git 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetWriteSupport.scala
 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetWriteSupport.scala
index 8742dad332..48655e5977 100644
--- 
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetWriteSupport.scala
+++ 
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetWriteSupport.scala
@@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import 
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetMetaData.{GEOPARQUET_COVERING_KEY,
 GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION, 
createCoveringColumnMetadata}
+import 
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetMetaData.{GEOPARQUET_COVERING_KEY,
 GEOPARQUET_COVERING_MODE_AUTO, GEOPARQUET_COVERING_MODE_KEY, 
GEOPARQUET_COVERING_MODE_LEGACY, GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, 
VERSION, createCoveringColumnMetadata}
 import 
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetWriteSupport.GeometryColumnInfo
 import 
org.apache.spark.sql.execution.datasources.geoparquet.internal.{DataSourceUtils,
 LegacyBehaviorPolicy, PortableSQLConf}
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
@@ -110,6 +110,8 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
   private var defaultGeoParquetCrs: Option[JValue] = None
   private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] = 
mutable.Map.empty
   private val geoParquetColumnCoveringMap: mutable.Map[String, Covering] = 
mutable.Map.empty
+  private val generatedCoveringColumnOrdinals: mutable.Map[Int, Int] = 
mutable.Map.empty
+  private var geoParquetCoveringMode: String = GEOPARQUET_COVERING_MODE_AUTO
 
   override def init(configuration: Configuration): WriteContext = {
     val schemaString = 
configuration.get(internal.ParquetWriteSupport.SPARK_ROW_SCHEMA)
@@ -126,11 +128,11 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
       
PortableSQLConf.ParquetOutputTimestampType.withName(configuration.get(key))
     }
 
-    this.rootFieldWriters = schema.zipWithIndex
-      .map { case (field, ordinal) =>
-        makeWriter(field.dataType, Some(ordinal))
+    schema.zipWithIndex.foreach { case (field, ordinal) =>
+      if (field.dataType == GeometryUDT) {
+        geometryColumnInfoMap.getOrElseUpdate(ordinal, new 
GeometryColumnInfo())
       }
-      .toArray[ValueWriter]
+    }
 
     if (geometryColumnInfoMap.isEmpty) {
       throw new RuntimeException("No geometry column found in the schema")
@@ -140,6 +142,18 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
       case null => Some(VERSION)
       case version: String => Some(version)
     }
+    geoParquetCoveringMode = 
Option(configuration.get(GEOPARQUET_COVERING_MODE_KEY))
+      .map(_.trim)
+      .filter(_.nonEmpty)
+      .getOrElse(GEOPARQUET_COVERING_MODE_AUTO)
+      .toLowerCase(java.util.Locale.ROOT)
+    if (geoParquetCoveringMode != GEOPARQUET_COVERING_MODE_AUTO &&
+      geoParquetCoveringMode != GEOPARQUET_COVERING_MODE_LEGACY) {
+      throw new IllegalArgumentException(
+        s"Invalid value '$geoParquetCoveringMode' for 
$GEOPARQUET_COVERING_MODE_KEY. " +
+          s"Supported values are '$GEOPARQUET_COVERING_MODE_AUTO' and " +
+          s"'$GEOPARQUET_COVERING_MODE_LEGACY'.")
+    }
     defaultGeoParquetCrs = configuration.get(GEOPARQUET_CRS_KEY) match {
       case null =>
         // If no CRS is specified, we write null to the crs metadata field. 
This is for compatibility with
@@ -165,13 +179,27 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
       geoParquetColumnCoveringMap.put(geometryColumnName, covering)
     }
     geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
-      Option(configuration.get(GEOPARQUET_COVERING_KEY + "." + name)).foreach {
-        coveringColumnName =>
+      val perColumnKey = GEOPARQUET_COVERING_KEY + "." + name
+      // Skip keys that collide with reserved option keys (e.g. 
geoparquet.covering.mode)
+      if (perColumnKey != GEOPARQUET_COVERING_MODE_KEY) {
+        Option(configuration.get(perColumnKey)).foreach { coveringColumnName =>
           val covering = createCoveringColumnMetadata(coveringColumnName, 
schema)
           geoParquetColumnCoveringMap.put(name, covering)
+        }
       }
     }
 
+    maybeAutoGenerateCoveringColumns()
+
+    this.rootFieldWriters = schema.zipWithIndex
+      .map { case (field, ordinal) =>
+        generatedCoveringColumnOrdinals.get(ordinal) match {
+          case Some(geometryOrdinal) => 
makeGeneratedCoveringWriter(geometryOrdinal)
+          case None => makeWriter(field.dataType, Some(ordinal))
+        }
+      }
+      .toArray[ValueWriter]
+
     val messageType = new 
internal.SparkToParquetSchemaConverter(configuration).convert(schema)
     val sparkSqlParquetRowMetadata = 
GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema)
     val metadata = Map(
@@ -240,16 +268,109 @@ class GeoParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
       schema: StructType,
       fieldWriters: Array[ValueWriter]): Unit = {
     var i = 0
-    while (i < row.numFields) {
-      if (!row.isNullAt(i)) {
-        consumeField(schema(i).name, i) {
-          fieldWriters(i).apply(row, i)
-        }
+    while (i < schema.length) {
+      generatedCoveringColumnOrdinals.get(i) match {
+        case Some(geometryOrdinal) =>
+          if (!row.isNullAt(geometryOrdinal)) {
+            consumeField(schema(i).name, i) {
+              fieldWriters(i).apply(row, i)
+            }
+          }
+        case None =>
+          if (i < row.numFields && !row.isNullAt(i)) {
+            consumeField(schema(i).name, i) {
+              fieldWriters(i).apply(row, i)
+            }
+          }
       }
       i += 1
     }
   }
 
+  private def maybeAutoGenerateCoveringColumns(): Unit = {
+    if (!isAutoCoveringEnabled) {
+      return
+    }
+
+    // If the user provided any explicit covering options, don't auto-generate 
for
+    // the remaining geometry columns. Explicit options signal intentional 
configuration.
+    if (geoParquetColumnCoveringMap.nonEmpty) {
+      return
+    }
+
+    val generatedCoveringFields = mutable.ArrayBuffer.empty[StructField]
+    val geometryColumns =
+      geometryColumnInfoMap.keys.toSeq.sorted.map(ordinal => ordinal -> 
schema(ordinal).name)
+
+    geometryColumns.foreach { case (geometryOrdinal, geometryColumnName) =>
+      if (!geoParquetColumnCoveringMap.contains(geometryColumnName)) {
+        val coveringColumnName = s"${geometryColumnName}_bbox"
+        if (schema.fieldNames.contains(coveringColumnName)) {
+          // Reuse an existing column if it is a valid covering struct; 
otherwise skip.
+          try {
+            val covering = createCoveringColumnMetadata(coveringColumnName, 
schema)
+            geoParquetColumnCoveringMap.put(geometryColumnName, covering)
+          } catch {
+            case _: IllegalArgumentException =>
+              logWarning(
+                s"Existing column '$coveringColumnName' is not a valid 
covering struct " +
+                  s"(expected struct<xmin, ymin, xmax, ymax> with float/double 
fields; " +
+                  s"optional zmin/zmax fields are also supported). " +
+                  s"Skipping automatic covering for geometry column 
'$geometryColumnName'.")
+          }
+        } else {
+          val coveringStructType = StructType(
+            Seq(
+              StructField("xmin", DoubleType, nullable = false),
+              StructField("ymin", DoubleType, nullable = false),
+              StructField("xmax", DoubleType, nullable = false),
+              StructField("ymax", DoubleType, nullable = false)))
+          generatedCoveringFields +=
+            StructField(coveringColumnName, coveringStructType, nullable = 
true)
+          val generatedOrdinal = schema.length + 
generatedCoveringFields.length - 1
+          generatedCoveringColumnOrdinals.put(generatedOrdinal, 
geometryOrdinal)
+        }
+      }
+    }
+
+    if (generatedCoveringFields.nonEmpty) {
+      schema = StructType(schema.fields ++ generatedCoveringFields)
+      generatedCoveringFields.foreach { generatedField =>
+        val covering = createCoveringColumnMetadata(generatedField.name, 
schema)
+        val geometryColumnName = generatedField.name.stripSuffix("_bbox")
+        geoParquetColumnCoveringMap.put(geometryColumnName, covering)
+      }
+    }
+  }
+
+  private def isGeoParquet11: Boolean = {
+    geoParquetVersion.contains(VERSION)
+  }
+
+  private def isAutoCoveringEnabled: Boolean = {
+    geoParquetCoveringMode == GEOPARQUET_COVERING_MODE_AUTO && isGeoParquet11
+  }
+
+  private def makeGeneratedCoveringWriter(geometryOrdinal: Int): ValueWriter = 
{
+    (row: SpecializedGetters, _: Int) =>
+      val geom = GeometryUDT.deserialize(row.getBinary(geometryOrdinal))
+      val envelope = geom.getEnvelopeInternal
+      consumeGroup {
+        consumeField("xmin", 0) {
+          recordConsumer.addDouble(envelope.getMinX)
+        }
+        consumeField("ymin", 1) {
+          recordConsumer.addDouble(envelope.getMinY)
+        }
+        consumeField("xmax", 2) {
+          recordConsumer.addDouble(envelope.getMaxX)
+        }
+        consumeField("ymax", 3) {
+          recordConsumer.addDouble(envelope.getMaxY)
+        }
+      }
+  }
+
   private def makeWriter(dataType: DataType, rootOrdinal: Option[Int] = None): 
ValueWriter = {
     dataType match {
       case BooleanType =>
diff --git 
a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala 
b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index 63cd87971e..9f3bc97d27 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -148,7 +148,7 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
       val geoParquetSavePath = geoparquetoutputlocation + "/gp_sample4.parquet"
       
df.write.format("geoparquet").mode(SaveMode.Overwrite).save(geoParquetSavePath)
       val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
-      val newRows = df2.collect()
+      val newRows = df2.select(df.columns.map(col(_)): _*).collect()
       assert(rows.length == newRows.length)
       assert(newRows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
       assert(rows sameElements newRows)
@@ -182,7 +182,7 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
       val geoParquetSavePath = geoparquetoutputlocation + "/gp_sample5.parquet"
       
df.write.format("geoparquet").mode(SaveMode.Overwrite).save(geoParquetSavePath)
       val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
-      val newRows = df2.collect()
+      val newRows = df2.select(df.columns.map(col(_)): _*).collect()
       assert(rows.length == newRows.length)
       assert(newRows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
       assert(rows sameElements newRows)
@@ -882,6 +882,100 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
       }
     }
 
+    it("GeoParquet auto populates covering metadata for single geometry 
column") {
+      val df = sparkSession
+        .range(0, 100)
+        .toDF("id")
+        .withColumn("id", expr("CAST(id AS DOUBLE)"))
+        .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+        .withColumn(
+          "geometry_bbox",
+          expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS 
ymax)"))
+      val geoParquetSavePath =
+        geoparquetoutputlocation + 
"/gp_with_covering_metadata_auto_single.parquet"
+      df.write
+        .format("geoparquet")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+        val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+        val covering = coveringJsValue.extract[Covering]
+        assert(covering.bbox.xmin == Seq("geometry_bbox", "xmin"))
+        assert(covering.bbox.ymin == Seq("geometry_bbox", "ymin"))
+        assert(covering.bbox.xmax == Seq("geometry_bbox", "xmax"))
+        assert(covering.bbox.ymax == Seq("geometry_bbox", "ymax"))
+      }
+    }
+
+    it("GeoParquet auto generates covering column and metadata for GeoParquet 
1.1.0") {
+      val df = sparkSession
+        .range(0, 100)
+        .toDF("id")
+        .withColumn("id", expr("CAST(id AS DOUBLE)"))
+        .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+      val geoParquetSavePath =
+        geoparquetoutputlocation + "/gp_with_generated_covering_column.parquet"
+      df.write
+        .format("geoparquet")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+
+      val parquetDf = sparkSession.read.parquet(geoParquetSavePath)
+      assert(parquetDf.columns.contains("geometry_bbox"))
+
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+        val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+        val covering = coveringJsValue.extract[Covering]
+        assert(covering.bbox.xmin == Seq("geometry_bbox", "xmin"))
+        assert(covering.bbox.ymin == Seq("geometry_bbox", "ymin"))
+        assert(covering.bbox.xmax == Seq("geometry_bbox", "xmax"))
+        assert(covering.bbox.ymax == Seq("geometry_bbox", "ymax"))
+      }
+    }
+
+    it("GeoParquet covering mode legacy disables auto covering generation") {
+      val df = sparkSession
+        .range(0, 100)
+        .toDF("id")
+        .withColumn("id", expr("CAST(id AS DOUBLE)"))
+        .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+      val geoParquetSavePath =
+        geoparquetoutputlocation + 
"/gp_without_generated_covering_column_legacy_mode.parquet"
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.covering.mode", "legacy")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+
+      val parquetDf = sparkSession.read.parquet(geoParquetSavePath)
+      assert(!parquetDf.columns.contains("geometry_bbox"))
+
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        assert(geo \ "columns" \ "geometry" \ "covering" == 
org.json4s.JNothing)
+      }
+    }
+
+    it("GeoParquet covering mode should reject invalid value") {
+      val df = sparkSession
+        .range(0, 100)
+        .toDF("id")
+        .withColumn("id", expr("CAST(id AS DOUBLE)"))
+        .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+
+      val e = intercept[SparkException] {
+        df.write
+          .format("geoparquet")
+          .option("geoparquet.covering.mode", "invalid-mode")
+          .mode("overwrite")
+          .save(geoparquetoutputlocation + "/gp_invalid_covering_mode.parquet")
+      }
+      assert(e.getMessage.contains("geoparquet.covering.mode"))
+      assert(e.getMessage.contains("auto"))
+      assert(e.getMessage.contains("legacy"))
+    }
+
     it("GeoParquet supports writing covering metadata for multiple columns") {
       val df = sparkSession
         .range(0, 100)
@@ -932,6 +1026,88 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
         assert(covering.bbox.ymax == Seq("test_cov2", "ymax"))
       }
     }
+
+    it("GeoParquet auto populates covering metadata for multiple geometry 
columns") {
+      val df = sparkSession
+        .range(0, 100)
+        .toDF("id")
+        .withColumn("id", expr("CAST(id AS DOUBLE)"))
+        .withColumn("geom1", expr("ST_Point(id, id + 1)"))
+        .withColumn(
+          "geom1_bbox",
+          expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS 
ymax)"))
+        .withColumn("geom2", expr("ST_Point(10 * id, 10 * id + 1)"))
+        .withColumn(
+          "geom2_bbox",
+          expr(
+            "struct(10 * id AS xmin, 10 * id + 1 AS ymin, 10 * id AS xmax, 10 
* id + 1 AS ymax)"))
+      val geoParquetSavePath =
+        geoparquetoutputlocation + 
"/gp_with_covering_metadata_auto_multiple.parquet"
+      df.write
+        .format("geoparquet")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+        Seq(("geom1", "geom1_bbox"), ("geom2", "geom2_bbox")).foreach {
+          case (geomName, coveringName) =>
+            val coveringJsValue = geo \ "columns" \ geomName \ "covering"
+            val covering = coveringJsValue.extract[Covering]
+            assert(covering.bbox.xmin == Seq(coveringName, "xmin"))
+            assert(covering.bbox.ymin == Seq(coveringName, "ymin"))
+            assert(covering.bbox.xmax == Seq(coveringName, "xmax"))
+            assert(covering.bbox.ymax == Seq(coveringName, "ymax"))
+        }
+      }
+    }
+
+    it("GeoParquet does not auto generate covering column for non-1.1.0 
version") {
+      val df = sparkSession
+        .range(0, 100)
+        .toDF("id")
+        .withColumn("id", expr("CAST(id AS DOUBLE)"))
+        .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+      val geoParquetSavePath =
+        geoparquetoutputlocation + 
"/gp_without_generated_covering_column.parquet"
+      df.write
+        .format("geoparquet")
+        .option("geoparquet.version", "1.0.0")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+
+      val parquetDf = sparkSession.read.parquet(geoParquetSavePath)
+      assert(!parquetDf.columns.contains("geometry_bbox"))
+
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        assert(geo \ "columns" \ "geometry" \ "covering" == 
org.json4s.JNothing)
+      }
+    }
+
+    it("GeoParquet auto covering skips invalid existing _bbox column 
gracefully") {
+      // Create a DataFrame with a geometry_bbox column that has wrong field 
types (String instead of Double)
+      val df = sparkSession
+        .range(0, 10)
+        .toDF("id")
+        .withColumn("id", expr("CAST(id AS DOUBLE)"))
+        .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+        .withColumn(
+          "geometry_bbox",
+          expr(
+            "struct(CAST(id AS STRING) AS xmin, CAST(id AS STRING) AS ymin, " +
+              "CAST(id AS STRING) AS xmax, CAST(id AS STRING) AS ymax)"))
+      val geoParquetSavePath =
+        geoparquetoutputlocation + "/gp_with_invalid_bbox_column.parquet"
+      // Should succeed without throwing
+      df.write
+        .format("geoparquet")
+        .mode("overwrite")
+        .save(geoParquetSavePath)
+
+      // No covering metadata should be generated for the invalid bbox column
+      validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+        assert(geo \ "columns" \ "geometry" \ "covering" == 
org.json4s.JNothing)
+      }
+    }
   }
 
   describe("Spark types tests") {
@@ -952,7 +1128,11 @@ class geoparquetIOTests extends TestBaseScala with 
BeforeAndAfterAll {
 
       // Read it back
       val df2 =
-        
sparkSession.read.format("geoparquet").load(geoparquetoutputlocation).sort(col("id"))
+        sparkSession.read
+          .format("geoparquet")
+          .load(geoparquetoutputlocation)
+          .select(df.columns.map(col(_)): _*)
+          .sort(col("id"))
       assert(df2.schema.fields(1).dataType == TimestampNTZType)
       val data1 = df.sort(col("id")).collect()
       val data2 = df2.collect()

Reply via email to