This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new c5acc9ceea [GH-2646] Auto-populate covering metadata for GeoParquet
1.1.0 writes (#2665)
c5acc9ceea is described below
commit c5acc9ceea2de655416dd5786209ece5ea954fc3
Author: Jia Yu <[email protected]>
AuthorDate: Fri Feb 20 20:00:49 2026 -0700
[GH-2646] Auto-populate covering metadata for GeoParquet 1.1.0 writes
(#2665)
---
R/tests/testthat/test-data-interface.R | 3 +-
docs/tutorial/files/geoparquet-sedona-spark.md | 22 ++-
.../geoparquet/GeoParquetMetaData.scala | 12 ++
.../geoparquet/GeoParquetWriteSupport.scala | 145 ++++++++++++++--
.../org/apache/sedona/sql/geoparquetIOTests.scala | 186 ++++++++++++++++++++-
5 files changed, 351 insertions(+), 17 deletions(-)
diff --git a/R/tests/testthat/test-data-interface.R
b/R/tests/testthat/test-data-interface.R
index f24ae90282..0a9098280b 100644
--- a/R/tests/testthat/test-data-interface.R
+++ b/R/tests/testthat/test-data-interface.R
@@ -671,9 +671,10 @@ test_that("spark_write_geoparquet() works as expected", {
lifecycle::expect_deprecated({
geoparquet_2_sdf <- spark_read_geoparquet(sc, tmp_dest)
})
+ original_cols <- colnames(geoparquet_sdf)
expect_equivalent(
geoparquet_sdf %>% mutate(geometry = geometry %>% st_astext()) %>%
collect(),
- geoparquet_2_sdf %>% mutate(geometry = geometry %>% st_astext()) %>%
collect()
+ geoparquet_2_sdf %>% dplyr::select(dplyr::all_of(original_cols)) %>%
mutate(geometry = geometry %>% st_astext()) %>% collect()
)
unlink(tmp_dest, recursive = TRUE)
diff --git a/docs/tutorial/files/geoparquet-sedona-spark.md
b/docs/tutorial/files/geoparquet-sedona-spark.md
index bbe8273b3e..833437f577 100644
--- a/docs/tutorial/files/geoparquet-sedona-spark.md
+++ b/docs/tutorial/files/geoparquet-sedona-spark.md
@@ -173,7 +173,7 @@ The `columns` column contains bounding box information on
each file in the GeoPa
## Write GeoParquet with CRS Metadata
Since v`1.5.1`, Sedona supports writing GeoParquet files with custom
GeoParquet spec version and crs.
-The default GeoParquet spec version is `1.0.0` and the default crs is `null`.
You can specify the GeoParquet spec version and crs as follows:
+The default GeoParquet spec version is `1.1.0` (since `v1.9.0`) and the
default crs is `null`. You can specify the GeoParquet spec version and crs as
follows:
```scala
val projjson = "{...}" // PROJJSON string for all geometry columns
@@ -225,6 +225,26 @@ df.write.format("geoparquet")
.save("/path/to/saved_geoparquet.parquet")
```
+If you don't set a `geoparquet.covering` option, Sedona will automatically
populate covering metadata for GeoParquet `1.1.0`.
+
+For each geometry column, Sedona uses `<geometryColumnName>_bbox` as the
covering column:
+
+* If `<geometryColumnName>_bbox` already exists and is a valid covering struct
(`xmin`, `ymin`, `xmax`, `ymax`), Sedona reuses it.
+* If `<geometryColumnName>_bbox` does not exist, Sedona generates it
automatically while writing.
+
+Explicit `geoparquet.covering` or `geoparquet.covering.<geometryColumnName>`
options take precedence over the default behavior.
+
+You can control this default behavior with `geoparquet.covering.mode`:
+
+* `auto` (default): enable automatic covering metadata/column generation for
GeoParquet `1.1.0`.
+* `legacy`: disable automatic covering generation.
+
+```scala
+df.write.format("geoparquet")
+ .option("geoparquet.covering.mode", "legacy")
+ .save("/path/to/saved_geoparquet.parquet")
+```
+
If the DataFrame does not have a covering column, you can construct one using
Sedona's SQL functions:
```scala
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
index 1f2ad3ace7..a108e3bafa 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetMetaData.scala
@@ -95,6 +95,18 @@ object GeoParquetMetaData {
*/
val GEOPARQUET_COVERING_KEY = "geoparquet.covering"
+ /**
+ * Configuration key for controlling default covering behavior.
+ *
+ * Supported values:
+ * - `auto`: automatically generate/reuse `<geometryColumnName>_bbox`
covering columns for
+ * GeoParquet 1.1.0 when explicit covering options are not provided.
+ * - `legacy`: disable automatic covering generation and keep legacy
behavior.
+ */
+ val GEOPARQUET_COVERING_MODE_KEY = "geoparquet.covering.mode"
+ val GEOPARQUET_COVERING_MODE_AUTO = "auto"
+ val GEOPARQUET_COVERING_MODE_LEGACY = "legacy"
+
def parseKeyValueMetaData(
keyValueMetaData: java.util.Map[String, String]):
Option[GeoParquetMetaData] = {
Option(keyValueMetaData.get("geo")).map { geo =>
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetWriteSupport.scala
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetWriteSupport.scala
index 8742dad332..48655e5977 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetWriteSupport.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetWriteSupport.scala
@@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetMetaData.{GEOPARQUET_COVERING_KEY,
GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION,
createCoveringColumnMetadata}
+import
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetMetaData.{GEOPARQUET_COVERING_KEY,
GEOPARQUET_COVERING_MODE_AUTO, GEOPARQUET_COVERING_MODE_KEY,
GEOPARQUET_COVERING_MODE_LEGACY, GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY,
VERSION, createCoveringColumnMetadata}
import
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetWriteSupport.GeometryColumnInfo
import
org.apache.spark.sql.execution.datasources.geoparquet.internal.{DataSourceUtils,
LegacyBehaviorPolicy, PortableSQLConf}
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
@@ -110,6 +110,8 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
private var defaultGeoParquetCrs: Option[JValue] = None
private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] =
mutable.Map.empty
private val geoParquetColumnCoveringMap: mutable.Map[String, Covering] =
mutable.Map.empty
+ private val generatedCoveringColumnOrdinals: mutable.Map[Int, Int] =
mutable.Map.empty
+ private var geoParquetCoveringMode: String = GEOPARQUET_COVERING_MODE_AUTO
override def init(configuration: Configuration): WriteContext = {
val schemaString =
configuration.get(internal.ParquetWriteSupport.SPARK_ROW_SCHEMA)
@@ -126,11 +128,11 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
PortableSQLConf.ParquetOutputTimestampType.withName(configuration.get(key))
}
- this.rootFieldWriters = schema.zipWithIndex
- .map { case (field, ordinal) =>
- makeWriter(field.dataType, Some(ordinal))
+ schema.zipWithIndex.foreach { case (field, ordinal) =>
+ if (field.dataType == GeometryUDT) {
+ geometryColumnInfoMap.getOrElseUpdate(ordinal, new
GeometryColumnInfo())
}
- .toArray[ValueWriter]
+ }
if (geometryColumnInfoMap.isEmpty) {
throw new RuntimeException("No geometry column found in the schema")
@@ -140,6 +142,18 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
case null => Some(VERSION)
case version: String => Some(version)
}
+ geoParquetCoveringMode =
Option(configuration.get(GEOPARQUET_COVERING_MODE_KEY))
+ .map(_.trim)
+ .filter(_.nonEmpty)
+ .getOrElse(GEOPARQUET_COVERING_MODE_AUTO)
+ .toLowerCase(java.util.Locale.ROOT)
+ if (geoParquetCoveringMode != GEOPARQUET_COVERING_MODE_AUTO &&
+ geoParquetCoveringMode != GEOPARQUET_COVERING_MODE_LEGACY) {
+ throw new IllegalArgumentException(
+ s"Invalid value '$geoParquetCoveringMode' for
$GEOPARQUET_COVERING_MODE_KEY. " +
+ s"Supported values are '$GEOPARQUET_COVERING_MODE_AUTO' and " +
+ s"'$GEOPARQUET_COVERING_MODE_LEGACY'.")
+ }
defaultGeoParquetCrs = configuration.get(GEOPARQUET_CRS_KEY) match {
case null =>
// If no CRS is specified, we write null to the crs metadata field.
This is for compatibility with
@@ -165,13 +179,27 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
geoParquetColumnCoveringMap.put(geometryColumnName, covering)
}
geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
- Option(configuration.get(GEOPARQUET_COVERING_KEY + "." + name)).foreach {
- coveringColumnName =>
+ val perColumnKey = GEOPARQUET_COVERING_KEY + "." + name
+ // Skip keys that collide with reserved option keys (e.g.
geoparquet.covering.mode)
+ if (perColumnKey != GEOPARQUET_COVERING_MODE_KEY) {
+ Option(configuration.get(perColumnKey)).foreach { coveringColumnName =>
val covering = createCoveringColumnMetadata(coveringColumnName,
schema)
geoParquetColumnCoveringMap.put(name, covering)
+ }
}
}
+ maybeAutoGenerateCoveringColumns()
+
+ this.rootFieldWriters = schema.zipWithIndex
+ .map { case (field, ordinal) =>
+ generatedCoveringColumnOrdinals.get(ordinal) match {
+ case Some(geometryOrdinal) =>
makeGeneratedCoveringWriter(geometryOrdinal)
+ case None => makeWriter(field.dataType, Some(ordinal))
+ }
+ }
+ .toArray[ValueWriter]
+
val messageType = new
internal.SparkToParquetSchemaConverter(configuration).convert(schema)
val sparkSqlParquetRowMetadata =
GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema)
val metadata = Map(
@@ -240,16 +268,109 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
schema: StructType,
fieldWriters: Array[ValueWriter]): Unit = {
var i = 0
- while (i < row.numFields) {
- if (!row.isNullAt(i)) {
- consumeField(schema(i).name, i) {
- fieldWriters(i).apply(row, i)
- }
+ while (i < schema.length) {
+ generatedCoveringColumnOrdinals.get(i) match {
+ case Some(geometryOrdinal) =>
+ if (!row.isNullAt(geometryOrdinal)) {
+ consumeField(schema(i).name, i) {
+ fieldWriters(i).apply(row, i)
+ }
+ }
+ case None =>
+ if (i < row.numFields && !row.isNullAt(i)) {
+ consumeField(schema(i).name, i) {
+ fieldWriters(i).apply(row, i)
+ }
+ }
}
i += 1
}
}
+ private def maybeAutoGenerateCoveringColumns(): Unit = {
+ if (!isAutoCoveringEnabled) {
+ return
+ }
+
+ // If the user provided any explicit covering options, don't auto-generate
for
+ // the remaining geometry columns. Explicit options signal intentional
configuration.
+ if (geoParquetColumnCoveringMap.nonEmpty) {
+ return
+ }
+
+ val generatedCoveringFields = mutable.ArrayBuffer.empty[StructField]
+ val geometryColumns =
+ geometryColumnInfoMap.keys.toSeq.sorted.map(ordinal => ordinal ->
schema(ordinal).name)
+
+ geometryColumns.foreach { case (geometryOrdinal, geometryColumnName) =>
+ if (!geoParquetColumnCoveringMap.contains(geometryColumnName)) {
+ val coveringColumnName = s"${geometryColumnName}_bbox"
+ if (schema.fieldNames.contains(coveringColumnName)) {
+ // Reuse an existing column if it is a valid covering struct;
otherwise skip.
+ try {
+ val covering = createCoveringColumnMetadata(coveringColumnName,
schema)
+ geoParquetColumnCoveringMap.put(geometryColumnName, covering)
+ } catch {
+ case _: IllegalArgumentException =>
+ logWarning(
+ s"Existing column '$coveringColumnName' is not a valid
covering struct " +
+ s"(expected struct<xmin, ymin, xmax, ymax> with float/double
fields; " +
+ s"optional zmin/zmax fields are also supported). " +
+ s"Skipping automatic covering for geometry column
'$geometryColumnName'.")
+ }
+ } else {
+ val coveringStructType = StructType(
+ Seq(
+ StructField("xmin", DoubleType, nullable = false),
+ StructField("ymin", DoubleType, nullable = false),
+ StructField("xmax", DoubleType, nullable = false),
+ StructField("ymax", DoubleType, nullable = false)))
+ generatedCoveringFields +=
+ StructField(coveringColumnName, coveringStructType, nullable =
true)
+ val generatedOrdinal = schema.length +
generatedCoveringFields.length - 1
+ generatedCoveringColumnOrdinals.put(generatedOrdinal,
geometryOrdinal)
+ }
+ }
+ }
+
+ if (generatedCoveringFields.nonEmpty) {
+ schema = StructType(schema.fields ++ generatedCoveringFields)
+ generatedCoveringFields.foreach { generatedField =>
+ val covering = createCoveringColumnMetadata(generatedField.name,
schema)
+ val geometryColumnName = generatedField.name.stripSuffix("_bbox")
+ geoParquetColumnCoveringMap.put(geometryColumnName, covering)
+ }
+ }
+ }
+
+ private def isGeoParquet11: Boolean = {
+ geoParquetVersion.contains(VERSION)
+ }
+
+ private def isAutoCoveringEnabled: Boolean = {
+ geoParquetCoveringMode == GEOPARQUET_COVERING_MODE_AUTO && isGeoParquet11
+ }
+
+ private def makeGeneratedCoveringWriter(geometryOrdinal: Int): ValueWriter =
{
+ (row: SpecializedGetters, _: Int) =>
+ val geom = GeometryUDT.deserialize(row.getBinary(geometryOrdinal))
+ val envelope = geom.getEnvelopeInternal
+ consumeGroup {
+ consumeField("xmin", 0) {
+ recordConsumer.addDouble(envelope.getMinX)
+ }
+ consumeField("ymin", 1) {
+ recordConsumer.addDouble(envelope.getMinY)
+ }
+ consumeField("xmax", 2) {
+ recordConsumer.addDouble(envelope.getMaxX)
+ }
+ consumeField("ymax", 3) {
+ recordConsumer.addDouble(envelope.getMaxY)
+ }
+ }
+ }
+
private def makeWriter(dataType: DataType, rootOrdinal: Option[Int] = None):
ValueWriter = {
dataType match {
case BooleanType =>
diff --git
a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index 63cd87971e..9f3bc97d27 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -148,7 +148,7 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
val geoParquetSavePath = geoparquetoutputlocation + "/gp_sample4.parquet"
df.write.format("geoparquet").mode(SaveMode.Overwrite).save(geoParquetSavePath)
val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
- val newRows = df2.collect()
+ val newRows = df2.select(df.columns.map(col(_)): _*).collect()
assert(rows.length == newRows.length)
assert(newRows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
assert(rows sameElements newRows)
@@ -182,7 +182,7 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
val geoParquetSavePath = geoparquetoutputlocation + "/gp_sample5.parquet"
df.write.format("geoparquet").mode(SaveMode.Overwrite).save(geoParquetSavePath)
val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
- val newRows = df2.collect()
+ val newRows = df2.select(df.columns.map(col(_)): _*).collect()
assert(rows.length == newRows.length)
assert(newRows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
assert(rows sameElements newRows)
@@ -882,6 +882,100 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
}
}
+ it("GeoParquet auto populates covering metadata for single geometry
column") {
+ val df = sparkSession
+ .range(0, 100)
+ .toDF("id")
+ .withColumn("id", expr("CAST(id AS DOUBLE)"))
+ .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+ .withColumn(
+ "geometry_bbox",
+ expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS
ymax)"))
+ val geoParquetSavePath =
+ geoparquetoutputlocation +
"/gp_with_covering_metadata_auto_single.parquet"
+ df.write
+ .format("geoparquet")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+ val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+ val covering = coveringJsValue.extract[Covering]
+ assert(covering.bbox.xmin == Seq("geometry_bbox", "xmin"))
+ assert(covering.bbox.ymin == Seq("geometry_bbox", "ymin"))
+ assert(covering.bbox.xmax == Seq("geometry_bbox", "xmax"))
+ assert(covering.bbox.ymax == Seq("geometry_bbox", "ymax"))
+ }
+ }
+
+ it("GeoParquet auto generates covering column and metadata for GeoParquet
1.1.0") {
+ val df = sparkSession
+ .range(0, 100)
+ .toDF("id")
+ .withColumn("id", expr("CAST(id AS DOUBLE)"))
+ .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+ val geoParquetSavePath =
+ geoparquetoutputlocation + "/gp_with_generated_covering_column.parquet"
+ df.write
+ .format("geoparquet")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+
+ val parquetDf = sparkSession.read.parquet(geoParquetSavePath)
+ assert(parquetDf.columns.contains("geometry_bbox"))
+
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+ val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+ val covering = coveringJsValue.extract[Covering]
+ assert(covering.bbox.xmin == Seq("geometry_bbox", "xmin"))
+ assert(covering.bbox.ymin == Seq("geometry_bbox", "ymin"))
+ assert(covering.bbox.xmax == Seq("geometry_bbox", "xmax"))
+ assert(covering.bbox.ymax == Seq("geometry_bbox", "ymax"))
+ }
+ }
+
+ it("GeoParquet covering mode legacy disables auto covering generation") {
+ val df = sparkSession
+ .range(0, 100)
+ .toDF("id")
+ .withColumn("id", expr("CAST(id AS DOUBLE)"))
+ .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+ val geoParquetSavePath =
+ geoparquetoutputlocation +
"/gp_without_generated_covering_column_legacy_mode.parquet"
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.covering.mode", "legacy")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+
+ val parquetDf = sparkSession.read.parquet(geoParquetSavePath)
+ assert(!parquetDf.columns.contains("geometry_bbox"))
+
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ assert(geo \ "columns" \ "geometry" \ "covering" ==
org.json4s.JNothing)
+ }
+ }
+
+ it("GeoParquet covering mode should reject invalid value") {
+ val df = sparkSession
+ .range(0, 100)
+ .toDF("id")
+ .withColumn("id", expr("CAST(id AS DOUBLE)"))
+ .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+
+ val e = intercept[SparkException] {
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.covering.mode", "invalid-mode")
+ .mode("overwrite")
+ .save(geoparquetoutputlocation + "/gp_invalid_covering_mode.parquet")
+ }
+ assert(e.getMessage.contains("geoparquet.covering.mode"))
+ assert(e.getMessage.contains("auto"))
+ assert(e.getMessage.contains("legacy"))
+ }
+
it("GeoParquet supports writing covering metadata for multiple columns") {
val df = sparkSession
.range(0, 100)
@@ -932,6 +1026,88 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
assert(covering.bbox.ymax == Seq("test_cov2", "ymax"))
}
}
+
+ it("GeoParquet auto populates covering metadata for multiple geometry
columns") {
+ val df = sparkSession
+ .range(0, 100)
+ .toDF("id")
+ .withColumn("id", expr("CAST(id AS DOUBLE)"))
+ .withColumn("geom1", expr("ST_Point(id, id + 1)"))
+ .withColumn(
+ "geom1_bbox",
+ expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS
ymax)"))
+ .withColumn("geom2", expr("ST_Point(10 * id, 10 * id + 1)"))
+ .withColumn(
+ "geom2_bbox",
+ expr(
+ "struct(10 * id AS xmin, 10 * id + 1 AS ymin, 10 * id AS xmax, 10
* id + 1 AS ymax)"))
+ val geoParquetSavePath =
+ geoparquetoutputlocation +
"/gp_with_covering_metadata_auto_multiple.parquet"
+ df.write
+ .format("geoparquet")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+ Seq(("geom1", "geom1_bbox"), ("geom2", "geom2_bbox")).foreach {
+ case (geomName, coveringName) =>
+ val coveringJsValue = geo \ "columns" \ geomName \ "covering"
+ val covering = coveringJsValue.extract[Covering]
+ assert(covering.bbox.xmin == Seq(coveringName, "xmin"))
+ assert(covering.bbox.ymin == Seq(coveringName, "ymin"))
+ assert(covering.bbox.xmax == Seq(coveringName, "xmax"))
+ assert(covering.bbox.ymax == Seq(coveringName, "ymax"))
+ }
+ }
+ }
+
+ it("GeoParquet does not auto generate covering column for non-1.1.0
version") {
+ val df = sparkSession
+ .range(0, 100)
+ .toDF("id")
+ .withColumn("id", expr("CAST(id AS DOUBLE)"))
+ .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+ val geoParquetSavePath =
+ geoparquetoutputlocation +
"/gp_without_generated_covering_column.parquet"
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.version", "1.0.0")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+
+ val parquetDf = sparkSession.read.parquet(geoParquetSavePath)
+ assert(!parquetDf.columns.contains("geometry_bbox"))
+
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ assert(geo \ "columns" \ "geometry" \ "covering" ==
org.json4s.JNothing)
+ }
+ }
+
+ it("GeoParquet auto covering skips invalid existing _bbox column
gracefully") {
+ // Create a DataFrame with a geometry_bbox column that has wrong field
types (String instead of Double)
+ val df = sparkSession
+ .range(0, 10)
+ .toDF("id")
+ .withColumn("id", expr("CAST(id AS DOUBLE)"))
+ .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+ .withColumn(
+ "geometry_bbox",
+ expr(
+ "struct(CAST(id AS STRING) AS xmin, CAST(id AS STRING) AS ymin, " +
+ "CAST(id AS STRING) AS xmax, CAST(id AS STRING) AS ymax)"))
+ val geoParquetSavePath =
+ geoparquetoutputlocation + "/gp_with_invalid_bbox_column.parquet"
+ // Should succeed without throwing
+ df.write
+ .format("geoparquet")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+
+ // No covering metadata should be generated for the invalid bbox column
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ assert(geo \ "columns" \ "geometry" \ "covering" ==
org.json4s.JNothing)
+ }
+ }
}
describe("Spark types tests") {
@@ -952,7 +1128,11 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
// Read it back
val df2 =
-
sparkSession.read.format("geoparquet").load(geoparquetoutputlocation).sort(col("id"))
+ sparkSession.read
+ .format("geoparquet")
+ .load(geoparquetoutputlocation)
+ .select(df.columns.map(col(_)): _*)
+ .sort(col("id"))
assert(df2.schema.fields(1).dataType == TimestampNTZType)
val data1 = df.sort(col("id")).collect()
val data2 = df2.collect()