This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d5da832625 [VL] Fallback Parquet scan if legacy timezone is found in 
file metadata (#11117)
d5da832625 is described below

commit d5da832625c97e96595e54f3f5a665bdae9469fa
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Nov 26 09:03:42 2025 +0000

    [VL] Fallback Parquet scan if legacy timezone is found in file metadata 
(#11117)
---
 .../sql/delta/DeltaInsertIntoTableSuite.scala      |  3 +-
 .../gluten/backendsapi/velox/VeloxBackend.scala    | 30 +++----
 .../apache/gluten/utils/ParquetMetadataUtils.scala | 91 ++++++++++++++++------
 docs/Configuration.md                              |  3 +-
 .../org/apache/gluten/config/GlutenConfig.scala    | 29 +++++--
 5 files changed, 112 insertions(+), 44 deletions(-)

diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
index c18e98f7f3..8485957b4a 100644
--- 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
@@ -757,8 +757,7 @@ class DeltaColumnDefaultsInsertSuite extends 
InsertIntoSQLOnlyTests with DeltaSQ
       |  'delta.columnMapping.mode' = 'name'
       |)""".stripMargin
 
-  // Ignored in Gluten: Results mismatch.
-  ignore("Column DEFAULT value support with Delta Lake, positive tests") {
+  test("Column DEFAULT value support with Delta Lake, positive tests") {
     Seq(
       PartitionOverwriteMode.STATIC.toString,
       PartitionOverwriteMode.DYNAMIC.toString
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index e97747b842..ee300372f6 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -183,21 +183,23 @@ object VeloxBackendSettings extends BackendSettingsApi {
       }
     }
 
-    def validateEncryption(): Option[String] = {
-
-      val encryptionValidationEnabled = 
GlutenConfig.get.parquetEncryptionValidationEnabled
-      if (!encryptionValidationEnabled) {
+    def validateMetadata(): Option[String] = {
+      if (format != ParquetReadFormat || rootPaths.isEmpty) {
+        // Only Parquet is needed for metadata validation so far.
         return None
       }
-
-      val fileLimit = GlutenConfig.get.parquetEncryptionValidationFileLimit
-      val encryptionResult =
-        ParquetMetadataUtils.validateEncryption(format, rootPaths, hadoopConf, 
fileLimit)
-      if (encryptionResult.ok()) {
-        None
-      } else {
-        Some(s"Detected encrypted parquet files: ${encryptionResult.reason()}")
-      }
+      val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit
+        .max(GlutenConfig.get.parquetEncryptionValidationFileLimit)
+      val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), 
SQLConf.get)
+      val parquetMetadataValidationResult =
+        ParquetMetadataUtils.validateMetadata(
+          format,
+          rootPaths,
+          hadoopConf,
+          parquetOptions,
+          fileLimit)
+      parquetMetadataValidationResult.map(
+        reason => s"Detected unsupported metadata in parquet files: $reason")
     }
 
     def validateDataSchema(): Option[String] = {
@@ -220,7 +222,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
     val validationChecks = Seq(
       validateScheme(),
       validateFormat(),
-      validateEncryption(),
+      validateMetadata(),
       validateDataSchema()
     )
 
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
index e1c88435b7..b533c029e4 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
@@ -16,18 +16,21 @@
  */
 package org.apache.gluten.utils
 
-import org.apache.gluten.execution.ValidationResult
+import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
-import 
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.ParquetReadFormat
+
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
+import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReader, 
ParquetOptions}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, 
RemoteIterator}
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
 
 object ParquetMetadataUtils {
 
   /**
-   * Validates whether Parquet encryption is enabled for the given paths.
+   * Validates whether Parquet metadata is unsupported for the given paths.
    *
    *   - If the file format is not Parquet, skip this check and return success.
    *   - If there is at least one Parquet file with encryption enabled, fail 
the validation.
@@ -39,37 +42,39 @@ object ParquetMetadataUtils {
    * @param hadoopConf
    *   Hadoop configuration
    * @return
-   *   [[ValidationResult]] validation success or failure
+   *   [[Option[String]]] Empty if the Parquet metadata is supported. Fallback 
reason otherwise.
    */
-  def validateEncryption(
+  def validateMetadata(
       format: ReadFileFormat,
       rootPaths: Seq[String],
       hadoopConf: Configuration,
+      parquetOptions: ParquetOptions,
       fileLimit: Int
-  ): ValidationResult = {
-    if (format != ParquetReadFormat || rootPaths.isEmpty) {
-      return ValidationResult.succeeded
-    }
-
+  ): Option[String] = {
     rootPaths.foreach {
       rootPath =>
         val fs = new Path(rootPath).getFileSystem(hadoopConf)
         try {
-          val encryptionDetected =
-            checkForEncryptionWithLimit(fs, new Path(rootPath), hadoopConf, 
fileLimit = fileLimit)
-          if (encryptionDetected) {
-            return ValidationResult.failed("Encrypted Parquet file detected.")
+          val reason =
+            checkForUnexpectedMetadataWithLimit(
+              fs,
+              new Path(rootPath),
+              hadoopConf,
+              parquetOptions,
+              fileLimit = fileLimit)
+          if (reason.nonEmpty) {
+            return reason
           }
         } catch {
           case e: Exception =>
         }
     }
-    ValidationResult.succeeded
+    None
   }
 
   /**
-   * Check any Parquet file under the given path is encrypted using a 
recursive iterator. Only the
-   * first `fileLimit` files are processed for efficiency.
+   * Check any Parquet file under the given path is with unexpected metadata 
using a recursive
+   * iterator. Only the first `fileLimit` files are processed for efficiency.
    *
    * @param fs
    *   FileSystem to use
@@ -82,21 +87,63 @@ object ParquetMetadataUtils {
    * @return
    *   True if an encrypted file is detected, false otherwise
    */
-  private def checkForEncryptionWithLimit(
+  private def checkForUnexpectedMetadataWithLimit(
       fs: FileSystem,
       path: Path,
       conf: Configuration,
+      parquetOptions: ParquetOptions,
       fileLimit: Int
-  ): Boolean = {
-
+  ): Option[String] = {
+    val isEncryptionValidationEnabled = 
GlutenConfig.get.parquetEncryptionValidationEnabled
     val filesIterator: RemoteIterator[LocatedFileStatus] = fs.listFiles(path, 
true)
     var checkedFileCount = 0
     while (filesIterator.hasNext && checkedFileCount < fileLimit) {
       val fileStatus = filesIterator.next()
       checkedFileCount += 1
-      if (SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, 
conf)) {
-        return true
+      if (
+        isEncryptionValidationEnabled && 
SparkShimLoader.getSparkShims.isParquetFileEncrypted(
+          fileStatus,
+          conf)
+      ) {
+        return Some("Encrypted Parquet file detected.")
       }
+      if (isTimezoneFoundInMetadata(fileStatus, conf, parquetOptions)) {
+        return Some("Legacy timezone found.")
+      }
+    }
+    None
+  }
+
+  /**
+   * Checks whether there are timezones set with Spark key 
SPARK_TIMEZONE_METADATA_KEY in the
+   * Parquet metadata. In this case, the Parquet scan should fall back to 
vanilla Spark since Velox
+   * doesn't yet support Spark legacy datetime.
+   */
+  private def isTimezoneFoundInMetadata(
+      fileStatus: LocatedFileStatus,
+      conf: Configuration,
+      parquetOptions: ParquetOptions): Boolean = {
+    val footerFileMetaData =
+      try {
+        ParquetFooterReader.readFooter(conf, fileStatus, 
SKIP_ROW_GROUPS).getFileMetaData
+      } catch {
+        case _: RuntimeException =>
+          // Ignored as it's could be a "Not a Parquet file" exception.
+          return false
+      }
+    val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
+    val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
+    val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
+      footerFileMetaData.getKeyValueMetaData.get,
+      datetimeRebaseModeInRead)
+    val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
+      footerFileMetaData.getKeyValueMetaData.get,
+      int96RebaseModeInRead)
+    if (datetimeRebaseSpec.originTimeZone.nonEmpty) {
+      return true
+    }
+    if (int96RebaseSpec.originTimeZone.nonEmpty) {
+      return true
     }
     false
   }
diff --git a/docs/Configuration.md b/docs/Configuration.md
index ec36344365..da718718a0 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -116,8 +116,9 @@ nav_order: 15
 | spark.gluten.sql.countDistinctWithoutExpand                        | false   
          | Convert Count Distinct to a UDAF called count_distinct to prevent 
SparkPlanner converting it to Expand+Count. WARNING: When enabled, count 
distinct queries will fail to fallback!!!                                       
                                                                                
                                                                    |
 | spark.gluten.sql.extendedColumnPruning.enabled                     | true    
          | Do extended nested column pruning for cases ignored by vanilla 
Spark.                                                                          
                                                                                
                                                                                
                                                                |
 | spark.gluten.sql.fallbackEncryptedParquet                          | false   
          | If enabled, gluten will not offload scan when encrypted parquet 
files are detected                                                              
                                                                                
                                                                                
                                                               |
-| spark.gluten.sql.fallbackEncryptedParquet.limit                    | 10      
          | If supplied, `limit` number of files will be checked to determine 
encryption and falling back java scan                                           
                                                                                
                                                                                
                                                             |
+| spark.gluten.sql.fallbackEncryptedParquet.limit                    | 
&lt;undefined&gt; | If supplied, `limit` number of files will be checked to 
determine encryption and falling back java scan. Defaulted to 
spark.gluten.sql.fallbackUnexpectedMetadataParquet.limit.                       
                                                                                
                                                                                
         |
 | spark.gluten.sql.fallbackRegexpExpressions                         | false   
          | If true, fall back all regexp expressions. There are a few 
incompatible cases between RE2 (used by native engine) and java.util.regex 
(used by Spark). User should enable this property if their incompatibility is 
intolerable.                                                                    
                                                                           |
+| spark.gluten.sql.fallbackUnexpectedMetadataParquet.limit           | 10      
          | If supplied, metadata of `limit` number of Parquet files will be 
checked to determine whether to fall back java scan                             
                                                                                
                                                                                
                                                              |
 | spark.gluten.sql.injectNativePlanStringToExplain                   | false   
          | When true, Gluten will inject native plan tree to Spark's explain 
output.                                                                         
                                                                                
                                                                                
                                                             |
 | spark.gluten.sql.mergeTwoPhasesAggregate.enabled                   | true    
          | Whether to merge two phases aggregate if there are no other 
operators between them.                                                         
                                                                                
                                                                                
                                                                   |
 | spark.gluten.sql.native.arrow.reader.enabled                       | false   
          | This is config to specify whether to enable the native columnar csv 
reader                                                                          
                                                                                
                                                                                
                                                           |
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index b8a37d8478..1e530d56fa 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -368,7 +368,16 @@ class GlutenConfig(conf: SQLConf) extends 
GlutenCoreConfig(conf) {
 
   def autoAdjustStageFallenNodeThreshold: Double =
     getConf(AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD)
-  def parquetEncryptionValidationFileLimit: Int = 
getConf(ENCRYPTED_PARQUET_FALLBACK_FILE_LIMIT)
+
+  def parquetMetadataFallbackFileLimit: Int = {
+    getConf(PARQUET_UNEXPECTED_METADATA_FALLBACK_FILE_LIMIT)
+  }
+
+  def parquetEncryptionValidationFileLimit: Int = {
+    getConf(PARQUET_ENCRYPTED_FALLBACK_FILE_LIMIT).getOrElse(
+      getConf(PARQUET_UNEXPECTED_METADATA_FALLBACK_FILE_LIMIT))
+  }
+
   def enableColumnarRange: Boolean = getConf(COLUMNAR_RANGE_ENABLED)
   def enableColumnarCollectLimit: Boolean = 
getConf(COLUMNAR_COLLECT_LIMIT_ENABLED)
   def enableColumnarCollectTail: Boolean = 
getConf(COLUMNAR_COLLECT_TAIL_ENABLED)
@@ -1552,14 +1561,24 @@ object GlutenConfig extends ConfigRegistry {
       .doubleConf
       .createWithDefault(0.5d)
 
-  val ENCRYPTED_PARQUET_FALLBACK_FILE_LIMIT =
-    buildConf("spark.gluten.sql.fallbackEncryptedParquet.limit")
-      .doc("If supplied, `limit` number of files will be checked to determine 
encryption " +
-        "and falling back java scan")
+  val PARQUET_UNEXPECTED_METADATA_FALLBACK_FILE_LIMIT =
+    buildConf("spark.gluten.sql.fallbackUnexpectedMetadataParquet.limit")
+      .doc("If supplied, metadata of `limit` number of Parquet files will be 
checked to" +
+        " determine whether to fall back java scan")
       .intConf
       .checkValue(_ > 0, s"must be positive.")
       .createWithDefault(10)
 
+  val PARQUET_ENCRYPTED_FALLBACK_FILE_LIMIT =
+    buildConf("spark.gluten.sql.fallbackEncryptedParquet.limit")
+      .doc(
+        "If supplied, `limit` number of files will be checked to determine 
encryption " +
+          s"and falling back java scan. Defaulted to " +
+          s"${PARQUET_UNEXPECTED_METADATA_FALLBACK_FILE_LIMIT.key}.")
+      .intConf
+      .checkValue(_ > 0, s"must be positive.")
+      .createOptional
+
   val COLUMNAR_RANGE_ENABLED =
     buildConf("spark.gluten.sql.columnar.range")
       .doc("Enable or disable columnar range.")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to