This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 30f7a2709c [GLUTEN-11224][VL] FileSourceScan check codec (#11225)
30f7a2709c is described below

commit 30f7a2709c5b8afa03b5d71e955b4abf4b1475bd
Author: Jin Chengcheng <[email protected]>
AuthorDate: Tue Dec 2 13:02:37 2025 +0000

    [GLUTEN-11224][VL] FileSourceScan check codec (#11225)
    
    Check the codec in read
    Enable some tests in spark-ut
    Create the issue for some tests which do not fail by previous reason.
---
 .../gluten/backendsapi/velox/VeloxBackend.scala    | 11 +--
 .../apache/gluten/utils/ParquetMetadataUtils.scala | 96 +++++++++++++++-------
 .../utils/ParquetEncryptionDetectionSuite.scala    | 43 +++++-----
 .../gluten/utils/velox/VeloxTestSettings.scala     | 27 +-----
 .../datasources/GlutenFileSourceCodecSuite.scala   |  8 +-
 .../parquet/GlutenParquetColumnIndexSuite.scala    | 48 +----------
 .../org/apache/gluten/sql/shims/SparkShims.scala   | 10 ++-
 .../gluten/sql/shims/spark32/Spark32Shims.scala    | 11 +--
 .../gluten/sql/shims/spark33/Spark33Shims.scala    | 11 +--
 .../gluten/sql/shims/spark34/Spark34Shims.scala    | 11 +--
 .../gluten/sql/shims/spark35/Spark35Shims.scala    | 42 +++-------
 .../gluten/sql/shims/spark40/Spark40Shims.scala    | 59 ++++---------
 12 files changed, 151 insertions(+), 226 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index aa1d8a6559..e5d19de082 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -184,7 +184,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
     }
 
     def validateMetadata(): Option[String] = {
-      if (format != ParquetReadFormat || rootPaths.isEmpty) {
+      if (format != ParquetReadFormat || rootPaths.isEmpty || 
dataSchema.isEmpty) {
         // Only Parquet is needed for metadata validation so far.
         return None
       }
@@ -192,12 +192,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
         .max(GlutenConfig.get.parquetEncryptionValidationFileLimit)
       val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), 
SQLConf.get)
       val parquetMetadataValidationResult =
-        ParquetMetadataUtils.validateMetadata(
-          format,
-          rootPaths,
-          hadoopConf,
-          parquetOptions,
-          fileLimit)
+        ParquetMetadataUtils.validateMetadata(rootPaths, hadoopConf, 
parquetOptions, fileLimit)
       parquetMetadataValidationResult.map(
         reason => s"Detected unsupported metadata in parquet files: $reason")
     }
@@ -218,7 +213,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
         None
       }
     }
-
     val validationChecks = Seq(
       validateScheme(),
       validateFormat(),
@@ -298,7 +292,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
     }
 
     def validateCompressionCodec(): Option[String] = {
-      // Velox doesn't support brotli and lzo.
       val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw")
       val compressionCodec = 
WriteFilesExecTransformer.getCompressionCodec(options)
       if (unSupportedCompressions.contains(compressionCodec)) {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
index d35a1cdb74..1085630dd5 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
@@ -18,25 +18,23 @@ package org.apache.gluten.utils
 
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.sql.execution.datasources.DataSourceUtils
 import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReader, 
ParquetOptions}
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, 
RemoteIterator}
-import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
 
 object ParquetMetadataUtils {
 
   /**
    * Validates whether Parquet metadata is unsupported for the given paths.
    *
-   *   - If the file format is not Parquet, skip this check and return success.
    *   - If there is at least one Parquet file with encryption enabled, fail 
the validation.
    *
-   * @param format
-   *   File format, e.g., `ParquetReadFormat`
    * @param rootPaths
    *   List of file paths to scan
    * @param hadoopConf
@@ -45,7 +43,6 @@ object ParquetMetadataUtils {
    *   [[Option[String]]] Empty if the Parquet metadata is supported. Fallback 
reason otherwise.
    */
   def validateMetadata(
-      format: ReadFileFormat,
       rootPaths: Seq[String],
       hadoopConf: Configuration,
       parquetOptions: ParquetOptions,
@@ -72,6 +69,19 @@ object ParquetMetadataUtils {
     None
   }
 
+  def validateCodec(footer: ParquetMetadata): Option[String] = {
+    val blocks = footer.getBlocks
+    if (blocks.isEmpty) {
+      return None
+    }
+    val codec = blocks.get(0).getColumns.get(0).getCodec
+    val unsupportedCodec = SparkShimLoader.getSparkShims.unsupportedCodec
+    if (unsupportedCodec.contains(codec)) {
+      return Some(s"Unsupported codec ${codec.name()}.")
+    }
+    None
+  }
+
   /**
    * Check any Parquet file under the given path is with unexpected metadata 
using a recursive
    * iterator. Only the first `fileLimit` files are processed for efficiency.
@@ -94,24 +104,14 @@ object ParquetMetadataUtils {
       parquetOptions: ParquetOptions,
       fileLimit: Int
   ): Option[String] = {
-    val isEncryptionValidationEnabled = 
GlutenConfig.get.parquetEncryptionValidationEnabled
-    val isMetadataValidationEnabled = 
GlutenConfig.get.parquetMetadataValidationEnabled
-    val filesIterator: RemoteIterator[LocatedFileStatus] = fs.listFiles(path, 
true)
+    val filesIterator = fs.listFiles(path, true)
     var checkedFileCount = 0
     while (filesIterator.hasNext && checkedFileCount < fileLimit) {
       val fileStatus = filesIterator.next()
       checkedFileCount += 1
-      if (
-        isEncryptionValidationEnabled && 
SparkShimLoader.getSparkShims.isParquetFileEncrypted(
-          fileStatus,
-          conf)
-      ) {
-        return Some("Encrypted Parquet file detected.")
-      }
-      if (
-        isMetadataValidationEnabled && isTimezoneFoundInMetadata(fileStatus, 
conf, parquetOptions)
-      ) {
-        return Some("Legacy timezone found.")
+      val metadataUnsupported = isUnsupportedMetadata(fileStatus, conf, 
parquetOptions)
+      if (metadataUnsupported.isDefined) {
+        return metadataUnsupported
       }
     }
     None
@@ -122,18 +122,55 @@ object ParquetMetadataUtils {
    * Parquet metadata. In this case, the Parquet scan should fall back to 
vanilla Spark since Velox
    * doesn't yet support Spark legacy datetime.
    */
-  private def isTimezoneFoundInMetadata(
+  private def isUnsupportedMetadata(
       fileStatus: LocatedFileStatus,
       conf: Configuration,
-      parquetOptions: ParquetOptions): Boolean = {
-    val footerFileMetaData =
+      parquetOptions: ParquetOptions): Option[String] = {
+    val isEncryptionValidationEnabled = 
GlutenConfig.get.parquetEncryptionValidationEnabled
+    val isMetadataValidationEnabled = 
GlutenConfig.get.parquetMetadataValidationEnabled
+    if (!isMetadataValidationEnabled && !isEncryptionValidationEnabled) {
+      return None
+    }
+    val footer =
       try {
-        ParquetFooterReader.readFooter(conf, fileStatus, 
SKIP_ROW_GROUPS).getFileMetaData
+        ParquetFooterReader.readFooter(conf, fileStatus, 
ParquetMetadataConverter.NO_FILTER)
       } catch {
+        case e: Exception if ExceptionUtils.hasCause(e, 
classOf[ParquetCryptoRuntimeException]) =>
+          if (!isEncryptionValidationEnabled) {
+            return None
+          }
+          return Some("Encrypted Parquet footer detected.")
         case _: RuntimeException =>
           // Ignored as it's could be a "Not a Parquet file" exception.
-          return false
+          return None
       }
+    val validationChecks = Seq(
+      validateCodec(footer),
+      isTimezoneFoundInMetadata(footer, parquetOptions)
+    )
+
+    if (isMetadataValidationEnabled) {
+      for (check <- validationChecks) {
+        if (check.isDefined) {
+          return check
+        }
+      }
+    }
+
+    // Previous Spark3.4 version uses toString to check if the data is 
encrypted,
+    // so place the check to the end
+    if (
+      isEncryptionValidationEnabled && 
SparkShimLoader.getSparkShims.isParquetFileEncrypted(footer)
+    ) {
+      return Some("Encrypted Parquet file detected.")
+    }
+    None
+  }
+
+  private def isTimezoneFoundInMetadata(
+      footer: ParquetMetadata,
+      parquetOptions: ParquetOptions): Option[String] = {
+    val footerFileMetaData = footer.getFileMetaData
     val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
     val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
     val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
@@ -143,11 +180,12 @@ object ParquetMetadataUtils {
       footerFileMetaData.getKeyValueMetaData.get,
       int96RebaseModeInRead)
     if (datetimeRebaseSpec.originTimeZone.nonEmpty) {
-      return true
+      return Some("Legacy timezone found.")
     }
     if (int96RebaseSpec.originTimeZone.nonEmpty) {
-      return true
+      return Some("Legacy timezone found.")
     }
-    false
+    None
   }
+
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
index a844f69113..6003caaa5a 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
@@ -16,12 +16,16 @@
  */
 package org.apache.gluten.utils
 
-import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.config.GlutenConfig
 
-import org.apache.spark.sql.{GlutenQueryTest, SparkSession}
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.Path
 import org.apache.parquet.crypto.{ColumnEncryptionProperties, 
FileEncryptionProperties}
 import org.apache.parquet.example.data.simple.SimpleGroup
 import org.apache.parquet.hadoop.example.ExampleParquetWriter
@@ -50,7 +54,7 @@ import scala.collection.JavaConverters._
  *   - Ensures the file is still detected as encrypted despite the plaintext 
footer.
  */
 
-class ParquetEncryptionDetectionSuite extends GlutenQueryTest {
+class ParquetEncryptionDetectionSuite extends SharedSparkSession {
 
   private val masterKey =
     
Base64.getEncoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8))
@@ -67,9 +71,11 @@ class ParquetEncryptionDetectionSuite extends 
GlutenQueryTest {
         .named("name"))
     .named("TestSchema")
 
-  private var _spark: SparkSession = _
-
-  override protected def spark: SparkSession = _spark
+  override def sparkConf: SparkConf = {
+    super.sparkConf
+      .set(GlutenConfig.ENCRYPTED_PARQUET_FALLBACK_ENABLED.key, "true")
+      .set(GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_ENABLED.key, 
"true")
+  }
 
   private def writeParquet(
       path: String,
@@ -104,10 +110,11 @@ class ParquetEncryptionDetectionSuite extends 
GlutenQueryTest {
     }
   }
 
-  private def getLocatedFileStatus(path: String): LocatedFileStatus = {
-    val conf = new Configuration()
-    val fs = FileSystem.get(conf)
-    fs.listFiles(new Path(path), false).next()
+  def isFileEncrypted(filePath: String): Boolean = {
+    val parquetOptions = new ParquetOptions(CaseInsensitiveMap(Map()), 
SQLConf.get)
+    ParquetMetadataUtils
+      .validateMetadata(Seq(filePath), new Configuration(), parquetOptions, 10)
+      .isDefined
   }
 
   test("Detect encrypted Parquet with encrypted footer") {
@@ -125,10 +132,7 @@ class ParquetEncryptionDetectionSuite extends 
GlutenQueryTest {
           .build()
 
         writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> 1, 
"name" -> "Alice")))
-        val fileStatus = getLocatedFileStatus(filePath)
-
-        assertTrue(
-          SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new 
Configuration()))
+        assertTrue(isFileEncrypted(filePath))
     }
   }
 
@@ -148,9 +152,7 @@ class ParquetEncryptionDetectionSuite extends 
GlutenQueryTest {
           .build()
 
         writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> 1, 
"name" -> "Bob")))
-        val fileStatus = getLocatedFileStatus(filePath)
-        assertTrue(
-          SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new 
Configuration()))
+        assertTrue(isFileEncrypted(filePath))
     }
   }
 
@@ -160,10 +162,7 @@ class ParquetEncryptionDetectionSuite extends 
GlutenQueryTest {
         val filePath = s"${tempDir.getAbsolutePath}/plain.parquet"
 
         writeParquet(filePath, None, Seq(Map("id" -> 1, "name" -> "Charlie")))
-        val fileStatus = getLocatedFileStatus(filePath)
-
-        assertFalse(
-          SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new 
Configuration()))
+        assertFalse(isFileEncrypted(filePath))
     }
   }
 }
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 16d21206bd..18024f0d0e 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -315,13 +315,6 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("read partitioned table - partition key included in orc file")
     .exclude("read partitioned table - with nulls and partition keys are 
included in Orc file")
   enableSuite[GlutenOrcV1QuerySuite]
-    // Rewrite to disable Spark's columnar reader.
-    .exclude("Simple selection form ORC table")
-    .exclude("simple select queries")
-    .exclude("overwriting")
-    .exclude("self-join")
-    .exclude("columns only referenced by pushed down filters should remain")
-    .exclude("SPARK-5309 strings stored using dictionary compression in orc")
     // For exception test.
     .exclude("SPARK-20728 Make ORCFileFormat configurable between sql/hive and 
sql/core")
     .exclude("Read/write binary data")
@@ -398,12 +391,9 @@ class VeloxTestSettings extends BackendTestSettings {
     // For exception test.
     .exclude("SPARK-20728 Make ORCFileFormat configurable between sql/hive and 
sql/core")
   enableSuite[GlutenOrcSourceSuite]
-    // Rewrite to disable Spark's columnar reader.
+    // https://github.com/apache/incubator-gluten/issues/11218
     .exclude("SPARK-31238: compatibility with Spark 2.4 in reading dates")
     .exclude("SPARK-31238, SPARK-31423: rebasing dates in write")
-    .exclude("SPARK-31284: compatibility with Spark 2.4 in reading timestamps")
-    .exclude("SPARK-31284, SPARK-31423: rebasing timestamps in write")
-    .exclude("SPARK-34862: Support ORC vectorized reader for nested column")
     // Ignored to disable vectorized reading check.
     .exclude("SPARK-36594: ORC vectorized reader should properly check maximal 
number of fields")
     .exclude("create temporary orc table")
@@ -425,18 +415,12 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenOrcV1SchemaPruningSuite]
   enableSuite[GlutenOrcV2SchemaPruningSuite]
   enableSuite[GlutenParquetColumnIndexSuite]
-    // Rewrite by just removing test timestamp.
-    .exclude("test reading unaligned pages - test all types")
-    // Rewrite by converting smaller integral value to timestamp.
-    .exclude("test reading unaligned pages - test all types (dict encode)")
   enableSuite[GlutenParquetCompressionCodecPrecedenceSuite]
   enableSuite[GlutenParquetDeltaByteArrayEncodingSuite]
   enableSuite[GlutenParquetDeltaEncodingInteger]
   enableSuite[GlutenParquetDeltaEncodingLong]
   enableSuite[GlutenParquetDeltaLengthByteArrayEncodingSuite]
   enableSuite[GlutenParquetEncodingSuite]
-    // Velox does not support rle encoding, but it can pass when native writer 
enabled.
-    .exclude("parquet v2 pages - rle encoding for boolean value columns")
   enableSuite[GlutenParquetFieldIdIOSuite]
   enableSuite[GlutenParquetFileFormatV1Suite]
   enableSuite[GlutenParquetFileFormatV2Suite]
@@ -541,9 +525,7 @@ class VeloxTestSettings extends BackendTestSettings {
     // error message mismatch is accepted
     .exclude("schema mismatch failure error message for parquet reader")
     .exclude("schema mismatch failure error message for parquet vectorized 
reader")
-    // [PATH_NOT_FOUND] Path does not exist:
-    // 
file:/opt/spark331/sql/core/src/test/resources/test-data/timestamp-nanos.parquet
-    // May require for newer spark.test.home
+    // https://github.com/apache/incubator-gluten/issues/11220
     .excludeByPrefix("SPARK-40819")
     // TODO: fix in Spark-4.0
     .excludeByPrefix("SPARK-46056")
@@ -572,10 +554,6 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenOrcV2AggregatePushDownSuite]
     .exclude("nested column: Max(top level column) not push down")
     .exclude("nested column: Count(nested sub-field) not push down")
-  enableSuite[GlutenParquetCodecSuite]
-    // codec not supported in native
-    .exclude("write and read - file source parquet - codec: lz4_raw")
-    .exclude("write and read - file source parquet - codec: lz4raw")
   enableSuite[GlutenOrcCodecSuite]
   enableSuite[GlutenFileSourceStrategySuite]
     // Plan comparison.
@@ -601,6 +579,7 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("read float and double together")
     .exclude("change column type from float to double")
   enableSuite[GlutenMergedParquetReadSchemaSuite]
+  enableSuite[GlutenParquetCodecSuite]
   enableSuite[GlutenV1WriteCommandSuite]
     // Rewrite to match SortExecTransformer.
     .excludeByPrefix("SPARK-41914:")
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileSourceCodecSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileSourceCodecSuite.scala
index 631be9c96f..8790bed64e 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileSourceCodecSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileSourceCodecSuite.scala
@@ -16,8 +16,14 @@
  */
 package org.apache.spark.sql.execution.datasources
 
+import org.apache.gluten.config.GlutenConfig
+
+import org.apache.spark.SparkConf
 import org.apache.spark.sql.GlutenSQLTestsBaseTrait
 
-class GlutenParquetCodecSuite extends ParquetCodecSuite with 
GlutenSQLTestsBaseTrait {}
+class GlutenParquetCodecSuite extends ParquetCodecSuite with 
GlutenSQLTestsBaseTrait {
+  override def sparkConf: SparkConf =
+    
super.sparkConf.set(GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_ENABLED.key,
 "true")
+}
 
 class GlutenOrcCodecSuite extends OrcCodecSuite with GlutenSQLTestsBaseTrait {}
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetColumnIndexSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetColumnIndexSuite.scala
index 60e1ca04a2..4bb8e96455 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetColumnIndexSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetColumnIndexSuite.scala
@@ -16,50 +16,6 @@
  */
 package org.apache.spark.sql.execution.datasources.parquet
 
-import org.apache.spark.sql.{DataFrame, GlutenSQLTestsBaseTrait}
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
 
-class GlutenParquetColumnIndexSuite extends ParquetColumnIndexSuite with 
GlutenSQLTestsBaseTrait {
-  private val actions: Seq[DataFrame => DataFrame] = Seq(
-    "_1 = 500",
-    "_1 = 500 or _1 = 1500",
-    "_1 = 500 or _1 = 501 or _1 = 1500",
-    "_1 = 500 or _1 = 501 or _1 = 1000 or _1 = 1500",
-    "_1 >= 500 and _1 < 1000",
-    "(_1 >= 500 and _1 < 1000) or (_1 >= 1500 and _1 < 1600)"
-  ).map(f => (df: DataFrame) => df.filter(f))
-
-  testGluten("test reading unaligned pages - test all types") {
-    val df = spark
-      .range(0, 2000)
-      .selectExpr(
-        "id as _1",
-        "cast(id as short) as _3",
-        "cast(id as int) as _4",
-        "cast(id as float) as _5",
-        "cast(id as double) as _6",
-        "cast(id as decimal(20,0)) as _7",
-        // We changed 1618161925000 to 1618161925 to avoid reaching the 
limitation of Velox:
-        // Timepoint is outside of supported year range.
-        "cast(cast(1618161925 + id * 60 * 60 * 24 as timestamp) as date) as _9"
-      )
-    checkUnalignedPages(df)(actions: _*)
-  }
-
-  testGluten("test reading unaligned pages - test all types (dict encode)") {
-    val df = spark
-      .range(0, 2000)
-      .selectExpr(
-        "id as _1",
-        "cast(id % 10 as byte) as _2",
-        "cast(id % 10 as short) as _3",
-        "cast(id % 10 as int) as _4",
-        "cast(id % 10 as float) as _5",
-        "cast(id % 10 as double) as _6",
-        "cast(id % 10 as decimal(20,0)) as _7",
-        "cast(id % 2 as boolean) as _8",
-        "cast(cast(1618161925 + (id % 10) * 60 * 60 * 24 as timestamp) as 
date) as _9",
-        "cast(1618161925 + (id % 10) as timestamp) as _10"
-      )
-    checkUnalignedPages(df)(actions: _*)
-  }
-}
+class GlutenParquetColumnIndexSuite extends ParquetColumnIndexSuite with 
GlutenSQLTestsBaseTrait {}
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index dcfc6b2104..33f59ff20e 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -51,8 +51,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
 import org.apache.spark.util.SparkShimVersionUtil
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.parquet.hadoop.metadata.{CompressionCodecName, 
ParquetMetadata}
 import org.apache.parquet.schema.MessageType
 
 import java.util.{Map => JMap}
@@ -320,7 +320,7 @@ trait SparkShims {
   /** Shim method for usages from GlutenExplainUtils.scala. */
   def unsetOperatorId(plan: QueryPlan[_]): Unit
 
-  def isParquetFileEncrypted(fileStatus: LocatedFileStatus, conf: 
Configuration): Boolean
+  def isParquetFileEncrypted(footer: ParquetMetadata): Boolean
 
   def getOtherConstantMetadataColumnValues(file: PartitionedFile): 
JMap[String, Object] =
     Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]]
@@ -346,4 +346,8 @@ trait SparkShims {
   def getFileSourceScanStream(scan: FileSourceScanExec): 
Option[SparkDataStream] = {
     None
   }
+
+  def unsupportedCodec: Seq[CompressionCodecName] = {
+    Seq(CompressionCodecName.LZO, CompressionCodecName.BROTLI)
+  }
 }
diff --git 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 6c4cb7ff62..367b66f9c4 100644
--- 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -53,10 +53,9 @@ import org.apache.spark.sql.types.{DecimalType, StructField, 
StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.parquet.crypto.ParquetCryptoRuntimeException
-import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
 import org.apache.parquet.schema.MessageType
 
 class Spark32Shims extends SparkShims {
@@ -284,11 +283,9 @@ class Spark32Shims extends SparkShims {
     plan.unsetTagValue(QueryPlan.OP_ID_TAG)
   }
 
-  override def isParquetFileEncrypted(
-      fileStatus: LocatedFileStatus,
-      conf: Configuration): Boolean = {
+  override def isParquetFileEncrypted(footer: ParquetMetadata): Boolean = {
     try {
-      ParquetFileReader.readFooter(new Configuration(), 
fileStatus.getPath).toString
+      footer.toString
       false
     } catch {
       case e: Exception if ExceptionUtils.hasCause(e, 
classOf[ParquetCryptoRuntimeException]) =>
diff --git 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index c18e7dae42..2b6affcded 100644
--- 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -54,10 +54,9 @@ import org.apache.spark.sql.types.{DecimalType, StructField, 
StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.parquet.crypto.ParquetCryptoRuntimeException
-import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
@@ -381,11 +380,9 @@ class Spark33Shims extends SparkShims {
   override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
     plan.unsetTagValue(QueryPlan.OP_ID_TAG)
   }
-  override def isParquetFileEncrypted(
-      fileStatus: LocatedFileStatus,
-      conf: Configuration): Boolean = {
+  override def isParquetFileEncrypted(footer: ParquetMetadata): Boolean = {
     try {
-      ParquetFileReader.readFooter(new Configuration(), 
fileStatus.getPath).toString
+      footer.toString
       false
     } catch {
       case e: Exception if ExceptionUtils.hasCause(e, 
classOf[ParquetCryptoRuntimeException]) =>
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index ef7b7f21fe..39dd71a6bc 100644
--- 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -57,10 +57,9 @@ import org.apache.spark.sql.types.{DecimalType, IntegerType, 
LongType, StructFie
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.parquet.crypto.ParquetCryptoRuntimeException
-import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
@@ -606,11 +605,9 @@ class Spark34Shims extends SparkShims {
   override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
     plan.unsetTagValue(QueryPlan.OP_ID_TAG)
   }
-  override def isParquetFileEncrypted(
-      fileStatus: LocatedFileStatus,
-      conf: Configuration): Boolean = {
+  override def isParquetFileEncrypted(footer: ParquetMetadata): Boolean = {
     try {
-      ParquetFileReader.readFooter(new Configuration(), 
fileStatus.getPath).toString
+      footer.toString
       false
     } catch {
       case e: Exception if ExceptionUtils.hasCause(e, 
classOf[ParquetCryptoRuntimeException]) =>
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index a538b25dca..78da08190f 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.sql.shims.spark35
 import org.apache.gluten.execution.PartitionedFileUtilShim
 import org.apache.gluten.expression.{ExpressionNames, Sig}
 import org.apache.gluten.sql.shims.SparkShims
-import org.apache.gluten.utils.ExceptionUtils
 
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
@@ -57,12 +56,9 @@ import org.apache.spark.sql.types.{DecimalType, IntegerType, 
LongType, StructFie
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
-import org.apache.parquet.crypto.ParquetCryptoRuntimeException
-import org.apache.parquet.format.converter.ParquetMetadataConverter
-import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
@@ -659,30 +655,18 @@ class Spark35Shims extends SparkShims {
     QueryPlan.localIdMap.get().remove(plan)
   }
 
-  override def isParquetFileEncrypted(
-      fileStatus: LocatedFileStatus,
-      conf: Configuration): Boolean = {
-    try {
-      val footer =
-        ParquetFileReader.readFooter(conf, fileStatus.getPath, 
ParquetMetadataConverter.NO_FILTER)
-      val fileMetaData = footer.getFileMetaData
-      fileMetaData.getEncryptionType match {
-        // UNENCRYPTED file has a plaintext footer and no file encryption,
-        // We can leverage file metadata for this check and return unencrypted.
-        case EncryptionType.UNENCRYPTED =>
-          false
-        // PLAINTEXT_FOOTER has a plaintext footer however the file is 
encrypted.
-        // In such cases, read the footer and use the metadata for encryption 
check.
-        case EncryptionType.PLAINTEXT_FOOTER =>
-          true
-        case _ =>
-          false
-      }
-    } catch {
-      // Both footer and file are encrypted, return false.
-      case e: Exception if ExceptionUtils.hasCause(e, 
classOf[ParquetCryptoRuntimeException]) =>
+  override def isParquetFileEncrypted(footer: ParquetMetadata): Boolean = {
+    footer.getFileMetaData.getEncryptionType match {
+      // UNENCRYPTED file has a plaintext footer and no file encryption,
+      // We can leverage file metadata for this check and return unencrypted.
+      case EncryptionType.UNENCRYPTED =>
+        false
+      // PLAINTEXT_FOOTER has a plaintext footer however the file is encrypted.
+      // In such cases, read the footer and use the metadata for encryption 
check.
+      case EncryptionType.PLAINTEXT_FOOTER =>
         true
-      case e: Exception => false
+      case _ =>
+        false
     }
   }
 
diff --git 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index cecb8eee10..88fe373175 100644
--- 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++ 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.sql.shims.spark40
 import org.apache.gluten.execution.PartitionedFileUtilShim
 import org.apache.gluten.expression.{ExpressionNames, Sig}
 import org.apache.gluten.sql.shims.SparkShims
-import org.apache.gluten.utils.ExceptionUtils
 
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
@@ -58,14 +57,9 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
-import org.apache.parquet.HadoopReadOptions
-import org.apache.parquet.crypto.ParquetCryptoRuntimeException
-import org.apache.parquet.format.converter.ParquetMetadataConverter
-import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.parquet.hadoop.metadata.{CompressionCodecName, 
ParquetMetadata}
 import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
-import org.apache.parquet.hadoop.util.HadoopInputFile
 import org.apache.parquet.schema.MessageType
 
 import java.time.ZoneOffset
@@ -683,41 +677,18 @@ class Spark40Shims extends SparkShims {
     QueryPlan.localIdMap.get().remove(plan)
   }
 
-  override def isParquetFileEncrypted(
-      fileStatus: LocatedFileStatus,
-      conf: Configuration): Boolean = {
-    val file = HadoopInputFile.fromPath(fileStatus.getPath, conf)
-    val filter = ParquetMetadataConverter.NO_FILTER
-    val options = HadoopReadOptions
-      .builder(file.getConfiguration, file.getPath)
-      .withMetadataFilter(filter)
-      .build
-    val in = file.newStream
-    try {
-      val footer =
-        ParquetFileReader.readFooter(file, options, in)
-      val fileMetaData = footer.getFileMetaData
-      fileMetaData.getEncryptionType match {
-        // UNENCRYPTED file has a plaintext footer and no file encryption,
-        // We can leverage file metadata for this check and return unencrypted.
-        case EncryptionType.UNENCRYPTED =>
-          false
-        // PLAINTEXT_FOOTER has a plaintext footer however the file is 
encrypted.
-        // In such cases, read the footer and use the metadata for encryption 
check.
-        case EncryptionType.PLAINTEXT_FOOTER =>
-          true
-        case _ =>
-          false
-      }
-    } catch {
-      // Both footer and file are encrypted, return false.
-      case e: Exception if ExceptionUtils.hasCause(e, 
classOf[ParquetCryptoRuntimeException]) =>
+  override def isParquetFileEncrypted(footer: ParquetMetadata): Boolean = {
+    footer.getFileMetaData.getEncryptionType match {
+      // UNENCRYPTED file has a plaintext footer and no file encryption,
+      // We can leverage file metadata for this check and return unencrypted.
+      case EncryptionType.UNENCRYPTED =>
+        false
+      // PLAINTEXT_FOOTER has a plaintext footer however the file is encrypted.
+      // In such cases, read the footer and use the metadata for encryption 
check.
+      case EncryptionType.PLAINTEXT_FOOTER =>
         true
-      case e: Exception => false
-    } finally {
-      if (in != null) {
-        in.close()
-      }
+      case _ =>
+        false
     }
   }
 
@@ -774,4 +745,8 @@ class Spark40Shims extends SparkShims {
   override def getFileSourceScanStream(scan: FileSourceScanExec): 
Option[SparkDataStream] = {
     scan.stream
   }
+
+  override def unsupportedCodec: Seq[CompressionCodecName] = {
+    Seq(CompressionCodecName.LZO, CompressionCodecName.BROTLI, 
CompressionCodecName.LZ4_RAW)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to