This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 83a0cab485d0bc257b203a60f6a00fea0f1556ed Author: wangkang <[email protected]> AuthorDate: Thu Jan 5 09:43:45 2023 +0800 [HUDI-5492] spark call command 'show_compaction' doesn't return the completed compaction (#7593) Co-authored-by: kandy01.wang <[email protected]> (cherry picked from commit 2f5e487445a19e2fa5b2c26143efeb4fe9aeda63) --- .../procedures/ShowCompactionProcedure.scala | 2 +- .../spark/sql/hudi/TestCompactionTable.scala | 6 ++-- .../hudi/procedure/TestCompactionProcedure.scala | 38 +++++++++++++++++++--- 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala index 7a7bb2cf9d9..1076b9fc44a 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala @@ -65,7 +65,7 @@ class ShowCompactionProcedure extends BaseProcedure with ProcedureBuilder with S assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ, s"Cannot show compaction on a Non Merge On Read table.") val compactionInstants = metaClient.getActiveTimeline.getInstants.iterator().asScala - .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION) + .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION || p.getAction == HoodieTimeline.COMMIT_ACTION) .toSeq .sortBy(f => f.getTimestamp) .reverse diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala index 0ef89fc5b9f..78fa33c52ca 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala @@ -58,7 +58,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase { Seq(3, "a3", 10.0, 1000), Seq(4, "a4", 10.0, 1000) ) - assertResult(1)(spark.sql(s"show compaction on $tableName").collect().length) + assertResult(2)(spark.sql(s"show compaction on $tableName").collect().length) spark.sql(s"run compaction on $tableName at ${timestamps(0)}") checkAnswer(s"select id, name, price, ts from $tableName order by id")( Seq(1, "a1", 11.0, 1000), @@ -66,7 +66,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase { Seq(3, "a3", 10.0, 1000), Seq(4, "a4", 10.0, 1000) ) - assertResult(0)(spark.sql(s"show compaction on $tableName").collect().length) + assertResult(2)(spark.sql(s"show compaction on $tableName").collect().length) } } @@ -119,7 +119,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase { Seq(2, "a2", 12.0, 1000), Seq(3, "a3", 10.0, 1000) ) - assertResult(0)(spark.sql(s"show compaction on '${tmp.getCanonicalPath}'").collect().length) + assertResult(2)(spark.sql(s"show compaction on '${tmp.getCanonicalPath}'").collect().length) checkException(s"run compaction on '${tmp.getCanonicalPath}' at 12345")( s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath}, Available pending compaction instants are: " diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala index e9d9d550d3f..236d87970d2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala @@ -88,8 +88,7 @@ class TestCompactionProcedure extends HoodieSparkProcedureTestBase { val resultC = spark.sql(s"call show_compaction('$tableName')") .collect() .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2))) - assertResult(1)(resultC.length) - assertResult(resultA)(resultC) + assertResult(2)(resultC.length) checkAnswer(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(0)})")( Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name()) @@ -100,7 +99,7 @@ class TestCompactionProcedure extends HoodieSparkProcedureTestBase { Seq(3, "a3", 10.0, 1000), Seq(4, "a4", 10.0, 1000) ) - assertResult(0)(spark.sql(s"call show_compaction(table => '$tableName')").collect().length) + assertResult(2)(spark.sql(s"call show_compaction(table => '$tableName')").collect().length) } } @@ -168,11 +167,42 @@ class TestCompactionProcedure extends HoodieSparkProcedureTestBase { Seq(2, "a2", 12.0, 1000), Seq(3, "a3", 10.0, 1000) ) - assertResult(0)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length) + assertResult(2)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length) checkException(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}', timestamp => 12345L)")( s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath}, Available pending compaction instants are: " ) } } + test("Test show_compaction Procedure by Path") { + withTempDir { tmp => + val tableName1 = generateTableName + spark.sql( + s""" + |create table $tableName1 ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | tblproperties ( + | type = 'mor', + | primaryKey = 'id', + | preCombineField = 'ts', + | hoodie.compact.inline ='true', + | hoodie.compact.inline.max.delta.commits ='2' + | ) + | location '${tmp.getCanonicalPath}/$tableName1' + """.stripMargin) + spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)") + + spark.sql(s"insert into $tableName1 values(1, 'a2', 10, 1000)") + + spark.sql(s"insert into $tableName1 values(1, 'a3', 10, 1000)") + + spark.sql(s"insert into $tableName1 values(1, 'a4', 10, 1000)") + + assertResult(2)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}/$tableName1')").collect().length) + } + } }
