This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c0fb6d951e [HUDI-7914] Use internal schema without metadata fields 
for delete-partitions OP (#11487)
4c0fb6d951e is described below

commit 4c0fb6d951e849d4b1487905bedb354c5df7b544
Author: Vitali Makarevich <[email protected]>
AuthorDate: Sat Jun 29 02:08:40 2024 +0200

    [HUDI-7914] Use internal schema without metadata fields for 
delete-partitions OP (#11487)
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  2 +-
 .../sql/hudi/common/HoodieSparkSqlTestBase.scala   |  4 +++
 .../sql/hudi/ddl/TestAlterTableDropPartition.scala | 41 ++++++++++++++++++++--
 3 files changed, 43 insertions(+), 4 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 27e64d5a88a..50722abfbab 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -425,7 +425,7 @@ class HoodieSparkSqlWriterInternal {
             }
 
             // Issue the delete.
-            val schemaStr = new 
TableSchemaResolver(tableMetaClient).getTableAvroSchema.toString
+            val schemaStr = new 
TableSchemaResolver(tableMetaClient).getTableAvroSchema(false).toString
             val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
               schemaStr, path, tblName,
                 (parameters - 
HoodieWriteConfig.AUTO_COMMIT_ENABLE.key).asJava))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index 6363718a83f..e8020a3e7a3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -81,6 +81,10 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
     }
   }
 
+  protected def getTableStoragePath(tableName: String): String = {
+    new File(sparkWareHouse, tableName).getCanonicalPath
+  }
+
   override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any /* Assertion */)(implicit pos: source.Position): Unit = {
     super.test(testName, testTags: _*)(
       try {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
index ad8b17a8039..3fcbea3a05c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi.ddl
 
+import org.apache.avro.Schema
 import org.apache.hudi.avro.model.{HoodieCleanMetadata, 
HoodieCleanPartitionMetadata}
 import org.apache.hudi.common.model.{HoodieCleaningPolicy, 
HoodieCommitMetadata}
 import org.apache.hudi.common.table.timeline.HoodieInstant
@@ -25,14 +26,30 @@ import org.apache.hudi.config.{HoodieCleanConfig, 
HoodieWriteConfig}
 import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 import org.apache.hudi.{DataSourceWriteOptions, HoodieCLIUtils, 
HoodieSparkUtils}
-
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import 
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getLastCleanMetadata
 import org.junit.jupiter.api.Assertions
 import org.junit.jupiter.api.Assertions.assertTrue
 
+import scala.collection.JavaConverters._
+
 class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
+  private val schemaFields: Seq[String] = Seq("id", "name", "ts", "dt")
+
+  private def ensureLastCommitIncludesProperSchema(path: String, 
expectedSchema: Seq[String] = schemaFields): Unit = {
+    val metaClient = createMetaClient(spark, path)
+    // A bit weird way to extract schema, but there is no way to get it 
exactly as is, since once `includeMetadataFields`
+    // is used - it will use custom logic to forcefully add/remove fields.
+    // And available public methods does not allow to specify exact instant to 
get schema from, only latest after some filtering
+    // which may lead to false positives in test scenarios.
+    val lastInstant = 
metaClient.getActiveTimeline.getCompletedReplaceTimeline.lastInstant().get()
+    val commitMetadata = 
HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastInstant).get(),
 classOf[HoodieCommitMetadata])
+    val schemaStr = commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)
+    val schema = new Schema.Parser().parse(schemaStr)
+    val fields = schema.getFields.asScala.map(_.name())
+    assert(expectedSchema == fields, s"Commit metadata should include no meta 
fields, received $fields")
+  }
 
   test("Drop non-partitioned table") {
     val tableName = generateTableName
@@ -133,11 +150,13 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
 
         // drop 2021-10-01 partition
         spark.sql(s"alter table $tableName drop partition (dt='2021/10/01')")
+        ensureLastCommitIncludesProperSchema(tablePath)
 
         // trigger clean so that partition deletion kicks in.
         spark.sql(s"set 
${HoodieCleanConfig.CLEANER_POLICY.key}=${HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name()}")
         spark.sql(s"call run_clean(table => '$tableName', retain_commits => 
1)")
           .collect()
+        ensureLastCommitIncludesProperSchema(tablePath)
 
         val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark, 
tablePath)
         val cleanPartitionMeta = new 
java.util.ArrayList(cleanMetadata.getPartitionMetadata.values()).toArray()
@@ -201,8 +220,10 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
 
         // drop 2021-10-01 partition
         spark.sql(s"alter table $tableName drop partition (dt='2021/10/01')")
+        ensureLastCommitIncludesProperSchema(tablePath)
 
         spark.sql(s"""insert into $tableName values (2, "l4", "v1", 
"2021/10/02")""")
+        ensureLastCommitIncludesProperSchema(tablePath)
 
         val partitionPath = if (urlencode) {
           PartitionPathEncodeUtils.escapePathName("2021/10/01")
@@ -257,6 +278,7 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
 
     // drop 2021-10-01 partition
     spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')")
+    ensureLastCommitIncludesProperSchema(getTableStoragePath(tableName))
 
     // trigger clean so that partition deletion kicks in.
     spark.sql(s"set 
${HoodieCleanConfig.CLEANER_POLICY.key}=${HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name()}")
@@ -277,10 +299,11 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
       withTempDir { tmp =>
         val tableName = generateTableName
         val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+        val schemaFields = Seq("id", "name", "ts", "year", "month", "day")
 
         import spark.implicits._
         val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1", 
"2021", "10", "02"))
-          .toDF("id", "name", "ts", "year", "month", "day")
+          .toDF(schemaFields :_*)
 
         df.write.format("hudi")
           .option(HoodieWriteConfig.TBL_NAME.key, tableName)
@@ -321,11 +344,13 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
 
         // drop 2021-10-01 partition
         spark.sql(s"alter table $tableName drop partition (year='2021', 
month='10', day='01')")
+        ensureLastCommitIncludesProperSchema(tablePath, schemaFields)
 
         // trigger clean so that partition deletion kicks in.
         spark.sql(s"set 
${HoodieCleanConfig.CLEANER_POLICY.key}=${HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name()}")
         spark.sql(s"call run_clean(table => '$tableName', retain_commits => 
1)")
           .collect()
+        ensureLastCommitIncludesProperSchema(tablePath, schemaFields)
 
         val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark, 
tablePath)
         val cleanPartitionMeta = new 
java.util.ArrayList(cleanMetadata.getPartitionMetadata.values()).toArray()
@@ -355,9 +380,10 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
       withTempDir { tmp =>
         val tableName = generateTableName
         val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+        val schemaFields = Seq("id", "name", "ts", "year", "month", "day")
 
         import spark.implicits._
-        val df = Seq((1, "z3", "v1", "2021", "10", "01")).toDF("id", "name", 
"ts", "year", "month", "day")
+        val df = Seq((1, "z3", "v1", "2021", "10", "01")).toDF(schemaFields 
:_*)
 
         df.write.format("hudi")
           .option(HoodieWriteConfig.TBL_NAME.key, tableName)
@@ -398,12 +424,14 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
 
         // drop 2021-10-01 partition
         spark.sql(s"alter table $tableName drop partition (year='2021', 
month='10', day='01')")
+        ensureLastCommitIncludesProperSchema(tablePath, schemaFields)
 
         spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021", 
"10", "02")""")
 
         // trigger clean so that partition deletion kicks in.
         spark.sql(s"call run_clean(table => '$tableName', retain_commits => 
1)")
           .collect()
+        ensureLastCommitIncludesProperSchema(tablePath, schemaFields)
 
         val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark, 
tablePath)
         val cleanPartitionMeta = new 
java.util.ArrayList(cleanMetadata.getPartitionMetadata.values()).toArray()
@@ -438,6 +466,7 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
     withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         val tableName = generateTableName
+        val schemaFields = Seq("id", "name", "price", "ts", "dt")
         // create table
         spark.sql(
           s"""
@@ -471,6 +500,7 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
           Seq(2, "a2", 10.0, 1000, "02"),
           Seq(3, "a3", 10.0, 1000, "03")
         )
+        
ensureLastCommitIncludesProperSchema(s"${tmp.getCanonicalPath}/$tableName", 
schemaFields)
 
         // check schema
         val metaClient = createMetaClient(spark, 
s"${tmp.getCanonicalPath}/$tableName")
@@ -493,6 +523,7 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
     withTempDir { tmp =>
       val tableName = generateTableName
       val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+      val schemaFields = Seq("id", "name", "price", "ts")
       spark.sql(
         s"""
            |create table $tableName (
@@ -532,6 +563,7 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
     withTempDir { tmp =>
       val tableName = generateTableName
       val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+      val schemaFields = Seq("id", "name", "price", "ts")
       // Using INMEMORY index type to ensure that deltacommits generate log 
files instead of parquet
       spark.sql(
         s"""
@@ -579,6 +611,7 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
     withTempDir { tmp =>
       val tableName = generateTableName
       val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+      val schemaFields = Seq("id", "name", "price", "ts")
       // Using INMEMORY index type to ensure that deltacommits generate log 
files instead of parquet
       spark.sql(
         s"""
@@ -623,6 +656,7 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
     withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         val tableName = generateTableName
+        val schemaFields = Seq("id", "name", "price", "ts", 
"partition_date_col")
         spark.sql(
           s"""
              |create table $tableName (
@@ -647,6 +681,7 @@ class TestAlterTableDropPartition extends 
HoodieSparkSqlTestBase {
           Seq("partition_date_col=2023-09-01")
         )
         spark.sql(s"alter table $tableName drop 
partition(partition_date_col='2023-08-*')")
+        
ensureLastCommitIncludesProperSchema(s"${tmp.getCanonicalPath}/$tableName", 
schemaFields)
         // show partitions will still return all partitions for tests, use 
select distinct as a stop-gap
         checkAnswer(s"select distinct partition_date_col from $tableName")(
           Seq("2023-09-01")

Reply via email to