This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 7a835c967e [GLUTEN-11550][VL][UT] Enable Variant test suites (#11726)
7a835c967e is described below
commit 7a835c967eed2458bfb621f41e916b46abe1b379
Author: Chang Chen <[email protected]>
AuthorDate: Fri Mar 20 20:14:51 2026 +0800
[GLUTEN-11550][VL][UT] Enable Variant test suites (#11726)
Enable GlutenVariantEndToEndSuite, GlutenVariantShreddingSuite, and
GlutenParquetVariantShreddingSuite for both spark40 and spark41.
Changes:
1. VeloxValidatorApi: Detect variant shredded structs produced by
Spark's PushVariantIntoScan (checking __VARIANT_METADATA_KEY
metadata) to trigger fallback to Spark's native Parquet reader.
2. ParquetMetadataUtils: Add variant annotation check in
isUnsupportedMetadata, gated by parquetMetadataValidationEnabled.
Reads the same footer, no extra I/O.
3. Spark41Shims: Add shouldFallbackForParquetVariantAnnotation to
detect Parquet variant logical type annotations.
4. GlutenParquetVariantShreddingSuite (spark41): Set
parquetMetadataValidationEnabled=true to enable variant
annotation fallback detection.
5. pom.xml: Add -Dfile.encoding=UTF-8 to test JVM args. On JDK 17
with LANG=C (CI containers centos-8/9), the default charset is
US-ASCII causing garbled output for multi-byte characters. JDK
18+ defaults to UTF-8 via JEP 400.
See:
https://github.com/apache/spark/blob/v4.0.1/common/variant/src/main/java/org/apache/spark/types/variant/VariantUtil.java#L508
Co-authored-by: Copilot <[email protected]>
---
.../gluten/backendsapi/velox/VeloxBackend.scala | 7 +++----
.../gluten/backendsapi/velox/VeloxValidatorApi.scala | 9 +++++++++
.../apache/gluten/utils/ParquetMetadataUtils.scala | 6 ++++++
.../apache/gluten/utils/velox/VeloxTestSettings.scala | 4 ++--
.../apache/gluten/utils/velox/VeloxTestSettings.scala | 6 +++---
.../parquet/GlutenParquetVariantShreddingSuite.scala | 9 ++++++++-
pom.xml | 1 +
.../org/apache/gluten/sql/shims/SparkShims.scala | 2 ++
.../gluten/sql/shims/spark41/Spark41Shims.scala | 19 ++++++++++++++++++-
9 files changed, 52 insertions(+), 11 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 6d5a2a6c2a..9694c8a3ae 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -209,10 +209,9 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit
val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties),
SQLConf.get)
- val parquetMetadataValidationResult =
- ParquetMetadataUtils.validateMetadata(rootPaths, hadoopConf,
parquetOptions, fileLimit)
- parquetMetadataValidationResult.map(
- reason => s"Detected unsupported metadata in parquet files: $reason")
+ ParquetMetadataUtils
+ .validateMetadata(rootPaths, hadoopConf, parquetOptions, fileLimit)
+ .map(reason => s"Detected unsupported metadata in parquet files:
$reason")
}
def validateDataSchema(): Option[String] = {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
index 2d2193a538..1676e91d17 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
@@ -121,6 +121,15 @@ object VeloxValidatorApi {
case map: MapType =>
validateSchema(map.keyType).orElse(validateSchema(map.valueType))
case struct: StructType =>
+ // Detect variant shredded struct produced by Spark's
PushVariantIntoScan.
+ // These structs have all fields annotated with __VARIANT_METADATA_KEY
metadata.
+ // Velox cannot read the variant shredding encoding in Parquet files.
+ if (
+ struct.fields.nonEmpty &&
+ struct.fields.forall(_.metadata.contains("__VARIANT_METADATA_KEY"))
+ ) {
+ return Some(s"Variant shredded struct is not supported: $struct")
+ }
struct.foreach {
field =>
val reason = validateSchema(field.dataType)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
index ab76cba4aa..9864f42a9f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
@@ -148,6 +148,12 @@ object ParquetMetadataUtils extends Logging {
isTimezoneFoundInMetadata(footer, parquetOptions)
)
+ // Variant annotation check: Velox native reader does not check variant
annotations,
+ // so fallback to vanilla Spark when detected.
+ if
(SparkShimLoader.getSparkShims.shouldFallbackForParquetVariantAnnotation(footer))
{
+ return Some("Variant annotation detected in Parquet file.")
+ }
+
for (check <- validationChecks) {
if (check.isDefined) {
return check
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 2f2a99a1a5..64aca6fa22 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -857,8 +857,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenUDTRegistrationSuite]
enableSuite[GlutenUnsafeRowSuite]
enableSuite[GlutenUserDefinedTypeSuite]
- // TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures
- // TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures
+ enableSuite[GlutenVariantEndToEndSuite]
+ enableSuite[GlutenVariantShreddingSuite]
enableSuite[GlutenVariantSuite]
enableSuite[GlutenVariantWriteShreddingSuite]
enableSuite[GlutenXmlFunctionsSuite]
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index cbbe612553..4e14df0d8f 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -399,7 +399,7 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("parquet widening conversion ShortType -> DecimalType(20,0)")
.exclude("parquet widening conversion ShortType -> DecimalType(38,0)")
.exclude("parquet widening conversion ShortType -> DoubleType")
- // TODO: 4.x enableSuite[GlutenParquetVariantShreddingSuite] // 1 failure
+ enableSuite[GlutenParquetVariantShreddingSuite]
// Generated suites for org.apache.spark.sql.execution.datasources.text
// TODO: 4.x enableSuite[GlutenWholeTextFileV1Suite] // 1 failure
// TODO: 4.x enableSuite[GlutenWholeTextFileV2Suite] // 1 failure
@@ -822,8 +822,8 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenUDTRegistrationSuite]
enableSuite[GlutenUnsafeRowSuite]
enableSuite[GlutenUserDefinedTypeSuite]
- // TODO: 4.x enableSuite[GlutenVariantEndToEndSuite] // 3 failures
- // TODO: 4.x enableSuite[GlutenVariantShreddingSuite] // 8 failures
+ enableSuite[GlutenVariantEndToEndSuite]
+ enableSuite[GlutenVariantShreddingSuite]
enableSuite[GlutenVariantSuite]
enableSuite[GlutenVariantWriteShreddingSuite]
enableSuite[GlutenXmlFunctionsSuite]
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala
index ea9d5a7418..deb92a27fd 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetVariantShreddingSuite.scala
@@ -16,8 +16,15 @@
*/
package org.apache.spark.sql.execution.datasources.parquet
+import org.apache.gluten.config.GlutenConfig
+
import org.apache.spark.sql.GlutenSQLTestsTrait
class GlutenParquetVariantShreddingSuite
extends ParquetVariantShreddingSuite
- with GlutenSQLTestsTrait {}
+ with GlutenSQLTestsTrait {
+
+ override def sparkConf: org.apache.spark.SparkConf = {
+
super.sparkConf.set(GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_ENABLED.key,
"true")
+ }
+}
diff --git a/pom.xml b/pom.xml
index f6b10ba991..2227f9d353 100644
--- a/pom.xml
+++ b/pom.xml
@@ -167,6 +167,7 @@
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED
-Djdk.reflect.useDirectMethodHandle=false
-Dio.netty.tryReflectionSetAccessible=true
+ -Dfile.encoding=UTF-8
</extraJavaTestArgs>
<log4j.conf>file:src/test/resources/log4j2.properties</log4j.conf>
</properties>
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 1f6d015393..2f5350f38a 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -237,6 +237,8 @@ trait SparkShims {
def isParquetFileEncrypted(footer: ParquetMetadata): Boolean
+ def shouldFallbackForParquetVariantAnnotation(footer: ParquetMetadata):
Boolean = false
+
def getOtherConstantMetadataColumnValues(file: PartitionedFile):
JMap[String, Object] =
Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]]
diff --git
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
index 0e3e752f99..5ff9d51c71 100644
---
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
+++
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
@@ -53,7 +53,7 @@ import org.apache.spark.sql.types._
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.parquet.hadoop.metadata.{CompressionCodecName,
ParquetMetadata}
import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
-import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation,
MessageType}
import java.time.ZoneOffset
import java.util.{Map => JMap}
@@ -571,6 +571,23 @@ class Spark41Shims extends SparkShims {
}
}
+ override def shouldFallbackForParquetVariantAnnotation(footer:
ParquetMetadata): Boolean = {
+ if (SQLConf.get.getConf(SQLConf.PARQUET_IGNORE_VARIANT_ANNOTATION)) {
+ false
+ } else {
+ containsVariantAnnotation(footer.getFileMetaData.getSchema)
+ }
+ }
+
+ private def containsVariantAnnotation(groupType: GroupType): Boolean = {
+ groupType.getFields.asScala.exists {
+ field =>
+ Option(field.getLogicalTypeAnnotation)
+
.exists(_.isInstanceOf[LogicalTypeAnnotation.VariantLogicalTypeAnnotation]) ||
+ (!field.isPrimitive && containsVariantAnnotation(field.asGroupType()))
+ }
+ }
+
override def getOtherConstantMetadataColumnValues(file: PartitionedFile):
JMap[String, Object] =
file.otherConstantMetadataColumnValues.asJava.asInstanceOf[JMap[String,
Object]]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]