This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 77921229cd74 [SPARK-55260][GEO][SQL] Implement Parquet write support 
for Geo types
77921229cd74 is described below

commit 77921229cd74bddc902a8e8c00a71112c64899f0
Author: Uros Bojanic <[email protected]>
AuthorDate: Mon Feb 2 20:10:39 2026 +0800

    [SPARK-55260][GEO][SQL] Implement Parquet write support for Geo types
    
    ### What changes were proposed in this pull request?
    Enable writing Geometry and Geography data to Parquet files.
    
    ### Why are the changes needed?
    Allowing users to persist geospatial data in Parquet format.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, geo data can now be written to Parquet.
    
    ### How was this patch tested?
    Added tests for writing GEOMETRY and GEOGRAPHY to Parquet.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #54039 from uros-db/geo-parquet-write.
    
    Authored-by: Uros Bojanic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../datasources/parquet/ParquetFileFormat.scala    |  4 ++
 .../datasources/parquet/ParquetWriteSupport.scala  | 16 ++++++-
 .../datasources/v2/parquet/ParquetTable.scala      |  4 ++
 .../org/apache/spark/sql/STExpressionsSuite.scala  | 54 ++++++++++++++++++++++
 4 files changed, 77 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index 08e545cb8c20..850bfb4a8985 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -408,6 +408,10 @@ class ParquetFileFormat
   }
 
   override def supportDataType(dataType: DataType): Boolean = dataType match {
+    // GeoSpatial data types in Parquet are limited only to types with 
supported SRIDs.
+    case g: GeometryType => GeometryType.isSridSupported(g.srid)
+    case g: GeographyType => GeographyType.isSridSupported(g.srid)
+
     case _: AtomicType | _: NullType => true
 
     case st: StructType => st.forall { f => supportDataType(f.dataType) }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
index dcaf88fa8dfd..b84148992e32 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala
@@ -34,7 +34,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SPARK_LEGACY_DATETIME_METADATA_KEY, 
SPARK_LEGACY_INT96_METADATA_KEY, SPARK_TIMEZONE_METADATA_KEY, 
SPARK_VERSION_METADATA_KEY}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.catalyst.util.{DateTimeUtils, STUtils}
 import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.types._
@@ -276,6 +276,20 @@ class ParquetWriteSupport extends 
WriteSupport[InternalRow] with Logging {
         (row: SpecializedGetters, ordinal: Int) =>
           
recordConsumer.addBinary(Binary.fromReusedByteArray(row.getBinary(ordinal)))
 
+      case _: GeometryType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          // Data is written to Parquet using the WKB format, as per spec:
+          // https://parquet.apache.org/docs/file-format/types/geospatial/.
+          val wkb = STUtils.stAsBinary(row.getGeometry(ordinal))
+          recordConsumer.addBinary(Binary.fromReusedByteArray(wkb))
+
+      case _: GeographyType =>
+        (row: SpecializedGetters, ordinal: Int) =>
+          // Data is written to Parquet using the WKB format, as per spec:
+          // https://parquet.apache.org/docs/file-format/types/geospatial/.
+          val wkb = STUtils.stAsBinary(row.getGeography(ordinal))
+          recordConsumer.addBinary(Binary.fromReusedByteArray(wkb))
+
       case DecimalType.Fixed(precision, scale) =>
         makeDecimalWriter(precision, scale)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
index 28c5a62f91ec..67052c201a9d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala
@@ -51,6 +51,10 @@ case class ParquetTable(
   }
 
   override def supportsDataType(dataType: DataType): Boolean = dataType match {
+    // GeoSpatial data types in Parquet are limited only to types with 
supported SRIDs.
+    case g: GeometryType => GeometryType.isSridSupported(g.srid)
+    case g: GeographyType => GeographyType.isSridSupported(g.srid)
+
     case _: AtomicType => true
 
     case st: StructType => st.forall { f => supportsDataType(f.dataType) }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala
index 323826ca3820..d7f1ab49e89c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/STExpressionsSuite.scala
@@ -42,6 +42,60 @@ class STExpressionsSuite
     assert(sql(query).schema.fields.head.dataType.sameType(expectedDataType))
   }
 
+  // Test data: WKB representations of POINT(1 2) and POINT(3 4).
+  private final val wkbString1 = "0101000000000000000000F03F0000000000000040"
+  private final val wkbString2 = "010100000000000000000008400000000000001040"
+
+  /** Geospatial type storage. */
+
+  test("Parquet tables - unsupported geospatial types") {
+    val tableName = "tst_tbl"
+    // Test both v1 and v2 data sources.
+    Seq(true, false).foreach { useV1 =>
+      val useV1List = if (useV1) "parquet" else ""
+      withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
+        Seq("GEOMETRY(ANY)", "GEOGRAPHY(ANY)").foreach { unsupportedType =>
+          withTable(tableName) {
+            checkError(
+              exception = intercept[AnalysisException] {
+                sql(s"CREATE TABLE $tableName (g $unsupportedType) USING 
PARQUET")
+              },
+              condition = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
+              parameters = Map(
+                "columnName" -> "`g`",
+                "columnType" -> s""""$unsupportedType"""",
+                "format" -> "Parquet"))
+          }
+        }
+      }
+    }
+  }
+
+  test("Parquet write support for geometry and geography types") {
+    val tableName = "tst_tbl"
+    // Test both v1 and v2 data sources.
+    Seq(true, false).foreach { useV1 =>
+      val useV1List = if (useV1) "parquet" else ""
+      withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
+        withTable(tableName) {
+          sql(s"CREATE TABLE $tableName (geom GEOMETRY(0), geog 
GEOGRAPHY(4326)) USING PARQUET")
+
+          val geomNull = "ST_GeomFromWKB(NULL)"
+          val geomNotNull = s"ST_GeomFromWKB(X'$wkbString1')"
+          val geogNull = "ST_GeogFromWKB(NULL)"
+          val geogNotNull = s"ST_GeogFromWKB(X'$wkbString2')"
+
+          sql(s"INSERT INTO $tableName VALUES ($geomNull, $geogNull)")
+          sql(s"INSERT INTO $tableName VALUES ($geomNotNull, $geogNull)")
+          sql(s"INSERT INTO $tableName VALUES ($geomNull, $geogNotNull)")
+          sql(s"INSERT INTO $tableName VALUES ($geomNotNull, $geogNotNull)")
+
+          checkAnswer(sql(s"SELECT COUNT(*) FROM $tableName"), Seq(Row(4)))
+        }
+      }
+    }
+  }
+
   /** Geospatial type casting. */
 
   test("Cast GEOGRAPHY(srid) to GEOGRAPHY(ANY)") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to