This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4c0fb6d951e [HUDI-7914] Use internal schema without metadata fields
for delete-partitions OP (#11487)
4c0fb6d951e is described below
commit 4c0fb6d951e849d4b1487905bedb354c5df7b544
Author: Vitali Makarevich <[email protected]>
AuthorDate: Sat Jun 29 02:08:40 2024 +0200
[HUDI-7914] Use internal schema without metadata fields for
delete-partitions OP (#11487)
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 2 +-
.../sql/hudi/common/HoodieSparkSqlTestBase.scala | 4 +++
.../sql/hudi/ddl/TestAlterTableDropPartition.scala | 41 ++++++++++++++++++++--
3 files changed, 43 insertions(+), 4 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 27e64d5a88a..50722abfbab 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -425,7 +425,7 @@ class HoodieSparkSqlWriterInternal {
}
// Issue the delete.
- val schemaStr = new
TableSchemaResolver(tableMetaClient).getTableAvroSchema.toString
+ val schemaStr = new
TableSchemaResolver(tableMetaClient).getTableAvroSchema(false).toString
val client =
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
schemaStr, path, tblName,
(parameters -
HoodieWriteConfig.AUTO_COMMIT_ENABLE.key).asJava))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index 6363718a83f..e8020a3e7a3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -81,6 +81,10 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
}
}
+ protected def getTableStoragePath(tableName: String): String = {
+ new File(sparkWareHouse, tableName).getCanonicalPath
+ }
+
override protected def test(testName: String, testTags: Tag*)(testFun: =>
Any /* Assertion */)(implicit pos: source.Position): Unit = {
super.test(testName, testTags: _*)(
try {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
index ad8b17a8039..3fcbea3a05c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTableDropPartition.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hudi.ddl
+import org.apache.avro.Schema
import org.apache.hudi.avro.model.{HoodieCleanMetadata,
HoodieCleanPartitionMetadata}
import org.apache.hudi.common.model.{HoodieCleaningPolicy,
HoodieCommitMetadata}
import org.apache.hudi.common.table.timeline.HoodieInstant
@@ -25,14 +26,30 @@ import org.apache.hudi.config.{HoodieCleanConfig,
HoodieWriteConfig}
import org.apache.hudi.keygen.{ComplexKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import org.apache.hudi.{DataSourceWriteOptions, HoodieCLIUtils,
HoodieSparkUtils}
-
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.getLastCleanMetadata
import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Assertions.assertTrue
+import scala.collection.JavaConverters._
+
class TestAlterTableDropPartition extends HoodieSparkSqlTestBase {
+ private val schemaFields: Seq[String] = Seq("id", "name", "ts", "dt")
+
+ private def ensureLastCommitIncludesProperSchema(path: String,
expectedSchema: Seq[String] = schemaFields): Unit = {
+ val metaClient = createMetaClient(spark, path)
+ // A bit weird way to extract schema, but there is no way to get it
exactly as is, since once `includeMetadataFields`
+ // is used - it will use custom logic to forcefully add/remove fields.
+ // And available public methods does not allow to specify exact instant to
get schema from, only latest after some filtering
+ // which may lead to false positives in test scenarios.
+ val lastInstant =
metaClient.getActiveTimeline.getCompletedReplaceTimeline.lastInstant().get()
+ val commitMetadata =
HoodieCommitMetadata.fromBytes(metaClient.getActiveTimeline.getInstantDetails(lastInstant).get(),
classOf[HoodieCommitMetadata])
+ val schemaStr = commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)
+ val schema = new Schema.Parser().parse(schemaStr)
+ val fields = schema.getFields.asScala.map(_.name())
+ assert(expectedSchema == fields, s"Commit metadata should include no meta
fields, received $fields")
+ }
test("Drop non-partitioned table") {
val tableName = generateTableName
@@ -133,11 +150,13 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021/10/01')")
+ ensureLastCommitIncludesProperSchema(tablePath)
// trigger clean so that partition deletion kicks in.
spark.sql(s"set
${HoodieCleanConfig.CLEANER_POLICY.key}=${HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name()}")
spark.sql(s"call run_clean(table => '$tableName', retain_commits =>
1)")
.collect()
+ ensureLastCommitIncludesProperSchema(tablePath)
val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark,
tablePath)
val cleanPartitionMeta = new
java.util.ArrayList(cleanMetadata.getPartitionMetadata.values()).toArray()
@@ -201,8 +220,10 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021/10/01')")
+ ensureLastCommitIncludesProperSchema(tablePath)
spark.sql(s"""insert into $tableName values (2, "l4", "v1",
"2021/10/02")""")
+ ensureLastCommitIncludesProperSchema(tablePath)
val partitionPath = if (urlencode) {
PartitionPathEncodeUtils.escapePathName("2021/10/01")
@@ -257,6 +278,7 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (dt='2021-10-01')")
+ ensureLastCommitIncludesProperSchema(getTableStoragePath(tableName))
// trigger clean so that partition deletion kicks in.
spark.sql(s"set
${HoodieCleanConfig.CLEANER_POLICY.key}=${HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name()}")
@@ -277,10 +299,11 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ val schemaFields = Seq("id", "name", "ts", "year", "month", "day")
import spark.implicits._
val df = Seq((1, "z3", "v1", "2021", "10", "01"), (2, "l4", "v1",
"2021", "10", "02"))
- .toDF("id", "name", "ts", "year", "month", "day")
+ .toDF(schemaFields :_*)
df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
@@ -321,11 +344,13 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (year='2021',
month='10', day='01')")
+ ensureLastCommitIncludesProperSchema(tablePath, schemaFields)
// trigger clean so that partition deletion kicks in.
spark.sql(s"set
${HoodieCleanConfig.CLEANER_POLICY.key}=${HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name()}")
spark.sql(s"call run_clean(table => '$tableName', retain_commits =>
1)")
.collect()
+ ensureLastCommitIncludesProperSchema(tablePath, schemaFields)
val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark,
tablePath)
val cleanPartitionMeta = new
java.util.ArrayList(cleanMetadata.getPartitionMetadata.values()).toArray()
@@ -355,9 +380,10 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ val schemaFields = Seq("id", "name", "ts", "year", "month", "day")
import spark.implicits._
- val df = Seq((1, "z3", "v1", "2021", "10", "01")).toDF("id", "name",
"ts", "year", "month", "day")
+ val df = Seq((1, "z3", "v1", "2021", "10", "01")).toDF(schemaFields
:_*)
df.write.format("hudi")
.option(HoodieWriteConfig.TBL_NAME.key, tableName)
@@ -398,12 +424,14 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
// drop 2021-10-01 partition
spark.sql(s"alter table $tableName drop partition (year='2021',
month='10', day='01')")
+ ensureLastCommitIncludesProperSchema(tablePath, schemaFields)
spark.sql(s"""insert into $tableName values (2, "l4", "v1", "2021",
"10", "02")""")
// trigger clean so that partition deletion kicks in.
spark.sql(s"call run_clean(table => '$tableName', retain_commits =>
1)")
.collect()
+ ensureLastCommitIncludesProperSchema(tablePath, schemaFields)
val cleanMetadata: HoodieCleanMetadata = getLastCleanMetadata(spark,
tablePath)
val cleanPartitionMeta = new
java.util.ArrayList(cleanMetadata.getPartitionMetadata.values()).toArray()
@@ -438,6 +466,7 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
+ val schemaFields = Seq("id", "name", "price", "ts", "dt")
// create table
spark.sql(
s"""
@@ -471,6 +500,7 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
Seq(2, "a2", 10.0, 1000, "02"),
Seq(3, "a3", 10.0, 1000, "03")
)
+
ensureLastCommitIncludesProperSchema(s"${tmp.getCanonicalPath}/$tableName",
schemaFields)
// check schema
val metaClient = createMetaClient(spark,
s"${tmp.getCanonicalPath}/$tableName")
@@ -493,6 +523,7 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+ val schemaFields = Seq("id", "name", "price", "ts")
spark.sql(
s"""
|create table $tableName (
@@ -532,6 +563,7 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+ val schemaFields = Seq("id", "name", "price", "ts")
// Using INMEMORY index type to ensure that deltacommits generate log
files instead of parquet
spark.sql(
s"""
@@ -579,6 +611,7 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}t/$tableName"
+ val schemaFields = Seq("id", "name", "price", "ts")
// Using INMEMORY index type to ensure that deltacommits generate log
files instead of parquet
spark.sql(
s"""
@@ -623,6 +656,7 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
+ val schemaFields = Seq("id", "name", "price", "ts",
"partition_date_col")
spark.sql(
s"""
|create table $tableName (
@@ -647,6 +681,7 @@ class TestAlterTableDropPartition extends
HoodieSparkSqlTestBase {
Seq("partition_date_col=2023-09-01")
)
spark.sql(s"alter table $tableName drop
partition(partition_date_col='2023-08-*')")
+
ensureLastCommitIncludesProperSchema(s"${tmp.getCanonicalPath}/$tableName",
schemaFields)
// show partitions will still return all partitions for tests, use
select distinct as a stop-gap
checkAnswer(s"select distinct partition_date_col from $tableName")(
Seq("2023-09-01")