This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 09b95b4804 [GH-2240] Fix write and read nested geometry array using
vectorized parquet reader (#2359)
09b95b4804 is described below
commit 09b95b480480137716a6db12b7c05962ef0378b6
Author: Feng Zhang <[email protected]>
AuthorDate: Tue Sep 23 13:45:18 2025 -0700
[GH-2240] Fix write and read nested geometry array using vectorized parquet
reader (#2359)
* Fix write and read nested geometry array using vectorized parquet reader
* remove temp test files
* remove printout in tests
* rename FixNestedUDTInParquetRule
* remove unused file
* add type check cache
* optimize regular parquet reader path
* rename hasHash method
* rename to be more clear
* add more comments
* fix spark 4.0 compilation error
* fix test
* address copilot comments
* revert changes to ParquetColumnVector.java
* add futher tests on read back dataframe
---
.../org/apache/sedona/spark/SedonaContext.scala | 2 +
.../sedona_sql/UDT/TransformNestedUDTParquet.scala | 145 +++++++++++++++++++++
.../org/apache/sedona/sql/geoparquetIOTests.scala | 101 ++++++++++++++
3 files changed, 248 insertions(+)
diff --git
a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
index 9619837691..b0e46cf6e9 100644
--- a/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
+++ b/spark/common/src/main/scala/org/apache/sedona/spark/SedonaContext.scala
@@ -27,6 +27,7 @@ import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkStrategy
+import org.apache.spark.sql.sedona_sql.UDT.TransformNestedUDTParquet
import org.apache.spark.sql.sedona_sql.optimization._
import org.apache.spark.sql.sedona_sql.strategy.join.JoinQueryDetector
import
org.apache.spark.sql.sedona_sql.strategy.physical.function.EvalPhysicalFunctionStrategy
@@ -43,6 +44,7 @@ object SedonaContext {
private def customOptimizationsWithSession(sparkSession: SparkSession) =
Seq(
+ new TransformNestedUDTParquet(sparkSession),
new SpatialFilterPushDownForGeoParquet(sparkSession),
new SpatialTemporalFilterPushDownForStacScan(sparkSession))
diff --git
a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/TransformNestedUDTParquet.scala
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/TransformNestedUDTParquet.scala
new file mode 100644
index 0000000000..902a16d4b2
--- /dev/null
+++
b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/TransformNestedUDTParquet.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.UDT
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation,
LogicalRelation}
+import org.apache.spark.sql.types._
+
+/**
+ * Catalyst rule that automatically transforms schemas with nested GeometryUDT
to prevent
+ * SPARK-48942 errors in Parquet reading.
+ *
+ * This rule detects LogicalRelations that use ParquetFileFormat and have
nested GeometryUDT in
+ * their schema, then transforms the schema to use BinaryType instead.
+ */
+class TransformNestedUDTParquet(spark: SparkSession) extends Rule[LogicalPlan]
{
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.transformUp {
+ case lr: LogicalRelation
+ if lr.relation.isInstanceOf[HadoopFsRelation] &&
+ lr.relation
+ .asInstanceOf[HadoopFsRelation]
+ .fileFormat
+ .isInstanceOf[ParquetFileFormat] &&
+ hasNestedGeometryUDT(lr.schema) =>
+ val relation = lr.relation.asInstanceOf[HadoopFsRelation]
+
+ // Transform the schema to use BinaryType for nested GeometryUDT
+ val transformedSchema = transformSchemaForNestedUDT(lr.schema)
+
+ // Create new AttributeReferences with transformed data types
+ val transformedAttributes = transformedSchema.fields.zipWithIndex.map {
+ case (field, index) =>
+ val originalAttr = lr.output(index)
+ AttributeReference(field.name, field.dataType, field.nullable,
field.metadata)(
+ originalAttr.exprId,
+ originalAttr.qualifier)
+ }
+ lr.copy(output = transformedAttributes)
+
+ case other => other
+ }
+ }
+
+ /**
+ * Checks if a schema contains nested GeometryUDT fields, meaning
GeometryUDT instances that are
+ * inside arrays, maps, or structs (i.e., not at the top level of the
schema). Top-level
+ * GeometryUDT fields (fields whose type is GeometryUDT directly in the
StructType) are NOT
+ * considered nested and do not trigger the transformation. This distinction
is important
+ * because the SPARK-48942 Parquet bug only affects nested UDTs, not
top-level ones. Therefore,
+ * this method returns true only if a GeometryUDT is found inside a
container type, ensuring
+ * that only affected fields are transformed.
+ */
+ private def hasNestedGeometryUDT(schema: StructType): Boolean = {
+ schema.fields.exists(field => hasNestedGeometryUDTInType(field.dataType,
isTopLevel = true))
+ }
+
+ /**
+ * Recursively check if a data type contains nested GeometryUDT.
+ * @param dataType
+ * the data type to check
+ * @param isTopLevel
+ * true if this is a top-level field, false if nested inside a container
+ * @return
+ * true if nested GeometryUDT is found, false otherwise
+ */
+ private def hasNestedGeometryUDTInType(dataType: DataType, isTopLevel:
Boolean): Boolean = {
+ dataType match {
+ case _: GeometryUDT => !isTopLevel // GeometryUDT is "nested" only if
NOT at top level
+ case ArrayType(elementType, _) =>
+ hasNestedGeometryUDTInType(elementType, isTopLevel = false)
+ case MapType(keyType, valueType, _) =>
+ hasNestedGeometryUDTInType(keyType, isTopLevel = false) ||
+ hasNestedGeometryUDTInType(valueType, isTopLevel = false)
+ case StructType(fields) =>
+ fields.exists(field => hasNestedGeometryUDTInType(field.dataType,
isTopLevel = false))
+ case _ => false
+ }
+ }
+
+ /**
+ * Transform a schema to handle nested UDT by processing each field. This
preserves top-level
+ * GeometryUDT fields while transforming nested ones to BinaryType.
+ */
+ private def transformSchemaForNestedUDT(schema: StructType): StructType = {
+ StructType(schema.fields.map(field =>
+ field.copy(dataType = transformDataType(field.dataType, isTopLevel =
true))))
+ }
+
+ /**
+ * Recursively transform data types based on nesting level.
+ * @param dataType
+ * the data type to transform
+ * @param isTopLevel
+ * true if this is a top-level field (preserves GeometryUDT), false if
nested (converts
+ * GeometryUDT to BinaryType)
+ * @return
+ * transformed data type
+ */
+ private def transformDataType(dataType: DataType, isTopLevel: Boolean):
DataType = {
+ dataType match {
+ case _: GeometryUDT =>
+ if (isTopLevel) dataType else BinaryType // Preserve at top-level,
convert if nested
+
+ case ArrayType(elementType, containsNull) =>
+ ArrayType(transformDataType(elementType, isTopLevel = false),
containsNull)
+
+ case MapType(keyType, valueType, valueContainsNull) =>
+ MapType(
+ transformDataType(keyType, isTopLevel = false),
+ transformDataType(valueType, isTopLevel = false),
+ valueContainsNull)
+
+ case StructType(fields) =>
+ StructType(fields.map(field =>
+ field.copy(dataType = transformDataType(field.dataType, isTopLevel =
false))))
+
+ case udt: UserDefinedType[_] if !isTopLevel =>
+ transformDataType(udt.sqlType, isTopLevel = false)
+
+ case other => other
+ }
+ }
+}
diff --git
a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
index 369feeb22a..c3ef8dd89e 100644
--- a/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
+++ b/spark/common/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala
@@ -784,6 +784,107 @@ class geoparquetIOTests extends TestBaseScala with
BeforeAndAfterAll {
}
}
+ describe("Fix SPARK-48942 reading parquet with array of structs of UDTs
workaround") {
+ it("should handle array of struct with geometry UDT") {
+ // This test reproduces the issue described in SPARK-48942
+ // https://issues.apache.org/jira/browse/SPARK-48942
+ // where reading back nested geometry from Parquet with PySpark 3.5 fails
+ val testPath = geoparquetoutputlocation + "/spark_48942_test.parquet"
+
+ // Create DataFrame with array of struct containing geometry
+ val df = sparkSession.sql("""
+ SELECT ARRAY(STRUCT(ST_POINT(1.0, 1.1) AS geometry)) AS
nested_geom_array
+ """)
+
+ // Write to Parquet
+ df.write.mode("overwrite").format("parquet").save(testPath)
+
+ // The fix allows vectorized reading to handle UDT compatibility properly
+ val readDf = sparkSession.read.format("parquet").load(testPath)
+
+ // Verify the geometry data is correct and accessible
+ val result = readDf.collect()
+ assert(result.length == 1)
+ val nestedArray = result(0).getSeq[Any](0)
+ assert(nestedArray.length == 1)
+
+ // Verify we can perform geometric operations on the read data
+ // This tests that the nested geometry is functionally correct after the
SPARK-48942 fix
+ readDf.createOrReplaceTempView("nested_geom_test")
+ val extractedGeom = sparkSession.sql("""
+ SELECT nested_geom_array[0].geometry as extracted_geometry
+ FROM nested_geom_test
+ """)
+
+ val extractedResult = extractedGeom.collect()
+ assert(extractedResult.length == 1)
+ assert(extractedResult(0).get(0) != null, "Extracted geometry should not
be null")
+
+ // Test that we can use the extracted geometry in spatial functions
+ extractedGeom.createOrReplaceTempView("extracted_test")
+ val spatialTest = sparkSession.sql("""
+ SELECT ST_X(extracted_geometry) as x, ST_Y(extracted_geometry) as y
+ FROM extracted_test
+ """)
+
+ val spatialResult = spatialTest.collect()
+ assert(spatialResult.length == 1)
+ assert(spatialResult(0).getDouble(0) == 1.0, "X coordinate should be
1.0")
+ assert(spatialResult(0).getDouble(1) == 1.1, "Y coordinate should be
1.1")
+
+ // Test that we can perform more complex spatial operations
+ val spatialOperations = sparkSession.sql("""
+ SELECT
+ ST_AsText(extracted_geometry) as wkt,
+ ST_GeometryType(extracted_geometry) as geom_type
+ FROM extracted_test
+ """)
+
+ val operationResult = spatialOperations.collect()
+ assert(operationResult.length == 1)
+ assert(
+ operationResult(0).getString(0) == "POINT (1 1.1)",
+ "WKT should match original point")
+ assert(operationResult(0).getString(1) == "ST_Point", "Geometry type
should be ST_Point")
+ }
+
+ it("should reject nested geometry when using GeoParquet format") {
+ // GeoParquet specification requires "Geometry columns MUST be at the
root of the schema"
+ // Therefore, nested geometry should be rejected
+ val testPath = geoparquetoutputlocation +
"/spark_48942_geoparquet_test.geoparquet"
+
+ val df = sparkSession.sql("""
+ SELECT ARRAY(STRUCT(ST_POINT(1.0, 1.1) AS geometry)) AS
nested_geom_array
+ """)
+
+ // Writing nested geometry to GeoParquet should fail according to the
specification
+ assertThrows[SparkException] {
+ df.write.mode("overwrite").format("geoparquet").save(testPath)
+ }
+ }
+
+ it("should handle deeply nested arrays with geometry UDT") {
+ // Test deeply nested arrays: array of array of struct with geometry
+ val testPath = geoparquetoutputlocation +
"/spark_48942_deep_nested_test.parquet"
+
+ val df = sparkSession.sql("""
+ SELECT ARRAY(
+ ARRAY(STRUCT(ST_POINT(1.0, 1.1) AS geometry)),
+ ARRAY(STRUCT(ST_POINT(2.0, 2.1) AS geometry))
+ ) AS deeply_nested_geom_array
+ """)
+
+ // Write to Parquet
+ df.write.mode("overwrite").format("parquet").save(testPath)
+
+ // Read back and verify
+ val readDf = sparkSession.read.format("parquet").load(testPath)
+
+ val result = readDf.collect()
+ assert(result.length == 1)
+ }
+ }
+
def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue =>
Unit): Unit = {
val parquetFiles = new
File(path).listFiles().filter(_.getName.endsWith(".parquet"))
parquetFiles.foreach { filePath =>