This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5c320f4854d9 [SPARK-55259][GEO][SQL] Implement Parquet schema
conversion for Geo types
5c320f4854d9 is described below
commit 5c320f4854d9e3312c19c228fdcc5a5a86345f11
Author: Uros Bojanic <[email protected]>
AuthorDate: Thu Jan 29 22:50:42 2026 +0800
[SPARK-55259][GEO][SQL] Implement Parquet schema conversion for Geo types
### What changes were proposed in this pull request?
Add bidirectional schema conversion support between Spark and Parquet for
geospatial types (`GeometryType` and `GeographyType`).
### Why are the changes needed?
Parquet schema conversion will serve as the foundation for read/write
support with geo columns.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added comprehensive tests for Parquet from/to Catalyst conversion for
GEOMETRY and GEOGRAPHY.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54037 from uros-db/geo-parquet-schema.
Authored-by: Uros Bojanic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../parquet/ParquetSchemaConverter.scala | 24 ++
.../datasources/parquet/ParquetSchemaSuite.scala | 241 +++++++++++++++++++++
2 files changed, 265 insertions(+)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
index 9e6f4447ca79..183a45f1cf23 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
import java.util.Locale
import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.column.schema.EdgeInterpolationAlgorithm
import org.apache.parquet.io.{ColumnIO, ColumnIOFactory, GroupColumnIO,
PrimitiveColumnIO}
import org.apache.parquet.schema._
import org.apache.parquet.schema.LogicalTypeAnnotation._
@@ -31,6 +32,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.VariantMetadata
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.{EdgeInterpolationAlgorithm =>
SparkEdgeInterpolationAlgorithm}
/**
* This converter class is used to convert Parquet [[MessageType]] to Spark
SQL [[StructType]]
@@ -336,6 +338,17 @@ class ParquetToSparkSchemaConverter(
case null => BinaryType
case _: BsonLogicalTypeAnnotation => BinaryType
case _: DecimalLogicalTypeAnnotation => makeDecimalType()
+ case geom: GeometryLogicalTypeAnnotation =>
+
GeometryType(Option(geom.getCrs).getOrElse(LogicalTypeAnnotation.DEFAULT_CRS))
+ case geog: GeographyLogicalTypeAnnotation =>
+ val crs =
Option(geog.getCrs).getOrElse(LogicalTypeAnnotation.DEFAULT_CRS)
+ val sparkAlgorithm = if (geog.getAlgorithm != null) {
+
SparkEdgeInterpolationAlgorithm.fromString(geog.getAlgorithm.toString)
+ .getOrElse(SparkEdgeInterpolationAlgorithm.SPHERICAL)
+ } else {
+ SparkEdgeInterpolationAlgorithm.SPHERICAL
+ }
+ GeographyType(crs, sparkAlgorithm)
case _ => illegalType()
}
@@ -653,6 +666,17 @@ class SparkToParquetSchemaConverter(
Types.primitive(BINARY, repetition)
.as(LogicalTypeAnnotation.stringType()).named(field.name)
+ case geom: GeometryType =>
+ Types.primitive(BINARY, repetition)
+ .as(LogicalTypeAnnotation.geometryType(geom.crs)).named(field.name)
+
+ case geog: GeographyType =>
+ val logicalType = LogicalTypeAnnotation.geographyType(
+ geog.crs,
+ EdgeInterpolationAlgorithm.valueOf(geog.algorithm.toString))
+ Types.primitive(BINARY, repetition)
+ .as(logicalType).named(field.name)
+
case DateType =>
Types.primitive(INT32, repetition)
.as(LogicalTypeAnnotation.dateType()).named(field.name)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 56076175d60e..c8d866d8b2d0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -2204,6 +2204,247 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
Seq("f1", "key_value", "value", "g2", "list",
"element", "h2"))))))))
))))))
+ // ==============================================================
+ // Tests for BINARY geospatial logical types (Geometry/Geography)
+ // ==============================================================
+
+ /** Parquet to Catalyst conversion for geospatial types. */
+
+ testParquetToCatalyst(
+ "Parquet to Catalyst - BINARY with GEOMETRY logical type annotation",
+ StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))),
+ """message root {
+ | optional binary f1 (GEOMETRY);
+ |}
+ """.stripMargin,
+ binaryAsString = false,
+ int96AsTimestamp = true)
+
+ testParquetToCatalyst(
+ "Parquet to Catalyst - BINARY with GEOMETRY logical type annotation (with
CRS)",
+ StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))),
+ """message root {
+ | optional binary f1 (GEOMETRY(OGC:CRS84));
+ |}
+ """.stripMargin,
+ binaryAsString = false,
+ int96AsTimestamp = true)
+
+ testParquetToCatalyst(
+ "Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation",
+ StructType(Seq(StructField("f1",
+ GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))),
+ """message root {
+ | optional binary f1 (GEOGRAPHY);
+ |}
+ """.stripMargin,
+ binaryAsString = false,
+ int96AsTimestamp = true)
+
+ testParquetToCatalyst(
+ "Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation (with
CRS)",
+ StructType(Seq(StructField("f1",
+ GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))),
+ """message root {
+ | optional binary f1 (GEOGRAPHY(OGC:CRS84));
+ |}
+ """.stripMargin,
+ binaryAsString = false,
+ int96AsTimestamp = true)
+
+ testParquetToCatalyst(
+ "Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation (with
CRS and algorithm)",
+ StructType(Seq(StructField("f1",
+ GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))),
+ """message root {
+ | optional binary f1 (GEOGRAPHY(OGC:CRS84, SPHERICAL));
+ |}
+ """.stripMargin,
+ binaryAsString = false,
+ int96AsTimestamp = true)
+
+ testParquetToCatalyst(
+ "Parquet to Catalyst - BINARY with GEOMETRY logical type annotation with
binaryAsString",
+ StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))),
+ """message root {
+ | optional binary f1 (GEOMETRY);
+ |}
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true)
+
+ testParquetToCatalyst(
+ "Parquet to Catalyst - BINARY with GEOGRAPHY logical type annotation with
binaryAsString",
+ StructType(Seq(StructField("f1",
+ GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))),
+ """message root {
+ | optional binary f1 (GEOGRAPHY);
+ |}
+ """.stripMargin,
+ binaryAsString = true,
+ int96AsTimestamp = true)
+
+ /** Catalyst to Parquet conversion for geospatial types. */
+
+ testCatalystToParquet(
+ "Catalyst to Parquet - GeometryType",
+ StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))),
+ """message root {
+ | optional binary f1 (GEOMETRY);
+ |}
+ """.stripMargin,
+ writeLegacyParquetFormat = false)
+
+ testCatalystToParquet(
+ "Catalyst to Parquet - GeographyType",
+ StructType(Seq(StructField("f1",
+ GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))),
+ """message root {
+ | optional binary f1 (GEOGRAPHY);
+ |}
+ """.stripMargin,
+ writeLegacyParquetFormat = false)
+
+ testCatalystToParquet(
+ "Catalyst to Parquet - GeometryType with non-nullable field",
+ StructType(Seq(StructField("f1", GeometryType("OGC:CRS84"), nullable =
false))),
+ """message root {
+ | required binary f1 (GEOMETRY);
+ |}
+ """.stripMargin,
+ writeLegacyParquetFormat = false)
+
+ testCatalystToParquet(
+ "Catalyst to Parquet - GeographyType with non-nullable field",
+ StructType(Seq(StructField("f1",
+ GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL),
nullable = false))),
+ """message root {
+ | required binary f1 (GEOGRAPHY);
+ |}
+ """.stripMargin,
+ writeLegacyParquetFormat = false)
+
+ /** Round trip conversion for geospatial types. */
+
+ testSchema(
+ "Round-trip schema conversion - GeometryType",
+ StructType(Seq(StructField("f1", GeometryType("OGC:CRS84")))),
+ """message root {
+ | optional binary f1 (GEOMETRY);
+ |}
+ """.stripMargin,
+ binaryAsString = false,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
+
+ testSchema(
+ "Round-trip schema conversion - GeographyType",
+ StructType(Seq(StructField("f1",
+ GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL)))),
+ """message root {
+ | optional binary f1 (GEOGRAPHY);
+ |}
+ """.stripMargin,
+ binaryAsString = false,
+ int96AsTimestamp = true,
+ writeLegacyParquetFormat = false)
+
+ /** Complex types with geospatial types. */
+
+ testCatalystToParquet(
+ "Complex types with GeometryType - array element",
+ StructType(Seq(
+ StructField("f1", ArrayType(GeometryType("OGC:CRS84"), containsNull =
true))
+ )),
+ """message root {
+ | optional group f1 (LIST) {
+ | repeated group list {
+ | optional binary element (GEOMETRY);
+ | }
+ | }
+ |}
+ """.stripMargin,
+ writeLegacyParquetFormat = false)
+
+ testCatalystToParquet(
+ "Complex types with GeographyType - array element",
+ StructType(Seq(
+ StructField("f1", ArrayType(
+ GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL),
containsNull = true))
+ )),
+ """message root {
+ | optional group f1 (LIST) {
+ | repeated group list {
+ | optional binary element (GEOGRAPHY);
+ | }
+ | }
+ |}
+ """.stripMargin,
+ writeLegacyParquetFormat = false)
+
+ testCatalystToParquet(
+ "Complex types with GeometryType - nested struct",
+ StructType(Seq(
+ StructField("outer", StructType(Seq(
+ StructField("geom", GeometryType("OGC:CRS84"))
+ )))
+ )),
+ """message root {
+ | optional group outer {
+ | optional binary geom (GEOMETRY);
+ | }
+ |}
+ """.stripMargin,
+ writeLegacyParquetFormat = false)
+
+ testCatalystToParquet(
+ "Complex types with GeographyType - nested struct",
+ StructType(Seq(
+ StructField("outer", StructType(Seq(
+ StructField("geog", GeographyType("OGC:CRS84",
EdgeInterpolationAlgorithm.SPHERICAL))
+ )))
+ )),
+ """message root {
+ | optional group outer {
+ | optional binary geog (GEOGRAPHY);
+ | }
+ |}
+ """.stripMargin,
+ writeLegacyParquetFormat = false)
+
+ testCatalystToParquet(
+ "Complex types with GeometryType - map value",
+ StructType(Seq(
+ StructField("f1", MapType(StringType, GeometryType("OGC:CRS84"),
valueContainsNull = true))
+ )),
+ """message root {
+ | optional group f1 (MAP) {
+ | repeated group key_value {
+ | required binary key (UTF8);
+ | optional binary value (GEOMETRY);
+ | }
+ | }
+ |}
+ """.stripMargin,
+ writeLegacyParquetFormat = false)
+
+ testCatalystToParquet(
+ "Complex types with GeographyType - map value",
+ StructType(Seq(
+ StructField("f1", MapType(StringType,
+ GeographyType("OGC:CRS84", EdgeInterpolationAlgorithm.SPHERICAL),
valueContainsNull = true))
+ )),
+ """message root {
+ | optional group f1 (MAP) {
+ | repeated group key_value {
+ | required binary key (UTF8);
+ | optional binary value (GEOGRAPHY);
+ | }
+ | }
+ |}
+ """.stripMargin,
+ writeLegacyParquetFormat = false)
+
// =================================
// Tests for conversion for decimals
// =================================
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]