This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a835c967e [GLUTEN-11550][VL][UT] Enable Variant test suites (#11726)
7a835c967e is described below

commit 7a835c967eed2458bfb621f41e916b46abe1b379
Author: Chang Chen <[email protected]>
AuthorDate: Fri Mar 20 20:14:51 2026 +0800

    [GLUTEN-11550][VL][UT] Enable Variant test suites (#11726)
    
    Enable GlutenVariantEndToEndSuite, GlutenVariantShreddingSuite, and
    GlutenParquetVariantShreddingSuite for both spark40 and spark41.
    
    Changes:
    1. VeloxValidatorApi: Detect variant shredded structs produced by
       Spark's PushVariantIntoScan (checking __VARIANT_METADATA_KEY
       metadata) to trigger fallback to Spark's native Parquet reader.
    
    2. ParquetMetadataUtils: Add variant annotation check in
       isUnsupportedMetadata, gated by parquetMetadataValidationEnabled.
       Reads the same footer, no extra I/O.
    
    3. Spark41Shims: Add shouldFallbackForParquetVariantAnnotation to
       detect Parquet variant logical type annotations.
    
    4. GlutenParquetVariantShreddingSuite (spark41): Set
       parquetMetadataValidationEnabled=true to enable variant
       annotation fallback detection.
    
    5. pom.xml: Add -Dfile.encoding=UTF-8 to test JVM args. On JDK 17
       with LANG=C (CI containers centos-8/9), the default charset is
       US-ASCII causing garbled output for multi-byte characters. JDK
       18+ defaults to UTF-8 via JEP 400.
       See: 
https://github.com/apache/spark/blob/v4.0.1/common/variant/src/main/java/org/apache/spark/types/variant/VariantUtil.java#L508
    
    Co-authored-by: Copilot <[email protected]>
---
 .../gluten/backendsapi/velox/VeloxBackend.scala       |  7 +++----
 .../gluten/backendsapi/velox/VeloxValidatorApi.scala  |  9 +++++++++
 .../apache/gluten/utils/ParquetMetadataUtils.scala    |  6 ++++++
 .../apache/gluten/utils/velox/VeloxTestSettings.scala |  4 ++--
 .../apache/gluten/utils/velox/VeloxTestSettings.scala |  6 +++---
 .../parquet/GlutenParquetVariantShreddingSuite.scala  |  9 ++++++++-
 pom.xml                                               |  1 +
 .../org/apache/gluten/sql/shims/SparkShims.scala      |  2 ++
 .../gluten/sql/shims/spark41/Spark41Shims.scala       | 19 ++++++++++++++++++-
 9 files changed, 52 insertions(+), 11 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 6d5a2a6c2a..9694c8a3ae 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -209,10 +209,9 @@ object VeloxBackendSettings extends BackendSettingsApi {
       }
       val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit
       val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), 
SQLConf.get)
-      val parquetMetadataValidationResult =
-        ParquetMetadataUtils.validateMetadata(rootPaths, hadoopConf, 
parquetOptions, fileLimit)
-      parquetMetadataValidationResult.map(
-        reason => s"Detected unsupported metadata in parquet files: $reason")
+      ParquetMetadataUtils
+        .validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit)
+        .map(reason => s"Detected unsupported metadata in parquet files: 
$reason")
     }
 
     def validateDataSchema(): Option[String] = {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
index 2d2193a538..1676e91d17 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
@@ -121,6 +121,15 @@ object VeloxValidatorApi {
       case map: MapType =>
         validateSchema(map.keyType).orElse(validateSchema(map.valueType))
       case struct: StructType =>
+        // Detect variant shredded struct produced by Spark's 
PushVariantIntoScan.
+        // These structs have all fields annotated with __VARIANT_METADATA_KEY 
metadata.
+        // Velox cannot read the variant shredding encoding in Parquet files.
+        if (
+          struct.fields.nonEmpty &&
+          struct.fields.forall(_.metadata.contains("__VARIANT_METADATA_KEY"))
+        ) {
+          return Some(s"Variant shredded struct is not supported: $struct")
+        }
         struct.foreach {
           field =>
             val reason = validateSchema(field.dataType)
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
index ab76cba4aa..9864f42a9f 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
@@ -148,6 +148,12 @@ object ParquetMetadataUtils extends Logging {
       isTimezoneFoundInMetadata(footer, parquetOptions)
     )
 
+    // Variant annotation check: Velox native reader does not check variant 
annotations,
+    // so fallback to vanilla Spark when detected.
+    if 
(SparkShimLoader.getSparkShims.shouldFallbackForParquetVariantAnnotation(footer))
 {
+      return Some("Variant annotation detected in Parquet file.")
+    }
+
     for (check <- validationChecks) {
       if (check.isDefined) {
         return check
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 2f2a99a1a5..64aca6fa22 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -857,8 +857,8 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenUDTRegistrationSuite]
   enableSuite[GlutenUnsafeRowSuite]
   enableSuite[GlutenUserDefinedTypeSuite]
-  // TODO: 4.x enableSuite[GlutenVariantEndToEndSuite]  // 3 failures
-  // TODO: 4.x enableSuite[GlutenVariantShreddingSuite]  // 8 failures
+  enableSuite[GlutenVariantEndToEndSuite]
+  enableSuite[GlutenVariantShreddingSuite]
   enableSuite[GlutenVariantSuite]
   enableSuite[GlutenVariantWriteShreddingSuite]
   enableSuite[GlutenXmlFunctionsSuite]
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index cbbe612553..4e14df0d8f 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -399,7 +399,7 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("parquet widening conversion ShortType -> DecimalType(20,0)")
     .exclude("parquet widening conversion ShortType -> DecimalType(38,0)")
     .exclude("parquet widening conversion ShortType -> DoubleType")
-  // TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite]  // 1 failure
+  enableSuite[GlutenParquetVariantShreddingSuite]
   // Generated suites for org.apache.spark.sql.execution.datasources.text
   // TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite]  // 1 failure
   // TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite]  // 1 failure
@@ -822,8 +822,8 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenUDTRegistrationSuite]
   enableSuite[GlutenUnsafeRowSuite]
   enableSuite[GlutenUserDefinedTypeSuite]
-  // TODO: 4.x enableSuite[GlutenVariantEndToEndSuite]  // 3 failures
-  // TODO: 4.x enableSuite[GlutenVariantShreddingSuite]  // 8 failures
+  enableSuite[GlutenVariantEndToEndSuite]
+  enableSuite[GlutenVariantShreddingSuite]
   enableSuite[GlutenVariantSuite]
   enableSuite[GlutenVariantWriteShreddingSuite]
   enableSuite[GlutenXmlFunctionsSuite]
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala
index ea9d5a7418..deb92a27fd 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala
@@ -16,8 +16,15 @@
  */
 package org.apache.spark.sql.execution.datasources.parquet
 
+import org.apache.gluten.config.GlutenConfig
+
 import org.apache.spark.sql.GlutenSQLTestsTrait
 
 class GlutenParquetVariantShreddingSuite
   extends ParquetVariantShreddingSuite
-  with GlutenSQLTestsTrait {}
+  with GlutenSQLTestsTrait {
+
+  override def sparkConf: org.apache.spark.SparkConf = {
+    
super.sparkConf.set(GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_ENABLED.key,
 "true")
+  }
+}
diff --git a/pom.xml b/pom.xml
index f6b10ba991..2227f9d353 100644
--- a/pom.xml
+++ b/pom.xml
@@ -167,6 +167,7 @@
       --add-opens=java.base/sun.util.calendar=ALL-UNNAMED
       -Djdk.reflect.useDirectMethodHandle=false
       -Dio.netty.tryReflectionSetAccessible=true
+      -Dfile.encoding=UTF-8
     </extraJavaTestArgs>
     <log4j.conf>file:src/test/resources/log4j2.properties</log4j.conf>
   </properties>
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 1f6d015393..2f5350f38a 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -237,6 +237,8 @@ trait SparkShims {
 
   def isParquetFileEncrypted(footer: ParquetMetadata): Boolean
 
+  def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata): 
Boolean = false
+
   def getOtherConstantMetadataColumnValues(file: PartitionedFile): 
JMap[String, Object] =
     Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]]
 
diff --git 
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
 
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
index 0e3e752f99..5ff9d51c71 100644
--- 
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
+++ 
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
@@ -53,7 +53,7 @@ import org.apache.spark.sql.types._
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.parquet.hadoop.metadata.{CompressionCodecName, 
ParquetMetadata}
 import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
-import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, 
MessageType}
 
 import java.time.ZoneOffset
 import java.util.{Map => JMap}
@@ -571,6 +571,23 @@ class Spark41Shims extends SparkShims {
     }
   }
 
+  override def shouldFallbackForParquetVariantAnnotation(footer: 
ParquetMetadata): Boolean = {
+    if (SQLConf.get.getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION)) {
+      false
+    } else {
+      containsVariantAnnotation(footer.getFileMetaData.getSchema)
+    }
+  }
+
+  private def containsVariantAnnotation(groupType: GroupType): Boolean = {
+    groupType.getFields.asScala.exists {
+      field =>
+        Option(field.getLogicalTypeAnnotation)
+          
.exists(_.isInstanceOf[LogicalTypeAnnotation.VariantLogicalTypeAnnotation]) ||
+        (!field.isPrimitive && containsVariantAnnotation(field.asGroupType()))
+    }
+  }
+
   override def getOtherConstantMetadataColumnValues(file: PartitionedFile): 
JMap[String, Object] =
     file.otherConstantMetadataColumnValues.asJava.asInstanceOf[JMap[String, 
Object]]
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to