linliu-code commented on code in PR #13793:
URL: https://github.com/apache/hudi/pull/13793#discussion_r2316144715
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansProcedure.scala:
##########
@@ -138,128 +171,245 @@ class ShowCleansProcedure(includePartitionMetadata:
Boolean) extends BaseProcedu
StructField("failed_delete_files", DataTypes.IntegerType, nullable = true,
Metadata.empty),
StructField("is_partition_deleted", DataTypes.BooleanType, nullable =
true, Metadata.empty),
StructField("time_taken_in_millis", DataTypes.LongType, nullable = true,
Metadata.empty),
- StructField("total_files_deleted", DataTypes.IntegerType, nullable = true,
Metadata.empty)
+ StructField("total_files_deleted", DataTypes.IntegerType, nullable = true,
Metadata.empty),
+ StructField("earliest_commit_to_retain", DataTypes.StringType, nullable =
true, Metadata.empty),
+ StructField("last_completed_commit_timestamp", DataTypes.StringType,
nullable = true, Metadata.empty),
+ StructField("version", DataTypes.IntegerType, nullable = true,
Metadata.empty),
+ StructField("total_partitions_to_clean", DataTypes.IntegerType, nullable =
true, Metadata.empty),
+ StructField("total_partitions_to_delete", DataTypes.IntegerType, nullable
= true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
- def outputType: StructType = if (includePartitionMetadata)
PARTITION_METADATA_OUTPUT_TYPE else OUTPUT_TYPE
+ def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
- val table = getArgValueOrDefault(args,
PARAMETERS(0)).get.asInstanceOf[String]
- val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
- val showArchived = getArgValueOrDefault(args,
PARAMETERS(2)).get.asInstanceOf[Boolean]
- val filter = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[String]
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+ val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+ val showArchived = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[Boolean]
+ val filter = getArgValueOrDefault(args,
PARAMETERS(4)).get.asInstanceOf[String]
+ val startTime = getArgValueOrDefault(args,
PARAMETERS(5)).get.asInstanceOf[String]
+ val endTime = getArgValueOrDefault(args,
PARAMETERS(6)).get.asInstanceOf[String]
validateFilter(filter, outputType)
- val hoodieCatalogTable =
HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
- val basePath = hoodieCatalogTable.tableLocation
+ val basePath = getBasePath(tableName, tablePath)
val metaClient = createMetaClient(jsc, basePath)
- val activeResults = if (includePartitionMetadata) {
- getCleansWithPartitionMetadata(metaClient.getActiveTimeline, limit)
- } else {
- getCleans(metaClient.getActiveTimeline, limit)
- }
+ val activeResults =
getCombinedCleansWithPartitionMetadata(metaClient.getActiveTimeline, limit,
metaClient, startTime, endTime, "ACTIVE")
val finalResults = if (showArchived) {
- val archivedResults = if (includePartitionMetadata) {
- getCleansWithPartitionMetadata(metaClient.getArchivedTimeline, limit)
+ val archivedResults =
getCombinedCleansWithPartitionMetadata(metaClient.getArchivedTimeline, limit,
metaClient, startTime, endTime, "ARCHIVED")
+ val combinedResults = (activeResults ++ archivedResults)
+ .sortWith((a, b) => a.getString(0) > b.getString(0))
+ if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+ combinedResults
} else {
- getCleans(metaClient.getArchivedTimeline, limit)
+ combinedResults.take(limit)
}
- (activeResults ++ archivedResults)
- .sortWith((a, b) => a.getString(0) > b.getString(0))
- .take(limit)
} else {
- activeResults
+ if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+ activeResults
+ } else {
+ activeResults.take(limit)
+ }
}
applyFilter(finalResults, filter, outputType)
}
- override def build: Procedure = new
ShowCleansProcedure(includePartitionMetadata)
+ override def build: Procedure = new ShowCleansProcedure()
- private def getCleansWithPartitionMetadata(timeline: HoodieTimeline,
- limit: Int): Seq[Row] = {
+ private def getCombinedCleansWithPartitionMetadata(timeline: HoodieTimeline,
limit: Int, metaClient: HoodieTableMetaClient, startTime: String, endTime:
String, timelineType: String): Seq[Row] = {
import scala.collection.JavaConverters._
+ import scala.util.{Failure, Success, Try}
+
+ val filteredCleanInstants =
timeline.getCleanerTimeline.getInstants.asScala.toSeq
+ .filter { instant =>
+ val instantTime = instant.requestedTime()
+ val withinStartTime = if (startTime.nonEmpty) instantTime >= startTime
else true
+ val withinEndTime = if (endTime.nonEmpty) instantTime <= endTime else
true
+ withinStartTime && withinEndTime
+ }
+ val allCleanInstants = if (startTime.nonEmpty && endTime.nonEmpty) {
+ filteredCleanInstants.sortWith((a, b) => a.requestedTime() >
b.requestedTime())
+ } else {
+ filteredCleanInstants.sortWith((a, b) => a.requestedTime() >
b.requestedTime()).take(limit)
+ }
- val (rows: util.ArrayList[Row], cleanInstants:
util.ArrayList[HoodieInstant]) = getSortedCleans(timeline)
-
- var rowCount = 0
-
- cleanInstants.asScala.takeWhile(_ => rowCount < limit).foreach {
cleanInstant =>
- val cleanMetadata = timeline.readCleanMetadata(cleanInstant)
-
- cleanMetadata.getPartitionMetadata.entrySet.asScala.takeWhile(_ =>
rowCount < limit).foreach { partitionMetadataEntry =>
- val partitionPath = partitionMetadataEntry.getKey
- val partitionMetadata = partitionMetadataEntry.getValue
-
- rows.add(Row(
- cleanInstant.requestedTime(),
- cleanInstant.getCompletionTime,
- cleanInstant.getAction,
- cleanMetadata.getStartCleanTime,
- partitionPath,
- partitionMetadata.getPolicy,
- partitionMetadata.getDeletePathPatterns.size(),
- partitionMetadata.getSuccessDeleteFiles.size(),
- partitionMetadata.getFailedDeleteFiles.size(),
- partitionMetadata.getIsPartitionDeleted,
- cleanMetadata.getTimeTakenInMillis,
- cleanMetadata.getTotalFilesDeleted
- ))
- rowCount += 1
+ val allRows = scala.collection.mutable.ListBuffer[Row]()
+
+ allCleanInstants.foreach { cleanInstant =>
+ if (cleanInstant.getState == HoodieInstant.State.COMPLETED) {
+ Try {
+ val cleanMetadata = timeline.readCleanMetadata(cleanInstant)
+
+ cleanMetadata.getPartitionMetadata.entrySet.asScala.foreach {
partitionMetadataEntry =>
+ val partitionPath = partitionMetadataEntry.getKey
+ val partitionMetadata = partitionMetadataEntry.getValue
+
+ val row = Row(
+ cleanInstant.requestedTime(),
Review Comment:
Remove "()"s to make consistent.
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansProcedure.scala:
##########
@@ -138,128 +171,245 @@ class ShowCleansProcedure(includePartitionMetadata:
Boolean) extends BaseProcedu
StructField("failed_delete_files", DataTypes.IntegerType, nullable = true,
Metadata.empty),
StructField("is_partition_deleted", DataTypes.BooleanType, nullable =
true, Metadata.empty),
StructField("time_taken_in_millis", DataTypes.LongType, nullable = true,
Metadata.empty),
- StructField("total_files_deleted", DataTypes.IntegerType, nullable = true,
Metadata.empty)
+ StructField("total_files_deleted", DataTypes.IntegerType, nullable = true,
Metadata.empty),
+ StructField("earliest_commit_to_retain", DataTypes.StringType, nullable =
true, Metadata.empty),
+ StructField("last_completed_commit_timestamp", DataTypes.StringType,
nullable = true, Metadata.empty),
+ StructField("version", DataTypes.IntegerType, nullable = true,
Metadata.empty),
+ StructField("total_partitions_to_clean", DataTypes.IntegerType, nullable =
true, Metadata.empty),
+ StructField("total_partitions_to_delete", DataTypes.IntegerType, nullable
= true, Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
- def outputType: StructType = if (includePartitionMetadata)
PARTITION_METADATA_OUTPUT_TYPE else OUTPUT_TYPE
+ def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
- val table = getArgValueOrDefault(args,
PARAMETERS(0)).get.asInstanceOf[String]
- val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
- val showArchived = getArgValueOrDefault(args,
PARAMETERS(2)).get.asInstanceOf[Boolean]
- val filter = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[String]
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+ val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+ val showArchived = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[Boolean]
+ val filter = getArgValueOrDefault(args,
PARAMETERS(4)).get.asInstanceOf[String]
+ val startTime = getArgValueOrDefault(args,
PARAMETERS(5)).get.asInstanceOf[String]
+ val endTime = getArgValueOrDefault(args,
PARAMETERS(6)).get.asInstanceOf[String]
validateFilter(filter, outputType)
- val hoodieCatalogTable =
HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
- val basePath = hoodieCatalogTable.tableLocation
+ val basePath = getBasePath(tableName, tablePath)
val metaClient = createMetaClient(jsc, basePath)
- val activeResults = if (includePartitionMetadata) {
- getCleansWithPartitionMetadata(metaClient.getActiveTimeline, limit)
- } else {
- getCleans(metaClient.getActiveTimeline, limit)
- }
+ val activeResults =
getCombinedCleansWithPartitionMetadata(metaClient.getActiveTimeline, limit,
metaClient, startTime, endTime, "ACTIVE")
val finalResults = if (showArchived) {
- val archivedResults = if (includePartitionMetadata) {
- getCleansWithPartitionMetadata(metaClient.getArchivedTimeline, limit)
+ val archivedResults =
getCombinedCleansWithPartitionMetadata(metaClient.getArchivedTimeline, limit,
metaClient, startTime, endTime, "ARCHIVED")
+ val combinedResults = (activeResults ++ archivedResults)
+ .sortWith((a, b) => a.getString(0) > b.getString(0))
+ if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+ combinedResults
} else {
- getCleans(metaClient.getArchivedTimeline, limit)
+ combinedResults.take(limit)
}
- (activeResults ++ archivedResults)
- .sortWith((a, b) => a.getString(0) > b.getString(0))
- .take(limit)
} else {
- activeResults
+ if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+ activeResults
+ } else {
+ activeResults.take(limit)
+ }
}
applyFilter(finalResults, filter, outputType)
}
- override def build: Procedure = new
ShowCleansProcedure(includePartitionMetadata)
+ override def build: Procedure = new ShowCleansProcedure()
- private def getCleansWithPartitionMetadata(timeline: HoodieTimeline,
- limit: Int): Seq[Row] = {
+ private def getCombinedCleansWithPartitionMetadata(timeline: HoodieTimeline,
limit: Int, metaClient: HoodieTableMetaClient, startTime: String, endTime:
String, timelineType: String): Seq[Row] = {
import scala.collection.JavaConverters._
+ import scala.util.{Failure, Success, Try}
+
+ val filteredCleanInstants =
timeline.getCleanerTimeline.getInstants.asScala.toSeq
+ .filter { instant =>
+ val instantTime = instant.requestedTime()
+ val withinStartTime = if (startTime.nonEmpty) instantTime >= startTime
else true
+ val withinEndTime = if (endTime.nonEmpty) instantTime <= endTime else
true
+ withinStartTime && withinEndTime
+ }
+ val allCleanInstants = if (startTime.nonEmpty && endTime.nonEmpty) {
+ filteredCleanInstants.sortWith((a, b) => a.requestedTime() >
b.requestedTime())
+ } else {
+ filteredCleanInstants.sortWith((a, b) => a.requestedTime() >
b.requestedTime()).take(limit)
+ }
- val (rows: util.ArrayList[Row], cleanInstants:
util.ArrayList[HoodieInstant]) = getSortedCleans(timeline)
-
- var rowCount = 0
-
- cleanInstants.asScala.takeWhile(_ => rowCount < limit).foreach {
cleanInstant =>
- val cleanMetadata = timeline.readCleanMetadata(cleanInstant)
-
- cleanMetadata.getPartitionMetadata.entrySet.asScala.takeWhile(_ =>
rowCount < limit).foreach { partitionMetadataEntry =>
- val partitionPath = partitionMetadataEntry.getKey
- val partitionMetadata = partitionMetadataEntry.getValue
-
- rows.add(Row(
- cleanInstant.requestedTime(),
- cleanInstant.getCompletionTime,
- cleanInstant.getAction,
- cleanMetadata.getStartCleanTime,
- partitionPath,
- partitionMetadata.getPolicy,
- partitionMetadata.getDeletePathPatterns.size(),
- partitionMetadata.getSuccessDeleteFiles.size(),
- partitionMetadata.getFailedDeleteFiles.size(),
- partitionMetadata.getIsPartitionDeleted,
- cleanMetadata.getTimeTakenInMillis,
- cleanMetadata.getTotalFilesDeleted
- ))
- rowCount += 1
+ val allRows = scala.collection.mutable.ListBuffer[Row]()
+
+ allCleanInstants.foreach { cleanInstant =>
+ if (cleanInstant.getState == HoodieInstant.State.COMPLETED) {
+ Try {
+ val cleanMetadata = timeline.readCleanMetadata(cleanInstant)
+
+ cleanMetadata.getPartitionMetadata.entrySet.asScala.foreach {
partitionMetadataEntry =>
+ val partitionPath = partitionMetadataEntry.getKey
+ val partitionMetadata = partitionMetadataEntry.getValue
+
+ val row = Row(
+ cleanInstant.requestedTime(),
+ cleanInstant.getCompletionTime,
+ cleanInstant.getState.name(),
+ cleanInstant.getAction,
+ timelineType,
+ cleanMetadata.getStartCleanTime,
+ partitionPath,
+ partitionMetadata.getPolicy,
+ partitionMetadata.getDeletePathPatterns.size(),
+ partitionMetadata.getSuccessDeleteFiles.size(),
+ partitionMetadata.getFailedDeleteFiles.size(),
+ partitionMetadata.getIsPartitionDeleted,
+ cleanMetadata.getTimeTakenInMillis,
+ cleanMetadata.getTotalFilesDeleted,
+ cleanMetadata.getEarliestCommitToRetain,
+ cleanMetadata.getLastCompletedCommitTimestamp,
+ cleanMetadata.getVersion,
+ null,
+ null
+ )
+ allRows += row
+ }
+ if (cleanMetadata.getPartitionMetadata.isEmpty) {
Review Comment:
```suggestion
if (cleanMetadata.getPartitionMetadata.isEmpty) {
...
} else {
cleanMetadata.getPartitionMetadata.entrySet.asScala.foreach {
...
}
}
```
Which is easier to understood
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansProcedure.scala:
##########
@@ -17,53 +17,80 @@
package org.apache.spark.sql.hudi.command.procedures
-import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport}
-import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline,
TimelineLayout}
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
-import java.util
-import java.util.Collections
import java.util.function.Supplier
import scala.collection.JavaConverters._
/**
- * Spark SQL procedure to show completed clean operations for a Hudi table.
+ * Spark SQL procedure to show all clean operations for a Hudi table.
*
- * This procedure displays information about clean operations that have been
executed.
- * Clean operations remove old file versions to reclaim storage space and
maintain table performance.
+ * This procedure provides a comprehensive view of Hudi clean operations.
+ * It displays completed clean operations with full partition metadata for
both completed and pending operations.
*
* == Parameters ==
* - `table`: Required. The name of the Hudi table to query
- * - `limit`: Optional. Maximum number of clean operations to return (default:
10)
+ * - `path`: Optional. The path of the Hudi table (anyone of `table` or `path`
must be provided)
+ * - `limit`: Optional. Maximum number of clean operations to return (default:
10, ignored if time range specified)
* - `showArchived`: Optional. Whether to include archived clean operations
(default: false)
* - `filter`: Optional. SQL expression to filter results (default: empty
string)
+ * - `startTime`: Optional. Start time for clean operations (format:
yyyyMMddHHmmss, default: empty)
+ * - `endTime`: Optional. End time for clean operations (format:
yyyyMMddHHmmss, default: empty)
*
* == Output Schema ==
* - `clean_time`: Timestamp when the clean operation was performed
* - `state_transition_time`: Time when the clean transitioned to completed
state
+ * - `state`: Operation state (COMPLETED, INFLIGHT, REQUESTED)
* - `action`: The action type (always 'clean')
* - `start_clean_time`: When the clean operation started
+ * - `partition_path`: Partition path for the clean operation
+ * - `policy`: Clean policy used (KEEP_LATEST_COMMITS, etc.)
+ * - `delete_path_patterns`: Number of delete path patterns
+ * - `success_delete_files`: Number of successfully deleted files
+ * - `failed_delete_files`: Number of files that failed to delete
+ * - `is_partition_deleted`: Whether the entire partition was deleted
* - `time_taken_in_millis`: Duration of the clean operation in milliseconds
* - `total_files_deleted`: Total number of files deleted during the clean
* - `earliest_commit_to_retain`: The earliest commit that was retained
* - `last_completed_commit_timestamp`: The last completed commit at clean time
* - `version`: Version of the clean operation metadata
- * - Additional partition-level metadata columns when using
`show_cleans_metadata`
+ * - `total_partitions_to_clean`: Total partitions to clean (for pending
operations)
+ * - `total_partitions_to_delete`: Total partitions to delete (for pending
operations)
*
- * == Error Handling ==
- * - Throws `IllegalArgumentException` for invalid filter expressions
- * - Throws `HoodieException` for table access issues
- * - Returns empty result set if no clean plans match the criteria
+ * == Data Availability by Operation State ==
+ * - **COMPLETED operations**: All execution and partition metadata fields are
populated
+ * - **PENDING operations**: Plan fields are populated, execution fields are
null (graceful handling)
*
* == Filter Support ==
* The `filter` parameter supports SQL expressions for filtering results.
*
* === Common Filter Examples ===
* {{{
+ * -- Show only completed operations (equivalent to old show_cleans)
+ * CALL show_cleans(
+ * table => 'my_table',
+ * filter => "state = 'COMPLETED'"
+ * )
+ *
+ * -- Show only pending operations (equivalent to old show_clean_plans)
+ * CALL show_cleans(
+ * table => 'my_table',
+ * filter => "state IN ('REQUESTED', 'INFLIGHT')"
Review Comment:
Are these states case sensitive?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCleansProcedure.scala:
##########
@@ -17,53 +17,80 @@
package org.apache.spark.sql.hudi.command.procedures
-import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport}
-import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline,
TimelineLayout}
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
-import java.util
-import java.util.Collections
import java.util.function.Supplier
import scala.collection.JavaConverters._
/**
- * Spark SQL procedure to show completed clean operations for a Hudi table.
+ * Spark SQL procedure to show all clean operations for a Hudi table.
*
- * This procedure displays information about clean operations that have been
executed.
- * Clean operations remove old file versions to reclaim storage space and
maintain table performance.
+ * This procedure provides a comprehensive view of Hudi clean operations.
+ * It displays completed clean operations with full partition metadata for
both completed and pending operations.
*
* == Parameters ==
* - `table`: Required. The name of the Hudi table to query
- * - `limit`: Optional. Maximum number of clean operations to return (default:
10)
+ * - `path`: Optional. The path of the Hudi table (anyone of `table` or `path`
must be provided)
+ * - `limit`: Optional. Maximum number of clean operations to return (default:
10, ignored if time range specified)
* - `showArchived`: Optional. Whether to include archived clean operations
(default: false)
* - `filter`: Optional. SQL expression to filter results (default: empty
string)
+ * - `startTime`: Optional. Start time for clean operations (format:
yyyyMMddHHmmss, default: empty)
Review Comment:
Interesting though. Do we have a way to let a Hudi commiter know this rule?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala:
##########
@@ -53,32 +133,219 @@ class ShowClusteringProcedure extends BaseProcedure with
ProcedureBuilder with S
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
- val showInvolvedPartitions = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[Boolean]
+ val showArchived = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[Boolean]
+ val filter = getArgValueOrDefault(args,
PARAMETERS(4)).get.asInstanceOf[String]
+ val startTime = getArgValueOrDefault(args,
PARAMETERS(5)).get.asInstanceOf[String]
+ val endTime = getArgValueOrDefault(args,
PARAMETERS(6)).get.asInstanceOf[String]
+ validateFilter(filter, outputType)
val basePath: String = getBasePath(tableName, tablePath)
val metaClient = createMetaClient(jsc, basePath)
- val clusteringInstants =
metaClient.getActiveTimeline.getInstants.iterator().asScala
+
+ val activeResults =
getCombinedClusteringsWithPartitionMetadata(metaClient.getActiveTimeline,
limit, metaClient, startTime, endTime, "ACTIVE")
+ val finalResults = if (showArchived) {
+ val archivedResults =
getCombinedClusteringsWithPartitionMetadata(metaClient.getArchivedTimeline,
limit, metaClient, startTime, endTime, "ARCHIVED")
+ val combinedResults = (activeResults ++ archivedResults)
+ .sortWith((a, b) => a.getString(0) > b.getString(0))
+ if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+ combinedResults
+ } else {
+ combinedResults.take(limit)
+ }
+ } else {
+ if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+ activeResults
+ } else {
+ activeResults.take(limit)
+ }
+ }
+
+ applyFilter(finalResults, filter, outputType)
+ }
+
+ private def getCombinedClusteringsWithPartitionMetadata(timeline:
HoodieTimeline,
+ limit: Int,
+ metaClient:
HoodieTableMetaClient,
+ startTime: String,
+ endTime: String,
+ timelineType:
String): Seq[Row] = {
+ val allRows = scala.collection.mutable.ListBuffer[Row]()
+
+ val filteredClusteringInstants = timeline.getInstants.iterator().asScala
.filter(p =>
ClusteringUtils.isClusteringOrReplaceCommitAction(p.getAction))
- .toSeq
- .sortBy(f => f.requestedTime)
- .reverse
- .take(limit)
-
- val clusteringPlans = clusteringInstants.map(instant =>
- ClusteringUtils.getClusteringPlan(metaClient, instant)
- ).filter(clusteringPlan => clusteringPlan.isPresent)
-
- if (showInvolvedPartitions) {
- clusteringPlans.map { p =>
- Row(p.get().getLeft.requestedTime,
p.get().getRight.getInputGroups.size(),
- p.get().getLeft.getState.name(),
HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala.toSeq))
+ .filter { instant =>
+ val instantTime = instant.requestedTime()
+ val withinStartTime = if (startTime.nonEmpty) instantTime >= startTime
else true
+ val withinEndTime = if (endTime.nonEmpty) instantTime <= endTime else
true
+ withinStartTime && withinEndTime
}
+ .toSeq
+ val allClusteringInstants = if (startTime.nonEmpty && endTime.nonEmpty) {
+ filteredClusteringInstants.sortBy(_.requestedTime).reverse
} else {
- clusteringPlans.map { p =>
- Row(p.get().getLeft.requestedTime,
p.get().getRight.getInputGroups.size(),
- p.get().getLeft.getState.name(), "*")
+ filteredClusteringInstants.sortBy(_.requestedTime).reverse.take(limit)
+ }
+
+ allClusteringInstants.foreach { clusteringInstant =>
+ if (clusteringInstant.getState == HoodieInstant.State.COMPLETED) {
+ scala.util.Try {
+ val commitMetadata = timeline.readCommitMetadata(clusteringInstant)
+ val planStats = Try {
+ val clusteringPlanOpt =
ClusteringUtils.getClusteringPlan(metaClient, clusteringInstant)
+ if (clusteringPlanOpt.isPresent) {
+
Some(extractClusteringPlanStats(clusteringPlanOpt.get().getRight))
+ } else None
+ }.getOrElse(None)
+
+ commitMetadata.getPartitionToWriteStats.entrySet.asScala.foreach {
partitionEntry =>
+ val partitionPath = partitionEntry.getKey
+ val writeStats = partitionEntry.getValue.asScala
+
+ val totalInputFiles = planStats.flatMap(s =>
s.partitionStats.get(partitionPath).map(_.inputFiles))
+ .getOrElse(0)
+
+ val totalOutputFiles = writeStats.size
+ val totalOutputSizeBytes = writeStats.map(_.getFileSizeInBytes).sum
+ val inputGroupSize =
planStats.map(_.totalInputGroups).getOrElse(writeStats.size)
+
+ val row = Row(
+ clusteringInstant.requestedTime(),
+ clusteringInstant.getCompletionTime,
+ clusteringInstant.getState.name(),
+ clusteringInstant.getAction,
+ timelineType,
+ inputGroupSize,
+ partitionPath,
+ totalInputFiles,
+ totalOutputFiles,
+ totalOutputSizeBytes
+ )
+ allRows += row
+ }
+ if (commitMetadata.getPartitionToWriteStats.isEmpty) {
+ val totalInputFiles =
planStats.map(_.totalInputFiles).map(Integer.valueOf).orNull
+ val totalOutputFiles = commitMetadata.fetchTotalFilesInsert +
commitMetadata.fetchTotalFilesUpdated
+ val totalOutputSizeBytes = commitMetadata.fetchTotalBytesWritten
+ val inputGroupSize =
planStats.map(_.totalInputGroups).map(Integer.valueOf).orNull
+
+ val row = Row(
+ clusteringInstant.requestedTime(),
+ clusteringInstant.getCompletionTime,
+ clusteringInstant.getState.name(),
+ clusteringInstant.getAction,
+ timelineType,
+ inputGroupSize,
+ null,
+ totalInputFiles,
+ totalOutputFiles.toInt,
+ totalOutputSizeBytes
+ )
+ allRows += row
+ }
+ }.recover {
+ case e: Exception =>
+ log.warn(s"Failed to read clustering metadata for instant
${clusteringInstant.requestedTime}: ${e.getMessage}")
+ val row =
createErrorRowForCompletedWithPartition(clusteringInstant, timelineType)
+ allRows += row
+ }
+ } else {
+ scala.util.Try {
+ val clusteringPlanOpt =
ClusteringUtils.getClusteringPlan(metaClient, clusteringInstant)
+ if (clusteringPlanOpt.isPresent) {
+ val clusteringPlan = clusteringPlanOpt.get()
+ val plan = clusteringPlan.getRight
+ val stats = extractClusteringPlanStats(plan)
+ stats.involvedPartitions.foreach { partitionPath =>
+ val partitionStat = stats.partitionStats.get(partitionPath)
+ val row = Row(
+ clusteringInstant.requestedTime(),
+ null, // state_transition_time - not available for pending
+ clusteringInstant.getState.name(),
+ clusteringInstant.getAction,
+ timelineType,
+ stats.totalInputGroups, // input_group_size - total across all
partitions
+ partitionPath,
+ partitionStat.map(_.inputFiles).getOrElse(0), //
input_files_count - per partition
+ partitionStat.map(_.outputGroups).getOrElse(0), //
output_files_count - per partition
+ 0L // output_size_bytes - not available for pending operations
+ )
+ allRows += row
+ }
+ } else {
+ // for inflight requests without a clustering plan
+ val row = createErrorRowForPendingWithPartition(clusteringInstant,
timelineType)
+ allRows += row
+ }
+ }.recover {
+ case e: Exception =>
+ log.warn(s"Failed to read clustering plan for instant
${clusteringInstant.requestedTime}: ${e.getMessage}")
+ val row = createErrorRowForPendingWithPartition(clusteringInstant,
timelineType)
+ allRows += row
+ }
}
}
+ allRows.toSeq
+ }
Review Comment:
Do we have a way to break this 100 lines of code into smaller functions for
better readability?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowClusteringProcedure.scala:
##########
@@ -53,32 +133,219 @@ class ShowClusteringProcedure extends BaseProcedure with
ProcedureBuilder with S
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
- val showInvolvedPartitions = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[Boolean]
+ val showArchived = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[Boolean]
+ val filter = getArgValueOrDefault(args,
PARAMETERS(4)).get.asInstanceOf[String]
+ val startTime = getArgValueOrDefault(args,
PARAMETERS(5)).get.asInstanceOf[String]
+ val endTime = getArgValueOrDefault(args,
PARAMETERS(6)).get.asInstanceOf[String]
+ validateFilter(filter, outputType)
val basePath: String = getBasePath(tableName, tablePath)
val metaClient = createMetaClient(jsc, basePath)
- val clusteringInstants =
metaClient.getActiveTimeline.getInstants.iterator().asScala
+
+ val activeResults =
getCombinedClusteringsWithPartitionMetadata(metaClient.getActiveTimeline,
limit, metaClient, startTime, endTime, "ACTIVE")
+ val finalResults = if (showArchived) {
+ val archivedResults =
getCombinedClusteringsWithPartitionMetadata(metaClient.getArchivedTimeline,
limit, metaClient, startTime, endTime, "ARCHIVED")
+ val combinedResults = (activeResults ++ archivedResults)
+ .sortWith((a, b) => a.getString(0) > b.getString(0))
+ if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+ combinedResults
+ } else {
+ combinedResults.take(limit)
+ }
+ } else {
+ if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+ activeResults
+ } else {
+ activeResults.take(limit)
+ }
+ }
+
+ applyFilter(finalResults, filter, outputType)
+ }
+
+ private def getCombinedClusteringsWithPartitionMetadata(timeline:
HoodieTimeline,
+ limit: Int,
+ metaClient:
HoodieTableMetaClient,
+ startTime: String,
+ endTime: String,
+ timelineType:
String): Seq[Row] = {
+ val allRows = scala.collection.mutable.ListBuffer[Row]()
+
+ val filteredClusteringInstants = timeline.getInstants.iterator().asScala
.filter(p =>
ClusteringUtils.isClusteringOrReplaceCommitAction(p.getAction))
Review Comment:
The javadoc of `ClusteringUtils.isClusteringOrReplaceCommitAction` is
outdated. Please update it as well.
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCommitsProcedure.scala:
##########
@@ -17,150 +17,492 @@
package org.apache.spark.sql.hudi.command.procedures
-import org.apache.hudi.HoodieCLIUtils
-import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline,
TimelineLayout}
+import org.apache.hudi.common.model.HoodieWriteStat
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.util.ClusteringUtils
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
-import java.util
-import java.util.Collections
import java.util.function.Supplier
import scala.collection.JavaConverters._
+import scala.util.Try
-class ShowCommitsProcedure(includeExtraMetadata: Boolean) extends
BaseProcedure with ProcedureBuilder {
- var sortByFieldParameter: ProcedureParameter = _
-
+/**
+ * Spark SQL procedure to show all commit operations for a Hudi table.
+ *
+ * This procedure provides a comprehensive view of Hudi commit operations,
displaying both
+ * completed commits with execution metadata and pending commit plans.
+ * For pending operations, execution-specific fields are gracefully set to
null.
+ *
+ * == Parameters ==
+ * - `table`: Required. The name of the Hudi table to query
+ * - `path`: Optional. The path of the Hudi table (alternative to table name)
+ * - `limit`: Optional. Maximum number of commit operations to return
(default: 20)
+ * - `showArchived`: Optional. Whether to include archived commit operations
(default: false)
+ * - `filter`: Optional. SQL expression to filter results (default: empty
string)
+ * - `startTime`: Optional. Start time for commits (format: yyyyMMddHHmmss,
default: empty)
+ * - `endTime`: Optional. End time for commits (format: yyyyMMddHHmmss,
default: empty)
+ *
+ * == Output Schema ==
+ * - `commit_time`: Timestamp when the commit operation was initiated
+ * - `state_transition_time`: Time when the commit transitioned to completed
state (null for pending)
+ * - `state`: Operation state (COMPLETED, INFLIGHT, REQUESTED)
+ * - `action`: The action type (commit/deltacommit/replacecommit)
+ * - `timeline_type`: Source timeline (ACTIVE, ARCHIVED)
+ * - `partition_path`: Partition path for the commit operation
+ * - `file_id`: ID of the file
+ * - `previous_commit`: Previous commit time for the file
+ * - `num_writes`: Number of records written
+ * - `num_inserts`: Number of records inserted
+ * - `num_deletes`: Number of records deleted
+ * - `num_update_writes`: Number of records updated
+ * - `total_log_blocks`: Number of log blocks
+ * - `total_corrupt_log_blocks`: Number of corrupt log blocks
+ * - `total_rollback_blocks`: Number of rollback blocks
+ * - `total_log_records`: Number of log records
+ * - `total_updated_records_compacted`: Number of updated records compacted
+ * - `total_files_added`: Number of files added
+ * - `total_files_updated`: Number of files updated
+ * - `total_records_written`: Number of records written
+ * - `total_records_updated`: Number of records updated
+ * - `total_bytes_written`: Total bytes written
+ * - `total_errors`: Number of errors encountered
+ * - `file_size`: Size of the file in bytes
+ * - `avg_record_size`: Average size of each record in bytes
+ * - `extra_metadata`: Additional metadata from the commit
+ */
+class ShowCommitsProcedure extends BaseProcedure with ProcedureBuilder with
Logging {
private val PARAMETERS = Array[ProcedureParameter](
- ProcedureParameter.required(0, "table", DataTypes.StringType),
- ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10)
+ ProcedureParameter.optional(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "path", DataTypes.StringType),
+ ProcedureParameter.optional(2, "limit", DataTypes.IntegerType, 20),
+ ProcedureParameter.optional(3, "showArchived", DataTypes.BooleanType,
false),
+ ProcedureParameter.optional(4, "showFiles", DataTypes.BooleanType, false),
+ ProcedureParameter.optional(5, "filter", DataTypes.StringType, ""),
+ ProcedureParameter.optional(6, "startTime", DataTypes.StringType, ""),
+ ProcedureParameter.optional(7, "endTime", DataTypes.StringType, "")
)
private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("commit_time", DataTypes.StringType, nullable = true,
Metadata.empty),
StructField("state_transition_time", DataTypes.StringType, nullable =
true, Metadata.empty),
+ StructField("state", DataTypes.StringType, nullable = true,
Metadata.empty),
StructField("action", DataTypes.StringType, nullable = true,
Metadata.empty),
- StructField("total_bytes_written", DataTypes.LongType, nullable = true,
Metadata.empty),
- StructField("total_files_added", DataTypes.LongType, nullable = true,
Metadata.empty),
- StructField("total_files_updated", DataTypes.LongType, nullable = true,
Metadata.empty),
- StructField("total_partitions_written", DataTypes.LongType, nullable =
true, Metadata.empty),
- StructField("total_records_written", DataTypes.LongType, nullable = true,
Metadata.empty),
- StructField("total_update_records_written", DataTypes.LongType, nullable =
true, Metadata.empty),
- StructField("total_errors", DataTypes.LongType, nullable = true,
Metadata.empty)
- ))
-
- private val METADATA_OUTPUT_TYPE = new StructType(Array[StructField](
- StructField("commit_time", DataTypes.StringType, nullable = true,
Metadata.empty),
- StructField("state_transition_time", DataTypes.StringType, nullable =
true, Metadata.empty),
- StructField("action", DataTypes.StringType, nullable = true,
Metadata.empty),
- StructField("partition", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("timeline_type", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("partition_path", DataTypes.StringType, nullable = true,
Metadata.empty),
StructField("file_id", DataTypes.StringType, nullable = true,
Metadata.empty),
StructField("previous_commit", DataTypes.StringType, nullable = true,
Metadata.empty),
StructField("num_writes", DataTypes.LongType, nullable = true,
Metadata.empty),
StructField("num_inserts", DataTypes.LongType, nullable = true,
Metadata.empty),
StructField("num_deletes", DataTypes.LongType, nullable = true,
Metadata.empty),
StructField("num_update_writes", DataTypes.LongType, nullable = true,
Metadata.empty),
- StructField("total_errors", DataTypes.LongType, nullable = true,
Metadata.empty),
StructField("total_log_blocks", DataTypes.LongType, nullable = true,
Metadata.empty),
StructField("total_corrupt_log_blocks", DataTypes.LongType, nullable =
true, Metadata.empty),
StructField("total_rollback_blocks", DataTypes.LongType, nullable = true,
Metadata.empty),
StructField("total_log_records", DataTypes.LongType, nullable = true,
Metadata.empty),
StructField("total_updated_records_compacted", DataTypes.LongType,
nullable = true, Metadata.empty),
- StructField("total_bytes_written", DataTypes.LongType, nullable = true,
Metadata.empty)
+ StructField("total_files_added", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("total_files_updated", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("total_records_written", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("total_records_updated", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("total_bytes_written", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("total_errors", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("file_size", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("avg_record_size", DataTypes.LongType, nullable = true,
Metadata.empty),
+ StructField("extra_metadata", DataTypes.StringType, nullable = true,
Metadata.empty)
))
def parameters: Array[ProcedureParameter] = PARAMETERS
- def outputType: StructType = if (includeExtraMetadata) METADATA_OUTPUT_TYPE
else OUTPUT_TYPE
+ def outputType: StructType = OUTPUT_TYPE
override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)
- val table = getArgValueOrDefault(args,
PARAMETERS(0)).get.asInstanceOf[String]
- val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
+ val limit = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Int]
+ val showArchived = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[Boolean]
+ val showFiles = getArgValueOrDefault(args,
PARAMETERS(4)).get.asInstanceOf[Boolean]
+ val filter = getArgValueOrDefault(args,
PARAMETERS(5)).get.asInstanceOf[String]
+ val startTime = getArgValueOrDefault(args,
PARAMETERS(6)).get.asInstanceOf[String]
+ val endTime = getArgValueOrDefault(args,
PARAMETERS(7)).get.asInstanceOf[String]
+
+ validateFilter(filter, outputType)
- val hoodieCatalogTable =
HoodieCLIUtils.getHoodieCatalogTable(sparkSession, table)
- val basePath = hoodieCatalogTable.tableLocation
+ val basePath: String = getBasePath(tableName, tablePath)
val metaClient = createMetaClient(jsc, basePath)
- val activeTimeline = metaClient.getActiveTimeline
- if (includeExtraMetadata) {
- getCommitsWithMetadata(activeTimeline, limit)
+ val activeResults = getCommitsWithPartitionMetadata(
+ metaClient.getActiveTimeline, limit, showFiles, startTime, endTime,
metaClient, "ACTIVE")
+
+ val finalResults = if (showArchived) {
+ val archivedResults =
getCommitsWithPartitionMetadata(metaClient.getArchivedTimeline, limit,
showFiles, startTime, endTime, metaClient, "ARCHIVED")
+ val combinedResults = (activeResults ++ archivedResults)
+ .sortWith((a, b) => a.getString(0) > b.getString(0))
+ if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+ combinedResults
+ } else {
+ combinedResults.take(limit)
+ }
} else {
- getCommits(activeTimeline, limit)
+ if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) {
+ activeResults
+ } else {
+ activeResults.take(limit)
+ }
}
+ applyFilter(finalResults, filter, outputType)
}
- override def build: Procedure = new
ShowCommitsProcedure(includeExtraMetadata)
-
- private def getCommitsWithMetadata(timeline: HoodieTimeline,
- limit: Int): Seq[Row] = {
- import scala.collection.JavaConverters._
-
- val (rows: util.ArrayList[Row], newCommits: util.ArrayList[HoodieInstant])
= getSortCommits(timeline)
- val layout = TimelineLayout.fromVersion(timeline.getTimelineLayoutVersion)
- for (i <- 0 until newCommits.size) {
- val commit = newCommits.get(i)
- val commitMetadata = timeline.readCommitMetadata(commit)
- for (partitionWriteStat <-
commitMetadata.getPartitionToWriteStats.entrySet.asScala) {
- for (hoodieWriteStat <- partitionWriteStat.getValue.asScala) {
- rows.add(Row(
- commit.requestedTime, commit.getCompletionTime, commit.getAction,
hoodieWriteStat.getPartitionPath,
- hoodieWriteStat.getFileId, hoodieWriteStat.getPrevCommit,
hoodieWriteStat.getNumWrites,
- hoodieWriteStat.getNumInserts, hoodieWriteStat.getNumDeletes,
hoodieWriteStat.getNumUpdateWrites,
- hoodieWriteStat.getTotalWriteErrors,
hoodieWriteStat.getTotalLogBlocks, hoodieWriteStat.getTotalCorruptLogBlock,
- hoodieWriteStat.getTotalRollbackBlocks,
hoodieWriteStat.getTotalLogRecords,
- hoodieWriteStat.getTotalUpdatedRecordsCompacted,
hoodieWriteStat.getTotalWriteBytes))
+ private def getCommitsWithPartitionMetadata(timeline: HoodieTimeline,
+ limit: Int,
+ showFiles: Boolean,
+ startTime: String,
+ endTime: String,
+ metaClient:
HoodieTableMetaClient,
+ timelineType: String): Seq[Row]
= {
+
+ val allRows = scala.collection.mutable.ListBuffer[Row]()
+
+ val commitsTimeline = timeline.getCommitsTimeline
+
+ val filteredCommitInstants = commitsTimeline.getInstants.asScala
+ .filter { instant =>
+ val instantTime = instant.requestedTime()
+ val withinStartTime = if (startTime.nonEmpty) instantTime >= startTime
else true
+ val withinEndTime = if (endTime.nonEmpty) instantTime <= endTime else
true
+ withinStartTime && withinEndTime
+ }
+ .toSeq
+
+ val allCommitInstants = if (startTime.nonEmpty && endTime.nonEmpty) {
+ filteredCommitInstants.sortBy(_.requestedTime).reverse
+ } else {
+ filteredCommitInstants.sortBy(_.requestedTime).reverse.take(limit)
+ }
+
+ allCommitInstants.foreach { commitInstant =>
+ if (commitInstant.getState == HoodieInstant.State.COMPLETED) {
+ Try {
+ val commitMetadata = if
(ClusteringUtils.isClusteringOrReplaceCommitAction(commitInstant.getAction)) {
+ timeline.readReplaceCommitMetadata(commitInstant)
+ } else {
+ timeline.readCommitMetadata(commitInstant)
+ }
+
+ val extraMetadata = Option(commitMetadata.getExtraMetadata)
+ .map(_.asScala.map { case (k, v) => s"$k=$v" }.mkString(", "))
+ .orNull
+
+ if (showFiles) {
+ for (entry <-
commitMetadata.getPartitionToWriteStats.entrySet.asScala) {
+ val partitionPath = entry.getKey
+ val stats = entry.getValue.asScala
+
+ for (stat <- stats) {
+ val fileId = stat.getFileId
+ val prevCommit = stat.getPrevCommit
+ val fileSize = stat.getFileSizeInBytes
+
+ val isInsert = prevCommit == HoodieWriteStat.NULL_COMMIT
+ val totalFilesAdded = if (isInsert) 1L else 0L
+ val totalFilesUpdated = if (!isInsert) 1L else 0L
+
+ val numWrites = stat.getNumWrites
+ val numInserts = stat.getNumInserts
+ val numDeletes = stat.getNumDeletes
+ val numUpdateWrites = stat.getNumUpdateWrites
+ val totalLogBlocks = stat.getTotalLogBlocks
+ val totalCorruptLogBlocks = stat.getTotalCorruptLogBlock
+ val totalRollbackBlocks = stat.getTotalRollbackBlocks
+ val totalLogRecords = stat.getTotalLogRecords
+ val totalUpdatedRecordsCompacted =
stat.getTotalUpdatedRecordsCompacted
+ val totalBytesWritten = stat.getTotalWriteBytes
+ val totalErrors = stat.getTotalWriteErrors
+ val avgRecordSize = if (numWrites > 0) totalBytesWritten /
numWrites else 0L
+
+ allRows += Row(
+ commitInstant.requestedTime(),
+ commitInstant.getCompletionTime,
+ commitInstant.getState.name(),
+ commitInstant.getAction,
+ timelineType,
+ partitionPath,
+ fileId,
+ prevCommit,
+ numWrites,
+ numInserts,
+ numDeletes,
+ numUpdateWrites,
+ totalLogBlocks,
+ totalCorruptLogBlocks,
+ totalRollbackBlocks,
+ totalLogRecords,
+ totalUpdatedRecordsCompacted,
+ totalFilesAdded,
+ totalFilesUpdated,
+ numWrites,
+ numUpdateWrites,
+ totalBytesWritten,
+ totalErrors,
+ fileSize,
+ avgRecordSize,
+ extraMetadata
+ )
+ }
+ }
+ } else {
+ for (entry <-
commitMetadata.getPartitionToWriteStats.entrySet.asScala) {
+ val partitionPath = entry.getKey
+ val stats = entry.getValue.asScala
+
+ var totalFilesAdded = 0L
+ var totalFilesUpdated = 0L
+ var totalRecordsWritten = 0L
+ var totalRecordsUpdated = 0L
+ var totalBytesWritten = 0L
+ var totalErrors = 0L
+ var totalNumWrites = 0L
+ var totalNumInserts = 0L
+ var totalNumDeletes = 0L
+ var totalNumUpdateWrites = 0L
+ var totalLogBlocks = 0L
+ var totalCorruptLogBlocks = 0L
+ var totalRollbackBlocks = 0L
+ var totalLogRecords = 0L
+ var totalUpdatedRecordsCompacted = 0L
+ var totalFileSize = 0L
+
+ for (stat <- stats) {
+ if (stat.getPrevCommit == HoodieWriteStat.NULL_COMMIT) {
+ totalFilesAdded += 1
+ } else {
+ totalFilesUpdated += 1
+ }
+ totalNumWrites += stat.getNumWrites
+ totalNumInserts += stat.getNumInserts
+ totalNumDeletes += stat.getNumDeletes
+ totalNumUpdateWrites += stat.getNumUpdateWrites
+ totalLogBlocks += stat.getTotalLogBlocks
+ totalCorruptLogBlocks += stat.getTotalCorruptLogBlock
+ totalRollbackBlocks += stat.getTotalRollbackBlocks
+ totalLogRecords += stat.getTotalLogRecords
+ totalUpdatedRecordsCompacted +=
stat.getTotalUpdatedRecordsCompacted
+
+ totalRecordsWritten += stat.getNumWrites
+ totalRecordsUpdated += stat.getNumUpdateWrites
+ totalBytesWritten += stat.getTotalWriteBytes
+ totalErrors += stat.getTotalWriteErrors
+ totalFileSize += stat.getFileSizeInBytes
+ }
+
+ val avgRecordSize = if (totalNumWrites > 0) totalBytesWritten /
totalNumWrites else 0L
+
+ allRows += Row(
+ commitInstant.requestedTime(),
+ commitInstant.getCompletionTime,
+ commitInstant.getState.name(),
+ commitInstant.getAction,
+ timelineType,
+ partitionPath,
+ "*",
+ null,
+ totalNumWrites,
+ totalNumInserts,
+ totalNumDeletes,
+ totalNumUpdateWrites,
+ totalLogBlocks,
+ totalCorruptLogBlocks,
+ totalRollbackBlocks,
+ totalLogRecords,
+ totalUpdatedRecordsCompacted,
+ totalFilesAdded,
+ totalFilesUpdated,
+ totalRecordsWritten,
+ totalRecordsUpdated,
+ totalBytesWritten,
+ totalErrors,
+ totalFileSize,
+ avgRecordSize,
+ extraMetadata
+ )
+ }
+
+ if (commitMetadata.getPartitionToWriteStats.isEmpty) {
+ val totalFilesAdded = commitMetadata.fetchTotalFilesInsert
+ val totalFilesUpdated = commitMetadata.fetchTotalFilesUpdated
+ val totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten
+ val totalRecordsUpdated =
commitMetadata.fetchTotalUpdateRecordsWritten
+ val totalBytesWritten = commitMetadata.fetchTotalBytesWritten
+ val totalErrors = commitMetadata.fetchTotalWriteErrors
+ val avgRecordSize = if (totalRecordsWritten > 0)
totalBytesWritten / totalRecordsWritten else 0L
+
+ allRows += Row(
+ commitInstant.requestedTime(),
+ commitInstant.getCompletionTime,
+ commitInstant.getState.name(),
+ commitInstant.getAction,
+ timelineType,
+ null, // partitionPath
+ "*", // fileId
+ null, // prevCommit
+ totalRecordsWritten, // numWrites
+ totalFilesAdded, // numInserts
+ null, // numDeletes
+ totalRecordsUpdated, // numUpdateWrites
+ null, // totalLogBlocks
+ null, // totalCorruptLogBlocks
+ null, // totalRollbackBlocks
+ null, // totalLogRecords
+ null, // totalUpdatedRecordsCompacted
+ totalFilesAdded,
+ totalFilesUpdated,
+ totalRecordsWritten,
+ totalRecordsUpdated,
+ totalBytesWritten,
+ totalErrors,
+ null, // fileSize
+ avgRecordSize, // avgRecordSize
+ extraMetadata
+ )
+ }
+ }
+ }.recover {
+ case e: Exception =>
+ log.warn(s"Failed to read commit metadata for instant
${commitInstant.requestedTime}: ${e.getMessage}")
+ val row = createErrorRow(commitInstant)
+ allRows += row
+ }
+ } else {
+ val commitStats = extractCommitPlanStats(commitInstant, metaClient)
+ commitStats.involvedPartitions.foreach { partitionPath =>
+ allRows += Row(
+ commitInstant.requestedTime(),
+ null, // state_transition_time
+ commitInstant.getState.name(),
+ commitInstant.getAction,
+ timelineType,
+ partitionPath,
+ "*", // fileId
+ null, // prevCommit
+ 0L, // numWrites
+ 0L, // numInserts
+ 0L, // numDeletes
+ 0L, // numUpdateWrites
+ 0L, // totalLogBlocks
+ 0L, // totalCorruptLogBlocks
+ 0L, // totalRollbackBlocks
+ 0L, // totalLogRecords
+ 0L, // totalUpdatedRecordsCompacted
+ 0L, // totalFilesAdded
+ 0L, // totalFilesUpdated
+ 0L, // totalRecordsWritten
+ 0L, // totalRecordsUpdated
+ 0L, // totalBytesWritten
+ 0L, // totalErrors
+ 0L, // fileSize
+ 0L, // avgRecordSize
+ commitStats.extraMetadata
+ )
Review Comment:
Break into smaller functions will good for reviewer.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]