This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f44eeda5bc [HUDI-8018] Improve Spark SQL tests (#11859)
4f44eeda5bc is described below

commit 4f44eeda5bcbd6a765c8bcf30352c47b47af04d4
Author: Sagar Sumit <[email protected]>
AuthorDate: Tue Sep 10 11:04:34 2024 +0530

    [HUDI-8018] Improve Spark SQL tests (#11859)
---
 .../TestAutoGenerationOfRecordKeys.scala           |  16 +-
 .../hudi/functional/TestMetadataRecordIndex.scala  |  11 +-
 .../functional/TestParquetColumnProjection.scala   |   3 +-
 .../TestSparkSqlWithTimestampKeyGenerator.scala    |  30 +-
 .../sql/hudi/ddl/TestAlterTableDropPartition.scala |  72 ++--
 .../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala  | 219 +++++------
 .../spark/sql/hudi/dml/TestCDCForSparkSQL.scala    |  74 ++--
 .../spark/sql/hudi/dml/TestCompactionTable.scala   |  70 ++++
 .../spark/sql/hudi/dml/TestDataSkippingQuery.scala | 277 +++++++-------
 .../spark/sql/hudi/dml/TestInsertTable.scala       | 424 +++++++++++----------
 .../spark/sql/hudi/dml/TestMergeIntoTable.scala    |   6 +-
 .../spark/sql/hudi/dml/TestTimeTravelTable.scala   | 246 ++++++------
 .../spark/sql/hudi/dml/TestUpdateTable.scala       |  63 ++-
 13 files changed, 814 insertions(+), 697 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
index 247454a0626..5c35c28c007 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestAutoGenerationOfRecordKeys.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.{SaveMode, SparkSession, 
SparkSessionExtensions}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.CsvSource
+import org.junit.jupiter.params.provider.{CsvSource, EnumSource}
 
 import java.util.function.Consumer
 
@@ -210,7 +210,7 @@ class TestAutoGenerationOfRecordKeys extends 
HoodieSparkClientTestBase with Scal
     var options: Map[String, String] = vanillaWriteOpts ++ Map(
       DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
classOf[SimpleKeyGenerator].getCanonicalName)
 
-    // NOTE: In this test we deliberately removing record-key configuration
+    // NOTE: In this test we are deliberately removing record-key configuration
     //       to validate Hudi is handling this case appropriately
     var writeOpts = options -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
 
@@ -264,14 +264,16 @@ class TestAutoGenerationOfRecordKeys extends 
HoodieSparkClientTestBase with Scal
     assertEquals(5, snapshot0.count())
   }
 
-  @Test
-  def testUpsertsAndDeletesWithPkLess(): Unit = {
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testUpsertsAndDeletesWithPkLess(tableType: HoodieTableType): Unit = {
     val (vanillaWriteOpts, readOpts) = 
getWriterReaderOpts(HoodieRecordType.AVRO)
 
-    var options: Map[String, String] = vanillaWriteOpts ++ Map(
-      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
classOf[SimpleKeyGenerator].getCanonicalName)
+    val options: Map[String, String] = vanillaWriteOpts ++ Map(
+      DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> 
classOf[SimpleKeyGenerator].getCanonicalName,
+      DataSourceWriteOptions.TABLE_TYPE.key -> tableType.name())
 
-    var writeOpts = options -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
+    val writeOpts = options -- Seq(DataSourceWriteOptions.RECORDKEY_FIELD.key)
 
     // Insert Operation
     val records = recordsToStrings(dataGen.generateInserts("000", 
20)).asScala.toList
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
index 4253f425a58..74686933921 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
@@ -28,13 +28,13 @@ import org.apache.hudi.common.util.Option
 import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
 import org.apache.hudi.metadata.{HoodieBackedTableMetadata, 
HoodieTableMetadataUtil, MetadataPartitionType}
 import org.apache.hudi.testutils.HoodieSparkClientTestBase
-
 import org.apache.spark.sql._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api._
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
 
 import java.util.concurrent.atomic.AtomicInteger
-
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
@@ -78,10 +78,11 @@ class TestMetadataRecordIndex extends 
HoodieSparkClientTestBase {
     cleanupSparkContexts()
   }
 
-  @Test
-  def testClusteringWithRecordIndex(): Unit = {
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testClusteringWithRecordIndex(tableType: HoodieTableType): Unit = {
     val hudiOpts = commonOpts ++ Map(
-      TABLE_TYPE.key -> HoodieTableType.COPY_ON_WRITE.name(),
+      TABLE_TYPE.key -> tableType.name(),
       HoodieClusteringConfig.INLINE_CLUSTERING.key() -> "true",
       HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key() -> "2"
     )
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index 8e48bc1a070..b62cf1b9f93 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.{Dataset, HoodieUnsafeUtils, Row, SaveMode}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertTrue, fail}
-import org.junit.jupiter.api.{Disabled, Tag, Test}
+import org.junit.jupiter.api.{Tag, Test}
 
 import scala.collection.JavaConverters._
 import scala.math.abs
@@ -60,7 +60,6 @@ class TestParquetColumnProjection extends 
SparkClientFunctionalTestHarness with
 
   override def conf: SparkConf = conf(getSparkSqlConf)
 
-  @Disabled("Currently disabled b/c of the fallback to HadoopFsRelation")
   @Test
   def testBaseFileOnlyViewRelation(): Unit = {
     val tablePath = s"$basePath/cow"
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
index eae33bb860d..e7b20318a00 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithTimestampKeyGenerator.scala
@@ -44,7 +44,7 @@ class TestSparkSqlWithTimestampKeyGenerator extends 
HoodieSparkSqlTestBase {
         timestampKeyGeneratorSettings.foreach { keyGeneratorSettings =>
           withTable(generateTableName) { tableName =>
             // Warning level is used due to CI run with warn-log profile for 
quick failed cases identification
-            LOG.warn(s"Table '${tableName}' with parameters: ${testParams}. 
Timestamp key generator settings: ${keyGeneratorSettings}")
+            LOG.warn(s"Table '$tableName' with parameters: $testParams. 
Timestamp key generator settings: $keyGeneratorSettings")
             val tablePath = tmp.getCanonicalPath + "/" + tableName
             val tsType = if (keyGeneratorSettings.contains("DATE_STRING")) 
"string" else "long"
             spark.sql(
@@ -53,19 +53,19 @@ class TestSparkSqlWithTimestampKeyGenerator extends 
HoodieSparkSqlTestBase {
                  |   id int,
                  |   name string,
                  |   precomb long,
-                 |   ts ${tsType}
+                 |   ts $tsType
                  | ) USING HUDI
                  | PARTITIONED BY (ts)
-                 | LOCATION '${tablePath}'
+                 | LOCATION '$tablePath'
                  | TBLPROPERTIES (
-                 |   type = '${tableType}',
+                 |   type = '$tableType',
                  |   primaryKey = 'id',
                  |   preCombineField = 'precomb',
                  |   hoodie.datasource.write.partitionpath.field = 'ts',
                  |   hoodie.datasource.write.hive_style_partitioning = 'false',
-                 |   hoodie.file.group.reader.enabled = 
'${shouldUseFileGroupReader}',
+                 |   hoodie.file.group.reader.enabled = 
'$shouldUseFileGroupReader',
                  |   hoodie.table.keygenerator.class = 
'org.apache.hudi.keygen.TimestampBasedKeyGenerator',
-                 |   ${keyGeneratorSettings}
+                 |   $keyGeneratorSettings
                  | )
                  |""".stripMargin)
             // TODO: couldn't set `TIMESTAMP` for 
`hoodie.table.keygenerator.type`, it's overwritten by `SIMPLE`, only 
`hoodie.table.keygenerator.class` works
@@ -77,14 +77,14 @@ class TestSparkSqlWithTimestampKeyGenerator extends 
HoodieSparkSqlTestBase {
             else // UNIX_TIMESTAMP, and SCALAR with SECONDS
               (dataBatchesWithLongOfSeconds, queryResultWithLongOfSeconds)
 
-            withSQLConf("hoodie.file.group.reader.enabled" -> 
s"${shouldUseFileGroupReader}",
+            withSQLConf("hoodie.file.group.reader.enabled" -> 
s"$shouldUseFileGroupReader",
               "hoodie.datasource.query.type" -> "snapshot") {
               // two partitions, one contains parquet file only, the second 
one contains parquet and log files for MOR, and two parquets for COW
-              spark.sql(s"INSERT INTO ${tableName} VALUES ${dataBatches(0)}")
-              spark.sql(s"INSERT INTO ${tableName} VALUES ${dataBatches(1)}")
+              spark.sql(s"INSERT INTO $tableName VALUES ${dataBatches(0)}")
+              spark.sql(s"INSERT INTO $tableName VALUES ${dataBatches(1)}")
 
-              val queryResult = spark.sql(s"SELECT id, name, precomb, ts FROM 
${tableName} ORDER BY id").collect().mkString("; ")
-              LOG.warn(s"Query result: ${queryResult}")
+              val queryResult = spark.sql(s"SELECT id, name, precomb, ts FROM 
$tableName ORDER BY id").collect().mkString("; ")
+              LOG.warn(s"Query result: $queryResult")
               // TODO: use `shouldExtractPartitionValuesFromPartitionPath` 
uniformly, and get `expectedQueryResult` for all cases instead of 
`expectedQueryResultWithLossyString` for some cases
               //   After it we could properly process filters like "WHERE ts 
BETWEEN 1078016000 and 1718953003" and add tests with partition pruning.
               //   COW: Fix for [HUDI-3896] overwrites 
`shouldExtractPartitionValuesFromPartitionPath` in `BaseFileOnlyRelation`, 
therefore for COW we extracting from partition paths and get nulls
@@ -134,18 +134,18 @@ object TestSparkSqlWithTimestampKeyGenerator {
   val timestampKeyGeneratorSettings: Array[String] = Array(
     s"""
        |   hoodie.keygen.timebased.timestamp.type = 'UNIX_TIMESTAMP',
-       |   hoodie.keygen.timebased.output.dateformat = 
'${outputDateformat}'""",
+       |   hoodie.keygen.timebased.output.dateformat = '$outputDateformat'""",
     s"""
        |   hoodie.keygen.timebased.timestamp.type = 'EPOCHMILLISECONDS',
-       |   hoodie.keygen.timebased.output.dateformat = 
'${outputDateformat}'""",
+       |   hoodie.keygen.timebased.output.dateformat = '$outputDateformat'""",
     s"""
        |   hoodie.keygen.timebased.timestamp.type = 'SCALAR',
        |   hoodie.keygen.timebased.timestamp.scalar.time.unit = 'SECONDS',
-       |   hoodie.keygen.timebased.output.dateformat = 
'${outputDateformat}'""",
+       |   hoodie.keygen.timebased.output.dateformat = '$outputDateformat'""",
     s"""
        |   hoodie.keygen.timebased.timestamp.type = 'DATE_STRING',
        |   hoodie.keygen.timebased.input.dateformat = 'yyyy-MM-dd HH:mm:ss',
-       |   hoodie.keygen.timebased.output.dateformat = '${outputDateformat}'"""
+       |   hoodie.keygen.timebased.output.dateformat = '$outputDateformat'"""
   )
 
   // All data batches should correspond to 2004-02-29 01:02:03 and 2024-06-21 
06:50:03
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
index 3fcbea3a05c..5f8f2673bc1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
@@ -520,42 +520,44 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
   }
 
   test("Prevent a partition from being dropped if there are pending CLUSTERING 
jobs") {
-    withTempDir { tmp =>
-      val tableName = generateTableName
-      val basePath = s"${tmp.getCanonicalPath}t/$tableName"
-      val schemaFields = Seq("id", "name", "price", "ts")
-      spark.sql(
-        s"""
-           |create table $tableName (
-           |  id int,
-           |  name string,
-           |  price double,
-           |  ts long
-           |) using hudi
-           | options (
-           |  primaryKey ='id',
-           |  type = 'cow',
-           |  preCombineField = 'ts'
-           | )
-           | partitioned by(ts)
-           | location '$basePath'
-           | """.stripMargin)
-      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
-      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
-      val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, 
Map.empty, Option(tableName))
-
-      // Generate the first clustering plan
-      val firstScheduleInstant = client.createNewInstantTime()
-      client.scheduleClusteringAtInstant(firstScheduleInstant, HOption.empty())
-
-      checkAnswer(s"call show_clustering('$tableName')")(
-        Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), "*")
-      )
+    Seq("cow", "mor").foreach { tableType =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+        val schemaFields = Seq("id", "name", "price", "ts")
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | options (
+             |  primaryKey ='id',
+             |  type = '$tableType',
+             |  preCombineField = 'ts'
+             | )
+             | partitioned by(ts)
+             | location '$basePath'
+             | """.stripMargin)
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+        val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, 
Map.empty, Option(tableName))
+
+        // Generate the first clustering plan
+        val firstScheduleInstant = client.createNewInstantTime()
+        client.scheduleClusteringAtInstant(firstScheduleInstant, 
HOption.empty())
+
+        checkAnswer(s"call show_clustering('$tableName')")(
+          Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), 
"*")
+        )
 
-      val partition = "ts=1002"
-      val errMsg = s"Failed to drop partitions. Please ensure that there are 
no pending table service actions (clustering/compaction) for the partitions to 
be deleted: [$partition]"
-      checkExceptionContain(s"ALTER TABLE $tableName DROP 
PARTITION($partition)")(errMsg)
+        val partition = "ts=1002"
+        val errMsg = s"Failed to drop partitions. Please ensure that there are 
no pending table service actions (clustering/compaction) for the partitions to 
be deleted: [$partition]"
+        checkExceptionContain(s"ALTER TABLE $tableName DROP 
PARTITION($partition)")(errMsg)
+      }
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index 385ba06ba5e..fb39e8eaef7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -41,7 +41,7 @@ import scala.collection.JavaConverters._
 class TestSpark3DDL extends HoodieSparkSqlTestBase {
 
   def createTestResult(tableName: String): Array[Row] = {
-    spark.sql(s"select * from ${tableName} order by id")
+    spark.sql(s"select * from $tableName order by id")
       .drop("_hoodie_commit_time", "_hoodie_commit_seqno", 
"_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
   }
 
@@ -184,53 +184,55 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
   }
 
   test("Test Enable and Disable Schema on read") {
-    withTempDir { tmp =>
-      val tableName = generateTableName
-      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
-      if (HoodieSparkUtils.gteqSpark3_3) {
-        spark.sql("set hoodie.schema.on.read.enable=true")
-        // Create table
-        spark.sql(
-          s"""
-             |create table $tableName (
-             |  id int,
-             |  name string,
-             |  price double,
-             |  ts long
-             |) using hudi
-             | location '$tablePath'
-             | tblproperties (
-             |  type = 'cow',
-             |  primaryKey = 'id',
-             |  preCombineField = 'ts'
-             | )
+      withTempDir { tmp =>
+        Seq("cow", "mor").foreach { tableType =>
+        val tableName = generateTableName
+        val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+        if (HoodieSparkUtils.gteqSpark3_3) {
+          spark.sql("set hoodie.schema.on.read.enable=true")
+          // Create table
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               | location '$tablePath'
+               | tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts'
+               | )
        """.stripMargin)
 
-        // Insert data to the new table.
-        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 10.0, 1000)
-        )
-
-        // add column
-        spark.sql(s"alter table $tableName add columns(new_col string)")
-        val catalogTable = spark.sessionState.catalog.getTableMetadata(new 
TableIdentifier(tableName))
-        assertResult(Seq("id", "name", "price", "ts", "new_col")) {
-          
HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name)
+          // Insert data to the new table.
+          spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+          checkAnswer(s"select id, name, price, ts from $tableName")(
+            Seq(1, "a1", 10.0, 1000)
+          )
+
+          // add column
+          spark.sql(s"alter table $tableName add columns(new_col string)")
+          val catalogTable = spark.sessionState.catalog.getTableMetadata(new 
TableIdentifier(tableName))
+          assertResult(Seq("id", "name", "price", "ts", "new_col")) {
+            
HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name)
+          }
+          checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
+            Seq(1, "a1", 10.0, 1000, null)
+          )
+          // disable schema on read.
+          spark.sql("set hoodie.schema.on.read.enable=false")
+          spark.sql(s"refresh table $tableName")
+          // Insert data to the new table.
+          spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')")
+          // write should succeed. and subsequent read should succeed as well.
+          checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
+            Seq(1, "a1", 10.0, 1000, null),
+            Seq(2, "a2", 12.0, 2000, "e0")
+          )
         }
-        checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
-          Seq(1, "a1", 10.0, 1000, null)
-        )
-        // disable schema on read.
-        spark.sql("set hoodie.schema.on.read.enable=false")
-        spark.sql(s"refresh table $tableName")
-        // Insert data to the new table.
-        spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')")
-        // write should succeed. and subsequent read should succeed as well.
-        checkAnswer(s"select id, name, price, ts, new_col from $tableName")(
-          Seq(1, "a1", 10.0, 1000, null),
-          Seq(2, "a2", 12.0, 2000, "e0")
-        )
       }
     }
   }
@@ -896,7 +898,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
     }
   }
 
-  test("Test DOUBLE to DECIMAL schema evolution (lost in scale)") {
+  test("Test DOUBLE or STRING to DECIMAL schema evolution (lost in scale)") {
     Seq("cow", "mor").foreach { tableType =>
       withTempDir { tmp =>
         // Using INMEMORY index for mor table so that log files will be 
created instead of parquet
@@ -957,57 +959,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
             Seq(12, "a12", "-10.04", 1000)
           )
 
-          // clear after using INMEMORY index
-          HoodieInMemoryHashIndex.clear()
-        }
-      }
-    }
-  }
-
-  test("Test STRING to DECIMAL schema evolution (lost in scale)") {
-    Seq("cow", "mor").foreach { tableType =>
-      withTempDir { tmp =>
-        // Using INMEMORY index for mor table so that log files will be 
created instead of parquet
-        val tableName = generateTableName
-        if (HoodieSparkUtils.gteqSpark3_3) {
-          spark.sql(
-            s"""
-               |create table $tableName (
-               |  id int,
-               |  name string,
-               |  price string,
-               |  ts long
-               |) using hudi
-               | location '${tmp.getCanonicalPath}'
-               | tblproperties (
-               |  primaryKey = 'id',
-               |  type = '$tableType',
-               |  preCombineField = 'ts'
-               |  ${if (tableType.equals("mor")) ", hoodie.index.type = 
'INMEMORY'" else ""}
-               | )
-           """.stripMargin)
-
-          spark.sql(s"insert into $tableName values " +
-            // testing the rounding behaviour to ensure that HALF_UP is used 
for positive values
-            "(1, 'a1', '10.024', 1000)," +
-            "(2, 'a2', '10.025', 1000)," +
-            "(3, 'a3', '10.026', 1000)," +
-            // testing the rounding behaviour to ensure that HALF_UP is used 
for negative values
-            "(4, 'a4', '-10.024', 1000)," +
-            "(5, 'a5', '-10.025', 1000)," +
-            "(6, 'a6', '-10.026', 1000)," +
-            // testing the GENERAL rounding behaviour (HALF_UP and HALF_EVEN 
will retain the same result)
-            "(7, 'a7', '10.034', 1000)," +
-            "(8, 'a8', '10.035', 1000)," +
-            "(9, 'a9', '10.036', 1000)," +
-            // testing the GENERAL rounding behaviour (HALF_UP and HALF_EVEN 
will retain the same result)
-            "(10, 'a10', '-10.034', 1000)," +
-            "(11, 'a11', '-10.035', 1000)," +
-            "(12, 'a12', '-10.036', 1000)")
-
-          
assertResult(tableType.equals("mor"))(DataSourceTestUtils.isLogFileOnly(tmp.getCanonicalPath))
-
-          spark.sql("set hoodie.schema.on.read.enable=true")
           spark.sql(s"alter table $tableName alter column price type 
decimal(4, 2)")
 
           checkAnswer(s"select id, name, cast(price as string), ts from 
$tableName order by id")(
@@ -1033,42 +984,44 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
   }
 
   test("Test extract partition values from path when schema evolution is 
enabled") {
-    withTable(generateTableName) { tableName =>
-      spark.sql(
-        s"""
-           |create table $tableName (
-           | id int,
-           | name string,
-           | ts bigint,
-           | region string,
-           | dt date
-           |) using hudi
-           |tblproperties (
-           | primaryKey = 'id',
-           | type = 'cow',
-           | preCombineField = 'ts'
-           |)
-           |partitioned by (region, dt)""".stripMargin)
-
-      withSQLConf("hoodie.datasource.read.extract.partition.values.from.path" 
-> "true",
-        "hoodie.schema.on.read.enable" -> "true") {
-        spark.sql(s"insert into $tableName partition (region='reg1', 
dt='2023-10-01') " +
-          s"select 1, 'name1', 1000")
-        checkAnswer(s"select id, name, ts, region, cast(dt as string) from 
$tableName where region='reg1'")(
-          Seq(1, "name1", 1000, "reg1", "2023-10-01")
-        )
-
-        // apply schema evolution and perform a read again
-        spark.sql(s"alter table $tableName add columns(price double)")
-        checkAnswer(s"select id, name, ts, region, cast(dt as string) from 
$tableName where region='reg1'")(
-          Seq(1, "name1", 1000, "reg1", "2023-10-01")
-        )
-
-        // ensure this won't be broken in the future
-        // BooleanSimplification is always applied when calling 
HoodieDataSourceHelper#getNonPartitionFilters
-        checkAnswer(s"select id, name, ts, region, cast(dt as string) from 
$tableName where not(region='reg2' or id=2)")(
-          Seq(1, "name1", 1000, "reg1", "2023-10-01")
-        )
+    Seq("cow", "mor").foreach { tableType =>
+      withTable(generateTableName) { tableName =>
+        spark.sql(
+          s"""
+             |create table $tableName (
+             | id int,
+             | name string,
+             | ts bigint,
+             | region string,
+             | dt date
+             |) using hudi
+             |tblproperties (
+             | primaryKey = 'id',
+             | type = '$tableType',
+             | preCombineField = 'ts'
+             |)
+             |partitioned by (region, dt)""".stripMargin)
+
+        
withSQLConf("hoodie.datasource.read.extract.partition.values.from.path" -> 
"true",
+          "hoodie.schema.on.read.enable" -> "true") {
+          spark.sql(s"insert into $tableName partition (region='reg1', 
dt='2023-10-01') " +
+            s"select 1, 'name1', 1000")
+          checkAnswer(s"select id, name, ts, region, cast(dt as string) from 
$tableName where region='reg1'")(
+            Seq(1, "name1", 1000, "reg1", "2023-10-01")
+          )
+
+          // apply schema evolution and perform a read again
+          spark.sql(s"alter table $tableName add columns(price double)")
+          checkAnswer(s"select id, name, ts, region, cast(dt as string) from 
$tableName where region='reg1'")(
+            Seq(1, "name1", 1000, "reg1", "2023-10-01")
+          )
+
+          // ensure this won't be broken in the future
+          // BooleanSimplification is always applied when calling 
HoodieDataSourceHelper#getNonPartitionFilters
+          checkAnswer(s"select id, name, ts, region, cast(dt as string) from 
$tableName where not(region='reg2' or id=2)")(
+            Seq(1, "name1", 1000, "reg1", "2023-10-01")
+          )
+        }
       }
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala
index 9275476682e..cfa91f2cf01 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCDCForSparkSQL.scala
@@ -48,44 +48,46 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
   }
 
   test("Test delete all records in filegroup") {
-    withTempDir { tmp =>
-      val databaseName = "hudi_database"
-      spark.sql(s"create database if not exists $databaseName")
-      spark.sql(s"use $databaseName")
-      val tableName = generateTableName
-      val basePath = s"${tmp.getCanonicalPath}/$tableName"
-      spark.sql(
-        s"""
-           | create table $tableName (
-           |  id int,
-           |  name string,
-           |  price double,
-           |  ts long
-           | ) using hudi
-           | partitioned by (name)
-           | tblproperties (
-           |   'primaryKey' = 'id',
-           |   'preCombineField' = 'ts',
-           |   'hoodie.table.cdc.enabled' = 'true',
-           |   'hoodie.table.cdc.supplemental.logging.mode' = 
'$DATA_BEFORE_AFTER',
-           |   type = 'cow'
-           | )
-           | location '$basePath'
+    Seq("cow", "mor").foreach { tableType =>
+      withTempDir { tmp =>
+        val databaseName = "hudi_database"
+        spark.sql(s"create database if not exists $databaseName")
+        spark.sql(s"use $databaseName")
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             | create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             | ) using hudi
+             | partitioned by (name)
+             | tblproperties (
+             |   'primaryKey' = 'id',
+             |   'preCombineField' = 'ts',
+             |   'hoodie.table.cdc.enabled' = 'true',
+             |   'hoodie.table.cdc.supplemental.logging.mode' = 
'$DATA_BEFORE_AFTER',
+             |   type = '$tableType'
+             | )
+             | location '$basePath'
       """.stripMargin)
-      val metaClient = createMetaClient(spark, basePath)
-      spark.sql(s"insert into $tableName values (1, 11, 1000, 'a1'), (2, 12, 
1000, 'a2')")
-      assert(spark.sql(s"select _hoodie_file_name from 
$tableName").distinct().count() == 2)
-      val fgForID1 = spark.sql(s"select _hoodie_file_name from $tableName 
where id=1").head().get(0)
-      val commitTime1 = 
metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
-      val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
-      cdcDataOnly1.show(false)
-      assertCDCOpCnt(cdcDataOnly1, 2, 0, 0)
+        val metaClient = createMetaClient(spark, basePath)
+        spark.sql(s"insert into $tableName values (1, 11, 1000, 'a1'), (2, 12, 
1000, 'a2')")
+        assert(spark.sql(s"select _hoodie_file_name from 
$tableName").distinct().count() == 2)
+        val fgForID1 = spark.sql(s"select _hoodie_file_name from $tableName 
where id=1").head().get(0)
+        val commitTime1 = 
metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
+        val cdcDataOnly1 = cdcDataFrame(basePath, commitTime1.toLong - 1)
+        cdcDataOnly1.show(false)
+        assertCDCOpCnt(cdcDataOnly1, 2, 0, 0)
 
-      spark.sql(s"delete from $tableName where id = 1")
-      val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong)
-      assertCDCOpCnt(cdcDataOnly2, 0, 0, 1)
-      assert(spark.sql(s"select _hoodie_file_name from 
$tableName").distinct().count() == 1)
-      assert(!spark.sql(s"select _hoodie_file_name from 
$tableName").head().get(0).equals(fgForID1))
+        spark.sql(s"delete from $tableName where id = 1")
+        val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong)
+        assertCDCOpCnt(cdcDataOnly2, 0, 0, 1)
+        assert(spark.sql(s"select _hoodie_file_name from 
$tableName").distinct().count() == 1)
+        assert(!spark.sql(s"select _hoodie_file_name from 
$tableName").head().get(0).equals(fgForID1))
+      }
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCompactionTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCompactionTable.scala
index 31948c3298d..07d4c302b0d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCompactionTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestCompactionTable.scala
@@ -136,4 +136,74 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
       )
     }
   }
+
+  test("Test compaction before and after deletes") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | tblproperties (
+           |  primaryKey ='id',
+           |  type = 'mor',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      spark.sql("set hoodie.parquet.max.file.size = 10000")
+      // disable automatic inline compaction
+      spark.sql("set hoodie.compact.inline=false")
+      spark.sql("set hoodie.compact.schedule.inline=false")
+      // set compaction frequency to every 2 commits
+      spark.sql("set hoodie.compact.inline.max.delta.commits=2")
+      // insert data
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
+      // update data
+      spark.sql(s"update $tableName set price = 11 where id = 1")
+      // update data
+      spark.sql(s"update $tableName set price = 12 where id = 2")
+      // schedule compaction
+      spark.sql(s"schedule compaction on $tableName")
+      // show compaction
+      var compactionRows = spark.sql(s"show compaction on $tableName limit 
10").collect()
+      var timestamps = compactionRows.map(_.getString(0))
+      assertResult(1)(timestamps.length)
+      // run compaction
+      spark.sql(s"run compaction on $tableName at ${timestamps(0)}")
+      // check data
+      checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+        Seq(1, "a1", 11.0, 1000),
+        Seq(2, "a2", 12.0, 1000),
+        Seq(3, "a3", 10.0, 1000)
+      )
+      // show compaction
+      assertResult(1)(spark.sql(s"show compaction on 
$tableName").collect().length)
+      // Try deleting non-existent row
+      spark.sql(s"DELETE FROM $tableName WHERE id = 41")
+      // Delete record identified by some field other than the primary-key
+      spark.sql(s"DELETE FROM $tableName WHERE name = 'a2'")
+      // schedule compaction
+      spark.sql(s"schedule compaction on $tableName")
+      // show compaction
+      compactionRows = spark.sql(s"show compaction on $tableName limit 
10").collect()
+      timestamps = compactionRows.map(_.getString(0)).sorted
+      assertResult(2)(timestamps.length)
+      // run compaction
+      spark.sql(s"run compaction on $tableName at ${timestamps(1)}")
+      // check data, only 2 records should be present
+      checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+        Seq(1, "a1", 11.0, 1000),
+        Seq(3, "a3", 10.0, 1000)
+      )
+      // show compaction
+      assertResult(2)(spark.sql(s"show compaction on $tableName limit 
10").collect().length)
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
index 4be2532fb25..4becf4c7f0a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestDataSkippingQuery.scala
@@ -29,169 +29,176 @@ class TestDataSkippingQuery extends 
HoodieSparkSqlTestBase {
 
   test("Test the data skipping query involves conditions " +
     "that cover both columns supported by column stats and those that are not 
supported.") {
-    withTempDir { tmp =>
-      val tableName = generateTableName
-      spark.sql("set hoodie.metadata.enable = true")
-      spark.sql("set hoodie.metadata.index.column.stats.enable = true")
-      spark.sql("set hoodie.enable.data.skipping = true")
-      spark.sql(
-        s"""
-           |create table $tableName (
-           |  id int,
-           |  name string,
-           |  attributes map<string, string>,
-           |  price double,
-           |  ts long,
-           |  dt string
-           |) using hudi
-           | tblproperties (primaryKey = 'id')
-           | partitioned by (dt)
-           | location '${tmp.getCanonicalPath}'
+    Seq("cow", "mor").foreach { tableType =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        spark.sql("set hoodie.metadata.enable = true")
+        spark.sql("set hoodie.metadata.index.column.stats.enable = true")
+        spark.sql("set hoodie.enable.data.skipping = true")
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  attributes map<string, string>,
+             |  price double,
+             |  ts long,
+             |  dt string
+             |) using hudi
+             | tblproperties (primaryKey = 'id', type = '$tableType')
+             | partitioned by (dt)
+             | location '${tmp.getCanonicalPath}'
                   """.stripMargin)
-      spark.sql(
-        s"""
-           | insert into $tableName values
-           | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000, 
'2021-01-05'),
-           | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000, 
'2021-01-06'),
-           | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000, 
'2021-01-07')
+        spark.sql(
+          s"""
+             | insert into $tableName values
+             | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000, 
'2021-01-05'),
+             | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000, 
'2021-01-06'),
+             | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000, 
'2021-01-07')
                   """.stripMargin)
-      // Check the case where the WHERE condition only includes columns not 
supported by column stats
-      checkAnswer(s"select id, name, price, ts, dt from $tableName where 
attributes.color = 'red'")(
-        Seq(1, "a1", 10.0, 1000, "2021-01-05")
-      )
-      // Check the case where the WHERE condition only includes columns 
supported by column stats
-      checkAnswer(s"select id, name, price, ts, dt from $tableName where 
name='a1'")(
-        Seq(1, "a1", 10.0, 1000, "2021-01-05")
-      )
-      // Check the case where the WHERE condition includes both columns 
supported by column stats and those that are not
-      checkAnswer(s"select id, name, price, ts, dt from $tableName where 
attributes.color = 'red' and name='a1'")(
-        Seq(1, "a1", 10.0, 1000, "2021-01-05")
-      )
+        // Check the case where the WHERE condition only includes columns not 
supported by column stats
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where 
attributes.color = 'red'")(
+          Seq(1, "a1", 10.0, 1000, "2021-01-05")
+        )
+        // Check the case where the WHERE condition only includes columns 
supported by column stats
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where 
name='a1'")(
+          Seq(1, "a1", 10.0, 1000, "2021-01-05")
+        )
+        // Check the case where the WHERE condition includes both columns 
supported by column stats and those that are not
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where 
attributes.color = 'red' and name='a1'")(
+          Seq(1, "a1", 10.0, 1000, "2021-01-05")
+        )
+      }
     }
   }
 
   test("Test data skipping when specifying columns with column stats 
support.") {
-    withTempDir { tmp =>
-      val tableName = generateTableName
-      spark.sql("set hoodie.metadata.enable = true")
-      spark.sql("set hoodie.metadata.index.column.stats.enable = true")
-      spark.sql("set hoodie.enable.data.skipping = true")
-      spark.sql("set hoodie.metadata.index.column.stats.column.list = name")
-      spark.sql(
-        s"""
-           |create table $tableName (
-           |  id int,
-           |  name string,
-           |  attributes map<string, string>,
-           |  price double,
-           |  ts long,
-           |  dt string
-           |) using hudi
-           | tblproperties (primaryKey = 'id')
-           | partitioned by (dt)
-           | location '${tmp.getCanonicalPath}'
-                  """.stripMargin)
-      spark.sql(
-        s"""
-           | insert into $tableName values
-           | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000, 
'2021-01-05'),
-           | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000, 
'2021-01-06'),
-           | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000, 
'2021-01-07')
-                  """.stripMargin)
-      // Check the case where the WHERE condition only includes columns not 
supported by column stats
-      checkAnswer(s"select id, name, price, ts, dt from $tableName where 
attributes.color = 'red'")(
-        Seq(1, "a1", 10.0, 1000, "2021-01-05")
-      )
-      // Check the case where the WHERE condition only includes columns 
supported by column stats
-      checkAnswer(s"select id, name, price, ts, dt from $tableName where 
name='a1'")(
-        Seq(1, "a1", 10.0, 1000, "2021-01-05")
-      )
-      // Check the case where the WHERE condition includes both columns 
supported by column stats and those that are not
-      checkAnswer(s"select id, name, price, ts, dt from $tableName where 
attributes.color = 'red' and name='a1'")(
-        Seq(1, "a1", 10.0, 1000, "2021-01-05")
-      )
-      // Check WHERE condition that includes both columns with existing column 
stats and columns of types
-      // that support column stats but for which column stats do not exist
-      checkAnswer(s"select id, name, price, ts, dt from $tableName where 
ts=1000 and name='a1'")(
-        Seq(1, "a1", 10.0, 1000, "2021-01-05")
-      )
-    }
-  }
-
-  test("bucket index query") {
-    // table bucket prop can not be read in the query sql now, so need to set 
these configs
-    withSQLConf("hoodie.enable.data.skipping" -> "true",
-      "hoodie.bucket.index.hash.field" -> "id",
-      "hoodie.bucket.index.num.buckets" -> "20",
-      "hoodie.index.type" -> "BUCKET") {
+    Seq("cow", "mor").foreach { tableType =>
       withTempDir { tmp =>
         val tableName = generateTableName
-        // Create a partitioned table
+        spark.sql("set hoodie.metadata.enable = true")
+        spark.sql("set hoodie.metadata.index.column.stats.enable = true")
+        spark.sql("set hoodie.enable.data.skipping = true")
+        spark.sql("set hoodie.metadata.index.column.stats.column.list = name")
         spark.sql(
           s"""
              |create table $tableName (
              |  id int,
-             |  dt string,
              |  name string,
+             |  attributes map<string, string>,
              |  price double,
-             |  ts long
+             |  ts long,
+             |  dt string
              |) using hudi
-             | tblproperties (
-             | primaryKey = 'id,name',
-             | preCombineField = 'ts',
-             | hoodie.index.type = 'BUCKET',
-             | hoodie.bucket.index.hash.field = 'id',
-             | hoodie.bucket.index.num.buckets = '20')
+             | tblproperties (primaryKey = 'id', type = '$tableType')
              | partitioned by (dt)
              | location '${tmp.getCanonicalPath}'
-       """.stripMargin)
-
+                  """.stripMargin)
         spark.sql(
           s"""
              | insert into $tableName values
-             | (1, 'a1', 10, 1000, "2021-01-05"),
-             | (2, 'a2', 20, 2000, "2021-01-06"),
-             | (3, 'a3', 30, 3000, "2021-01-07")
-              """.stripMargin)
-
-        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
= 1")(
+             | (1, 'a1', map('color', 'red', 'size', 'M'), 10, 1000, 
'2021-01-05'),
+             | (2, 'a2', map('color', 'blue', 'size', 'L'), 20, 2000, 
'2021-01-06'),
+             | (3, 'a3', map('color', 'green', 'size', 'S'), 30, 3000, 
'2021-01-07')
+                  """.stripMargin)
+        // Check the case where the WHERE condition only includes columns not 
supported by column stats
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where 
attributes.color = 'red'")(
           Seq(1, "a1", 10.0, 1000, "2021-01-05")
         )
-        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
= 1 and name = 'a1'")(
+        // Check the case where the WHERE condition only includes columns 
supported by column stats
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where 
name='a1'")(
           Seq(1, "a1", 10.0, 1000, "2021-01-05")
         )
-        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
= 2 or id = 5")(
-          Seq(2, "a2", 20.0, 2000, "2021-01-06")
-        )
-        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
in (2, 3)")(
-          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
-          Seq(3, "a3", 30.0, 3000, "2021-01-07")
-        )
-        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
!= 4")(
-          Seq(1, "a1", 10.0, 1000, "2021-01-05"),
-          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
-          Seq(3, "a3", 30.0, 3000, "2021-01-07")
-        )
-        spark.sql("set hoodie.bucket.index.query.pruning = false")
-        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
= 1")(
+        // Check the case where the WHERE condition includes both columns 
supported by column stats and those that are not
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where 
attributes.color = 'red' and name='a1'")(
           Seq(1, "a1", 10.0, 1000, "2021-01-05")
         )
-        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
= 1 and name = 'a1'")(
+        // Check WHERE condition that includes both columns with existing 
column stats and columns of types
+        // that support column stats but for which column stats do not exist
+        checkAnswer(s"select id, name, price, ts, dt from $tableName where 
ts=1000 and name='a1'")(
           Seq(1, "a1", 10.0, 1000, "2021-01-05")
         )
-        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
= 2 or id = 5")(
-          Seq(2, "a2", 20.0, 2000, "2021-01-06")
-        )
-        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
in (2, 3)")(
-          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
-          Seq(3, "a3", 30.0, 3000, "2021-01-07")
-        )
-        checkAnswer(s"select id, name, price, ts, dt from $tableName where id 
!= 4")(
-          Seq(1, "a1", 10.0, 1000, "2021-01-05"),
-          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
-          Seq(3, "a3", 30.0, 3000, "2021-01-07")
-        )
-        spark.sql("set hoodie.bucket.index.query.pruning = true")
+      }
+    }
+  }
+
+  test("bucket index query") {
+    Seq("cow", "mor").foreach { tableType =>
+      // table bucket prop can not be read in the query sql now, so need to 
set these configs
+      withSQLConf("hoodie.enable.data.skipping" -> "true",
+        "hoodie.bucket.index.hash.field" -> "id",
+        "hoodie.bucket.index.num.buckets" -> "20",
+        "hoodie.index.type" -> "BUCKET") {
+        withTempDir { tmp =>
+          val tableName = generateTableName
+          // Create a partitioned table
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  dt string,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               | tblproperties (
+               | primaryKey = 'id,name',
+               | preCombineField = 'ts',
+               | type = '$tableType',
+               | hoodie.index.type = 'BUCKET',
+               | hoodie.bucket.index.hash.field = 'id',
+               | hoodie.bucket.index.num.buckets = '20')
+               | partitioned by (dt)
+               | location '${tmp.getCanonicalPath}'
+       """.stripMargin)
+
+          spark.sql(
+            s"""
+               | insert into $tableName values
+               | (1, 'a1', 10, 1000, "2021-01-05"),
+               | (2, 'a2', 20, 2000, "2021-01-06"),
+               | (3, 'a3', 30, 3000, "2021-01-07")
+              """.stripMargin)
+
+          checkAnswer(s"select id, name, price, ts, dt from $tableName where 
id = 1")(
+            Seq(1, "a1", 10.0, 1000, "2021-01-05")
+          )
+          checkAnswer(s"select id, name, price, ts, dt from $tableName where 
id = 1 and name = 'a1'")(
+            Seq(1, "a1", 10.0, 1000, "2021-01-05")
+          )
+          checkAnswer(s"select id, name, price, ts, dt from $tableName where 
id = 2 or id = 5")(
+            Seq(2, "a2", 20.0, 2000, "2021-01-06")
+          )
+          checkAnswer(s"select id, name, price, ts, dt from $tableName where 
id in (2, 3)")(
+            Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+            Seq(3, "a3", 30.0, 3000, "2021-01-07")
+          )
+          checkAnswer(s"select id, name, price, ts, dt from $tableName where 
id != 4")(
+            Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+            Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+            Seq(3, "a3", 30.0, 3000, "2021-01-07")
+          )
+          spark.sql("set hoodie.bucket.index.query.pruning = false")
+          checkAnswer(s"select id, name, price, ts, dt from $tableName where 
id = 1")(
+            Seq(1, "a1", 10.0, 1000, "2021-01-05")
+          )
+          checkAnswer(s"select id, name, price, ts, dt from $tableName where 
id = 1 and name = 'a1'")(
+            Seq(1, "a1", 10.0, 1000, "2021-01-05")
+          )
+          checkAnswer(s"select id, name, price, ts, dt from $tableName where 
id = 2 or id = 5")(
+            Seq(2, "a2", 20.0, 2000, "2021-01-06")
+          )
+          checkAnswer(s"select id, name, price, ts, dt from $tableName where 
id in (2, 3)")(
+            Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+            Seq(3, "a3", 30.0, 3000, "2021-01-07")
+          )
+          checkAnswer(s"select id, name, price, ts, dt from $tableName where 
id != 4")(
+            Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+            Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+            Seq(3, "a3", 30.0, 3000, "2021-01-07")
+          )
+          spark.sql("set hoodie.bucket.index.query.pruning = true")
+        }
       }
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
index 9390df7b73a..f75ddbbd6b4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestInsertTable.scala
@@ -192,160 +192,166 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   }
 
   test("Test Insert Into with static partition") {
-    withTempDir { tmp =>
-      val tableName = generateTableName
-      // Create a partitioned table
-      spark.sql(
-        s"""
-           |create table $tableName (
-           |  id int,
-           |  dt string,
-           |  name string,
-           |  price double,
-           |  ts long
-           |) using hudi
-           | tblproperties (primaryKey = 'id')
-           | partitioned by (dt)
-           | location '${tmp.getCanonicalPath}'
+    Seq("cow", "mor").foreach { tableType =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        // Create a partitioned table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  dt string,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | tblproperties (primaryKey = 'id', type = '$tableType')
+             | partitioned by (dt)
+             | location '${tmp.getCanonicalPath}'
        """.stripMargin)
-      // Insert into static partition
-      spark.sql(
-        s"""
-           | insert into $tableName partition(dt = '2021-01-05')
-           | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+        // Insert into static partition
+        spark.sql(
+          s"""
+             | insert into $tableName partition(dt = '2021-01-05')
+             | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
               """.stripMargin)
 
-      spark.sql(
-        s"""
-           | insert into $tableName partition(dt = '2021-01-06')
-           | select 20 as price, 2000 as ts, 2 as id, 'a2' as name
+        spark.sql(
+          s"""
+             | insert into $tableName partition(dt = '2021-01-06')
+             | select 20 as price, 2000 as ts, 2 as id, 'a2' as name
               """.stripMargin)
-      // should not mess with the original order after write the out-of-order 
data.
-      val metaClient = createMetaClient(spark, tmp.getCanonicalPath)
-      val schema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient).get
-      assert(schema.getFieldIndex("id").contains(0))
-      assert(schema.getFieldIndex("price").contains(2))
+        // should not mess with the original order after write the 
out-of-order data.
+        val metaClient = createMetaClient(spark, tmp.getCanonicalPath)
+        val schema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient).get
+        assert(schema.getFieldIndex("id").contains(0))
+        assert(schema.getFieldIndex("price").contains(2))
 
-      // Note: Do not write the field alias, the partition field must be 
placed last.
-      spark.sql(
-        s"""
-           | insert into $tableName
-           | select 3, 'a3', 30, 3000, '2021-01-07'
+        // Note: Do not write the field alias, the partition field must be 
placed last.
+        spark.sql(
+          s"""
+             | insert into $tableName
+             | select 3, 'a3', 30, 3000, '2021-01-07'
         """.stripMargin)
 
-      checkAnswer(s"select id, name, price, ts, dt from $tableName")(
-        Seq(1, "a1", 10.0, 1000, "2021-01-05"),
-        Seq(2, "a2", 20.0, 2000, "2021-01-06"),
-        Seq(3, "a3", 30.0, 3000, "2021-01-07")
-      )
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+          Seq(3, "a3", 30.0, 3000, "2021-01-07")
+        )
+      }
     }
   }
 
   test("Test Insert Into with dynamic partition") {
-    withTempDir { tmp =>
-      val tableName = generateTableName
-      // Create a partitioned table
-      spark.sql(
-        s"""
-           |create table $tableName (
-           |  id int,
-           |  dt string,
-           |  name string,
-           |  price double,
-           |  ts long
-           |) using hudi
-           | tblproperties (primaryKey = 'id')
-           | partitioned by (dt)
-           | location '${tmp.getCanonicalPath}'
+    Seq("cow", "mor").foreach { tableType =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        // Create a partitioned table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  dt string,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | tblproperties (primaryKey = 'id', type = '$tableType')
+             | partitioned by (dt)
+             | location '${tmp.getCanonicalPath}'
        """.stripMargin)
 
-      // Insert into dynamic partition
-      spark.sql(
-        s"""
-           | insert into $tableName partition(dt)
-           | select 1 as id, '2021-01-05' as dt, 'a1' as name, 10 as price, 
1000 as ts
+        // Insert into dynamic partition
+        spark.sql(
+          s"""
+             | insert into $tableName partition(dt)
+             | select 1 as id, '2021-01-05' as dt, 'a1' as name, 10 as price, 
1000 as ts
         """.stripMargin)
-      // should not mess with the original order after write the out-of-order 
data.
-      val metaClient = createMetaClient(spark, tmp.getCanonicalPath)
-      val schema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient).get
-      assert(schema.getFieldIndex("id").contains(0))
-      assert(schema.getFieldIndex("price").contains(2))
+        // should not mess with the original order after write the 
out-of-order data.
+        val metaClient = createMetaClient(spark, tmp.getCanonicalPath)
+        val schema = HoodieSqlCommonUtils.getTableSqlSchema(metaClient).get
+        assert(schema.getFieldIndex("id").contains(0))
+        assert(schema.getFieldIndex("price").contains(2))
 
-      spark.sql(
-        s"""
-           | insert into $tableName
-           | select 2 as id, 'a2' as name, 20 as price, 2000 as ts, 
'2021-01-06' as dt
+        spark.sql(
+          s"""
+             | insert into $tableName
+             | select 2 as id, 'a2' as name, 20 as price, 2000 as ts, 
'2021-01-06' as dt
         """.stripMargin)
 
-      // Note: Do not write the field alias, the partition field must be 
placed last.
-      spark.sql(
-        s"""
-           | insert into $tableName
-           | select 3, 'a3', 30, 3000, '2021-01-07'
+        // Note: Do not write the field alias, the partition field must be 
placed last.
+        spark.sql(
+          s"""
+             | insert into $tableName
+             | select 3, 'a3', 30, 3000, '2021-01-07'
         """.stripMargin)
 
-      checkAnswer(s"select id, name, price, ts, dt from $tableName")(
-        Seq(1, "a1", 10.0, 1000, "2021-01-05"),
-        Seq(2, "a2", 20.0, 2000, "2021-01-06"),
-        Seq(3, "a3", 30.0, 3000, "2021-01-07")
-      )
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+          Seq(3, "a3", 30.0, 3000, "2021-01-07")
+        )
+      }
     }
   }
 
   test("Test Insert Into with multi partition") {
-    withRecordType()(withTempDir { tmp =>
-      val tableName = generateTableName
-      // Create a partitioned table
-      spark.sql(
-        s"""
-           |create table $tableName (
-           |  id int,
-           |  dt string,
-           |  name string,
-           |  price double,
-           |  ht string,
-           |  ts long
-           |) using hudi
-           | tblproperties (primaryKey = 'id')
-           | partitioned by (dt, ht)
-           | location '${tmp.getCanonicalPath}'
+    Seq("cow", "mor").foreach { tableType =>
+      withRecordType()(withTempDir { tmp =>
+        val tableName = generateTableName
+        // Create a partitioned table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  dt string,
+             |  name string,
+             |  price double,
+             |  ht string,
+             |  ts long
+             |) using hudi
+             | tblproperties (primaryKey = 'id', type = '$tableType')
+             | partitioned by (dt, ht)
+             | location '${tmp.getCanonicalPath}'
        """.stripMargin)
-      spark.sql(
-        s"""
-           | insert into $tableName partition(dt, ht)
-           | select 1 as id, 'a1' as name, 10 as price,'20210101' as dt, 1000 
as ts, '01' as ht
+        spark.sql(
+          s"""
+             | insert into $tableName partition(dt, ht)
+             | select 1 as id, 'a1' as name, 10 as price,'20210101' as dt, 
1000 as ts, '01' as ht
               """.stripMargin)
 
-      // Insert into static partition and dynamic partition
-      spark.sql(
-        s"""
-           | insert into $tableName partition(dt = '20210102', ht)
-           | select 2 as id, 'a2' as name, 20 as price, 2000 as ts, '02' as ht
+        // Insert into static partition and dynamic partition
+        spark.sql(
+          s"""
+             | insert into $tableName partition(dt = '20210102', ht)
+             | select 2 as id, 'a2' as name, 20 as price, 2000 as ts, '02' as 
ht
               """.stripMargin)
 
-      spark.sql(
-        s"""
-           | insert into $tableName partition(dt, ht = '03')
-           | select 3 as id, 'a3' as name, 30 as price, 3000 as ts, '20210103' 
as dt
+        spark.sql(
+          s"""
+             | insert into $tableName partition(dt, ht = '03')
+             | select 3 as id, 'a3' as name, 30 as price, 3000 as ts, 
'20210103' as dt
               """.stripMargin)
 
-      // Note: Do not write the field alias, the partition field must be 
placed last.
-      spark.sql(
-        s"""
-           | insert into $tableName
-           | select 4, 'a4', 40, 4000, '20210104', '04'
+        // Note: Do not write the field alias, the partition field must be 
placed last.
+        spark.sql(
+          s"""
+             | insert into $tableName
+             | select 4, 'a4', 40, 4000, '20210104', '04'
         """.stripMargin)
 
-      checkAnswer(s"select id, name, price, ts, dt, ht from $tableName")(
-        Seq(1, "a1", 10.0, 1000, "20210101", "01"),
-        Seq(2, "a2", 20.0, 2000, "20210102", "02"),
-        Seq(3, "a3", 30.0, 3000, "20210103", "03"),
-        Seq(4, "a4", 40.0, 4000, "20210104", "04")
-      )
-    })
+        checkAnswer(s"select id, name, price, ts, dt, ht from $tableName")(
+          Seq(1, "a1", 10.0, 1000, "20210101", "01"),
+          Seq(2, "a2", 20.0, 2000, "20210102", "02"),
+          Seq(3, "a3", 30.0, 3000, "20210103", "03"),
+          Seq(4, "a4", 40.0, 4000, "20210104", "04")
+        )
+      })
+    }
   }
 
-  test("Test Insert Into None Partitioned Table") {
+  test("Test Insert Into Non Partitioned Table") {
    withRecordType(Seq(HoodieRecordType.AVRO, HoodieRecordType.SPARK), 
Map(HoodieRecordType.SPARK ->
      // SparkMerger should use "HoodieSparkValidateDuplicateKeyRecordMerger"
      // with "hoodie.sql.insert.mode=strict"
@@ -851,7 +857,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   }
 
 
-  test("Test Insert timestamp when 'spark.sql.datetime.java8API.enabled' 
enables") {
+  test("Test Insert timestamp when 'spark.sql.datetime.java8API.enabled' 
enabled") {
     withRecordType() {
       withSQLConf("spark.sql.datetime.java8API.enabled" -> "true") {
         val tableName = generateTableName
@@ -1291,64 +1297,70 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   }
 
   test("Test combine before insert") {
-    withSQLConf("hoodie.sql.bulk.insert.enable" -> "false", 
"hoodie.merge.allow.duplicate.on.inserts" -> "false") {
-      withRecordType()(withTempDir{tmp =>
-        val tableName = generateTableName
-        spark.sql(
-          s"""
-             |create table $tableName (
-             |  id int,
-             |  name string,
-             |  price double,
-             |  ts long
-             |) using hudi
-             | location '${tmp.getCanonicalPath}/$tableName'
-             | tblproperties (
-             |  primaryKey = 'id',
-             |  preCombineField = 'ts'
-             | )
+    Seq("cow", "mor").foreach { tableType =>
+      withSQLConf("hoodie.sql.bulk.insert.enable" -> "false", 
"hoodie.merge.allow.duplicate.on.inserts" -> "false") {
+        withRecordType()(withTempDir { tmp =>
+          val tableName = generateTableName
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               | location '${tmp.getCanonicalPath}/$tableName'
+               | tblproperties (
+               |  primaryKey = 'id',
+               |  type = '$tableType',
+               |  preCombineField = 'ts'
+               | )
        """.stripMargin)
-        spark.sql(
-          s"""
-             |insert overwrite table $tableName
-             |select * from (
-             | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
-             | union all
-             | select 1 as id, 'a1' as name, 11 as price, 1001 as ts
-             | )
-             |""".stripMargin
-        )
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 11.0, 1001)
-        )
-      })
+          spark.sql(
+            s"""
+               |insert overwrite table $tableName
+               |select * from (
+               | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
+               | union all
+               | select 1 as id, 'a1' as name, 11 as price, 1001 as ts
+               | )
+               |""".stripMargin
+          )
+          checkAnswer(s"select id, name, price, ts from $tableName")(
+            Seq(1, "a1", 11.0, 1001)
+          )
+        })
+      }
     }
   }
 
   test("Test insert pk-table") {
-    withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") {
-      withRecordType()(withTempDir{tmp =>
-        val tableName = generateTableName
-        spark.sql(
-          s"""
-             |create table $tableName (
-             |  id int,
-             |  name string,
-             |  price double,
-             |  ts long
-             |) using hudi
-             | location '${tmp.getCanonicalPath}/$tableName'
-             | tblproperties (
-             |  primaryKey = 'id',
-             |  preCombineField = 'ts'
-             | )
+    Seq("cow", "mor").foreach { tableType =>
+      withSQLConf("hoodie.sql.bulk.insert.enable" -> "false") {
+        withRecordType()(withTempDir { tmp =>
+          val tableName = generateTableName
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               | location '${tmp.getCanonicalPath}/$tableName'
+               | tblproperties (
+               |  primaryKey = 'id',
+               |  type = '$tableType',
+               |  preCombineField = 'ts'
+               | )
        """.stripMargin)
-        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-        spark.sql(s"insert into $tableName values(1, 'a1', 11, 1000)")
-        checkAnswer(s"select id, name, price, ts from $tableName")(
-          Seq(1, "a1", 11.0, 1000)
-        )
-      })
+          spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+          spark.sql(s"insert into $tableName values(1, 'a1', 11, 1000)")
+          checkAnswer(s"select id, name, price, ts from $tableName")(
+            Seq(1, "a1", 11.0, 1000)
+          )
+        })
+      }
     }
   }
 
@@ -1952,39 +1964,41 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   }
 
   test("Test Insert Into with auto generate record keys with precombine ") {
-    withTempDir { tmp =>
-      val tableName = generateTableName
-      // Create a partitioned table
-      spark.sql(
-        s"""
-           |create table $tableName (
-           |  id int,
-           |  dt string,
-           |  name string,
-           |  price double,
-           |  ts long
-           |) using hudi
-           | partitioned by (dt)
-           | location '${tmp.getCanonicalPath}'
-           | tblproperties (
-           |  type = 'cow',
-           |  preCombineField = 'price'
-           | )
+    Seq("cow", "mor").foreach { tableType =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        // Create a partitioned table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  dt string,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | partitioned by (dt)
+             | location '${tmp.getCanonicalPath}'
+             | tblproperties (
+             |  type = '$tableType',
+             |  preCombineField = 'price'
+             | )
        """.stripMargin)
 
-      spark.sql(
-        s"""
-           | insert into $tableName values
-           | (1, 'a1', 10, 1000, "2021-01-05"),
-           | (2, 'a2', 20, 2000, "2021-01-06"),
-           | (3, 'a3', 30, 3000, "2021-01-07")
+        spark.sql(
+          s"""
+             | insert into $tableName values
+             | (1, 'a1', 10, 1000, "2021-01-05"),
+             | (2, 'a2', 20, 2000, "2021-01-06"),
+             | (3, 'a3', 30, 3000, "2021-01-07")
               """.stripMargin)
 
-      checkAnswer(s"select id, name, price, ts, dt from $tableName")(
-        Seq(1, "a1", 10.0, 1000, "2021-01-05"),
-        Seq(2, "a2", 20.0, 2000, "2021-01-06"),
-        Seq(3, "a3", 30.0, 3000, "2021-01-07")
-      )
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+          Seq(3, "a3", 30.0, 3000, "2021-01-07")
+        )
+      }
     }
   }
 
@@ -2527,7 +2541,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
 
   test("Test DROP insert dup policy with INSERT_INTO explicit new configs BULK 
INSERT operation") {
     withRecordType(Seq(HoodieRecordType.AVRO))(withTempDir { tmp =>
-      Seq("cow").foreach { tableType =>
+      Seq("cow", "mor").foreach { tableType =>
         val dupPolicy = DROP_INSERT_DUP_POLICY
         withTable(generateTableName) { tableName =>
           ingestAndValidateDropDupPolicyBulkInsert(tableType, tableName, tmp,
@@ -2540,7 +2554,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
 
   test("Test FAIL insert dup policy with INSERT_INTO explicit new configs") {
     withRecordType(Seq(HoodieRecordType.AVRO))(withTempDir { tmp =>
-      Seq("cow").foreach { tableType =>
+      Seq("cow", "mor").foreach { tableType =>
         val operation = WriteOperationType.UPSERT
         val dupPolicy = FAIL_INSERT_DUP_POLICY
         withTable(generateTableName) { tableName =>
@@ -2592,7 +2606,7 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
   test(s"Test INSERT INTO with upsert operation type") {
     if (HoodieSparkUtils.gteqSpark3_3) {
       withTempDir { tmp =>
-        Seq("mor").foreach { tableType =>
+        Seq("cow", "mor").foreach { tableType =>
           val tableName = generateTableName
           spark.sql(
             s"""
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
index 9da5a1a6818..1c70f70aaa0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeIntoTable.scala
@@ -135,7 +135,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
         val targetTable = generateTableName
         spark.sql(
           s"""
-             |create table ${targetTable} (
+             |create table $targetTable (
              |  id int,
              |  name string,
              |  data int,
@@ -152,7 +152,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
              |""".stripMargin)
         spark.sql(
           s"""
-             |merge into ${targetTable} as target
+             |merge into $targetTable as target
              |using (
              |select 1 as id, 'lb' as name, 6 as data, 'shu' as country, 
1646643193 as ts
              |) source
@@ -164,7 +164,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
              |""".stripMargin)
         spark.sql(
           s"""
-             |merge into ${targetTable} as target
+             |merge into $targetTable as target
              |using (
              |select 1 as id, 'lb' as name, 5 as data, 'shu' as country, 
1646643196 as ts
              |) source
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestTimeTravelTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestTimeTravelTable.scala
index ab5657d8d3a..972d246e574 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestTimeTravelTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestTimeTravelTable.scala
@@ -24,116 +24,120 @@ import 
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 
 class TestTimeTravelTable extends HoodieSparkSqlTestBase {
   test("Test Insert and Update Record with time travel") {
-    if (HoodieSparkUtils.gteqSpark3_3) {
-      withRecordType()(withTempDir { tmp =>
-        val tableName1 = generateTableName
-        spark.sql(
-          s"""
-             |create table $tableName1 (
-             |  id int,
-             |  name string,
-             |  price double,
-             |  ts long
-             |) using hudi
-             | tblproperties (
-             |  type = 'cow',
-             |  primaryKey = 'id',
-             |  preCombineField = 'ts'
-             | )
-             | location '${tmp.getCanonicalPath}/$tableName1'
+    Seq("cow", "mor").foreach { tableType =>
+      if (HoodieSparkUtils.gteqSpark3_3) {
+        withRecordType()(withTempDir { tmp =>
+          val tableName1 = generateTableName
+          spark.sql(
+            s"""
+               |create table $tableName1 (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               | tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts'
+               | )
+               | location '${tmp.getCanonicalPath}/$tableName1'
        """.stripMargin)
 
-        // 1st commit instant
-        spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
+          // 1st commit instant
+          spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
 
-        val metaClient1 = createMetaClient(spark, 
s"${tmp.getCanonicalPath}/$tableName1")
-        val instant1 = metaClient1.getActiveTimeline.getAllCommitsTimeline
-          .lastInstant().get().getTimestamp
+          val metaClient1 = createMetaClient(spark, 
s"${tmp.getCanonicalPath}/$tableName1")
+          val instant1 = metaClient1.getActiveTimeline.getAllCommitsTimeline
+            .lastInstant().get().getTimestamp
 
-        // 2nd commit instant
-        spark.sql(s"insert into $tableName1 values(1, 'a2', 20, 2000)")
+          // 2nd commit instant
+          spark.sql(s"insert into $tableName1 values(1, 'a2', 20, 2000)")
 
-        checkAnswer(s"select id, name, price, ts from $tableName1")(
-          Seq(1, "a2", 20.0, 2000)
-        )
+          checkAnswer(s"select id, name, price, ts from $tableName1")(
+            Seq(1, "a2", 20.0, 2000)
+          )
 
-        // time travel as of instant 1
-        checkAnswer(
-          s"select id, name, price, ts from $tableName1 TIMESTAMP AS OF 
'$instant1'")(
-          Seq(1, "a1", 10.0, 1000)
-        )
-      })
+          // time travel as of instant 1
+          checkAnswer(
+            s"select id, name, price, ts from $tableName1 TIMESTAMP AS OF 
'$instant1'")(
+            Seq(1, "a1", 10.0, 1000)
+          )
+        })
+      }
     }
   }
 
   test("Test Insert Into Records with time travel To new Table") {
-    if (HoodieSparkUtils.gteqSpark3_3) {
-      withTempDir { tmp =>
-        // Create Non-Partitioned table
-        val tableName1 = generateTableName
-        spark.sql(
-          s"""
-             |create table $tableName1 (
-             |  id int,
-             |  name string,
-             |  price double,
-             |  ts long
-             |) using hudi
-             | tblproperties (
-             |  type = 'cow',
-             |  primaryKey = 'id',
-             |  preCombineField = 'ts'
-             | )
-             | location '${tmp.getCanonicalPath}/$tableName1'
+    Seq("cow", "mor").foreach { tableType =>
+      if (HoodieSparkUtils.gteqSpark3_3) {
+        withTempDir { tmp =>
+          // Create Non-Partitioned table
+          val tableName1 = generateTableName
+          spark.sql(
+            s"""
+               |create table $tableName1 (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               | tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts'
+               | )
+               | location '${tmp.getCanonicalPath}/$tableName1'
        """.stripMargin)
 
-        spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
+          spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
 
-        val metaClient1 = createMetaClient(spark, 
s"${tmp.getCanonicalPath}/$tableName1")
+          val metaClient1 = createMetaClient(spark, 
s"${tmp.getCanonicalPath}/$tableName1")
 
-        val instant1 = metaClient1.getActiveTimeline.getAllCommitsTimeline
-          .lastInstant().get().getTimestamp
+          val instant1 = metaClient1.getActiveTimeline.getAllCommitsTimeline
+            .lastInstant().get().getTimestamp
 
 
-        val tableName2 = generateTableName
-        // Create a partitioned table
-        spark.sql(
-          s"""
-             |create table $tableName2 (
-             |  id int,
-             |  name string,
-             |  price double,
-             |  ts long,
-             |  dt string
-             |) using hudi
-             | tblproperties (primaryKey = 'id')
-             | partitioned by (dt)
-             | location '${tmp.getCanonicalPath}/$tableName2'
+          val tableName2 = generateTableName
+          // Create a partitioned table
+          spark.sql(
+            s"""
+               |create table $tableName2 (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long,
+               |  dt string
+               |) using hudi
+               | tblproperties (primaryKey = 'id', type = '$tableType')
+               | partitioned by (dt)
+               | location '${tmp.getCanonicalPath}/$tableName2'
        """.stripMargin)
 
-        // Insert into dynamic partition
-        spark.sql(
-          s"""
-             | insert into $tableName2
-             | select id, name, price, ts, '2022-02-14' as dt
-             | from $tableName1 TIMESTAMP AS OF '$instant1'
+          // Insert into dynamic partition
+          spark.sql(
+            s"""
+               | insert into $tableName2
+               | select id, name, price, ts, '2022-02-14' as dt
+               | from $tableName1 TIMESTAMP AS OF '$instant1'
         """.stripMargin)
-        checkAnswer(s"select id, name, price, ts, dt from $tableName2")(
-          Seq(1, "a1", 10.0, 1000, "2022-02-14")
-        )
+          checkAnswer(s"select id, name, price, ts, dt from $tableName2")(
+            Seq(1, "a1", 10.0, 1000, "2022-02-14")
+          )
 
-        // Insert into static partition
-        spark.sql(
-          s"""
-             | insert into $tableName2 partition(dt = '2022-02-15')
-             | select 2 as id, 'a2' as name, price, ts
-             | from $tableName1 TIMESTAMP AS OF '$instant1'
+          // Insert into static partition
+          spark.sql(
+            s"""
+               | insert into $tableName2 partition(dt = '2022-02-15')
+               | select 2 as id, 'a2' as name, price, ts
+               | from $tableName1 TIMESTAMP AS OF '$instant1'
         """.stripMargin)
-        checkAnswer(
-          s"select id, name, price, ts, dt from $tableName2")(
-          Seq(1, "a1", 10.0, 1000, "2022-02-14"),
-          Seq(2, "a2", 10.0, 1000, "2022-02-15")
-        )
+          checkAnswer(
+            s"select id, name, price, ts, dt from $tableName2")(
+            Seq(1, "a1", 10.0, 1000, "2022-02-14"),
+            Seq(2, "a2", 10.0, 1000, "2022-02-15")
+          )
+        }
       }
     }
   }
@@ -238,43 +242,45 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
 
   test("Test Select Record with time travel and Repartition") {
     if (HoodieSparkUtils.gteqSpark3_3) {
-      withTempDir { tmp =>
-        val tableName = generateTableName
-        spark.sql(
-          s"""
-             |create table $tableName (
-             |  id int,
-             |  name string,
-             |  price double,
-             |  ts long
-             |) using hudi
-             | tblproperties (
-             |  type = 'cow',
-             |  primaryKey = 'id',
-             |  preCombineField = 'ts'
-             | )
-             | location '${tmp.getCanonicalPath}/$tableName'
+      Seq("cow", "mor").foreach { tableType =>
+        withTempDir { tmp =>
+          val tableName = generateTableName
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               | tblproperties (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts'
+               | )
+               | location '${tmp.getCanonicalPath}/$tableName'
        """.stripMargin)
 
-        // 1st commit instant
-        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+          // 1st commit instant
+          spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
 
-        val metaClient = createMetaClient(spark, 
s"${tmp.getCanonicalPath}/$tableName")
-        val instant1 = metaClient.getActiveTimeline.getAllCommitsTimeline
-          .lastInstant().get().getTimestamp
+          val metaClient = createMetaClient(spark, 
s"${tmp.getCanonicalPath}/$tableName")
+          val instant1 = metaClient.getActiveTimeline.getAllCommitsTimeline
+            .lastInstant().get().getTimestamp
 
-        // 2nd commit instant
-        spark.sql(s"insert into $tableName values(1, 'a2', 20, 2000)")
+          // 2nd commit instant
+          spark.sql(s"insert into $tableName values(1, 'a2', 20, 2000)")
 
-        checkAnswer(s"select id, name, price, ts from $tableName distribute by 
cast(rand() * 2 as int)")(
-          Seq(1, "a2", 20.0, 2000)
-        )
+          checkAnswer(s"select id, name, price, ts from $tableName distribute 
by cast(rand() * 2 as int)")(
+            Seq(1, "a2", 20.0, 2000)
+          )
 
-        // time travel as of instant 1
-        checkAnswer(
-          s"select id, name, price, ts from $tableName TIMESTAMP AS OF 
'$instant1' distribute by cast(rand() * 2 as int)")(
-          Seq(1, "a1", 10.0, 1000)
-        )
+          // time travel as of instant 1
+          checkAnswer(
+            s"select id, name, price, ts from $tableName TIMESTAMP AS OF 
'$instant1' distribute by cast(rand() * 2 as int)")(
+            Seq(1, "a1", 10.0, 1000)
+          )
+        }
       }
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
index 5162b664880..56b3eb7f315 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestUpdateTable.scala
@@ -18,10 +18,12 @@
 package org.apache.spark.sql.hudi.dml
 
 import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
+import org.apache.hudi.HoodieCLIUtils
 import org.apache.hudi.HoodieSparkUtils.isSpark2
 import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import org.junit.jupiter.api.Assertions.assertEquals
 
@@ -316,4 +318,63 @@ class TestUpdateTable extends HoodieSparkSqlTestBase {
       }
     }
   }
+
+  // Test update table with clustering
+  test("Test Update Table with Clustering") {
+    withTempDir { tmp =>
+      Seq("cow", "mor").foreach { tableType =>
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        // create table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long,
+             |  partition long
+             |) using hudi
+             | partitioned by (partition)
+             | location '$basePath'
+             | tblproperties (
+             |  type = '$tableType',
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts',
+             |  hoodie.clustering.keygen.class = 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator"
+             | )
+       """.stripMargin)
+
+        // insert data
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000, 1000)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001, 1001)")
+        spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002, 1002)")
+
+        // update data
+        spark.sql(s"update $tableName set price = 20 where id > 1")
+        checkAnswer(s"select id, name, price, ts from $tableName where id > 
1")(
+          Seq(2, "a2", 20.0, 1001),
+          Seq(3, "a3", 20.0, 1002)
+        )
+
+        // update data
+        spark.sql(s"update $tableName set price = price * 2 where id = 2")
+        checkAnswer(s"select id, name, price, ts from $tableName where id = 
2")(
+          Seq(2, "a2", 40.0, 1001)
+        )
+
+        val client = HoodieCLIUtils.createHoodieWriteClient(spark, basePath, 
Map.empty, Option(tableName))
+        // Generate the first clustering plan
+        val firstScheduleInstant = client.createNewInstantTime()
+        client.scheduleClusteringAtInstant(firstScheduleInstant, 
HOption.empty())
+        checkAnswer(s"call show_clustering(path => '$basePath', 
show_involved_partition => true)")(
+          Seq(firstScheduleInstant, 3, HoodieInstant.State.REQUESTED.name(), 
"partition=1000,partition=1001,partition=1002")
+        )
+        // Do clustering for all the clustering plan
+        checkAnswer(s"call run_clustering(path => '$basePath', order => 
'partition')")(
+          Seq(firstScheduleInstant, 3, HoodieInstant.State.COMPLETED.name(), 
"*")
+        )
+      }
+    }
+  }
 }


Reply via email to