This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new d51f92ebb [SEDONA-627] Write user specified column to the covering
metadata of GeoParquet files (#1522)
d51f92ebb is described below
commit d51f92ebbdeb33fa9f7189c354bc473f0b25dad0
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Tue Jul 16 12:30:30 2024 +0800
[SEDONA-627] Write user specified column to the covering metadata of
GeoParquet files (#1522)
* Support writing covering metadata to geoparquet files
* Port covering support to Spark 3.4 and Spark 3.5
* Update docs
* Add example 1.1.0 geoparquet file
* Document how to construct covering columns using ST functions
---
docs/tutorial/sql.md | 27 +++++
.../datasources/parquet/GeoParquetMetaData.scala | 88 +++++++++++++++-
.../resources/geoparquet/example-1.1.0.parquet | Bin 0 -> 29838 bytes
.../parquet/GeoParquetWriteSupport.scala | 24 ++++-
.../GeoParquetMetadataPartitionReaderFactory.scala | 8 +-
.../metadata/GeoParquetMetadataTable.scala | 3 +-
.../sedona/sql/GeoParquetMetadataTests.scala | 13 +++
.../org/apache/sedona/sql/geoparquetIOTests.scala | 111 ++++++++++++++++++++-
.../parquet/GeoParquetWriteSupport.scala | 24 ++++-
.../GeoParquetMetadataPartitionReaderFactory.scala | 8 +-
.../metadata/GeoParquetMetadataTable.scala | 3 +-
.../sedona/sql/GeoParquetMetadataTests.scala | 13 +++
.../org/apache/sedona/sql/geoparquetIOTests.scala | 111 ++++++++++++++++++++-
.../parquet/GeoParquetWriteSupport.scala | 23 ++++-
.../GeoParquetMetadataPartitionReaderFactory.scala | 8 +-
.../metadata/GeoParquetMetadataTable.scala | 3 +-
.../sedona/sql/GeoParquetMetadataTests.scala | 13 +++
.../org/apache/sedona/sql/geoparquetIOTests.scala | 111 ++++++++++++++++++++-
18 files changed, 567 insertions(+), 24 deletions(-)
diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md
index f183a1270..0b617384b 100644
--- a/docs/tutorial/sql.md
+++ b/docs/tutorial/sql.md
@@ -1111,6 +1111,8 @@ Since v`1.3.0`, Sedona natively supports writing
GeoParquet file. GeoParquet can
df.write.format("geoparquet").save(geoparquetoutputlocation +
"/GeoParquet_File_Name.parquet")
```
+### CRS Metadata
+
Since v`1.5.1`, Sedona supports writing GeoParquet files with custom
GeoParquet spec version and crs.
The default GeoParquet spec version is `1.0.0` and the default crs is `null`.
You can specify the GeoParquet spec version and crs as follows:
@@ -1146,6 +1148,31 @@ Its geoparquet writer will not leverage the SRID field
of a geometry so you will
Due to the same reason, Sedona geoparquet reader and writer do NOT check the
axis order (lon/lat or lat/lon) and assume they are handled by the users
themselves when writing / reading the files. You can always use
[`ST_FlipCoordinates`](../api/sql/Function.md#st_flipcoordinates) to swap the
axis order of your geometries.
+### Covering Metadata
+
+Since `v1.6.1`, Sedona supports writing the [`covering`
field](https://github.com/opengeospatial/geoparquet/blob/v1.1.0/format-specs/geoparquet.md#covering)
to geometry column metadata. The `covering` field specifies a bounding box
column to help accelerate spatial data retrieval. The bounding box column
should be a top-level struct column containing `xmin`, `ymin`, `xmax`, `ymax`
columns. If the DataFrame you are writing contains such columns, you can
specify `.option("geoparquet.coveri [...]
+
+```scala
+df.write.format("geoparquet")
+ .option("geoparquet.covering.geometry", "bbox")
+ .save("/path/to/saved_geoparquet.parquet")
+```
+
+If the DataFrame has only one geometry column, you can simply specify the
`geoparquet.covering` option and omit the geometry column name:
+
+```scala
+df.write.format("geoparquet")
+ .option("geoparquet.covering", "bbox")
+ .save("/path/to/saved_geoparquet.parquet")
+```
+
+If the DataFrame does not have a covering column, you can construct one using
Sedona's SQL functions:
+
+```scala
+val df_bbox = df.withColumn("bbox", expr("struct(ST_XMin(geometry) AS xmin,
ST_YMin(geometry) AS ymin, ST_XMax(geometry) AS xmax, ST_YMax(geometry) AS
ymax)"))
+df_bbox.write.format("geoparquet").option("geoparquet.covering.geometry",
"bbox").save("/path/to/saved_geoparquet.parquet")
+```
+
## Sort then Save GeoParquet
To maximize the performance of Sedona GeoParquet filter pushdown, we suggest
that you sort the data by their geohash values (see
[ST_GeoHash](../api/sql/Function.md#st_geohash)) and then save as a GeoParquet
file. An example is as follows:
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
index ab84e764d..e67f2b3c4 100644
---
a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
+++
b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala
@@ -18,6 +18,7 @@
*/
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.spark.sql.types.{DoubleType, FloatType, StructType}
import org.json4s.jackson.JsonMethods.parse
import org.json4s.jackson.compactJson
import org.json4s.{DefaultFormats, Extraction, JField, JNothing, JNull,
JObject, JValue}
@@ -33,12 +34,25 @@ import org.json4s.{DefaultFormats, Extraction, JField,
JNothing, JNull, JObject,
* @param crs
* The CRS of the geometries in the file. None if crs metadata is absent,
Some(JNull) if crs is
* null, Some(value) if the crs is present and not null.
+ * @param covering
+ * Object containing bounding box column names to help accelerate spatial
data retrieval
*/
case class GeometryFieldMetaData(
encoding: String,
geometryTypes: Seq[String],
bbox: Seq[Double],
- crs: Option[JValue] = None)
+ crs: Option[JValue] = None,
+ covering: Option[Covering] = None)
+
+case class Covering(bbox: CoveringBBox)
+
+case class CoveringBBox(
+ xmin: Seq[String],
+ ymin: Seq[String],
+ zmin: Option[Seq[String]],
+ xmax: Seq[String],
+ ymax: Seq[String],
+ zmax: Option[Seq[String]])
/**
* A case class that holds the metadata of GeoParquet file
@@ -55,9 +69,9 @@ case class GeoParquetMetaData(
columns: Map[String, GeometryFieldMetaData])
object GeoParquetMetaData {
- // We're conforming to version 1.0.0 of the GeoParquet specification, please
refer to
- // https://geoparquet.org/releases/v1.0.0/ for more details.
- val VERSION = "1.0.0"
+ // We're conforming to version 1.1.0 of the GeoParquet specification, please
refer to
+ // https://geoparquet.org/releases/v1.1.0/ for more details.
+ val VERSION = "1.1.0"
/**
* Configuration key for overriding the version field in GeoParquet file
metadata.
@@ -70,6 +84,14 @@ object GeoParquetMetaData {
*/
val GEOPARQUET_CRS_KEY = "geoparquet.crs"
+ /**
+ * Configuration key prefix for setting the covering columns of the
geometries in GeoParquet
+ * column metadata. The configuration key for geometry column named `x` is
+ * `geoparquet.covering.x`. If the parquet file contains only one geometry
column, we can omit
+ * the column name and use `geoparquet.covering` directly.
+ */
+ val GEOPARQUET_COVERING_KEY = "geoparquet.covering"
+
def parseKeyValueMetaData(
keyValueMetaData: java.util.Map[String, String]):
Option[GeoParquetMetaData] = {
Option(keyValueMetaData.get("geo")).map { geo =>
@@ -114,4 +136,62 @@ object GeoParquetMetaData {
}
compactJson(serializedGeoObject)
}
+
+ def createCoveringColumnMetadata(coveringColumnName: String, schema:
StructType): Covering = {
+ val coveringColumnIndex = schema.fieldIndex(coveringColumnName)
+ schema(coveringColumnIndex).dataType match {
+ case coveringColumnType: StructType =>
+ coveringColumnTypeToCovering(coveringColumnName, coveringColumnType)
+ case _ =>
+ throw new IllegalArgumentException(
+ s"Covering column $coveringColumnName is not a struct type")
+ }
+ }
+
+ private def coveringColumnTypeToCovering(
+ coveringColumnName: String,
+ coveringColumnType: StructType): Covering = {
+ def validateField(fieldName: String): Unit = {
+ val index = coveringColumnType.fieldIndex(fieldName)
+ val fieldType = coveringColumnType(index).dataType
+ if (fieldType != FloatType && fieldType != DoubleType) {
+ throw new IllegalArgumentException(
+ s"`$fieldName` in covering column `$coveringColumnName` is not float
or double type")
+ }
+ }
+ // We only validate the existence and types of the fields here. Although
the order of the fields is required to be
+ // xmin, ymin, [zmin], xmax, ymax, [zmax] (see
https://github.com/opengeospatial/geoparquet/pull/202), but we don't
+ // validate it. The requirement on user provided covering column is quite
lenient, it is up to the user to provide
+ // the covering column strictly follow the ordering requirement.
+ validateField("xmin")
+ validateField("ymin")
+ validateField("xmax")
+ validateField("ymax")
+ coveringColumnType.find(_.name == "zmin") match {
+ case Some(_) =>
+ validateField("zmin")
+ validateField("zmax")
+ Covering(
+ CoveringBBox(
+ Seq(coveringColumnName, "xmin"),
+ Seq(coveringColumnName, "ymin"),
+ Some(Seq(coveringColumnName, "zmin")),
+ Seq(coveringColumnName, "xmax"),
+ Seq(coveringColumnName, "ymax"),
+ Some(Seq(coveringColumnName, "zmax"))))
+ case None =>
+ if (coveringColumnType.fieldNames.contains("zmax")) {
+ throw new IllegalArgumentException(
+ s"zmax should not present in covering column `$coveringColumnName`
since zmin is not present")
+ }
+ Covering(
+ CoveringBBox(
+ Seq(coveringColumnName, "xmin"),
+ Seq(coveringColumnName, "ymin"),
+ None,
+ Seq(coveringColumnName, "xmax"),
+ Seq(coveringColumnName, "ymax"),
+ None))
+ }
+ }
}
diff --git a/spark/common/src/test/resources/geoparquet/example-1.1.0.parquet
b/spark/common/src/test/resources/geoparquet/example-1.1.0.parquet
new file mode 100644
index 000000000..33e4cdcdd
Binary files /dev/null and
b/spark/common/src/test/resources/geoparquet/example-1.1.0.parquet differ
diff --git
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index 015a21121..90d6d962f 100644
---
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -31,14 +31,13 @@ import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import
org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_CRS_KEY,
GEOPARQUET_VERSION_KEY, VERSION}
+import
org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_COVERING_KEY,
GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION,
createCoveringColumnMetadata}
import
org.apache.spark.sql.execution.datasources.parquet.GeoParquetWriteSupport.GeometryColumnInfo
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.types._
import org.json4s.{DefaultFormats, Extraction, JValue}
-import org.json4s.jackson.compactJson
import org.json4s.jackson.JsonMethods.parse
import org.locationtech.jts.geom.Geometry
import org.locationtech.jts.io.WKBWriter
@@ -111,6 +110,7 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
private var geoParquetVersion: Option[String] = None
private var defaultGeoParquetCrs: Option[JValue] = None
private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] =
mutable.Map.empty
+ private val geoParquetColumnCoveringMap: mutable.Map[String, Covering] =
mutable.Map.empty
override def init(configuration: Configuration): WriteContext = {
val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
@@ -155,6 +155,23 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
case crs: String => geoParquetColumnCrsMap.put(name, Some(parse(crs)))
}
}
+ Option(configuration.get(GEOPARQUET_COVERING_KEY)).foreach {
coveringColumnName =>
+ if (geometryColumnInfoMap.size > 1) {
+ throw new IllegalArgumentException(
+ s"$GEOPARQUET_COVERING_KEY is ambiguous when there are multiple
geometry columns." +
+ s"Please specify $GEOPARQUET_COVERING_KEY.<columnName> for
configured geometry column.")
+ }
+ val geometryColumnName = schema(geometryColumnInfoMap.keys.head).name
+ val covering = createCoveringColumnMetadata(coveringColumnName, schema)
+ geoParquetColumnCoveringMap.put(geometryColumnName, covering)
+ }
+ geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
+ Option(configuration.get(GEOPARQUET_COVERING_KEY + "." + name)).foreach {
+ coveringColumnName =>
+ val covering = createCoveringColumnMetadata(coveringColumnName,
schema)
+ geoParquetColumnCoveringMap.put(name, covering)
+ }
+ }
val messageType = new
SparkToParquetSchemaConverter(configuration).convert(schema)
val sparkSqlParquetRowMetadata =
GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema)
@@ -203,7 +220,8 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
columnInfo.bbox.maxY)
} else Seq(0.0, 0.0, 0.0, 0.0)
val crs = geoParquetColumnCrsMap.getOrElse(columnName,
defaultGeoParquetCrs)
- columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs)
+ val covering = geoParquetColumnCoveringMap.get(columnName)
+ columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs,
covering)
}.toMap
val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion,
primaryColumn, columns)
val geoParquetMetadataJson =
GeoParquetMetaData.toJson(geoParquetMetadata)
diff --git
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index f7e17da70..1fe2faa2e 100644
---
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration
+import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.{compact, render}
case class GeoParquetMetadataPartitionReaderFactory(
@@ -74,13 +75,18 @@ object GeoParquetMetadataPartitionReaderFactory {
val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
case Some(geo) =>
val geoColumnsMap = geo.columns.map { case (columnName,
columnMetadata) =>
+ implicit val formats: org.json4s.Formats = DefaultFormats
+ import org.json4s.jackson.Serialization
val columnMetadataFields: Array[Any] = Array(
UTF8String.fromString(columnMetadata.encoding),
new
GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
new GenericArrayData(columnMetadata.bbox.toArray),
columnMetadata.crs
.map(projjson =>
UTF8String.fromString(compact(render(projjson))))
- .getOrElse(UTF8String.fromString("")))
+ .getOrElse(UTF8String.fromString("")),
+ columnMetadata.covering
+ .map(covering =>
UTF8String.fromString(Serialization.write(covering)))
+ .orNull)
val columnMetadataStruct = new
GenericInternalRow(columnMetadataFields)
UTF8String.fromString(columnName) -> columnMetadataStruct
}
diff --git
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index ad1f91502..845764fae 100644
---
a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++
b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -56,7 +56,8 @@ object GeoParquetMetadataTable {
StructField("encoding", StringType, nullable = true),
StructField("geometry_types", ArrayType(StringType), nullable = true),
StructField("bbox", ArrayType(DoubleType), nullable = true),
- StructField("crs", StringType, nullable = true)))
+ StructField("crs", StringType, nullable = true),
+ StructField("covering", StringType, nullable = true)))
private val columnsType = MapType(StringType, columnMetadataType,
valueContainsNull = false)
diff --git
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 8411e4dde..421890c70 100644
---
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -135,5 +135,18 @@ class GeoParquetMetadataTests extends TestBaseScala with
BeforeAndAfterAll {
assert(!metadata.containsKey("geom_column2"))
assert(!metadata.containsKey("geom_column_2"))
}
+
+ it("Read GeoParquet with covering metadata") {
+ val dfMeta = sparkSession.read
+ .format("geoparquet.metadata")
+ .load(geoparquetdatalocation + "/example-1.1.0.parquet")
+ val row = dfMeta.collect()(0)
+ val metadata =
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+ val covering = metadata.getAs[String]("covering")
+ assert(covering.nonEmpty)
+ Seq("bbox", "xmin", "ymin", "xmax", "ymax").foreach { key =>
+ assert(covering contains key)
+ }
+ }
}
}
diff --git
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index 2e621af46..ccfd560c8 100644
---
a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++
b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -27,8 +27,7 @@ import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData
-import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport
+import org.apache.spark.sql.execution.datasources.parquet.{Covering,
GeoParquetMetaData, ParquetReadSupport}
import org.apache.spark.sql.functions.{col, expr}
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.sedona_sql.expressions.st_constructors.{ST_Point,
ST_PolygonFromEnvelope}
@@ -51,6 +50,7 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
val geoparquetdatalocation2: String = resourceFolder +
"geoparquet/example2.parquet"
val geoparquetdatalocation3: String = resourceFolder +
"geoparquet/example3.parquet"
val geoparquetdatalocation4: String = resourceFolder +
"geoparquet/example-1.0.0-beta.1.parquet"
+ val geoparquetdatalocation5: String = resourceFolder +
"geoparquet/example-1.1.0.parquet"
val legacyparquetdatalocation: String =
resourceFolder + "parquet/legacy-parquet-nested-columns.snappy.parquet"
val geoparquetoutputlocation: String = resourceFolder +
"geoparquet/geoparquet_output/"
@@ -158,6 +158,21 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
assert(!sparkSqlRowMetadata.contains("GeometryUDT"))
}
}
+ it("GEOPARQUET Test example-1.1.0.parquet") {
+ val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation5)
+ val count = df.count()
+ val rows = df.collect()
+ assert(rows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
+ assert(count == rows.length)
+
+ val geoParquetSavePath = geoparquetoutputlocation + "/gp_sample5.parquet"
+
df.write.format("geoparquet").mode(SaveMode.Overwrite).save(geoParquetSavePath)
+ val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+ val newRows = df2.collect()
+ assert(rows.length == newRows.length)
+ assert(newRows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
+ assert(rows sameElements newRows)
+ }
it("GeoParquet with multiple geometry columns") {
val wktReader = new WKTReader()
@@ -623,6 +638,98 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
assert(structGeom.getAs[AnyRef]("g1").isInstanceOf[Geometry])
}
}
+
+ it("GeoParquet supports writing covering metadata") {
+ val df = sparkSession
+ .range(0, 100)
+ .toDF("id")
+ .withColumn("id", expr("CAST(id AS DOUBLE)"))
+ .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+ .withColumn(
+ "test_cov",
+ expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS
ymax)"))
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_with_covering_metadata.parquet"
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.covering", "test_cov")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+ val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+ val covering = coveringJsValue.extract[Covering]
+ assert(covering.bbox.xmin == Seq("test_cov", "xmin"))
+ assert(covering.bbox.ymin == Seq("test_cov", "ymin"))
+ assert(covering.bbox.xmax == Seq("test_cov", "xmax"))
+ assert(covering.bbox.ymax == Seq("test_cov", "ymax"))
+ }
+
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.covering.geometry", "test_cov")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+ val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+ val covering = coveringJsValue.extract[Covering]
+ assert(covering.bbox.xmin == Seq("test_cov", "xmin"))
+ assert(covering.bbox.ymin == Seq("test_cov", "ymin"))
+ assert(covering.bbox.xmax == Seq("test_cov", "xmax"))
+ assert(covering.bbox.ymax == Seq("test_cov", "ymax"))
+ }
+ }
+
+ it("GeoParquet supports writing covering metadata for multiple columns") {
+ val df = sparkSession
+ .range(0, 100)
+ .toDF("id")
+ .withColumn("id", expr("CAST(id AS DOUBLE)"))
+ .withColumn("geom1", expr("ST_Point(id, id + 1)"))
+ .withColumn(
+ "test_cov1",
+ expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS
ymax)"))
+ .withColumn("geom2", expr("ST_Point(10 * id, 10 * id + 1)"))
+ .withColumn(
+ "test_cov2",
+ expr(
+ "struct(10 * id AS xmin, 10 * id + 1 AS ymin, 10 * id AS xmax, 10
* id + 1 AS ymax)"))
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_with_covering_metadata.parquet"
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.covering.geom1", "test_cov1")
+ .option("geoparquet.covering.geom2", "test_cov2")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+ Seq(("geom1", "test_cov1"), ("geom2", "test_cov2")).foreach {
+ case (geomName, coveringName) =>
+ val coveringJsValue = geo \ "columns" \ geomName \ "covering"
+ val covering = coveringJsValue.extract[Covering]
+ assert(covering.bbox.xmin == Seq(coveringName, "xmin"))
+ assert(covering.bbox.ymin == Seq(coveringName, "ymin"))
+ assert(covering.bbox.xmax == Seq(coveringName, "xmax"))
+ assert(covering.bbox.ymax == Seq(coveringName, "ymax"))
+ }
+ }
+
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.covering.geom2", "test_cov2")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+ assert(geo \ "columns" \ "geom1" \ "covering" == org.json4s.JNothing)
+ val coveringJsValue = geo \ "columns" \ "geom2" \ "covering"
+ val covering = coveringJsValue.extract[Covering]
+ assert(covering.bbox.xmin == Seq("test_cov2", "xmin"))
+ assert(covering.bbox.ymin == Seq("test_cov2", "ymin"))
+ assert(covering.bbox.xmax == Seq("test_cov2", "xmax"))
+ assert(covering.bbox.ymax == Seq("test_cov2", "ymax"))
+ }
+ }
}
def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue =>
Unit): Unit = {
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index e396bb658..3a6a89773 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -31,14 +31,13 @@ import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import
org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_CRS_KEY,
GEOPARQUET_VERSION_KEY, VERSION}
+import
org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_COVERING_KEY,
GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION,
createCoveringColumnMetadata}
import
org.apache.spark.sql.execution.datasources.parquet.GeoParquetWriteSupport.GeometryColumnInfo
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.types._
import org.json4s.{DefaultFormats, Extraction, JValue}
-import org.json4s.jackson.compactJson
import org.json4s.jackson.JsonMethods.parse
import org.locationtech.jts.geom.Geometry
import org.locationtech.jts.io.WKBWriter
@@ -111,6 +110,7 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
private var geoParquetVersion: Option[String] = None
private var defaultGeoParquetCrs: Option[JValue] = None
private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] =
mutable.Map.empty
+ private val geoParquetColumnCoveringMap: mutable.Map[String, Covering] =
mutable.Map.empty
override def init(configuration: Configuration): WriteContext = {
val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
@@ -155,6 +155,23 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
case crs: String => geoParquetColumnCrsMap.put(name, Some(parse(crs)))
}
}
+ Option(configuration.get(GEOPARQUET_COVERING_KEY)).foreach {
coveringColumnName =>
+ if (geometryColumnInfoMap.size > 1) {
+ throw new IllegalArgumentException(
+ s"$GEOPARQUET_COVERING_KEY is ambiguous when there are multiple
geometry columns." +
+ s"Please specify $GEOPARQUET_COVERING_KEY.<columnName> for
configured geometry column.")
+ }
+ val geometryColumnName = schema(geometryColumnInfoMap.keys.head).name
+ val covering = createCoveringColumnMetadata(coveringColumnName, schema)
+ geoParquetColumnCoveringMap.put(geometryColumnName, covering)
+ }
+ geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
+ Option(configuration.get(GEOPARQUET_COVERING_KEY + "." + name)).foreach {
+ coveringColumnName =>
+ val covering = createCoveringColumnMetadata(coveringColumnName,
schema)
+ geoParquetColumnCoveringMap.put(name, covering)
+ }
+ }
val messageType = new
SparkToParquetSchemaConverter(configuration).convert(schema)
val sparkSqlParquetRowMetadata =
GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema)
@@ -203,7 +220,8 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
columnInfo.bbox.maxY)
} else Seq(0.0, 0.0, 0.0, 0.0)
val crs = geoParquetColumnCrsMap.getOrElse(columnName,
defaultGeoParquetCrs)
- columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs)
+ val covering = geoParquetColumnCoveringMap.get(columnName)
+ columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs,
covering)
}.toMap
val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion,
primaryColumn, columns)
val geoParquetMetadataJson =
GeoParquetMetaData.toJson(geoParquetMetadata)
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index a37fa65ac..2a5e70624 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration
+import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.{compact, render}
case class GeoParquetMetadataPartitionReaderFactory(
@@ -74,13 +75,18 @@ object GeoParquetMetadataPartitionReaderFactory {
val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
case Some(geo) =>
val geoColumnsMap = geo.columns.map { case (columnName,
columnMetadata) =>
+ implicit val formats: org.json4s.Formats = DefaultFormats
+ import org.json4s.jackson.Serialization
val columnMetadataFields: Array[Any] = Array(
UTF8String.fromString(columnMetadata.encoding),
new
GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
new GenericArrayData(columnMetadata.bbox.toArray),
columnMetadata.crs
.map(projjson =>
UTF8String.fromString(compact(render(projjson))))
- .getOrElse(UTF8String.fromString("")))
+ .getOrElse(UTF8String.fromString("")),
+ columnMetadata.covering
+ .map(covering =>
UTF8String.fromString(Serialization.write(covering)))
+ .orNull)
val columnMetadataStruct = new
GenericInternalRow(columnMetadataFields)
UTF8String.fromString(columnName) -> columnMetadataStruct
}
diff --git
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index ad1f91502..845764fae 100644
---
a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++
b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -56,7 +56,8 @@ object GeoParquetMetadataTable {
StructField("encoding", StringType, nullable = true),
StructField("geometry_types", ArrayType(StringType), nullable = true),
StructField("bbox", ArrayType(DoubleType), nullable = true),
- StructField("crs", StringType, nullable = true)))
+ StructField("crs", StringType, nullable = true),
+ StructField("covering", StringType, nullable = true)))
private val columnsType = MapType(StringType, columnMetadataType,
valueContainsNull = false)
diff --git
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 8411e4dde..421890c70 100644
---
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -135,5 +135,18 @@ class GeoParquetMetadataTests extends TestBaseScala with
BeforeAndAfterAll {
assert(!metadata.containsKey("geom_column2"))
assert(!metadata.containsKey("geom_column_2"))
}
+
+ it("Read GeoParquet with covering metadata") {
+ val dfMeta = sparkSession.read
+ .format("geoparquet.metadata")
+ .load(geoparquetdatalocation + "/example-1.1.0.parquet")
+ val row = dfMeta.collect()(0)
+ val metadata =
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+ val covering = metadata.getAs[String]("covering")
+ assert(covering.nonEmpty)
+ Seq("bbox", "xmin", "ymin", "xmax", "ymax").foreach { key =>
+ assert(covering contains key)
+ }
+ }
}
}
diff --git
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index 2e621af46..ccfd560c8 100644
---
a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++
b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -27,8 +27,7 @@ import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData
-import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport
+import org.apache.spark.sql.execution.datasources.parquet.{Covering,
GeoParquetMetaData, ParquetReadSupport}
import org.apache.spark.sql.functions.{col, expr}
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.sedona_sql.expressions.st_constructors.{ST_Point,
ST_PolygonFromEnvelope}
@@ -51,6 +50,7 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
val geoparquetdatalocation2: String = resourceFolder +
"geoparquet/example2.parquet"
val geoparquetdatalocation3: String = resourceFolder +
"geoparquet/example3.parquet"
val geoparquetdatalocation4: String = resourceFolder +
"geoparquet/example-1.0.0-beta.1.parquet"
+ val geoparquetdatalocation5: String = resourceFolder +
"geoparquet/example-1.1.0.parquet"
val legacyparquetdatalocation: String =
resourceFolder + "parquet/legacy-parquet-nested-columns.snappy.parquet"
val geoparquetoutputlocation: String = resourceFolder +
"geoparquet/geoparquet_output/"
@@ -158,6 +158,21 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
assert(!sparkSqlRowMetadata.contains("GeometryUDT"))
}
}
+ it("GEOPARQUET Test example-1.1.0.parquet") {
+ val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation5)
+ val count = df.count()
+ val rows = df.collect()
+ assert(rows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
+ assert(count == rows.length)
+
+ val geoParquetSavePath = geoparquetoutputlocation + "/gp_sample5.parquet"
+
df.write.format("geoparquet").mode(SaveMode.Overwrite).save(geoParquetSavePath)
+ val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+ val newRows = df2.collect()
+ assert(rows.length == newRows.length)
+ assert(newRows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
+ assert(rows sameElements newRows)
+ }
it("GeoParquet with multiple geometry columns") {
val wktReader = new WKTReader()
@@ -623,6 +638,98 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
assert(structGeom.getAs[AnyRef]("g1").isInstanceOf[Geometry])
}
}
+
+ it("GeoParquet supports writing covering metadata") {
+ val df = sparkSession
+ .range(0, 100)
+ .toDF("id")
+ .withColumn("id", expr("CAST(id AS DOUBLE)"))
+ .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+ .withColumn(
+ "test_cov",
+ expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS
ymax)"))
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_with_covering_metadata.parquet"
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.covering", "test_cov")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+ val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+ val covering = coveringJsValue.extract[Covering]
+ assert(covering.bbox.xmin == Seq("test_cov", "xmin"))
+ assert(covering.bbox.ymin == Seq("test_cov", "ymin"))
+ assert(covering.bbox.xmax == Seq("test_cov", "xmax"))
+ assert(covering.bbox.ymax == Seq("test_cov", "ymax"))
+ }
+
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.covering.geometry", "test_cov")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+ val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+ val covering = coveringJsValue.extract[Covering]
+ assert(covering.bbox.xmin == Seq("test_cov", "xmin"))
+ assert(covering.bbox.ymin == Seq("test_cov", "ymin"))
+ assert(covering.bbox.xmax == Seq("test_cov", "xmax"))
+ assert(covering.bbox.ymax == Seq("test_cov", "ymax"))
+ }
+ }
+
+ it("GeoParquet supports writing covering metadata for multiple columns") {
+ val df = sparkSession
+ .range(0, 100)
+ .toDF("id")
+ .withColumn("id", expr("CAST(id AS DOUBLE)"))
+ .withColumn("geom1", expr("ST_Point(id, id + 1)"))
+ .withColumn(
+ "test_cov1",
+ expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS
ymax)"))
+ .withColumn("geom2", expr("ST_Point(10 * id, 10 * id + 1)"))
+ .withColumn(
+ "test_cov2",
+ expr(
+ "struct(10 * id AS xmin, 10 * id + 1 AS ymin, 10 * id AS xmax, 10
* id + 1 AS ymax)"))
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_with_covering_metadata.parquet"
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.covering.geom1", "test_cov1")
+ .option("geoparquet.covering.geom2", "test_cov2")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+ Seq(("geom1", "test_cov1"), ("geom2", "test_cov2")).foreach {
+ case (geomName, coveringName) =>
+ val coveringJsValue = geo \ "columns" \ geomName \ "covering"
+ val covering = coveringJsValue.extract[Covering]
+ assert(covering.bbox.xmin == Seq(coveringName, "xmin"))
+ assert(covering.bbox.ymin == Seq(coveringName, "ymin"))
+ assert(covering.bbox.xmax == Seq(coveringName, "xmax"))
+ assert(covering.bbox.ymax == Seq(coveringName, "ymax"))
+ }
+ }
+
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.covering.geom2", "test_cov2")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+ assert(geo \ "columns" \ "geom1" \ "covering" == org.json4s.JNothing)
+ val coveringJsValue = geo \ "columns" \ "geom2" \ "covering"
+ val covering = coveringJsValue.extract[Covering]
+ assert(covering.bbox.xmin == Seq("test_cov2", "xmin"))
+ assert(covering.bbox.ymin == Seq("test_cov2", "ymin"))
+ assert(covering.bbox.xmax == Seq("test_cov2", "xmax"))
+ assert(covering.bbox.ymax == Seq("test_cov2", "ymax"))
+ }
+ }
}
def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue =>
Unit): Unit = {
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
index 335f6c74f..fb5c92163 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.DataSourceUtils
-import
org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_CRS_KEY,
GEOPARQUET_VERSION_KEY, VERSION}
+import
org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_COVERING_KEY,
GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION,
createCoveringColumnMetadata}
import
org.apache.spark.sql.execution.datasources.parquet.GeoParquetWriteSupport.GeometryColumnInfo
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
@@ -111,6 +111,7 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
private var geoParquetVersion: Option[String] = None
private var defaultGeoParquetCrs: Option[JValue] = None
private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] =
mutable.Map.empty
+ private val geoParquetColumnCoveringMap: mutable.Map[String, Covering] =
mutable.Map.empty
override def init(configuration: Configuration): WriteContext = {
val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
@@ -155,6 +156,23 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
case crs: String => geoParquetColumnCrsMap.put(name, Some(parse(crs)))
}
}
+ Option(configuration.get(GEOPARQUET_COVERING_KEY)).foreach {
coveringColumnName =>
+ if (geometryColumnInfoMap.size > 1) {
+ throw new IllegalArgumentException(
+ s"$GEOPARQUET_COVERING_KEY is ambiguous when there are multiple
geometry columns." +
+ s"Please specify $GEOPARQUET_COVERING_KEY.<columnName> for
configured geometry column.")
+ }
+ val geometryColumnName = schema(geometryColumnInfoMap.keys.head).name
+ val covering = createCoveringColumnMetadata(coveringColumnName, schema)
+ geoParquetColumnCoveringMap.put(geometryColumnName, covering)
+ }
+ geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
+ Option(configuration.get(GEOPARQUET_COVERING_KEY + "." + name)).foreach {
+ coveringColumnName =>
+ val covering = createCoveringColumnMetadata(coveringColumnName,
schema)
+ geoParquetColumnCoveringMap.put(name, covering)
+ }
+ }
val messageType = new
SparkToParquetSchemaConverter(configuration).convert(schema)
val sparkSqlParquetRowMetadata =
GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema)
@@ -203,7 +221,8 @@ class GeoParquetWriteSupport extends
WriteSupport[InternalRow] with Logging {
columnInfo.bbox.maxY)
} else Seq(0.0, 0.0, 0.0, 0.0)
val crs = geoParquetColumnCrsMap.getOrElse(columnName,
defaultGeoParquetCrs)
- columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs)
+ val covering = geoParquetColumnCoveringMap.get(columnName)
+ columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs,
covering)
}.toMap
val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion,
primaryColumn, columns)
val geoParquetMetadataJson =
GeoParquetMetaData.toJson(geoParquetMetadata)
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
index a37fa65ac..2a5e70624 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.SerializableConfiguration
+import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.{compact, render}
case class GeoParquetMetadataPartitionReaderFactory(
@@ -74,13 +75,18 @@ object GeoParquetMetadataPartitionReaderFactory {
val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
case Some(geo) =>
val geoColumnsMap = geo.columns.map { case (columnName,
columnMetadata) =>
+ implicit val formats: org.json4s.Formats = DefaultFormats
+ import org.json4s.jackson.Serialization
val columnMetadataFields: Array[Any] = Array(
UTF8String.fromString(columnMetadata.encoding),
new
GenericArrayData(columnMetadata.geometryTypes.map(UTF8String.fromString).toArray),
new GenericArrayData(columnMetadata.bbox.toArray),
columnMetadata.crs
.map(projjson =>
UTF8String.fromString(compact(render(projjson))))
- .getOrElse(UTF8String.fromString("")))
+ .getOrElse(UTF8String.fromString("")),
+ columnMetadata.covering
+ .map(covering =>
UTF8String.fromString(Serialization.write(covering)))
+ .orNull)
val columnMetadataStruct = new
GenericInternalRow(columnMetadataFields)
UTF8String.fromString(columnName) -> columnMetadataStruct
}
diff --git
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
index ad1f91502..845764fae 100644
---
a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
+++
b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataTable.scala
@@ -56,7 +56,8 @@ object GeoParquetMetadataTable {
StructField("encoding", StringType, nullable = true),
StructField("geometry_types", ArrayType(StringType), nullable = true),
StructField("bbox", ArrayType(DoubleType), nullable = true),
- StructField("crs", StringType, nullable = true)))
+ StructField("crs", StringType, nullable = true),
+ StructField("covering", StringType, nullable = true)))
private val columnsType = MapType(StringType, columnMetadataType,
valueContainsNull = false)
diff --git
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
index 8411e4dde..421890c70 100644
---
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
+++
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/GeoParquetMetadataTests.scala
@@ -135,5 +135,18 @@ class GeoParquetMetadataTests extends TestBaseScala with
BeforeAndAfterAll {
assert(!metadata.containsKey("geom_column2"))
assert(!metadata.containsKey("geom_column_2"))
}
+
+ it("Read GeoParquet with covering metadata") {
+ val dfMeta = sparkSession.read
+ .format("geoparquet.metadata")
+ .load(geoparquetdatalocation + "/example-1.1.0.parquet")
+ val row = dfMeta.collect()(0)
+ val metadata =
row.getJavaMap(row.fieldIndex("columns")).get("geometry").asInstanceOf[Row]
+ val covering = metadata.getAs[String]("covering")
+ assert(covering.nonEmpty)
+ Seq("bbox", "xmin", "ymin", "xmax", "ymax").foreach { key =>
+ assert(covering contains key)
+ }
+ }
}
}
diff --git
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index 2e621af46..ccfd560c8 100644
---
a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++
b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -27,8 +27,7 @@ import org.apache.spark.SparkException
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
-import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData
-import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport
+import org.apache.spark.sql.execution.datasources.parquet.{Covering,
GeoParquetMetaData, ParquetReadSupport}
import org.apache.spark.sql.functions.{col, expr}
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.sedona_sql.expressions.st_constructors.{ST_Point,
ST_PolygonFromEnvelope}
@@ -51,6 +50,7 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
val geoparquetdatalocation2: String = resourceFolder +
"geoparquet/example2.parquet"
val geoparquetdatalocation3: String = resourceFolder +
"geoparquet/example3.parquet"
val geoparquetdatalocation4: String = resourceFolder +
"geoparquet/example-1.0.0-beta.1.parquet"
+ val geoparquetdatalocation5: String = resourceFolder +
"geoparquet/example-1.1.0.parquet"
val legacyparquetdatalocation: String =
resourceFolder + "parquet/legacy-parquet-nested-columns.snappy.parquet"
val geoparquetoutputlocation: String = resourceFolder +
"geoparquet/geoparquet_output/"
@@ -158,6 +158,21 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
assert(!sparkSqlRowMetadata.contains("GeometryUDT"))
}
}
+ it("GEOPARQUET Test example-1.1.0.parquet") {
+ val df =
sparkSession.read.format("geoparquet").load(geoparquetdatalocation5)
+ val count = df.count()
+ val rows = df.collect()
+ assert(rows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
+ assert(count == rows.length)
+
+ val geoParquetSavePath = geoparquetoutputlocation + "/gp_sample5.parquet"
+
df.write.format("geoparquet").mode(SaveMode.Overwrite).save(geoParquetSavePath)
+ val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
+ val newRows = df2.collect()
+ assert(rows.length == newRows.length)
+ assert(newRows(0).getAs[AnyRef]("geometry").isInstanceOf[Geometry])
+ assert(rows sameElements newRows)
+ }
it("GeoParquet with multiple geometry columns") {
val wktReader = new WKTReader()
@@ -623,6 +638,98 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
assert(structGeom.getAs[AnyRef]("g1").isInstanceOf[Geometry])
}
}
+
+ it("GeoParquet supports writing covering metadata") {
+ val df = sparkSession
+ .range(0, 100)
+ .toDF("id")
+ .withColumn("id", expr("CAST(id AS DOUBLE)"))
+ .withColumn("geometry", expr("ST_Point(id, id + 1)"))
+ .withColumn(
+ "test_cov",
+ expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS
ymax)"))
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_with_covering_metadata.parquet"
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.covering", "test_cov")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+ val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+ val covering = coveringJsValue.extract[Covering]
+ assert(covering.bbox.xmin == Seq("test_cov", "xmin"))
+ assert(covering.bbox.ymin == Seq("test_cov", "ymin"))
+ assert(covering.bbox.xmax == Seq("test_cov", "xmax"))
+ assert(covering.bbox.ymax == Seq("test_cov", "ymax"))
+ }
+
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.covering.geometry", "test_cov")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+ val coveringJsValue = geo \ "columns" \ "geometry" \ "covering"
+ val covering = coveringJsValue.extract[Covering]
+ assert(covering.bbox.xmin == Seq("test_cov", "xmin"))
+ assert(covering.bbox.ymin == Seq("test_cov", "ymin"))
+ assert(covering.bbox.xmax == Seq("test_cov", "xmax"))
+ assert(covering.bbox.ymax == Seq("test_cov", "ymax"))
+ }
+ }
+
+ it("GeoParquet supports writing covering metadata for multiple columns") {
+ val df = sparkSession
+ .range(0, 100)
+ .toDF("id")
+ .withColumn("id", expr("CAST(id AS DOUBLE)"))
+ .withColumn("geom1", expr("ST_Point(id, id + 1)"))
+ .withColumn(
+ "test_cov1",
+ expr("struct(id AS xmin, id + 1 AS ymin, id AS xmax, id + 1 AS
ymax)"))
+ .withColumn("geom2", expr("ST_Point(10 * id, 10 * id + 1)"))
+ .withColumn(
+ "test_cov2",
+ expr(
+ "struct(10 * id AS xmin, 10 * id + 1 AS ymin, 10 * id AS xmax, 10
* id + 1 AS ymax)"))
+ val geoParquetSavePath = geoparquetoutputlocation +
"/gp_with_covering_metadata.parquet"
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.covering.geom1", "test_cov1")
+ .option("geoparquet.covering.geom2", "test_cov2")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+ Seq(("geom1", "test_cov1"), ("geom2", "test_cov2")).foreach {
+ case (geomName, coveringName) =>
+ val coveringJsValue = geo \ "columns" \ geomName \ "covering"
+ val covering = coveringJsValue.extract[Covering]
+ assert(covering.bbox.xmin == Seq(coveringName, "xmin"))
+ assert(covering.bbox.ymin == Seq(coveringName, "ymin"))
+ assert(covering.bbox.xmax == Seq(coveringName, "xmax"))
+ assert(covering.bbox.ymax == Seq(coveringName, "ymax"))
+ }
+ }
+
+ df.write
+ .format("geoparquet")
+ .option("geoparquet.covering.geom2", "test_cov2")
+ .mode("overwrite")
+ .save(geoParquetSavePath)
+ validateGeoParquetMetadata(geoParquetSavePath) { geo =>
+ implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
+ assert(geo \ "columns" \ "geom1" \ "covering" == org.json4s.JNothing)
+ val coveringJsValue = geo \ "columns" \ "geom2" \ "covering"
+ val covering = coveringJsValue.extract[Covering]
+ assert(covering.bbox.xmin == Seq("test_cov2", "xmin"))
+ assert(covering.bbox.ymin == Seq("test_cov2", "ymin"))
+ assert(covering.bbox.xmax == Seq("test_cov2", "xmax"))
+ assert(covering.bbox.ymax == Seq("test_cov2", "ymax"))
+ }
+ }
}
def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue =>
Unit): Unit = {