This is an automated email from the ASF dual-hosted git repository.

wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7460cf1d1a7 [HUDI-8806] Fix "No value present in Option" in 
show_clustering procedure (#12649)
7460cf1d1a7 is described below

commit 7460cf1d1a753beca4ce520e9e80ea2b9e584a6a
Author: Ekaterina Belousova <[email protected]>
AuthorDate: Fri Jan 17 08:05:51 2025 +0700

    [HUDI-8806] Fix "No value present in Option" in show_clustering procedure 
(#12649)
---
 .../procedures/ShowClusteringProcedure.scala       |  2 +-
 .../hudi/procedure/TestClusteringProcedure.scala   | 46 +++++++++++++++++++++-
 2 files changed, 46 insertions(+), 2 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
index 7eb5ff72f8b..dc57a07c432 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
@@ -67,7 +67,7 @@ class ShowClusteringProcedure extends BaseProcedure with 
ProcedureBuilder with S
 
     val clusteringPlans = clusteringInstants.map(instant =>
       ClusteringUtils.getClusteringPlan(metaClient, instant)
-    )
+    ).filter(clusteringPlan => clusteringPlan.isPresent)
 
     if (showInvolvedPartitions) {
       clusteringPlans.map { p =>
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 6a43205128e..29742fc2064 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -31,7 +31,7 @@ import org.apache.hudi.{DataSourceReadOptions, 
HoodieCLIUtils, HoodieDataSourceH
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Literal}
-import org.apache.spark.sql.types.{DataTypes, Metadata, StringType, 
StructField, StructType}
+import org.apache.spark.sql.types.{DataTypes, IntegerType, Metadata, 
StringType, StructField, StructType}
 import org.apache.spark.sql.{Dataset, Row}
 
 import java.util
@@ -774,6 +774,50 @@ class TestClusteringProcedure extends 
HoodieSparkProcedureTestBase {
     }
   }
 
+  test("Test Call show_clustering with limit") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+      val hudiOptions = Map(
+        "hoodie.table.name" -> tableName,
+        "hoodie.datasource.write.table.type" -> "COPY_ON_WRITE",
+        "hoodie.datasource.write.recordkey.field" -> "a",
+        "hoodie.datasource.write.partitionpath.field" -> "a,b,c",
+        "hoodie.clean.automatic" -> "true",
+        "hoodie.metadata.enable" -> "true",
+        "hoodie.clustering.inline" -> "true",
+        "hoodie.clustering.inline.max.commits" -> "1",
+        "hoodie.cleaner.commits.retained" -> "2",
+        "hoodie.datasource.write.operation" -> "insert_overwrite"
+      )
+
+      val data = Seq(
+        (1, 2, 4),
+        (1, 2, 4),
+        (1, 2, 3)
+      )
+      val schema = StructType(Array(
+        StructField("a", IntegerType, true),
+        StructField("b", IntegerType, true),
+        StructField("c", IntegerType, true)
+      ))
+
+      val df = spark.createDataFrame(spark.sparkContext.parallelize(data).map {
+        case (a, b, c) => Row(a, b, c)
+      }, schema)
+
+      df.write
+        .options(hudiOptions)
+        .format("hudi")
+        .mode("append")
+        .save(s"$basePath")
+
+      assertResult(1)(spark.sql(s"call show_clustering(path => 
'$basePath')").count())
+      assertResult(1)(spark.sql(s"call show_clustering(path => '$basePath', 
limit => 1)").count())
+      assertResult(1)(spark.sql(s"call show_clustering(path => '$basePath', 
limit => 2)").count())
+    }
+  }
+
   def avgRecord(commitTimeline: HoodieTimeline): Long = {
     var totalByteSize = 0L
     var totalRecordsCount = 0L

Reply via email to