vamshikrishnakyatham commented on code in PR #13793:
URL: https://github.com/apache/hudi/pull/13793#discussion_r2316905452


##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansProcedure.scala:
##########
@@ -138,128 +171,245 @@ class ShowCleansProcedure(includePartitionMetadata: 
Boolean) extends BaseProcedu
     StructField("failed_delete_files", DataTypes.IntegerType, nullable = true, 
Metadata.empty),
     StructField("is_partition_deleted", DataTypes.BooleanType, nullable = 
true, Metadata.empty),
     StructField("time_taken_in_millis", DataTypes.LongType, nullable = true, 
Metadata.empty),
-    StructField("total_files_deleted", DataTypes.IntegerType, nullable = true, 
Metadata.empty)
+    StructField("total_files_deleted", DataTypes.IntegerType, nullable = true, 
Metadata.empty),
+    StructField("earliest_commit_to_retain", DataTypes.StringType, nullable = 
true, Metadata.empty),
+    StructField("last_completed_commit_timestamp", DataTypes.StringType, 
nullable = true, Metadata.empty),
+    StructField("version", DataTypes.IntegerType, nullable = true, 
Metadata.empty),
+    StructField("total_partitions_to_clean", DataTypes.IntegerType, nullable = 
true, Metadata.empty),
+    StructField("total_partitions_to_delete", DataTypes.IntegerType, nullable 
= true, Metadata.empty)
   ))
 
   def parameters: Array[ProcedureParameter] = PARAMETERS
 
-  def outputType: StructType = if (includePartitionMetadata) 
PARTITION_METADATA_OUTPUT_TYPE else OUTPUT_TYPE
+  def outputType: StructType = OUTPUT_TYPE
 
   override def call(args: ProcedureArgs): Seq[Row] = {
     super.checkArgs(PARAMETERS, args)
 
-    val table = getArgValueOrDefault(args, 
PARAMETERS(0)).get.asInstanceOf[String]
-    val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
-    val showArchived = getArgValueOrDefault(args, 
PARAMETERS(2)).get.asInstanceOf[Boolean]
-    val filter = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[String]
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+    val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+    val showArchived = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[Boolean]
+    val filter = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[String]
+    val startTime = getArgValueOrDefault(args, 
PARAMETERS(5)).get.asInstanceOf[String]
+    val endTime = getArgValueOrDefault(args, 
PARAMETERS(6)).get.asInstanceOf[String]
 
     validateFilter(filter, outputType)
 
-    val hoodieCatalogTable = 
HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
-    val basePath = hoodieCatalogTable.tableLocation
+    val basePath = getBasePath(tableName, tablePath)
     val metaClient = createMetaClient(jsc, basePath)
 
-    val activeResults = if (includePartitionMetadata) {
-      getCleansWithPartitionMetadata(metaClient.getActiveTimeline, limit)
-    } else {
-      getCleans(metaClient.getActiveTimeline, limit)
-    }
+    val activeResults = 
getCombinedCleansWithPartitionMetadata(metaClient.getActiveTimeline, limit, 
metaClient, startTime, endTime, "ACTIVE")
     val finalResults = if (showArchived) {
-      val archivedResults = if (includePartitionMetadata) {
-        getCleansWithPartitionMetadata(metaClient.getArchivedTimeline, limit)
+      val archivedResults = 
getCombinedCleansWithPartitionMetadata(metaClient.getArchivedTimeline, limit, 
metaClient, startTime, endTime, "ARCHIVED")
+      val combinedResults = (activeResults ++ archivedResults)
+        .sortWith((a, b) => a.getString(0) > b.getString(0))
+      if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+        combinedResults
       } else {
-        getCleans(metaClient.getArchivedTimeline, limit)
+        combinedResults.take(limit)
       }
-      (activeResults ++ archivedResults)
-        .sortWith((a, b) => a.getString(0) > b.getString(0))
-        .take(limit)
     } else {
-      activeResults
+      if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+        activeResults
+      } else {
+        activeResults.take(limit)
+      }
     }
     applyFilter(finalResults, filter, outputType)
   }
 
-  override def build: Procedure = new 
ShowCleansProcedure(includePartitionMetadata)
+  override def build: Procedure = new ShowCleansProcedure()
 
-  private def getCleansWithPartitionMetadata(timeline: HoodieTimeline,
-                                             limit: Int): Seq[Row] = {
+  private def getCombinedCleansWithPartitionMetadata(timeline: HoodieTimeline, 
limit: Int, metaClient: HoodieTableMetaClient, startTime: String, endTime: 
String, timelineType: String): Seq[Row] = {
     import scala.collection.JavaConverters._
+    import scala.util.{Failure, Success, Try}
+
+    val filteredCleanInstants = 
timeline.getCleanerTimeline.getInstants.asScala.toSeq
+      .filter { instant =>
+        val instantTime = instant.requestedTime()
+        val withinStartTime = if (startTime.nonEmpty) instantTime >= startTime 
else true
+        val withinEndTime = if (endTime.nonEmpty) instantTime <= endTime else 
true
+        withinStartTime && withinEndTime
+      }
+    val allCleanInstants = if (startTime.nonEmpty && endTime.nonEmpty) {
+      filteredCleanInstants.sortWith((a, b) => a.requestedTime() > 
b.requestedTime())
+    } else {
+      filteredCleanInstants.sortWith((a, b) => a.requestedTime() > 
b.requestedTime()).take(limit)
+    }
 
-    val (rows: util.ArrayList[Row], cleanInstants: 
util.ArrayList[HoodieInstant]) = getSortedCleans(timeline)
-
-    var rowCount = 0
-
-    cleanInstants.asScala.takeWhile(_ => rowCount < limit).foreach { 
cleanInstant =>
-      val cleanMetadata = timeline.readCleanMetadata(cleanInstant)
-
-      cleanMetadata.getPartitionMetadata.entrySet.asScala.takeWhile(_ => 
rowCount < limit).foreach { partitionMetadataEntry =>
-        val partitionPath = partitionMetadataEntry.getKey
-        val partitionMetadata = partitionMetadataEntry.getValue
-
-        rows.add(Row(
-          cleanInstant.requestedTime(),
-          cleanInstant.getCompletionTime,
-          cleanInstant.getAction,
-          cleanMetadata.getStartCleanTime,
-          partitionPath,
-          partitionMetadata.getPolicy,
-          partitionMetadata.getDeletePathPatterns.size(),
-          partitionMetadata.getSuccessDeleteFiles.size(),
-          partitionMetadata.getFailedDeleteFiles.size(),
-          partitionMetadata.getIsPartitionDeleted,
-          cleanMetadata.getTimeTakenInMillis,
-          cleanMetadata.getTotalFilesDeleted
-        ))
-        rowCount += 1
+    val allRows = scala.collection.mutable.ListBuffer[Row]()
+
+    allCleanInstants.foreach { cleanInstant =>
+      if (cleanInstant.getState == HoodieInstant.State.COMPLETED) {
+        Try {
+          val cleanMetadata = timeline.readCleanMetadata(cleanInstant)
+
+          cleanMetadata.getPartitionMetadata.entrySet.asScala.foreach { 
partitionMetadataEntry =>
+            val partitionPath = partitionMetadataEntry.getKey
+            val partitionMetadata = partitionMetadataEntry.getValue
+
+            val row = Row(
+              cleanInstant.requestedTime(),
+              cleanInstant.getCompletionTime,
+              cleanInstant.getState.name(),
+              cleanInstant.getAction,
+              timelineType,
+              cleanMetadata.getStartCleanTime,
+              partitionPath,
+              partitionMetadata.getPolicy,
+              partitionMetadata.getDeletePathPatterns.size(),
+              partitionMetadata.getSuccessDeleteFiles.size(),
+              partitionMetadata.getFailedDeleteFiles.size(),
+              partitionMetadata.getIsPartitionDeleted,
+              cleanMetadata.getTimeTakenInMillis,
+              cleanMetadata.getTotalFilesDeleted,
+              cleanMetadata.getEarliestCommitToRetain,
+              cleanMetadata.getLastCompletedCommitTimestamp,
+              cleanMetadata.getVersion,
+              null,
+              null
+            )
+            allRows += row
+          }
+          if (cleanMetadata.getPartitionMetadata.isEmpty) {

Review Comment:
   got it, I though of doing it but smh missed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to