vamshikrishnakyatham commented on code in PR #13793:
URL: https://github.com/apache/hudi/pull/13793#discussion_r2316905452
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansProcedure.scala:
##########
@@ -138,128 +171,245 @@ class ShowCleansProcedure(includePartitionMetadata:
Boolean) extends BaseProcedu
StructField("failed_delete_files", DataTypes.IntegerType, nullable = true,
Metadata.empty),
StructField("is_partition_deleted", DataTypes.BooleanType, nullable =
true, Metadata.empty),
StructField("time_taken_in_millis", DataTypes.LongType, nullable = true,
Metadata.empty),
- StructField("total_files_deleted", DataTypes.IntegerType, nullable = true,
Metadata.empty)
+ StructField("total_files_deleted", DataTypes.IntegerType, nullable = true,
Metadata.empty),
+ StructField("earliest_commit_to_retain", DataTypes.StringType, nullable =
true, Metadata.empty),
+ StructField("last_completed_commit_timestamp", DataTypes.StringType,
nullable = true, Metadata.empty),
+ StructField("version", DataTypes.IntegerType, nullable = true,
Metadata.empty),
+ StructField("total_partitions_to_clean", DataTypes.IntegerType, nullable =
true, Metadata.empty),
+ StructField("total_partitions_to_delete", DataTypes.IntegerType, nullable
= true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
- def outputType: StructType = if (includePartitionMetadata)
PARTITION_METADATA_OUTPUT_TYPE else OUTPUT_TYPE
+ def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
- val table = getArgValueOrDefault(args,
PARAMETERS(0)).get.asInstanceOf[String]
- val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
- val showArchived = getArgValueOrDefault(args,
PARAMETERS(2)).get.asInstanceOf[Boolean]
- val filter = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[String]
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+ val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+ val showArchived = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[Boolean]
+ val filter = getArgValueOrDefault(args,
PARAMETERS(4)).get.asInstanceOf[String]
+ val startTime = getArgValueOrDefault(args,
PARAMETERS(5)).get.asInstanceOf[String]
+ val endTime = getArgValueOrDefault(args,
PARAMETERS(6)).get.asInstanceOf[String]
validateFilter(filter, outputType)
- val hoodieCatalogTable =
HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
- val basePath = hoodieCatalogTable.tableLocation
+ val basePath = getBasePath(tableName, tablePath)
val metaClient = createMetaClient(jsc, basePath)
- val activeResults = if (includePartitionMetadata) {
- getCleansWithPartitionMetadata(metaClient.getActiveTimeline, limit)
- } else {
- getCleans(metaClient.getActiveTimeline, limit)
- }
+ val activeResults =
getCombinedCleansWithPartitionMetadata(metaClient.getActiveTimeline, limit,
metaClient, startTime, endTime, "ACTIVE")
val finalResults = if (showArchived) {
- val archivedResults = if (includePartitionMetadata) {
- getCleansWithPartitionMetadata(metaClient.getArchivedTimeline, limit)
+ val archivedResults =
getCombinedCleansWithPartitionMetadata(metaClient.getArchivedTimeline, limit,
metaClient, startTime, endTime, "ARCHIVED")
+ val combinedResults = (activeResults ++ archivedResults)
+ .sortWith((a, b) => a.getString(0) > b.getString(0))
+ if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+ combinedResults
} else {
- getCleans(metaClient.getArchivedTimeline, limit)
+ combinedResults.take(limit)
}
- (activeResults ++ archivedResults)
- .sortWith((a, b) => a.getString(0) > b.getString(0))
- .take(limit)
} else {
- activeResults
+ if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+ activeResults
+ } else {
+ activeResults.take(limit)
+ }
}
applyFilter(finalResults, filter, outputType)
}
- override def build: Procedure = new
ShowCleansProcedure(includePartitionMetadata)
+ override def build: Procedure = new ShowCleansProcedure()
- private def getCleansWithPartitionMetadata(timeline: HoodieTimeline,
- limit: Int): Seq[Row] = {
+ private def getCombinedCleansWithPartitionMetadata(timeline: HoodieTimeline,
limit: Int, metaClient: HoodieTableMetaClient, startTime: String, endTime:
String, timelineType: String): Seq[Row] = {
import scala.collection.JavaConverters._
+ import scala.util.{Failure, Success, Try}
+
+ val filteredCleanInstants =
timeline.getCleanerTimeline.getInstants.asScala.toSeq
+ .filter { instant =>
+ val instantTime = instant.requestedTime()
+ val withinStartTime = if (startTime.nonEmpty) instantTime >= startTime
else true
+ val withinEndTime = if (endTime.nonEmpty) instantTime <= endTime else
true
+ withinStartTime && withinEndTime
+ }
+ val allCleanInstants = if (startTime.nonEmpty && endTime.nonEmpty) {
+ filteredCleanInstants.sortWith((a, b) => a.requestedTime() >
b.requestedTime())
+ } else {
+ filteredCleanInstants.sortWith((a, b) => a.requestedTime() >
b.requestedTime()).take(limit)
+ }
- val (rows: util.ArrayList[Row], cleanInstants:
util.ArrayList[HoodieInstant]) = getSortedCleans(timeline)
-
- var rowCount = 0
-
- cleanInstants.asScala.takeWhile(_ => rowCount < limit).foreach {
cleanInstant =>
- val cleanMetadata = timeline.readCleanMetadata(cleanInstant)
-
- cleanMetadata.getPartitionMetadata.entrySet.asScala.takeWhile(_ =>
rowCount < limit).foreach { partitionMetadataEntry =>
- val partitionPath = partitionMetadataEntry.getKey
- val partitionMetadata = partitionMetadataEntry.getValue
-
- rows.add(Row(
- cleanInstant.requestedTime(),
- cleanInstant.getCompletionTime,
- cleanInstant.getAction,
- cleanMetadata.getStartCleanTime,
- partitionPath,
- partitionMetadata.getPolicy,
- partitionMetadata.getDeletePathPatterns.size(),
- partitionMetadata.getSuccessDeleteFiles.size(),
- partitionMetadata.getFailedDeleteFiles.size(),
- partitionMetadata.getIsPartitionDeleted,
- cleanMetadata.getTimeTakenInMillis,
- cleanMetadata.getTotalFilesDeleted
- ))
- rowCount += 1
+ val allRows = scala.collection.mutable.ListBuffer[Row]()
+
+ allCleanInstants.foreach { cleanInstant =>
+ if (cleanInstant.getState == HoodieInstant.State.COMPLETED) {
+ Try {
+ val cleanMetadata = timeline.readCleanMetadata(cleanInstant)
+
+ cleanMetadata.getPartitionMetadata.entrySet.asScala.foreach {
partitionMetadataEntry =>
+ val partitionPath = partitionMetadataEntry.getKey
+ val partitionMetadata = partitionMetadataEntry.getValue
+
+ val row = Row(
+ cleanInstant.requestedTime(),
+ cleanInstant.getCompletionTime,
+ cleanInstant.getState.name(),
+ cleanInstant.getAction,
+ timelineType,
+ cleanMetadata.getStartCleanTime,
+ partitionPath,
+ partitionMetadata.getPolicy,
+ partitionMetadata.getDeletePathPatterns.size(),
+ partitionMetadata.getSuccessDeleteFiles.size(),
+ partitionMetadata.getFailedDeleteFiles.size(),
+ partitionMetadata.getIsPartitionDeleted,
+ cleanMetadata.getTimeTakenInMillis,
+ cleanMetadata.getTotalFilesDeleted,
+ cleanMetadata.getEarliestCommitToRetain,
+ cleanMetadata.getLastCompletedCommitTimestamp,
+ cleanMetadata.getVersion,
+ null,
+ null
+ )
+ allRows += row
+ }
+ if (cleanMetadata.getPartitionMetadata.isEmpty) {
Review Comment:
got it, I though of doing it but smh missed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]