This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d5da832625 [VL] Fallback Parquet scan if legacy timezone is found in
file metadata (#11117)
d5da832625 is described below
commit d5da832625c97e96595e54f3f5a665bdae9469fa
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Nov 26 09:03:42 2025 +0000
[VL] Fallback Parquet scan if legacy timezone is found in file metadata
(#11117)
---
.../sql/delta/DeltaInsertIntoTableSuite.scala | 3 +-
.../gluten/backendsapi/velox/VeloxBackend.scala | 30 +++----
.../apache/gluten/utils/ParquetMetadataUtils.scala | 91 ++++++++++++++++------
docs/Configuration.md | 3 +-
.../org/apache/gluten/config/GlutenConfig.scala | 29 +++++--
5 files changed, 112 insertions(+), 44 deletions(-)
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
index c18e98f7f3..8485957b4a 100644
---
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuite.scala
@@ -757,8 +757,7 @@ class DeltaColumnDefaultsInsertSuite extends
InsertIntoSQLOnlyTests with DeltaSQ
| 'delta.columnMapping.mode' = 'name'
|)""".stripMargin
- // Ignored in Gluten: Results mismatch.
- ignore("Column DEFAULT value support with Delta Lake, positive tests") {
+ test("Column DEFAULT value support with Delta Lake, positive tests") {
Seq(
PartitionOverwriteMode.STATIC.toString,
PartitionOverwriteMode.DYNAMIC.toString
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index e97747b842..ee300372f6 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -183,21 +183,23 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
}
- def validateEncryption(): Option[String] = {
-
- val encryptionValidationEnabled =
GlutenConfig.get.parquetEncryptionValidationEnabled
- if (!encryptionValidationEnabled) {
+ def validateMetadata(): Option[String] = {
+ if (format != ParquetReadFormat || rootPaths.isEmpty) {
+ // Only Parquet is needed for metadata validation so far.
return None
}
-
- val fileLimit = GlutenConfig.get.parquetEncryptionValidationFileLimit
- val encryptionResult =
- ParquetMetadataUtils.validateEncryption(format, rootPaths, hadoopConf,
fileLimit)
- if (encryptionResult.ok()) {
- None
- } else {
- Some(s"Detected encrypted parquet files: ${encryptionResult.reason()}")
- }
+ val fileLimit = GlutenConfig.get.parquetMetadataFallbackFileLimit
+ .max(GlutenConfig.get.parquetEncryptionValidationFileLimit)
+ val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties),
SQLConf.get)
+ val parquetMetadataValidationResult =
+ ParquetMetadataUtils.validateMetadata(
+ format,
+ rootPaths,
+ hadoopConf,
+ parquetOptions,
+ fileLimit)
+ parquetMetadataValidationResult.map(
+ reason => s"Detected unsupported metadata in parquet files: $reason")
}
def validateDataSchema(): Option[String] = {
@@ -220,7 +222,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
val validationChecks = Seq(
validateScheme(),
validateFormat(),
- validateEncryption(),
+ validateMetadata(),
validateDataSchema()
)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
index e1c88435b7..b533c029e4 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
@@ -16,18 +16,21 @@
*/
package org.apache.gluten.utils
-import org.apache.gluten.execution.ValidationResult
+import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
-import
org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat.ParquetReadFormat
+
+import org.apache.spark.sql.execution.datasources.DataSourceUtils
+import
org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReader,
ParquetOptions}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path,
RemoteIterator}
+import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
object ParquetMetadataUtils {
/**
- * Validates whether Parquet encryption is enabled for the given paths.
+ * Validates whether Parquet metadata is unsupported for the given paths.
*
* - If the file format is not Parquet, skip this check and return success.
* - If there is at least one Parquet file with encryption enabled, fail
the validation.
@@ -39,37 +42,39 @@ object ParquetMetadataUtils {
* @param hadoopConf
* Hadoop configuration
* @return
- * [[ValidationResult]] validation success or failure
+ * [[Option[String]]] Empty if the Parquet metadata is supported. Fallback
reason otherwise.
*/
- def validateEncryption(
+ def validateMetadata(
format: ReadFileFormat,
rootPaths: Seq[String],
hadoopConf: Configuration,
+ parquetOptions: ParquetOptions,
fileLimit: Int
- ): ValidationResult = {
- if (format != ParquetReadFormat || rootPaths.isEmpty) {
- return ValidationResult.succeeded
- }
-
+ ): Option[String] = {
rootPaths.foreach {
rootPath =>
val fs = new Path(rootPath).getFileSystem(hadoopConf)
try {
- val encryptionDetected =
- checkForEncryptionWithLimit(fs, new Path(rootPath), hadoopConf,
fileLimit = fileLimit)
- if (encryptionDetected) {
- return ValidationResult.failed("Encrypted Parquet file detected.")
+ val reason =
+ checkForUnexpectedMetadataWithLimit(
+ fs,
+ new Path(rootPath),
+ hadoopConf,
+ parquetOptions,
+ fileLimit = fileLimit)
+ if (reason.nonEmpty) {
+ return reason
}
} catch {
case e: Exception =>
}
}
- ValidationResult.succeeded
+ None
}
/**
- * Check any Parquet file under the given path is encrypted using a
recursive iterator. Only the
- * first `fileLimit` files are processed for efficiency.
+ * Check any Parquet file under the given path is with unexpected metadata
using a recursive
+ * iterator. Only the first `fileLimit` files are processed for efficiency.
*
* @param fs
* FileSystem to use
@@ -82,21 +87,63 @@ object ParquetMetadataUtils {
* @return
* True if an encrypted file is detected, false otherwise
*/
- private def checkForEncryptionWithLimit(
+ private def checkForUnexpectedMetadataWithLimit(
fs: FileSystem,
path: Path,
conf: Configuration,
+ parquetOptions: ParquetOptions,
fileLimit: Int
- ): Boolean = {
-
+ ): Option[String] = {
+ val isEncryptionValidationEnabled =
GlutenConfig.get.parquetEncryptionValidationEnabled
val filesIterator: RemoteIterator[LocatedFileStatus] = fs.listFiles(path,
true)
var checkedFileCount = 0
while (filesIterator.hasNext && checkedFileCount < fileLimit) {
val fileStatus = filesIterator.next()
checkedFileCount += 1
- if (SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus,
conf)) {
- return true
+ if (
+ isEncryptionValidationEnabled &&
SparkShimLoader.getSparkShims.isParquetFileEncrypted(
+ fileStatus,
+ conf)
+ ) {
+ return Some("Encrypted Parquet file detected.")
}
+ if (isTimezoneFoundInMetadata(fileStatus, conf, parquetOptions)) {
+ return Some("Legacy timezone found.")
+ }
+ }
+ None
+ }
+
+ /**
+ * Checks whether there are timezones set with Spark key
SPARK_TIMEZONE_METADATA_KEY in the
+ * Parquet metadata. In this case, the Parquet scan should fall back to
vanilla Spark since Velox
+ * doesn't yet support Spark legacy datetime.
+ */
+ private def isTimezoneFoundInMetadata(
+ fileStatus: LocatedFileStatus,
+ conf: Configuration,
+ parquetOptions: ParquetOptions): Boolean = {
+ val footerFileMetaData =
+ try {
+ ParquetFooterReader.readFooter(conf, fileStatus,
SKIP_ROW_GROUPS).getFileMetaData
+ } catch {
+ case _: RuntimeException =>
+ // Ignored as it's could be a "Not a Parquet file" exception.
+ return false
+ }
+ val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
+ val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
+ val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
+ footerFileMetaData.getKeyValueMetaData.get,
+ datetimeRebaseModeInRead)
+ val int96RebaseSpec = DataSourceUtils.int96RebaseSpec(
+ footerFileMetaData.getKeyValueMetaData.get,
+ int96RebaseModeInRead)
+ if (datetimeRebaseSpec.originTimeZone.nonEmpty) {
+ return true
+ }
+ if (int96RebaseSpec.originTimeZone.nonEmpty) {
+ return true
}
false
}
diff --git a/docs/Configuration.md b/docs/Configuration.md
index ec36344365..da718718a0 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -116,8 +116,9 @@ nav_order: 15
| spark.gluten.sql.countDistinctWithoutExpand | false
| Convert Count Distinct to a UDAF called count_distinct to prevent
SparkPlanner converting it to Expand+Count. WARNING: When enabled, count
distinct queries will fail to fallback!!!
|
| spark.gluten.sql.extendedColumnPruning.enabled | true
| Do extended nested column pruning for cases ignored by vanilla
Spark.
|
| spark.gluten.sql.fallbackEncryptedParquet | false
| If enabled, gluten will not offload scan when encrypted parquet
files are detected
|
-| spark.gluten.sql.fallbackEncryptedParquet.limit | 10
| If supplied, `limit` number of files will be checked to determine
encryption and falling back java scan
|
+| spark.gluten.sql.fallbackEncryptedParquet.limit |
<undefined> | If supplied, `limit` number of files will be checked to
determine encryption and falling back java scan. Defaulted to
spark.gluten.sql.fallbackUnexpectedMetadataParquet.limit.
|
| spark.gluten.sql.fallbackRegexpExpressions | false
| If true, fall back all regexp expressions. There are a few
incompatible cases between RE2 (used by native engine) and java.util.regex
(used by Spark). User should enable this property if their incompatibility is
intolerable.
|
+| spark.gluten.sql.fallbackUnexpectedMetadataParquet.limit | 10
| If supplied, metadata of `limit` number of Parquet files will be
checked to determine whether to fall back java scan
|
| spark.gluten.sql.injectNativePlanStringToExplain | false
| When true, Gluten will inject native plan tree to Spark's explain
output.
|
| spark.gluten.sql.mergeTwoPhasesAggregate.enabled | true
| Whether to merge two phases aggregate if there are no other
operators between them.
|
| spark.gluten.sql.native.arrow.reader.enabled | false
| This is config to specify whether to enable the native columnar csv
reader
|
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index b8a37d8478..1e530d56fa 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -368,7 +368,16 @@ class GlutenConfig(conf: SQLConf) extends
GlutenCoreConfig(conf) {
def autoAdjustStageFallenNodeThreshold: Double =
getConf(AUTO_ADJUST_STAGE_RESOURCES_FALLEN_NODE_RATIO_THRESHOLD)
- def parquetEncryptionValidationFileLimit: Int =
getConf(ENCRYPTED_PARQUET_FALLBACK_FILE_LIMIT)
+
+ def parquetMetadataFallbackFileLimit: Int = {
+ getConf(PARQUET_UNEXPECTED_METADATA_FALLBACK_FILE_LIMIT)
+ }
+
+ def parquetEncryptionValidationFileLimit: Int = {
+ getConf(PARQUET_ENCRYPTED_FALLBACK_FILE_LIMIT).getOrElse(
+ getConf(PARQUET_UNEXPECTED_METADATA_FALLBACK_FILE_LIMIT))
+ }
+
def enableColumnarRange: Boolean = getConf(COLUMNAR_RANGE_ENABLED)
def enableColumnarCollectLimit: Boolean =
getConf(COLUMNAR_COLLECT_LIMIT_ENABLED)
def enableColumnarCollectTail: Boolean =
getConf(COLUMNAR_COLLECT_TAIL_ENABLED)
@@ -1552,14 +1561,24 @@ object GlutenConfig extends ConfigRegistry {
.doubleConf
.createWithDefault(0.5d)
- val ENCRYPTED_PARQUET_FALLBACK_FILE_LIMIT =
- buildConf("spark.gluten.sql.fallbackEncryptedParquet.limit")
- .doc("If supplied, `limit` number of files will be checked to determine
encryption " +
- "and falling back java scan")
+ val PARQUET_UNEXPECTED_METADATA_FALLBACK_FILE_LIMIT =
+ buildConf("spark.gluten.sql.fallbackUnexpectedMetadataParquet.limit")
+ .doc("If supplied, metadata of `limit` number of Parquet files will be
checked to" +
+ " determine whether to fall back java scan")
.intConf
.checkValue(_ > 0, s"must be positive.")
.createWithDefault(10)
+ val PARQUET_ENCRYPTED_FALLBACK_FILE_LIMIT =
+ buildConf("spark.gluten.sql.fallbackEncryptedParquet.limit")
+ .doc(
+ "If supplied, `limit` number of files will be checked to determine
encryption " +
+ s"and falling back java scan. Defaulted to " +
+ s"${PARQUET_UNEXPECTED_METADATA_FALLBACK_FILE_LIMIT.key}.")
+ .intConf
+ .checkValue(_ > 0, s"must be positive.")
+ .createOptional
+
val COLUMNAR_RANGE_ENABLED =
buildConf("spark.gluten.sql.columnar.range")
.doc("Enable or disable columnar range.")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]