This is an automated email from the ASF dual-hosted git repository.

yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f7c25db7 [GLUTEN-5341][VL][Part 3] Fix and enable some uts of spark 
3.5 (#5411)
8f7c25db7 is described below

commit 8f7c25db719b80236b426ca552687eb4ad988636
Author: 高阳阳 <[email protected]>
AuthorDate: Tue Apr 16 11:58:27 2024 +0800

    [GLUTEN-5341][VL][Part 3] Fix and enable some uts of spark 3.5 (#5411)
    
    This patch fixes below unit tests:
    
    GlutenParquetCompressionCodecPrecedenceSuite
    GlutenParquetV1QuerySuite
    GlutenParquetV2QuerySuite
    GlutenJsonReadSchemaSuite
    GlutenCoalesceShufflePartitionsSuite
    GlutenInsertSuite
    GlutenPartitionedWriteSuite
    GlutenCachedTableSuite
---
 .../gluten/utils/velox/VeloxTestSettings.scala     |  16 +--
 .../apache/spark/sql/GlutenCachedTableSuite.scala  | 115 +++++++++++++++++++++
 ...tenParquetCompressionCodecPrecedenceSuite.scala |  66 +++++++++++-
 .../spark/sql/sources/GlutenInsertSuite.scala      |   6 +-
 4 files changed, 185 insertions(+), 18 deletions(-)

diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index a82e73ad4..acd98a173 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -24,7 +24,7 @@ import 
org.apache.spark.sql.connector.{GlutenDataSourceV2DataFrameSessionCatalog
 import org.apache.spark.sql.errors.{GlutenQueryCompilationErrorsDSv2Suite, 
GlutenQueryCompilationErrorsSuite, GlutenQueryExecutionErrorsSuite, 
GlutenQueryParsingErrorsSuite}
 import org.apache.spark.sql.execution.{FallbackStrategiesSuite, 
GlutenBroadcastExchangeSuite, GlutenCoalesceShufflePartitionsSuite, 
GlutenExchangeSuite, GlutenLocalBroadcastExchangeSuite, 
GlutenReplaceHashWithSortAggSuite, GlutenReuseExchangeAndSubquerySuite, 
GlutenSameResultSuite, GlutenSortSuite, GlutenSQLAggregateFunctionSuite, 
GlutenSQLWindowFunctionSuite, GlutenTakeOrderedAndProjectSuite}
 import 
org.apache.spark.sql.execution.adaptive.velox.VeloxAdaptiveQueryExecSuite
-import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite, 
GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite, 
GlutenFileFormatWriterSuite, GlutenFileIndexSuite, 
GlutenFileMetadataStructSuite, GlutenFileSourceStrategySuite, 
GlutenHadoopFileLinesReaderSuite, GlutenHeaderCSVReadSchemaSuite, 
GlutenMergedOrcReadSchemaSuite, GlutenMergedParquetReadSchemaSuite, 
GlutenOrcCodecSuite, GlutenOrcReadSchemaSuite, 
GlutenOrcV1AggregatePushDownSuite, Glute [...]
+import org.apache.spark.sql.execution.datasources.{GlutenBucketingUtilsSuite, 
GlutenCSVReadSchemaSuite, GlutenDataSourceStrategySuite, GlutenDataSourceSuite, 
GlutenFileFormatWriterSuite, GlutenFileIndexSuite, 
GlutenFileMetadataStructSuite, GlutenFileSourceStrategySuite, 
GlutenHadoopFileLinesReaderSuite, GlutenHeaderCSVReadSchemaSuite, 
GlutenJsonReadSchemaSuite, GlutenMergedOrcReadSchemaSuite, 
GlutenMergedParquetReadSchemaSuite, GlutenOrcCodecSuite, 
GlutenOrcReadSchemaSuite, GlutenOrcV1Ag [...]
 import 
org.apache.spark.sql.execution.datasources.binaryfile.GlutenBinaryFileFormatSuite
 import 
org.apache.spark.sql.execution.datasources.csv.{GlutenCSVLegacyTimeParserSuite, 
GlutenCSVv1Suite, GlutenCSVv2Suite}
 import 
org.apache.spark.sql.execution.datasources.exchange.GlutenValidateRequirementsSuite
@@ -597,7 +597,6 @@ class VeloxTestSettings extends BackendTestSettings {
     // Rewrite by just removing test timestamp.
     .exclude("test reading unaligned pages - test all types")
   enableSuite[GlutenParquetCompressionCodecPrecedenceSuite]
-    // Disable for Spark3.5.
     .exclude("Create parquet table with compression")
   enableSuite[GlutenParquetDeltaByteArrayEncodingSuite]
   enableSuite[GlutenParquetDeltaEncodingInteger]
@@ -669,7 +668,6 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenParquetV2PartitionDiscoverySuite]
   enableSuite[GlutenParquetProtobufCompatibilitySuite]
   enableSuite[GlutenParquetV1QuerySuite]
-    // Disable for Spark3.5.
     .exclude("row group skipping doesn't overflow when reading into larger 
type")
     // Unsupport spark.sql.files.ignoreCorruptFiles.
     .exclude("Enabling/disabling ignoreCorruptFiles")
@@ -686,7 +684,6 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude(
       "SPARK-26677: negated null-safe equality comparison should not filter 
matched row groups")
   enableSuite[GlutenParquetV2QuerySuite]
-    // Disable for Spark3.5.
     .exclude("row group skipping doesn't overflow when reading into larger 
type")
     // Unsupport spark.sql.files.ignoreCorruptFiles.
     .exclude("Enabling/disabling ignoreCorruptFiles")
@@ -755,8 +752,7 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenPruneFileSourcePartitionsSuite]
   enableSuite[GlutenCSVReadSchemaSuite]
   enableSuite[GlutenHeaderCSVReadSchemaSuite]
-  // Disable for Spark3.5.
-  // enableSuite[GlutenJsonReadSchemaSuite]
+  enableSuite[GlutenJsonReadSchemaSuite]
   enableSuite[GlutenOrcReadSchemaSuite]
     .exclude("append column into middle")
     .exclude("hide column in the middle")
@@ -840,9 +836,6 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenBroadcastExchangeSuite]
   enableSuite[GlutenLocalBroadcastExchangeSuite]
   enableSuite[GlutenCoalesceShufflePartitionsSuite]
-    // Disable for Spark3.5.
-    .exclude("SPARK-46590 adaptive query execution works correctly with 
broadcast join and union")
-    .exclude("SPARK-46590 adaptive query execution works correctly with 
cartesian join and union")
     .excludeByPrefix("determining the number of reducers")
   enableSuite[GlutenExchangeSuite]
     // ColumnarShuffleExchangeExec does not support doExecute() method
@@ -908,8 +901,6 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenFilteredScanSuite]
   enableSuite[GlutenFiltersSuite]
   enableSuite[GlutenInsertSuite]
-    // Disable for Spark3.5.
-    .excludeByPrefix("Gluten - SPARK-39557")
     // the native write staing dir is differnt with vanilla Spark for coustom 
partition paths
     .exclude("SPARK-35106: Throw exception when rename custom partition paths 
returns false")
     .exclude("Stop task set if FileAlreadyExistsException was thrown")
@@ -922,8 +913,6 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-39557 INSERT INTO statements with tables with struct 
defaults")
     .exclude("SPARK-39557 INSERT INTO statements with tables with map 
defaults")
   enableSuite[GlutenPartitionedWriteSuite]
-    // Disable for Spark3.5.
-    .exclude("SPARK-37231, SPARK-37240: Dynamic writes/reads of ANSI interval 
partitions")
   enableSuite[GlutenPathOptionSuite]
   enableSuite[GlutenPrunedScanSuite]
   enableSuite[GlutenResolvedDataSourceSuite]
@@ -934,7 +923,6 @@ class VeloxTestSettings extends BackendTestSettings {
     // requires resource files from Vanilla spark jar
     .exclude("SPARK-32908: maximum target error in percentile_approx")
   enableSuite[GlutenCachedTableSuite]
-    // Disable for Spark3.5.
     .exclude("A cached table preserves the partitioning and ordering of its 
cached SparkPlan")
     .exclude("InMemoryRelation statistics")
     // Extra ColumnarToRow is needed to transform vanilla columnar data to 
gluten columnar data.
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
index e146c1310..abd5fddf3 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenCachedTableSuite.scala
@@ -21,11 +21,14 @@ import org.apache.gluten.GlutenConfig
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+import org.apache.spark.sql.internal.SQLConf
 
 class GlutenCachedTableSuite
   extends CachedTableSuite
   with GlutenSQLTestsTrait
   with AdaptiveSparkPlanHelper {
+  import testImplicits._
   // for temporarily disable the columnar table cache globally.
   sys.props.put(GlutenConfig.COLUMNAR_TABLE_CACHE_ENABLED.key, "true")
   override def sparkConf: SparkConf = {
@@ -40,4 +43,116 @@ class GlutenCachedTableSuite
         assert(cached.stats.sizeInBytes === 1132)
     }
   }
+
+  def verifyNumExchanges(df: DataFrame, expected: Int): Unit = {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      df.collect()
+    }
+    assert(collect(df.queryExecution.executedPlan) {
+      case e: ShuffleExchangeLike => e
+    }.size == expected)
+  }
+
+  testGluten("A cached table preserves the partitioning and ordering of its 
cached SparkPlan") {
+    // Distribute the tables into non-matching number of partitions. Need to 
shuffle one side.
+    withTempView("t1", "t2") {
+      testData.repartition(6, $"key").createOrReplaceTempView("t1")
+      testData2.repartition(3, $"a").createOrReplaceTempView("t2")
+      spark.catalog.cacheTable("t1")
+      spark.catalog.cacheTable("t2")
+      val query = sql("SELECT key, value, a, b FROM t1 t1 JOIN t2 t2 ON t1.key 
= t2.a")
+      verifyNumExchanges(query, 1)
+      
assert(stripAQEPlan(query.queryExecution.executedPlan).outputPartitioning.numPartitions
 === 6)
+      checkAnswer(
+        query,
+        testData.join(testData2, $"key" === $"a").select($"key", $"value", 
$"a", $"b"))
+      uncacheTable("t1")
+      uncacheTable("t2")
+    }
+
+    // One side of join is not partitioned in the desired way. Need to shuffle 
one side.
+    withTempView("t1", "t2") {
+      testData.repartition(6, $"value").createOrReplaceTempView("t1")
+      testData2.repartition(6, $"a").createOrReplaceTempView("t2")
+      spark.catalog.cacheTable("t1")
+      spark.catalog.cacheTable("t2")
+
+      val query = sql("SELECT key, value, a, b FROM t1 t1 JOIN t2 t2 ON t1.key 
= t2.a")
+      verifyNumExchanges(query, 1)
+      
assert(stripAQEPlan(query.queryExecution.executedPlan).outputPartitioning.numPartitions
 === 6)
+      checkAnswer(
+        query,
+        testData.join(testData2, $"key" === $"a").select($"key", $"value", 
$"a", $"b"))
+      uncacheTable("t1")
+      uncacheTable("t2")
+    }
+
+    withTempView("t1", "t2") {
+      testData.repartition(6, $"value").createOrReplaceTempView("t1")
+      testData2.repartition(12, $"a").createOrReplaceTempView("t2")
+      spark.catalog.cacheTable("t1")
+      spark.catalog.cacheTable("t2")
+
+      val query = sql("SELECT key, value, a, b FROM t1 t1 JOIN t2 t2 ON t1.key 
= t2.a")
+      verifyNumExchanges(query, 1)
+      assert(
+        
stripAQEPlan(query.queryExecution.executedPlan).outputPartitioning.numPartitions
 === 12)
+      checkAnswer(
+        query,
+        testData.join(testData2, $"key" === $"a").select($"key", $"value", 
$"a", $"b"))
+      uncacheTable("t1")
+      uncacheTable("t2")
+    }
+
+    // One side of join is not partitioned in the desired way. We'll only 
shuffle this side.
+    withTempView("t1", "t2") {
+      testData.repartition(6, $"value").createOrReplaceTempView("t1")
+      testData2.repartition(3, $"a").createOrReplaceTempView("t2")
+      spark.catalog.cacheTable("t1")
+      spark.catalog.cacheTable("t2")
+
+      val query = sql("SELECT key, value, a, b FROM t1 t1 JOIN t2 t2 ON t1.key 
= t2.a")
+      verifyNumExchanges(query, 1)
+      checkAnswer(
+        query,
+        testData.join(testData2, $"key" === $"a").select($"key", $"value", 
$"a", $"b"))
+      uncacheTable("t1")
+      uncacheTable("t2")
+    }
+
+    // repartition's column ordering is different from group by column 
ordering.
+    // But they use the same set of columns.
+    withTempView("t1") {
+      testData.repartition(6, $"value", $"key").createOrReplaceTempView("t1")
+      spark.catalog.cacheTable("t1")
+
+      val query = sql("SELECT value, key from t1 group by key, value")
+      verifyNumExchanges(query, 0)
+      checkAnswer(query, testData.distinct().select($"value", $"key"))
+      uncacheTable("t1")
+    }
+
+    // repartition's column ordering is different from join condition's column 
ordering.
+    // We will still shuffle because hashcodes of a row depend on the column 
ordering.
+    // If we do not shuffle, we may actually partition two tables in totally 
two different way.
+    // See PartitioningSuite for more details.
+    withTempView("t1", "t2") {
+      val df1 = testData
+      df1.repartition(6, $"value", $"key").createOrReplaceTempView("t1")
+      val df2 = testData2.select($"a", $"b".cast("string"))
+      df2.repartition(6, $"a", $"b").createOrReplaceTempView("t2")
+      spark.catalog.cacheTable("t1")
+      spark.catalog.cacheTable("t2")
+
+      val query =
+        sql("SELECT key, value, a, b FROM t1 t1 JOIN t2 t2 ON t1.key = t2.a 
and t1.value = t2.b")
+      verifyNumExchanges(query, 1)
+      
assert(stripAQEPlan(query.queryExecution.executedPlan).outputPartitioning.numPartitions
 === 6)
+      checkAnswer(
+        query,
+        df1.join(df2, $"key" === $"a" && $"value" === $"b").select($"key", 
$"value", $"a", $"b"))
+      uncacheTable("t1")
+      uncacheTable("t2")
+    }
+  }
 }
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetCompressionCodecPrecedenceSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetCompressionCodecPrecedenceSuite.scala
index 661d6aad8..4baa41727 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetCompressionCodecPrecedenceSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetCompressionCodecPrecedenceSuite.scala
@@ -18,6 +18,70 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import org.apache.spark.sql.GlutenSQLTestsBaseTrait
 
+import org.apache.hadoop.fs.Path
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
 class GlutenParquetCompressionCodecPrecedenceSuite
   extends ParquetCompressionCodecPrecedenceSuite
-  with GlutenSQLTestsBaseTrait {}
+  with GlutenSQLTestsBaseTrait {
+
+  private def getTableCompressionCodec(path: String): Seq[String] = {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    val codecs = for {
+      footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf)
+      block <- footer.getParquetMetadata.getBlocks.asScala
+      column <- block.getColumns.asScala
+    } yield column.getCodec.name()
+    codecs.distinct
+  }
+
+  private def createTableWithCompression(
+      tableName: String,
+      isPartitioned: Boolean,
+      compressionCodec: String,
+      rootDir: File): Unit = {
+    val options =
+      s"""
+         
|OPTIONS('path'='${rootDir.toURI.toString.stripSuffix("/")}/$tableName',
+         |'parquet.compression'='$compressionCodec')
+       """.stripMargin
+    val partitionCreate = if (isPartitioned) "PARTITIONED BY (p)" else ""
+    sql(s"""
+           |CREATE TABLE $tableName USING Parquet $options $partitionCreate
+           |AS SELECT 1 AS col1, 2 AS p
+       """.stripMargin)
+  }
+  private def checkCompressionCodec(compressionCodec: String, isPartitioned: 
Boolean): Unit = {
+    withTempDir {
+      tmpDir =>
+        val tempTableName = "TempParquetTable"
+        withTable(tempTableName) {
+          createTableWithCompression(tempTableName, isPartitioned, 
compressionCodec, tmpDir)
+          val partitionPath = if (isPartitioned) "p=2" else ""
+          val path = 
s"${tmpDir.getPath.stripSuffix("/")}/$tempTableName/$partitionPath"
+          val realCompressionCodecs = getTableCompressionCodec(path)
+          // Native parquet write currently not support LZ4_RAW
+          // reference here: 
https://github.com/facebookincubator/velox/blob/d796cfc8c2a3cc045f
+          // 
1b33880c5839fec21a6b3b/velox/dwio/parquet/writer/Writer.cpp#L107C1-L120C17
+          if (compressionCodec == "LZ4_RAW" || compressionCodec == "LZ4RAW") {
+            assert(realCompressionCodecs.forall(_ == "SNAPPY"))
+          } else {
+            assert(realCompressionCodecs.forall(_ == compressionCodec))
+          }
+        }
+    }
+  }
+
+  testGluten("Create parquet table with compression") {
+    Seq(true, false).foreach {
+      isPartitioned =>
+        val codecs = Seq("UNCOMPRESSED", "SNAPPY", "GZIP", "ZSTD", "LZ4", 
"LZ4RAW", "LZ4_RAW")
+        codecs.foreach {
+          compressionCodec => checkCompressionCodec(compressionCodec, 
isPartitioned)
+        }
+    }
+  }
+}
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
index cf6f35288..51b034ca8 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
@@ -419,7 +419,7 @@ class GlutenInsertSuite
       // declarations, if applicable.
       val incompatibleDefault =
         "Failed to execute ALTER TABLE ADD COLUMNS command because the 
destination " +
-          "table column s has a DEFAULT value with type"
+          "table column `s` has a DEFAULT value"
       Seq(Config("parquet"), Config("parquet", true)).foreach {
         config =>
           withTable("t") {
@@ -468,7 +468,7 @@ class GlutenInsertSuite
       // declarations, if applicable.
       val incompatibleDefault =
         "Failed to execute ALTER TABLE ADD COLUMNS command because the 
destination " +
-          "table column s has a DEFAULT value with type"
+          "table column `s` has a DEFAULT value"
       Seq(Config("parquet"), Config("parquet", true)).foreach {
         config =>
           withTable("t") {
@@ -565,7 +565,7 @@ class GlutenInsertSuite
       // declarations, if applicable.
       val incompatibleDefault =
         "Failed to execute ALTER TABLE ADD COLUMNS command because the 
destination " +
-          "table column s has a DEFAULT value with type"
+          "table column `s` has a DEFAULT value"
       Seq(Config("parquet"), Config("parquet", true)).foreach {
         config =>
           withTable("t") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to