This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 30f7a2709c [GLUTEN-11224][VL] FileSourceScan check codec (#11225)
30f7a2709c is described below
commit 30f7a2709c5b8afa03b5d71e955b4abf4b1475bd
Author: Jin Chengcheng <[email protected]>
AuthorDate: Tue Dec 2 13:02:37 2025 +0000
[GLUTEN-11224][VL] FileSourceScan check codec (#11225)
Check the codec in read
Enable some tests in spark-ut
Create the issue for some tests which do not fail by previous reason.
---
.../gluten/backendsapi/velox/VeloxBackend.scala | 11 +--
.../apache/gluten/utils/ParquetMetadataUtils.scala | 96 +++++++++++++++-------
.../utils/ParquetEncryptionDetectionSuite.scala | 43 +++++-----
.../gluten/utils/velox/VeloxTestSettings.scala | 27 +-----
.../datasources/GlutenFileSourceCodecSuite.scala | 8 +-
.../parquet/GlutenParquetColumnIndexSuite.scala | 48 +----------
.../org/apache/gluten/sql/shims/SparkShims.scala | 10 ++-
.../gluten/sql/shims/spark32/Spark32Shims.scala | 11 +--
.../gluten/sql/shims/spark33/Spark33Shims.scala | 11 +--
.../gluten/sql/shims/spark34/Spark34Shims.scala | 11 +--
.../gluten/sql/shims/spark35/Spark35Shims.scala | 42 +++-------
.../gluten/sql/shims/spark40/Spark40Shims.scala | 59 ++++---------
12 files changed, 151 insertions(+), 226 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index aa1d8a6559..e5d19de082 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -184,7 +184,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
def validateMetadata(): Option[String] = {
- if (format != ParquetReadFormat || rootPaths.isEmpty) {
+ if (format != ParquetReadFormat || rootPaths.isEmpty ||
dataSchema.isEmpty) {
// Only Parquet is needed for metadata validation so far.
return None
}
@@ -192,12 +192,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
.max(GlutenConfig.get.parquetEncryptionValidationFileLimit)
val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties),
SQLConf.get)
val parquetMetadataValidationResult =
- ParquetMetadataUtils.validateMetadata(
- format,
- rootPaths,
- hadoopConf,
- parquetOptions,
- fileLimit)
+ ParquetMetadataUtils.validateMetadata(rootPaths, hadoopConf,
parquetOptions, fileLimit)
parquetMetadataValidationResult.map(
reason => s"Detected unsupported metadata in parquet files: $reason")
}
@@ -218,7 +213,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
None
}
}
-
val validationChecks = Seq(
validateScheme(),
validateFormat(),
@@ -298,7 +292,6 @@ object VeloxBackendSettings extends BackendSettingsApi {
}
def validateCompressionCodec(): Option[String] = {
- // Velox doesn't support brotli and lzo.
val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw")
val compressionCodec =
WriteFilesExecTransformer.getCompressionCodec(options)
if (unSupportedCompressions.contains(compressionCodec)) {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
index d35a1cdb74..1085630dd5 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/utils/ParquetMetadataUtils.scala
@@ -18,25 +18,23 @@ package org.apache.gluten.utils
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.sql.shims.SparkShimLoader
-import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import
org.apache.spark.sql.execution.datasources.parquet.{ParquetFooterReader,
ParquetOptions}
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path,
RemoteIterator}
-import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
+import org.apache.parquet.crypto.ParquetCryptoRuntimeException
+import org.apache.parquet.format.converter.ParquetMetadataConverter
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
object ParquetMetadataUtils {
/**
* Validates whether Parquet metadata is unsupported for the given paths.
*
- * - If the file format is not Parquet, skip this check and return success.
* - If there is at least one Parquet file with encryption enabled, fail
the validation.
*
- * @param format
- * File format, e.g., `ParquetReadFormat`
* @param rootPaths
* List of file paths to scan
* @param hadoopConf
@@ -45,7 +43,6 @@ object ParquetMetadataUtils {
* [[Option[String]]] Empty if the Parquet metadata is supported. Fallback
reason otherwise.
*/
def validateMetadata(
- format: ReadFileFormat,
rootPaths: Seq[String],
hadoopConf: Configuration,
parquetOptions: ParquetOptions,
@@ -72,6 +69,19 @@ object ParquetMetadataUtils {
None
}
+ def validateCodec(footer: ParquetMetadata): Option[String] = {
+ val blocks = footer.getBlocks
+ if (blocks.isEmpty) {
+ return None
+ }
+ val codec = blocks.get(0).getColumns.get(0).getCodec
+ val unsupportedCodec = SparkShimLoader.getSparkShims.unsupportedCodec
+ if (unsupportedCodec.contains(codec)) {
+ return Some(s"Unsupported codec ${codec.name()}.")
+ }
+ None
+ }
+
/**
* Check any Parquet file under the given path is with unexpected metadata
using a recursive
* iterator. Only the first `fileLimit` files are processed for efficiency.
@@ -94,24 +104,14 @@ object ParquetMetadataUtils {
parquetOptions: ParquetOptions,
fileLimit: Int
): Option[String] = {
- val isEncryptionValidationEnabled =
GlutenConfig.get.parquetEncryptionValidationEnabled
- val isMetadataValidationEnabled =
GlutenConfig.get.parquetMetadataValidationEnabled
- val filesIterator: RemoteIterator[LocatedFileStatus] = fs.listFiles(path,
true)
+ val filesIterator = fs.listFiles(path, true)
var checkedFileCount = 0
while (filesIterator.hasNext && checkedFileCount < fileLimit) {
val fileStatus = filesIterator.next()
checkedFileCount += 1
- if (
- isEncryptionValidationEnabled &&
SparkShimLoader.getSparkShims.isParquetFileEncrypted(
- fileStatus,
- conf)
- ) {
- return Some("Encrypted Parquet file detected.")
- }
- if (
- isMetadataValidationEnabled && isTimezoneFoundInMetadata(fileStatus,
conf, parquetOptions)
- ) {
- return Some("Legacy timezone found.")
+ val metadataUnsupported = isUnsupportedMetadata(fileStatus, conf,
parquetOptions)
+ if (metadataUnsupported.isDefined) {
+ return metadataUnsupported
}
}
None
@@ -122,18 +122,55 @@ object ParquetMetadataUtils {
* Parquet metadata. In this case, the Parquet scan should fall back to
vanilla Spark since Velox
* doesn't yet support Spark legacy datetime.
*/
- private def isTimezoneFoundInMetadata(
+ private def isUnsupportedMetadata(
fileStatus: LocatedFileStatus,
conf: Configuration,
- parquetOptions: ParquetOptions): Boolean = {
- val footerFileMetaData =
+ parquetOptions: ParquetOptions): Option[String] = {
+ val isEncryptionValidationEnabled =
GlutenConfig.get.parquetEncryptionValidationEnabled
+ val isMetadataValidationEnabled =
GlutenConfig.get.parquetMetadataValidationEnabled
+ if (!isMetadataValidationEnabled && !isEncryptionValidationEnabled) {
+ return None
+ }
+ val footer =
try {
- ParquetFooterReader.readFooter(conf, fileStatus,
SKIP_ROW_GROUPS).getFileMetaData
+ ParquetFooterReader.readFooter(conf, fileStatus,
ParquetMetadataConverter.NO_FILTER)
} catch {
+ case e: Exception if ExceptionUtils.hasCause(e,
classOf[ParquetCryptoRuntimeException]) =>
+ if (!isEncryptionValidationEnabled) {
+ return None
+ }
+ return Some("Encrypted Parquet footer detected.")
case _: RuntimeException =>
// Ignored as it's could be a "Not a Parquet file" exception.
- return false
+ return None
}
+ val validationChecks = Seq(
+ validateCodec(footer),
+ isTimezoneFoundInMetadata(footer, parquetOptions)
+ )
+
+ if (isMetadataValidationEnabled) {
+ for (check <- validationChecks) {
+ if (check.isDefined) {
+ return check
+ }
+ }
+ }
+
+ // Previous Spark3.4 version uses toString to check if the data is
encrypted,
+ // so place the check to the end
+ if (
+ isEncryptionValidationEnabled &&
SparkShimLoader.getSparkShims.isParquetFileEncrypted(footer)
+ ) {
+ return Some("Encrypted Parquet file detected.")
+ }
+ None
+ }
+
+ private def isTimezoneFoundInMetadata(
+ footer: ParquetMetadata,
+ parquetOptions: ParquetOptions): Option[String] = {
+ val footerFileMetaData = footer.getFileMetaData
val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead
val int96RebaseModeInRead = parquetOptions.int96RebaseModeInRead
val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec(
@@ -143,11 +180,12 @@ object ParquetMetadataUtils {
footerFileMetaData.getKeyValueMetaData.get,
int96RebaseModeInRead)
if (datetimeRebaseSpec.originTimeZone.nonEmpty) {
- return true
+ return Some("Legacy timezone found.")
}
if (int96RebaseSpec.originTimeZone.nonEmpty) {
- return true
+ return Some("Legacy timezone found.")
}
- false
+ None
}
+
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
index a844f69113..6003caaa5a 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/utils/ParquetEncryptionDetectionSuite.scala
@@ -16,12 +16,16 @@
*/
package org.apache.gluten.utils
-import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.config.GlutenConfig
-import org.apache.spark.sql.{GlutenQueryTest, SparkSession}
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+import org.apache.spark.sql.execution.datasources.parquet.ParquetOptions
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.Path
import org.apache.parquet.crypto.{ColumnEncryptionProperties,
FileEncryptionProperties}
import org.apache.parquet.example.data.simple.SimpleGroup
import org.apache.parquet.hadoop.example.ExampleParquetWriter
@@ -50,7 +54,7 @@ import scala.collection.JavaConverters._
* - Ensures the file is still detected as encrypted despite the plaintext
footer.
*/
-class ParquetEncryptionDetectionSuite extends GlutenQueryTest {
+class ParquetEncryptionDetectionSuite extends SharedSparkSession {
private val masterKey =
Base64.getEncoder.encodeToString("0123456789012345".getBytes(StandardCharsets.UTF_8))
@@ -67,9 +71,11 @@ class ParquetEncryptionDetectionSuite extends
GlutenQueryTest {
.named("name"))
.named("TestSchema")
- private var _spark: SparkSession = _
-
- override protected def spark: SparkSession = _spark
+ override def sparkConf: SparkConf = {
+ super.sparkConf
+ .set(GlutenConfig.ENCRYPTED_PARQUET_FALLBACK_ENABLED.key, "true")
+ .set(GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_ENABLED.key,
"true")
+ }
private def writeParquet(
path: String,
@@ -104,10 +110,11 @@ class ParquetEncryptionDetectionSuite extends
GlutenQueryTest {
}
}
- private def getLocatedFileStatus(path: String): LocatedFileStatus = {
- val conf = new Configuration()
- val fs = FileSystem.get(conf)
- fs.listFiles(new Path(path), false).next()
+ def isFileEncrypted(filePath: String): Boolean = {
+ val parquetOptions = new ParquetOptions(CaseInsensitiveMap(Map()),
SQLConf.get)
+ ParquetMetadataUtils
+ .validateMetadata(Seq(filePath), new Configuration(), parquetOptions, 10)
+ .isDefined
}
test("Detect encrypted Parquet with encrypted footer") {
@@ -125,10 +132,7 @@ class ParquetEncryptionDetectionSuite extends
GlutenQueryTest {
.build()
writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> 1,
"name" -> "Alice")))
- val fileStatus = getLocatedFileStatus(filePath)
-
- assertTrue(
- SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new
Configuration()))
+ assertTrue(isFileEncrypted(filePath))
}
}
@@ -148,9 +152,7 @@ class ParquetEncryptionDetectionSuite extends
GlutenQueryTest {
.build()
writeParquet(filePath, Some(encryptionProps), Seq(Map("id" -> 1,
"name" -> "Bob")))
- val fileStatus = getLocatedFileStatus(filePath)
- assertTrue(
- SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new
Configuration()))
+ assertTrue(isFileEncrypted(filePath))
}
}
@@ -160,10 +162,7 @@ class ParquetEncryptionDetectionSuite extends
GlutenQueryTest {
val filePath = s"${tempDir.getAbsolutePath}/plain.parquet"
writeParquet(filePath, None, Seq(Map("id" -> 1, "name" -> "Charlie")))
- val fileStatus = getLocatedFileStatus(filePath)
-
- assertFalse(
- SparkShimLoader.getSparkShims.isParquetFileEncrypted(fileStatus, new
Configuration()))
+ assertFalse(isFileEncrypted(filePath))
}
}
}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 16d21206bd..18024f0d0e 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -315,13 +315,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("read partitioned table - partition key included in orc file")
.exclude("read partitioned table - with nulls and partition keys are
included in Orc file")
enableSuite[GlutenOrcV1QuerySuite]
- // Rewrite to disable Spark's columnar reader.
- .exclude("Simple selection form ORC table")
- .exclude("simple select queries")
- .exclude("overwriting")
- .exclude("self-join")
- .exclude("columns only referenced by pushed down filters should remain")
- .exclude("SPARK-5309 strings stored using dictionary compression in orc")
// For exception test.
.exclude("SPARK-20728 Make ORCFileFormat configurable between sql/hive and
sql/core")
.exclude("Read/write binary data")
@@ -398,12 +391,9 @@ class VeloxTestSettings extends BackendTestSettings {
// For exception test.
.exclude("SPARK-20728 Make ORCFileFormat configurable between sql/hive and
sql/core")
enableSuite[GlutenOrcSourceSuite]
- // Rewrite to disable Spark's columnar reader.
+ // https://github.com/apache/incubator-gluten/issues/11218
.exclude("SPARK-31238: compatibility with Spark 2.4 in reading dates")
.exclude("SPARK-31238, SPARK-31423: rebasing dates in write")
- .exclude("SPARK-31284: compatibility with Spark 2.4 in reading timestamps")
- .exclude("SPARK-31284, SPARK-31423: rebasing timestamps in write")
- .exclude("SPARK-34862: Support ORC vectorized reader for nested column")
// Ignored to disable vectorized reading check.
.exclude("SPARK-36594: ORC vectorized reader should properly check maximal
number of fields")
.exclude("create temporary orc table")
@@ -425,18 +415,12 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenOrcV1SchemaPruningSuite]
enableSuite[GlutenOrcV2SchemaPruningSuite]
enableSuite[GlutenParquetColumnIndexSuite]
- // Rewrite by just removing test timestamp.
- .exclude("test reading unaligned pages - test all types")
- // Rewrite by converting smaller integral value to timestamp.
- .exclude("test reading unaligned pages - test all types (dict encode)")
enableSuite[GlutenParquetCompressionCodecPrecedenceSuite]
enableSuite[GlutenParquetDeltaByteArrayEncodingSuite]
enableSuite[GlutenParquetDeltaEncodingInteger]
enableSuite[GlutenParquetDeltaEncodingLong]
enableSuite[GlutenParquetDeltaLengthByteArrayEncodingSuite]
enableSuite[GlutenParquetEncodingSuite]
- // Velox does not support rle encoding, but it can pass when native writer
enabled.
- .exclude("parquet v2 pages - rle encoding for boolean value columns")
enableSuite[GlutenParquetFieldIdIOSuite]
enableSuite[GlutenParquetFileFormatV1Suite]
enableSuite[GlutenParquetFileFormatV2Suite]
@@ -541,9 +525,7 @@ class VeloxTestSettings extends BackendTestSettings {
// error message mismatch is accepted
.exclude("schema mismatch failure error message for parquet reader")
.exclude("schema mismatch failure error message for parquet vectorized
reader")
- // [PATH_NOT_FOUND] Path does not exist:
- //
file:/opt/spark331/sql/core/src/test/resources/test-data/timestamp-nanos.parquet
- // May require for newer spark.test.home
+ // https://github.com/apache/incubator-gluten/issues/11220
.excludeByPrefix("SPARK-40819")
// TODO: fix in Spark-4.0
.excludeByPrefix("SPARK-46056")
@@ -572,10 +554,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenOrcV2AggregatePushDownSuite]
.exclude("nested column: Max(top level column) not push down")
.exclude("nested column: Count(nested sub-field) not push down")
- enableSuite[GlutenParquetCodecSuite]
- // codec not supported in native
- .exclude("write and read - file source parquet - codec: lz4_raw")
- .exclude("write and read - file source parquet - codec: lz4raw")
enableSuite[GlutenOrcCodecSuite]
enableSuite[GlutenFileSourceStrategySuite]
// Plan comparison.
@@ -601,6 +579,7 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("read float and double together")
.exclude("change column type from float to double")
enableSuite[GlutenMergedParquetReadSchemaSuite]
+ enableSuite[GlutenParquetCodecSuite]
enableSuite[GlutenV1WriteCommandSuite]
// Rewrite to match SortExecTransformer.
.excludeByPrefix("SPARK-41914:")
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileSourceCodecSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileSourceCodecSuite.scala
index 631be9c96f..8790bed64e 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileSourceCodecSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenFileSourceCodecSuite.scala
@@ -16,8 +16,14 @@
*/
package org.apache.spark.sql.execution.datasources
+import org.apache.gluten.config.GlutenConfig
+
+import org.apache.spark.SparkConf
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
-class GlutenParquetCodecSuite extends ParquetCodecSuite with
GlutenSQLTestsBaseTrait {}
+class GlutenParquetCodecSuite extends ParquetCodecSuite with
GlutenSQLTestsBaseTrait {
+ override def sparkConf: SparkConf =
+
super.sparkConf.set(GlutenConfig.PARQUET_UNEXPECTED_METADATA_FALLBACK_ENABLED.key,
"true")
+}
class GlutenOrcCodecSuite extends OrcCodecSuite with GlutenSQLTestsBaseTrait {}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetColumnIndexSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetColumnIndexSuite.scala
index 60e1ca04a2..4bb8e96455 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetColumnIndexSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetColumnIndexSuite.scala
@@ -16,50 +16,6 @@
*/
package org.apache.spark.sql.execution.datasources.parquet
-import org.apache.spark.sql.{DataFrame, GlutenSQLTestsBaseTrait}
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
-class GlutenParquetColumnIndexSuite extends ParquetColumnIndexSuite with
GlutenSQLTestsBaseTrait {
- private val actions: Seq[DataFrame => DataFrame] = Seq(
- "_1 = 500",
- "_1 = 500 or _1 = 1500",
- "_1 = 500 or _1 = 501 or _1 = 1500",
- "_1 = 500 or _1 = 501 or _1 = 1000 or _1 = 1500",
- "_1 >= 500 and _1 < 1000",
- "(_1 >= 500 and _1 < 1000) or (_1 >= 1500 and _1 < 1600)"
- ).map(f => (df: DataFrame) => df.filter(f))
-
- testGluten("test reading unaligned pages - test all types") {
- val df = spark
- .range(0, 2000)
- .selectExpr(
- "id as _1",
- "cast(id as short) as _3",
- "cast(id as int) as _4",
- "cast(id as float) as _5",
- "cast(id as double) as _6",
- "cast(id as decimal(20,0)) as _7",
- // We changed 1618161925000 to 1618161925 to avoid reaching the
limitation of Velox:
- // Timepoint is outside of supported year range.
- "cast(cast(1618161925 + id * 60 * 60 * 24 as timestamp) as date) as _9"
- )
- checkUnalignedPages(df)(actions: _*)
- }
-
- testGluten("test reading unaligned pages - test all types (dict encode)") {
- val df = spark
- .range(0, 2000)
- .selectExpr(
- "id as _1",
- "cast(id % 10 as byte) as _2",
- "cast(id % 10 as short) as _3",
- "cast(id % 10 as int) as _4",
- "cast(id % 10 as float) as _5",
- "cast(id % 10 as double) as _6",
- "cast(id % 10 as decimal(20,0)) as _7",
- "cast(id % 2 as boolean) as _8",
- "cast(cast(1618161925 + (id % 10) * 60 * 60 * 24 as timestamp) as
date) as _9",
- "cast(1618161925 + (id % 10) as timestamp) as _10"
- )
- checkUnalignedPages(df)(actions: _*)
- }
-}
+class GlutenParquetColumnIndexSuite extends ParquetColumnIndexSuite with
GlutenSQLTestsBaseTrait {}
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index dcfc6b2104..33f59ff20e 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -51,8 +51,8 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
import org.apache.spark.util.SparkShimVersionUtil
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.parquet.hadoop.metadata.{CompressionCodecName,
ParquetMetadata}
import org.apache.parquet.schema.MessageType
import java.util.{Map => JMap}
@@ -320,7 +320,7 @@ trait SparkShims {
/** Shim method for usages from GlutenExplainUtils.scala. */
def unsetOperatorId(plan: QueryPlan[_]): Unit
- def isParquetFileEncrypted(fileStatus: LocatedFileStatus, conf:
Configuration): Boolean
+ def isParquetFileEncrypted(footer: ParquetMetadata): Boolean
def getOtherConstantMetadataColumnValues(file: PartitionedFile):
JMap[String, Object] =
Map.empty[String, Any].asJava.asInstanceOf[JMap[String, Object]]
@@ -346,4 +346,8 @@ trait SparkShims {
def getFileSourceScanStream(scan: FileSourceScanExec):
Option[SparkDataStream] = {
None
}
+
+ def unsupportedCodec: Seq[CompressionCodecName] = {
+ Seq(CompressionCodecName.LZO, CompressionCodecName.BROTLI)
+ }
}
diff --git
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 6c4cb7ff62..367b66f9c4 100644
---
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -53,10 +53,9 @@ import org.apache.spark.sql.types.{DecimalType, StructField,
StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.parquet.crypto.ParquetCryptoRuntimeException
-import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
import org.apache.parquet.schema.MessageType
class Spark32Shims extends SparkShims {
@@ -284,11 +283,9 @@ class Spark32Shims extends SparkShims {
plan.unsetTagValue(QueryPlan.OP_ID_TAG)
}
- override def isParquetFileEncrypted(
- fileStatus: LocatedFileStatus,
- conf: Configuration): Boolean = {
+ override def isParquetFileEncrypted(footer: ParquetMetadata): Boolean = {
try {
- ParquetFileReader.readFooter(new Configuration(),
fileStatus.getPath).toString
+ footer.toString
false
} catch {
case e: Exception if ExceptionUtils.hasCause(e,
classOf[ParquetCryptoRuntimeException]) =>
diff --git
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index c18e7dae42..2b6affcded 100644
---
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -54,10 +54,9 @@ import org.apache.spark.sql.types.{DecimalType, StructField,
StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.parquet.crypto.ParquetCryptoRuntimeException
-import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
@@ -381,11 +380,9 @@ class Spark33Shims extends SparkShims {
override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
plan.unsetTagValue(QueryPlan.OP_ID_TAG)
}
- override def isParquetFileEncrypted(
- fileStatus: LocatedFileStatus,
- conf: Configuration): Boolean = {
+ override def isParquetFileEncrypted(footer: ParquetMetadata): Boolean = {
try {
- ParquetFileReader.readFooter(new Configuration(),
fileStatus.getPath).toString
+ footer.toString
false
} catch {
case e: Exception if ExceptionUtils.hasCause(e,
classOf[ParquetCryptoRuntimeException]) =>
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index ef7b7f21fe..39dd71a6bc 100644
---
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -57,10 +57,9 @@ import org.apache.spark.sql.types.{DecimalType, IntegerType,
LongType, StructFie
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.parquet.crypto.ParquetCryptoRuntimeException
-import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
@@ -606,11 +605,9 @@ class Spark34Shims extends SparkShims {
override def unsetOperatorId(plan: QueryPlan[_]): Unit = {
plan.unsetTagValue(QueryPlan.OP_ID_TAG)
}
- override def isParquetFileEncrypted(
- fileStatus: LocatedFileStatus,
- conf: Configuration): Boolean = {
+ override def isParquetFileEncrypted(footer: ParquetMetadata): Boolean = {
try {
- ParquetFileReader.readFooter(new Configuration(),
fileStatus.getPath).toString
+ footer.toString
false
} catch {
case e: Exception if ExceptionUtils.hasCause(e,
classOf[ParquetCryptoRuntimeException]) =>
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index a538b25dca..78da08190f 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.sql.shims.spark35
import org.apache.gluten.execution.PartitionedFileUtilShim
import org.apache.gluten.expression.{ExpressionNames, Sig}
import org.apache.gluten.sql.shims.SparkShims
-import org.apache.gluten.utils.ExceptionUtils
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
@@ -57,12 +56,9 @@ import org.apache.spark.sql.types.{DecimalType, IntegerType,
LongType, StructFie
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
-import org.apache.parquet.crypto.ParquetCryptoRuntimeException
-import org.apache.parquet.format.converter.ParquetMetadataConverter
-import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
+import org.apache.parquet.hadoop.metadata.ParquetMetadata
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
@@ -659,30 +655,18 @@ class Spark35Shims extends SparkShims {
QueryPlan.localIdMap.get().remove(plan)
}
- override def isParquetFileEncrypted(
- fileStatus: LocatedFileStatus,
- conf: Configuration): Boolean = {
- try {
- val footer =
- ParquetFileReader.readFooter(conf, fileStatus.getPath,
ParquetMetadataConverter.NO_FILTER)
- val fileMetaData = footer.getFileMetaData
- fileMetaData.getEncryptionType match {
- // UNENCRYPTED file has a plaintext footer and no file encryption,
- // We can leverage file metadata for this check and return unencrypted.
- case EncryptionType.UNENCRYPTED =>
- false
- // PLAINTEXT_FOOTER has a plaintext footer however the file is
encrypted.
- // In such cases, read the footer and use the metadata for encryption
check.
- case EncryptionType.PLAINTEXT_FOOTER =>
- true
- case _ =>
- false
- }
- } catch {
- // Both footer and file are encrypted, return false.
- case e: Exception if ExceptionUtils.hasCause(e,
classOf[ParquetCryptoRuntimeException]) =>
+ override def isParquetFileEncrypted(footer: ParquetMetadata): Boolean = {
+ footer.getFileMetaData.getEncryptionType match {
+ // UNENCRYPTED file has a plaintext footer and no file encryption,
+ // We can leverage file metadata for this check and return unencrypted.
+ case EncryptionType.UNENCRYPTED =>
+ false
+ // PLAINTEXT_FOOTER has a plaintext footer however the file is encrypted.
+ // In such cases, read the footer and use the metadata for encryption
check.
+ case EncryptionType.PLAINTEXT_FOOTER =>
true
- case e: Exception => false
+ case _ =>
+ false
}
}
diff --git
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index cecb8eee10..88fe373175 100644
---
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.sql.shims.spark40
import org.apache.gluten.execution.PartitionedFileUtilShim
import org.apache.gluten.expression.{ExpressionNames, Sig}
import org.apache.gluten.sql.shims.SparkShims
-import org.apache.gluten.utils.ExceptionUtils
import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
@@ -58,14 +57,9 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.storage.{BlockId, BlockManagerId}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
-import org.apache.parquet.HadoopReadOptions
-import org.apache.parquet.crypto.ParquetCryptoRuntimeException
-import org.apache.parquet.format.converter.ParquetMetadataConverter
-import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.parquet.hadoop.metadata.{CompressionCodecName,
ParquetMetadata}
import org.apache.parquet.hadoop.metadata.FileMetaData.EncryptionType
-import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.schema.MessageType
import java.time.ZoneOffset
@@ -683,41 +677,18 @@ class Spark40Shims extends SparkShims {
QueryPlan.localIdMap.get().remove(plan)
}
- override def isParquetFileEncrypted(
- fileStatus: LocatedFileStatus,
- conf: Configuration): Boolean = {
- val file = HadoopInputFile.fromPath(fileStatus.getPath, conf)
- val filter = ParquetMetadataConverter.NO_FILTER
- val options = HadoopReadOptions
- .builder(file.getConfiguration, file.getPath)
- .withMetadataFilter(filter)
- .build
- val in = file.newStream
- try {
- val footer =
- ParquetFileReader.readFooter(file, options, in)
- val fileMetaData = footer.getFileMetaData
- fileMetaData.getEncryptionType match {
- // UNENCRYPTED file has a plaintext footer and no file encryption,
- // We can leverage file metadata for this check and return unencrypted.
- case EncryptionType.UNENCRYPTED =>
- false
- // PLAINTEXT_FOOTER has a plaintext footer however the file is
encrypted.
- // In such cases, read the footer and use the metadata for encryption
check.
- case EncryptionType.PLAINTEXT_FOOTER =>
- true
- case _ =>
- false
- }
- } catch {
- // Both footer and file are encrypted, return false.
- case e: Exception if ExceptionUtils.hasCause(e,
classOf[ParquetCryptoRuntimeException]) =>
+ override def isParquetFileEncrypted(footer: ParquetMetadata): Boolean = {
+ footer.getFileMetaData.getEncryptionType match {
+ // UNENCRYPTED file has a plaintext footer and no file encryption,
+ // We can leverage file metadata for this check and return unencrypted.
+ case EncryptionType.UNENCRYPTED =>
+ false
+ // PLAINTEXT_FOOTER has a plaintext footer however the file is encrypted.
+ // In such cases, read the footer and use the metadata for encryption
check.
+ case EncryptionType.PLAINTEXT_FOOTER =>
true
- case e: Exception => false
- } finally {
- if (in != null) {
- in.close()
- }
+ case _ =>
+ false
}
}
@@ -774,4 +745,8 @@ class Spark40Shims extends SparkShims {
override def getFileSourceScanStream(scan: FileSourceScanExec):
Option[SparkDataStream] = {
scan.stream
}
+
+ override def unsupportedCodec: Seq[CompressionCodecName] = {
+ Seq(CompressionCodecName.LZO, CompressionCodecName.BROTLI,
CompressionCodecName.LZ4_RAW)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]