This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 8f7c25db7 [GLUTEN-5341][VL][Part 3] Fix and enable some uts of spark
3.5 (#5411)
8f7c25db7 is described below
commit 8f7c25db719b80236b426ca552687eb4ad988636
Author: 高阳阳 <[email protected]>
AuthorDate: Tue Apr 16 11:58:27 2024 +0800
[GLUTEN-5341][VL][Part 3] Fix and enable some uts of spark 3.5 (#5411)
This patch fixes below unit tests:
GlutenParquetCompressionCodecPrecedenceSuite
GlutenParquetV1QuerySuite
GlutenParquetV2QuerySuite
GlutenJsonReadSchemaSuite
GlutenCoalesceShufflePartitionsSuite
GlutenInsertSuite
GlutenPartitionedWriteSuite
GlutenCachedTableSuite
---
.../gluten/utils/velox/VeloxTestSettings.scala | 16 +--
.../apache/spark/sql/GlutenCachedTableSuite.scala | 115 +++++++++++++++++++++
...tenParquetCompressionCodecPrecedenceSuite.scala | 66 +++++++++++-
.../spark/sql/sources/GlutenInsertSuite.scala | 6 +-
4 files changed, 185 insertions(+), 18 deletions(-)
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index a82e73ad4..acd98a173 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -24,7 +24,7 @@ import
org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalog
import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite,
GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite,
GlutenQueryParsingErrorsSuite}
import org.apache.spark.sql.execution.{FallbackStrategiesSuite,
GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite,
GlutenExchangeSuite, GlutenLocalBroadcastExchangeSuite,
GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite,
GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite,
GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite}
import
org.apache.spark.sql.execution.adaptive.velox.VeloxAdaptiveQueryExecSuite
-import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite,
GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite,
GlutenFileFormatWriterSuite, GlutenFileIndexSuite,
GlutenFileMetadataStructSuite, GlutenFileSourceStrategySuite,
GlutenHadoopFileLinesReaderSuite, GlutenHeaderCSVReadSchemaSuite,
GlutenMergedOrcReadSchemaSuite, GlutenMergedParquetReadSchemaSuite,
GlutenOrcCodecSuite, GlutenOrcReadSchemaSuite,
GlutenOrcV1AggregatePushDownSuite, Glute [...]
+import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite,
GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite,
GlutenFileFormatWriterSuite, GlutenFileIndexSuite,
GlutenFileMetadataStructSuite, GlutenFileSourceStrategySuite,
GlutenHadoopFileLinesReaderSuite, GlutenHeaderCSVReadSchemaSuite,
GlutenJsonReadSchemaSuite, GlutenMergedOrcReadSchemaSuite,
GlutenMergedParquetReadSchemaSuite, GlutenOrcCodecSuite,
GlutenOrcReadSchemaSuite, GlutenOrcV1Ag [...]
import
org.apache.spark.sql.execution.datasources.binaryfile.GlutenBinaryFileFormatSuite
import
org.apache.spark.sql.execution.datasources.csv.{GlutenCSVLegacyTimeParserSuite,
GlutenCSVv1Suite, GlutenCSVv2Suite}
import
org.apache.spark.sql.execution.datasources.exchange.GlutenValidateRequirementsSuite
@@ -597,7 +597,6 @@ class VeloxTestSettings extends BackendTestSettings {
// Rewrite by just removing test timestamp.
.exclude("test reading unaligned pages - test all types")
enableSuite[GlutenParquetCompressionCodecPrecedenceSuite]
- // Disable for Spark3.5.
.exclude("Create parquet table with compression")
enableSuite[GlutenParquetDeltaByteArrayEncodingSuite]
enableSuite[GlutenParquetDeltaEncodingInteger]
@@ -669,7 +668,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenParquetV2PartitionDiscoverySuite]
enableSuite[GlutenParquetProtobufCompatibilitySuite]
enableSuite[GlutenParquetV1QuerySuite]
- // Disable for Spark3.5.
.exclude("row group skipping doesn't overflow when reading into larger
type")
// Unsupport spark.sql.files.ignoreCorruptFiles.
.exclude("Enabling/disabling ignoreCorruptFiles")
@@ -686,7 +684,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude(
"SPARK-26677: negated null-safe equality comparison should not filter
matched row groups")
enableSuite[GlutenParquetV2QuerySuite]
- // Disable for Spark3.5.
.exclude("row group skipping doesn't overflow when reading into larger
type")
// Unsupport spark.sql.files.ignoreCorruptFiles.
.exclude("Enabling/disabling ignoreCorruptFiles")
@@ -755,8 +752,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenPruneFileSourcePartitionsSuite]
enableSuite[GlutenCSVReadSchemaSuite]
enableSuite[GlutenHeaderCSVReadSchemaSuite]
- // Disable for Spark3.5.
- // enableSuite[GlutenJsonReadSchemaSuite]
+ enableSuite[GlutenJsonReadSchemaSuite]
enableSuite[GlutenOrcReadSchemaSuite]
.exclude("append column into middle")
.exclude("hide column in the middle")
@@ -840,9 +836,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenBroadcastExchangeSuite]
enableSuite[GlutenLocalBroadcastExchangeSuite]
enableSuite[GlutenCoalesceShufflePartitionsSuite]
- // Disable for Spark3.5.
- .exclude("SPARK-46590 adaptive query execution works correctly with
broadcast join and union")
- .exclude("SPARK-46590 adaptive query execution works correctly with
cartesian join and union")
.excludeByPrefix("determining the number of reducers")
enableSuite[GlutenExchangeSuite]
// ColumnarShuffleExchangeExec does not support doExecute() method
@@ -908,8 +901,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenFilteredScanSuite]
enableSuite[GlutenFiltersSuite]
enableSuite[GlutenInsertSuite]
- // Disable for Spark3.5.
- .excludeByPrefix("Gluten - SPARK-39557")
// the native write staing dir is differnt with vanilla Spark for coustom
partition paths
.exclude("SPARK-35106: Throw exception when rename custom partition paths
returns false")
.exclude("Stop task set if FileAlreadyExistsException was thrown")
@@ -922,8 +913,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-39557 INSERT INTO statements with tables with struct
defaults")
.exclude("SPARK-39557 INSERT INTO statements with tables with map
defaults")
enableSuite[GlutenPartitionedWriteSuite]
- // Disable for Spark3.5.
- .exclude("SPARK-37231, SPARK-37240: Dynamic writes/reads of ANSI interval
partitions")
enableSuite[GlutenPathOptionSuite]
enableSuite[GlutenPrunedScanSuite]
enableSuite[GlutenResolvedDataSourceSuite]
@@ -934,7 +923,6 @@ class VeloxTestSettings extends BackendTestSettings {
// requires resource files from Vanilla spark jar
.exclude("SPARK-32908: maximum target error in percentile_approx")
enableSuite[GlutenCachedTableSuite]
- // Disable for Spark3.5.
.exclude("A cached table preserves the partitioning and ordering of its
cached SparkPlan")
.exclude("InMemoryRelation statistics")
// Extra ColumnarToRow is needed to transform vanilla columnar data to
gluten columnar data.
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
index e146c1310..abd5fddf3 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
@@ -21,11 +21,14 @@ import org.apache.gluten.GlutenConfig
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+import org.apache.spark.sql.internal.SQLConf
class GlutenCachedTableSuite
extends CachedTableSuite
with GlutenSQLTestsTrait
with AdaptiveSparkPlanHelper {
+ import testImplicits._
// for temporarily disable the columnar table cache globally.
sys.props.put(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, "true")
override def sparkConf: SparkConf = {
@@ -40,4 +43,116 @@ class GlutenCachedTableSuite
assert(cached.stats.sizeInBytes === 1132)
}
}
+
+ def verifyNumExchanges(df: DataFrame, expected: Int): Unit = {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ df.collect()
+ }
+ assert(collect(df.queryExecution.executedPlan) {
+ case e: ShuffleExchangeLike => e
+ }.size == expected)
+ }
+
+ testGluten("A cached table preserves the partitioning and ordering of its
cached SparkPlan") {
+ // Distribute the tables into non-matching number of partitions. Need to
shuffle one side.
+ withTempView("t1", "t2") {
+ testData.repartition(6, $"key").createOrReplaceTempView("t1")
+ testData2.repartition(3, $"a").createOrReplaceTempView("t2")
+ spark.catalog.cacheTable("t1")
+ spark.catalog.cacheTable("t2")
+ val query = sql("SELECT key, value, a, b FROM t1 t1 JOIN t2 t2 ON t1.key
= t2.a")
+ verifyNumExchanges(query, 1)
+
assert(stripAQEPlan(query.queryExecution.executedPlan).outputPartitioning.numPartitions
=== 6)
+ checkAnswer(
+ query,
+ testData.join(testData2, $"key" === $"a").select($"key", $"value",
$"a", $"b"))
+ uncacheTable("t1")
+ uncacheTable("t2")
+ }
+
+ // One side of join is not partitioned in the desired way. Need to shuffle
one side.
+ withTempView("t1", "t2") {
+ testData.repartition(6, $"value").createOrReplaceTempView("t1")
+ testData2.repartition(6, $"a").createOrReplaceTempView("t2")
+ spark.catalog.cacheTable("t1")
+ spark.catalog.cacheTable("t2")
+
+ val query = sql("SELECT key, value, a, b FROM t1 t1 JOIN t2 t2 ON t1.key
= t2.a")
+ verifyNumExchanges(query, 1)
+
assert(stripAQEPlan(query.queryExecution.executedPlan).outputPartitioning.numPartitions
=== 6)
+ checkAnswer(
+ query,
+ testData.join(testData2, $"key" === $"a").select($"key", $"value",
$"a", $"b"))
+ uncacheTable("t1")
+ uncacheTable("t2")
+ }
+
+ withTempView("t1", "t2") {
+ testData.repartition(6, $"value").createOrReplaceTempView("t1")
+ testData2.repartition(12, $"a").createOrReplaceTempView("t2")
+ spark.catalog.cacheTable("t1")
+ spark.catalog.cacheTable("t2")
+
+ val query = sql("SELECT key, value, a, b FROM t1 t1 JOIN t2 t2 ON t1.key
= t2.a")
+ verifyNumExchanges(query, 1)
+ assert(
+
stripAQEPlan(query.queryExecution.executedPlan).outputPartitioning.numPartitions
=== 12)
+ checkAnswer(
+ query,
+ testData.join(testData2, $"key" === $"a").select($"key", $"value",
$"a", $"b"))
+ uncacheTable("t1")
+ uncacheTable("t2")
+ }
+
+ // One side of join is not partitioned in the desired way. We'll only
shuffle this side.
+ withTempView("t1", "t2") {
+ testData.repartition(6, $"value").createOrReplaceTempView("t1")
+ testData2.repartition(3, $"a").createOrReplaceTempView("t2")
+ spark.catalog.cacheTable("t1")
+ spark.catalog.cacheTable("t2")
+
+ val query = sql("SELECT key, value, a, b FROM t1 t1 JOIN t2 t2 ON t1.key
= t2.a")
+ verifyNumExchanges(query, 1)
+ checkAnswer(
+ query,
+ testData.join(testData2, $"key" === $"a").select($"key", $"value",
$"a", $"b"))
+ uncacheTable("t1")
+ uncacheTable("t2")
+ }
+
+ // repartition's column ordering is different from group by column
ordering.
+ // But they use the same set of columns.
+ withTempView("t1") {
+ testData.repartition(6, $"value", $"key").createOrReplaceTempView("t1")
+ spark.catalog.cacheTable("t1")
+
+ val query = sql("SELECT value, key from t1 group by key, value")
+ verifyNumExchanges(query, 0)
+ checkAnswer(query, testData.distinct().select($"value", $"key"))
+ uncacheTable("t1")
+ }
+
+ // repartition's column ordering is different from join condition's column
ordering.
+ // We will still shuffle because hashcodes of a row depend on the column
ordering.
+ // If we do not shuffle, we may actually partition two tables in totally
two different way.
+ // See PartitioningSuite for more details.
+ withTempView("t1", "t2") {
+ val df1 = testData
+ df1.repartition(6, $"value", $"key").createOrReplaceTempView("t1")
+ val df2 = testData2.select($"a", $"b".cast("string"))
+ df2.repartition(6, $"a", $"b").createOrReplaceTempView("t2")
+ spark.catalog.cacheTable("t1")
+ spark.catalog.cacheTable("t2")
+
+ val query =
+ sql("SELECT key, value, a, b FROM t1 t1 JOIN t2 t2 ON t1.key = t2.a
and t1.value = t2.b")
+ verifyNumExchanges(query, 1)
+
assert(stripAQEPlan(query.queryExecution.executedPlan).outputPartitioning.numPartitions
=== 6)
+ checkAnswer(
+ query,
+ df1.join(df2, $"key" === $"a" && $"value" === $"b").select($"key",
$"value", $"a", $"b"))
+ uncacheTable("t1")
+ uncacheTable("t2")
+ }
+ }
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetCompressionCodecPrecedenceSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetCompressionCodecPrecedenceSuite.scala
index 661d6aad8..4baa41727 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetCompressionCodecPrecedenceSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetCompressionCodecPrecedenceSuite.scala
@@ -18,6 +18,70 @@ package org.apache.spark.sql.execution.datasources.parquet
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.hadoop.fs.Path
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
class GlutenParquetCompressionCodecPrecedenceSuite
extends ParquetCompressionCodecPrecedenceSuite
- with GlutenSQLTestsBaseTrait {}
+ with GlutenSQLTestsBaseTrait {
+
+ private def getTableCompressionCodec(path: String): Seq[String] = {
+ val hadoopConf = spark.sessionState.newHadoopConf()
+ val codecs = for {
+ footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)
+ block <- footer.getParquetMetadata.getBlocks.asScala
+ column <- block.getColumns.asScala
+ } yield column.getCodec.name()
+ codecs.distinct
+ }
+
+ private def createTableWithCompression(
+ tableName: String,
+ isPartitioned: Boolean,
+ compressionCodec: String,
+ rootDir: File): Unit = {
+ val options =
+ s"""
+
|OPTIONS('path'='${rootDir.toURI.toString.stripSuffix("/")}/$tableName',
+ |'parquet.compression'='$compressionCodec')
+ """.stripMargin
+ val partitionCreate = if (isPartitioned) "PARTITIONED BY (p)" else ""
+ sql(s"""
+ |CREATE TABLE $tableName USING Parquet $options $partitionCreate
+ |AS SELECT 1 AS col1, 2 AS p
+ """.stripMargin)
+ }
+ private def checkCompressionCodec(compressionCodec: String, isPartitioned:
Boolean): Unit = {
+ withTempDir {
+ tmpDir =>
+ val tempTableName = "TempParquetTable"
+ withTable(tempTableName) {
+ createTableWithCompression(tempTableName, isPartitioned,
compressionCodec, tmpDir)
+ val partitionPath = if (isPartitioned) "p=2" else ""
+ val path =
s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath"
+ val realCompressionCodecs = getTableCompressionCodec(path)
+ // Native parquet write currently not support LZ4_RAW
+ // reference here:
https://github.com/facebookincubator/velox/blob/d796cfc8c2a3cc045f
+ //
1b33880c5839fec21a6b3b/velox/dwio/parquet/writer/Writer.cpp#L107C1-L120C17
+ if (compressionCodec == "LZ4_RAW" || compressionCodec == "LZ4RAW") {
+ assert(realCompressionCodecs.forall(_ == "SNAPPY"))
+ } else {
+ assert(realCompressionCodecs.forall(_ == compressionCodec))
+ }
+ }
+ }
+ }
+
+ testGluten("Create parquet table with compression") {
+ Seq(true, false).foreach {
+ isPartitioned =>
+ val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4",
"LZ4RAW", "LZ4_RAW")
+ codecs.foreach {
+ compressionCodec => checkCompressionCodec(compressionCodec,
isPartitioned)
+ }
+ }
+ }
+}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
index cf6f35288..51b034ca8 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
@@ -419,7 +419,7 @@ class GlutenInsertSuite
// declarations, if applicable.
val incompatibleDefault =
"Failed to execute ALTER TABLE ADD COLUMNS command because the
destination " +
- "table column s has a DEFAULT value with type"
+ "table column `s` has a DEFAULT value"
Seq(Config("parquet"), Config("parquet", true)).foreach {
config =>
withTable("t") {
@@ -468,7 +468,7 @@ class GlutenInsertSuite
// declarations, if applicable.
val incompatibleDefault =
"Failed to execute ALTER TABLE ADD COLUMNS command because the
destination " +
- "table column s has a DEFAULT value with type"
+ "table column `s` has a DEFAULT value"
Seq(Config("parquet"), Config("parquet", true)).foreach {
config =>
withTable("t") {
@@ -565,7 +565,7 @@ class GlutenInsertSuite
// declarations, if applicable.
val incompatibleDefault =
"Failed to execute ALTER TABLE ADD COLUMNS command because the
destination " +
- "table column s has a DEFAULT value with type"
+ "table column `s` has a DEFAULT value"
Seq(Config("parquet"), Config("parquet", true)).foreach {
config =>
withTable("t") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]