This is an automated email from the ASF dual-hosted git repository.
zhli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new bd2ca459c [GLUTEN-5341]Fix test write parquet with compression codec
(#5424)
bd2ca459c is described below
commit bd2ca459c7818f4a5c0fee564c014e7ce5be6d8f
Author: ayushi-agarwal <[email protected]>
AuthorDate: Wed Apr 17 17:06:15 2024 +0530
[GLUTEN-5341]Fix test write parquet with compression codec (#5424)
[GLUTEN-5341] Fix test write parquet with compression codec
---
.../gluten/backendsapi/velox/VeloxBackend.scala | 4 +-
.../gluten/utils/velox/VeloxTestSettings.scala | 7 +--
.../org/apache/spark/sql/GlutenJoinSuite.scala | 2 +
...tenParquetCompressionCodecPrecedenceSuite.scala | 67 +---------------------
4 files changed, 8 insertions(+), 72 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 13eb915d9..f6b94a11d 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -180,10 +180,10 @@ object BackendSettings extends BackendSettingsApi {
def validateCompressionCodec(): Option[String] = {
// Velox doesn't support brotli and lzo.
- val unSupportedCompressions = Set("brotli, lzo")
+ val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw")
val compressionCodec =
WriteFilesExecTransformer.getCompressionCodec(options)
if (unSupportedCompressions.contains(compressionCodec)) {
- Some("Brotli or lzo compression codec is unsupported in Velox
backend.")
+ Some("Brotli, lzo, lz4raw and lz4_raw compression codec is unsupported
in Velox backend.")
} else {
None
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index acd98a173..1c74bd247 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -597,7 +597,6 @@ class VeloxTestSettings extends BackendTestSettings {
// Rewrite by just removing test timestamp.
.exclude("test reading unaligned pages - test all types")
enableSuite[GlutenParquetCompressionCodecPrecedenceSuite]
- .exclude("Create parquet table with compression")
enableSuite[GlutenParquetDeltaByteArrayEncodingSuite]
enableSuite[GlutenParquetDeltaEncodingInteger]
enableSuite[GlutenParquetDeltaEncodingLong]
@@ -742,6 +741,9 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("nested column: Max(top level column) not push down")
.exclude("nested column: Count(nested sub-field) not push down")
enableSuite[GlutenParquetCodecSuite]
+ // codec not supported in native
+ .exclude("write and read - file source parquet - codec: lz4_raw")
+ .exclude("write and read - file source parquet - codec: lz4raw")
enableSuite[GlutenOrcCodecSuite]
enableSuite[GlutenFileSourceStrategySuite]
// Plan comparison.
@@ -1096,9 +1098,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Merge runtime bloom filters")
enableSuite[GlutenIntervalFunctionsSuite]
enableSuite[GlutenJoinSuite]
- // Disable for Spark3.5.
- .exclude(
- "SPARK-36612: Support left outer join build left or right outer join
build right in shuffled hash join")
// exclude as it check spark plan
.exclude("SPARK-36794: Ignore duplicated key when building relation for
semi/anti hash join")
// exclude as it check for SMJ node
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenJoinSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenJoinSuite.scala
index 4ac8bd3ea..09718fb1a 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenJoinSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenJoinSuite.scala
@@ -39,6 +39,8 @@ class GlutenJoinSuite extends JoinSuite with
GlutenSQLTestsTrait {
"SPARK-34593: Preserve broadcast nested loop join partitioning and
ordering",
"SPARK-35984: Config to force applying shuffled hash join",
"test SortMergeJoin (with spill)",
+ "SPARK-36612: Support left outer join build left or right" +
+ " outer join build right in shuffled hash join",
// NaN is not supported currently, just skip.
"NaN and -0.0 in join keys"
)
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetCompressionCodecPrecedenceSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetCompressionCodecPrecedenceSuite.scala
index 4baa41727..ac938d4ea 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetCompressionCodecPrecedenceSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetCompressionCodecPrecedenceSuite.scala
@@ -17,71 +17,6 @@
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
-
-import org.apache.hadoop.fs.Path
-
-import java.io.File
-
-import scala.collection.JavaConverters._
-
class GlutenParquetCompressionCodecPrecedenceSuite
extends ParquetCompressionCodecPrecedenceSuite
- with GlutenSQLTestsBaseTrait {
-
- private def getTableCompressionCodec(path: String): Seq[String] = {
- val hadoopConf = spark.sessionState.newHadoopConf()
- val codecs = for {
- footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)
- block <- footer.getParquetMetadata.getBlocks.asScala
- column <- block.getColumns.asScala
- } yield column.getCodec.name()
- codecs.distinct
- }
-
- private def createTableWithCompression(
- tableName: String,
- isPartitioned: Boolean,
- compressionCodec: String,
- rootDir: File): Unit = {
- val options =
- s"""
-
|OPTIONS('path'='${rootDir.toURI.toString.stripSuffix("/")}/$tableName',
- |'parquet.compression'='$compressionCodec')
- """.stripMargin
- val partitionCreate = if (isPartitioned) "PARTITIONED BY (p)" else ""
- sql(s"""
- |CREATE TABLE $tableName USING Parquet $options $partitionCreate
- |AS SELECT 1 AS col1, 2 AS p
- """.stripMargin)
- }
- private def checkCompressionCodec(compressionCodec: String, isPartitioned:
Boolean): Unit = {
- withTempDir {
- tmpDir =>
- val tempTableName = "TempParquetTable"
- withTable(tempTableName) {
- createTableWithCompression(tempTableName, isPartitioned,
compressionCodec, tmpDir)
- val partitionPath = if (isPartitioned) "p=2" else ""
- val path =
s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath"
- val realCompressionCodecs = getTableCompressionCodec(path)
- // Native parquet write currently not support LZ4_RAW
- // reference here:
https://github.com/facebookincubator/velox/blob/d796cfc8c2a3cc045f
- //
1b33880c5839fec21a6b3b/velox/dwio/parquet/writer/Writer.cpp#L107C1-L120C17
- if (compressionCodec == "LZ4_RAW" || compressionCodec == "LZ4RAW") {
- assert(realCompressionCodecs.forall(_ == "SNAPPY"))
- } else {
- assert(realCompressionCodecs.forall(_ == compressionCodec))
- }
- }
- }
- }
-
- testGluten("Create parquet table with compression") {
- Seq(true, false).foreach {
- isPartitioned =>
- val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4",
"LZ4RAW", "LZ4_RAW")
- codecs.foreach {
- compressionCodec => checkCompressionCodec(compressionCodec,
isPartitioned)
- }
- }
- }
-}
+ with GlutenSQLTestsBaseTrait {}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]