This is an automated email from the ASF dual-hosted git repository.
wombatukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7460cf1d1a7 [HUDI-8806] Fix "No value present in Option" in
show_clustering procedure (#12649)
7460cf1d1a7 is described below
commit 7460cf1d1a753beca4ce520e9e80ea2b9e584a6a
Author: Ekaterina Belousova <[email protected]>
AuthorDate: Fri Jan 17 08:05:51 2025 +0700
[HUDI-8806] Fix "No value present in Option" in show_clustering procedure
(#12649)
---
.../procedures/ShowClusteringProcedure.scala | 2 +-
.../hudi/procedure/TestClusteringProcedure.scala | 46 +++++++++++++++++++++-
2 files changed, 46 insertions(+), 2 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
index 7eb5ff72f8b..dc57a07c432 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala
@@ -67,7 +67,7 @@ class ShowClusteringProcedure extends BaseProcedure with
ProcedureBuilder with S
val clusteringPlans = clusteringInstants.map(instant =>
ClusteringUtils.getClusteringPlan(metaClient, instant)
- )
+ ).filter(clusteringPlan => clusteringPlan.isPresent)
if (showInvolvedPartitions) {
clusteringPlans.map { p =>
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
index 6a43205128e..29742fc2064 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestClusteringProcedure.scala
@@ -31,7 +31,7 @@ import org.apache.hudi.{DataSourceReadOptions,
HoodieCLIUtils, HoodieDataSourceH
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Literal}
-import org.apache.spark.sql.types.{DataTypes, Metadata, StringType,
StructField, StructType}
+import org.apache.spark.sql.types.{DataTypes, IntegerType, Metadata,
StringType, StructField, StructType}
import org.apache.spark.sql.{Dataset, Row}
import java.util
@@ -774,6 +774,50 @@ class TestClusteringProcedure extends
HoodieSparkProcedureTestBase {
}
}
+ test("Test Call show_clustering with limit") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ val hudiOptions = Map(
+ "hoodie.table.name" -> tableName,
+ "hoodie.datasource.write.table.type" -> "COPY_ON_WRITE",
+ "hoodie.datasource.write.recordkey.field" -> "a",
+ "hoodie.datasource.write.partitionpath.field" -> "a,b,c",
+ "hoodie.clean.automatic" -> "true",
+ "hoodie.metadata.enable" -> "true",
+ "hoodie.clustering.inline" -> "true",
+ "hoodie.clustering.inline.max.commits" -> "1",
+ "hoodie.cleaner.commits.retained" -> "2",
+ "hoodie.datasource.write.operation" -> "insert_overwrite"
+ )
+
+ val data = Seq(
+ (1, 2, 4),
+ (1, 2, 4),
+ (1, 2, 3)
+ )
+ val schema = StructType(Array(
+ StructField("a", IntegerType, true),
+ StructField("b", IntegerType, true),
+ StructField("c", IntegerType, true)
+ ))
+
+ val df = spark.createDataFrame(spark.sparkContext.parallelize(data).map {
+ case (a, b, c) => Row(a, b, c)
+ }, schema)
+
+ df.write
+ .options(hudiOptions)
+ .format("hudi")
+ .mode("append")
+ .save(s"$basePath")
+
+ assertResult(1)(spark.sql(s"call show_clustering(path =>
'$basePath')").count())
+ assertResult(1)(spark.sql(s"call show_clustering(path => '$basePath',
limit => 1)").count())
+ assertResult(1)(spark.sql(s"call show_clustering(path => '$basePath',
limit => 2)").count())
+ }
+ }
+
def avgRecord(commitTimeline: HoodieTimeline): Long = {
var totalByteSize = 0L
var totalRecordsCount = 0L