This is an automated email from the ASF dual-hosted git repository.
yangzy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 8f25b5a844 [VL] Enhance write parquet with compression codec test
(#7737)
8f25b5a844 is described below
commit 8f25b5a8441e2052016d5fc56545081209528bae
Author: Wechar Yu <[email protected]>
AuthorDate: Thu Oct 31 11:02:58 2024 +0800
[VL] Enhance write parquet with compression codec test (#7737)
---
.../sql/execution/VeloxParquetWriteSuite.scala | 38 +++++++++++++---------
1 file changed, 23 insertions(+), 15 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
index ce3b0852f6..4c76c753b9 100644
---
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
@@ -20,7 +20,11 @@ import
org.apache.gluten.execution.VeloxWholeStageTransformerSuite
import org.apache.gluten.test.FallbackUtil
import org.apache.spark.SparkConf
+import org.apache.spark.util.Utils
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.hadoop.util.HadoopInputFile
import org.junit.Assert
class VeloxParquetWriteSuite extends VeloxWholeStageTransformerSuite {
@@ -50,17 +54,11 @@ class VeloxParquetWriteSuite extends
VeloxWholeStageTransformerSuite {
}
}
- testWithSpecifiedSparkVersion("test write parquet with compression codec",
Some("3.2")) {
+ test("test write parquet with compression codec") {
// compression codec details see `VeloxParquetDatasource.cc`
Seq("snappy", "gzip", "zstd", "lz4", "none", "uncompressed")
.foreach {
codec =>
- val extension = codec match {
- case "none" | "uncompressed" => ""
- case "gzip" => "gz"
- case _ => codec
- }
-
TPCHTableDataFrames.foreach {
case (_, df) =>
withTempPath {
@@ -69,15 +67,25 @@ class VeloxParquetWriteSuite extends
VeloxWholeStageTransformerSuite {
.format("parquet")
.option("compression", codec)
.save(f.getCanonicalPath)
- val files = f.list()
- assert(files.nonEmpty, extension)
-
- if (!isSparkVersionGE("3.4")) {
- assert(
- files.exists(_.contains(extension)),
- extension
- ) // filename changed in spark 3.4.
+ val expectedCodec = codec match {
+ case "none" => "uncompressed"
+ case _ => codec
}
+ val parquetFiles = f.list((_, name) =>
name.contains("parquet"))
+ assert(parquetFiles.nonEmpty, expectedCodec)
+ assert(
+ parquetFiles.forall {
+ file =>
+ val path = new Path(f.getCanonicalPath, file)
+ val in = HadoopInputFile.fromPath(path,
spark.sessionState.newHadoopConf())
+ Utils.tryWithResource(ParquetFileReader.open(in)) {
+ reader =>
+ val column =
reader.getFooter.getBlocks.get(0).getColumns.get(0)
+
expectedCodec.equalsIgnoreCase(column.getCodec.toString)
+ }
+ },
+ expectedCodec
+ )
val parquetDf = spark.read
.format("parquet")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]