nsivabalan commented on code in PR #13745: URL: https://github.com/apache/hudi/pull/13745#discussion_r2330432997
########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String) + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites == 0) "INSERT" + else if (writeStat.getNumInserts == 0 && writeStat.getNumUpdateWrites > 0) "UPDATE" + else if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites > 0) "UPSERT" + else if (writeStat.getNumDeletes > 0) "DELETE" Review Comment: it may not be easy to reverse engineer the operation type. we could have deletes for an upsert operation as well. we could just curate an output string depending on which ones are non 0. "INSERT" "UPDATE" "INSERT_AND_UPDATE" "UPDATE_AND_DELETE" something like this. So, we will let the user interpret more w/ what we can surface. ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileGroupHistoryProcedure.scala: ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.table.HoodieTableMetaClient + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, StructType} + +import java.util +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +/** + * Spark SQL procedure to show the complete history of a specific file group in a Hudi table. + * + * This procedure displays comprehensive information about all operations performed on a specific file group, + * including commits, updates, deletions, replacements, and metadata changes in a detailed partition specific view. It tracks the lifecycle of + * files from creation through various modifications and eventual deletion or replacement. + * + * == Parameters == + * - `table`: Optional. The name of the Hudi table to query (mutually exclusive with `path`) + * - `path`: Optional. The base path of the Hudi table (mutually exclusive with `table`) + * - `fileGroupId`: Required. The unique identifier of the file group to track + * - `partition`: Optional. Specific partition to filter results (default: all partitions) + * - `showArchived`: Optional. Whether to include archived timeline data (default: false) + * - `limit`: Optional. Maximum number of history entries to return (default: 20) + * - `filter`: Optional. SQL expression to filter results (default: empty string) + * - `startTime`: Optional. Start timestamp for filtering results (format: yyyyMMddHHmmss) + * - `endTime`: Optional. End timestamp for filtering results (format: yyyyMMddHHmmss) + * + * == Output Schema == + * - `instant_time`: Timestamp when the operation was performed + * - `completion_time`: Time when the operation completed (null for pending operations) + * - `action`: The action type (commit, deltacommit, compaction, clustering, etc.) + * - `timeline_type`: Whether the data is from ACTIVE or ARCHIVED timeline + * - `state`: Current state of the operation (REQUESTED, INFLIGHT, COMPLETED) + * - `partition_path`: Partition path where the file group resides + * - `file_name`: Name of the file in the file group + * - `operation_type`: Type of write operation (INSERT, UPDATE, UPSERT, DELETE) + * - `num_writes`: Total number of records written in this operation + * - `num_inserts`: Number of new records inserted + * - `num_updates`: Number of existing records updated + * - `num_deletes`: Number of records deleted + * - `file_size_bytes`: Size of the file in bytes + * - `total_write_bytes`: Total bytes written during the operation + * - `prev_commit`: Previous commit timestamp that this operation builds upon + * - `was_deleted`: Whether the file was deleted in a subsequent operation Review Comment: can we call out, whether this will be honored even w/ start and end filters not really covering some fo the future instants(where the file was actually replaced) or will this only work within the start and end range. ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String) + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { Review Comment: Can you try running this procedure against v6 table created using 0.14.1 hudi. Esply w/ clustering operation, insert_overwrite operations in the timeline and validate the tool works as expected. ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String) + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites == 0) "INSERT" + else if (writeStat.getNumInserts == 0 && writeStat.getNumUpdateWrites > 0) "UPDATE" + else if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites > 0) "UPSERT" + else if (writeStat.getNumDeletes > 0) "DELETE" + else "UNKNOWN" + } + + def processTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry], + limit: Int, + startTime: String, + endTime: String): Unit = { + + val writeTimeline = timeline.getWriteTimeline + val instants = writeTimeline.getInstants.asScala.toSeq + .filter { instant => + val instantTime = instant.requestedTime() + val withinStartTime = if (startTime.nonEmpty) instantTime >= startTime else true + val withinEndTime = if (endTime.nonEmpty) instantTime <= endTime else true + withinStartTime && withinEndTime + } + val filteredInstants = instants.sortWith((a, b) => a.requestedTime() > b.requestedTime()) + + if (startTime.nonEmpty && endTime.nonEmpty) { + for (instant <- filteredInstants) { + try { + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } else { + var foundCount = 0 + for (instant <- filteredInstants if foundCount < limit) { + try { + val initialSize = entries.size() + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + if (entries.size() > initialSize) { + foundCount += 1 + } + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } + } + + private def processInstant(timeline: HoodieTimeline, + instant: HoodieInstant, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry]): Unit = { + + if (instant.getState == HoodieInstant.State.INFLIGHT || instant.getState == HoodieInstant.State.REQUESTED) { + val entry = createInflightRequestedEntry(instant, targetPartition, timelineType) + entries.add(entry) + } + + val commitMetadata = readCommitMetadata(timeline, instant) + if (commitMetadata.isDefined) { + val metadata = commitMetadata.get + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(metadata.getPartitionToWriteStats.keySet().asScala.toSet) + + for (partitionPath <- partitionsToCheck) { + val writeStatsOpt = Option(metadata.getPartitionToWriteStats.get(partitionPath)) + if (writeStatsOpt.isDefined) { + val writeStats = writeStatsOpt.get.asScala + for (writeStat <- writeStats) { + if (writeStat.getFileId == fileGroupId) { + val entry = createHistoryEntry(instant, writeStat, partitionPath, timelineType) + entries.add(entry) + } + } + } + } + } + } + + def createHistoryEntry(instant: HoodieInstant, + writeStat: HoodieWriteStat, + targetPartition: String, + timelineType: String): HistoryEntry = { + + val fileName = Option(writeStat.getPath).map(_.split("/").last).getOrElse("") + val operationType = determineOperationType(writeStat) + + val runtimeStats = Option(writeStat.getRuntimeStats) + val scanTime = runtimeStats.map(_.getTotalScanTime).getOrElse(0L) + val upsertTime = runtimeStats.map(_.getTotalUpsertTime).getOrElse(0L) + val createTime = runtimeStats.map(_.getTotalCreateTime).getOrElse(0L) + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition, + fileName = fileName, + operationType = operationType, + numWrites = writeStat.getNumWrites, + numInserts = writeStat.getNumInserts, + numUpdates = writeStat.getNumUpdateWrites, + numDeletes = writeStat.getNumDeletes, + fileSizeBytes = writeStat.getFileSizeInBytes, + totalWriteBytes = writeStat.getTotalWriteBytes, + prevCommit = writeStat.getPrevCommit, + totalWriteErrors = writeStat.getTotalWriteErrors, + totalScanTimeMs = scanTime, + totalUpsertTimeMs = upsertTime, + totalCreateTimeMs = createTime, + prevBaseFile = Option(writeStat.getPrevBaseFile), + columnStatsAvailable = writeStat.getColumnStats.isPresent + ) + } + + def createInflightRequestedEntry(instant: HoodieInstant, + targetPartition: Option[String], + timelineType: String): HistoryEntry = { + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition.getOrElse(""), + fileName = "", + operationType = "UNKNOWN", + numWrites = -1, + numInserts = -1, + numUpdates = -1, + numDeletes = -1, + fileSizeBytes = -1, + totalWriteBytes = -1, + prevCommit = "", + totalWriteErrors = -1, + totalScanTimeMs = -1, + totalUpsertTimeMs = -1, + totalCreateTimeMs = -1, + prevBaseFile = None, + columnStatsAvailable = false + ) + } + + def checkForDeletionsAndReplacements(metaClient: HoodieTableMetaClient, + fileGroupId: String, + targetPartition: Option[String], + showArchived: Boolean): (Map[String, DeletionInfo], Map[String, ReplacementInfo]) = { + + val deletions = scala.collection.mutable.Map[String, DeletionInfo]() + val replacements = scala.collection.mutable.Map[String, ReplacementInfo]() + + checkDeletionsAndReplacementsInTimeline(metaClient.getActiveTimeline, fileGroupId, targetPartition, "ACTIVE", deletions, replacements) + + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() + checkDeletionsAndReplacementsInTimeline(archivedTimeline, fileGroupId, targetPartition, "ARCHIVED", deletions, replacements) + } catch { + case e: Exception => + log.warn(s"Failed to check deletions in archived timeline: ${e.getMessage}") + } + } + + (deletions.toMap, replacements.toMap) + } + + def checkDeletionsAndReplacementsInTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + deletions: scala.collection.mutable.Map[String, DeletionInfo], + replacements: scala.collection.mutable.Map[String, ReplacementInfo]): Unit = { + + val cleanInstants = timeline.getCleanerTimeline.getInstants.iterator().asScala + for (instant <- cleanInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val cleanMetadata = timeline.readCleanMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(cleanMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(cleanMetadata.getPartitionMetadata.get(partitionPath)) + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala + if matchesDeletedFileGroup(deletedFile, fileGroupId) + } { + val deletedFileName = extractActualFileName(deletedFile) + deletions(deletedFileName) = DeletionInfo(HoodieTimeline.CLEAN_ACTION, instant.requestedTime, timelineType) + } + } + } catch { + case e: Exception => + log.warn(s"Failed to process clean instant ${instant.requestedTime}: ${e.getMessage}") + } + } + + val rollbackInstants = timeline.getRollbackTimeline.getInstants.iterator().asScala + for (instant <- rollbackInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val rollbackMetadata = timeline.readRollbackMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(rollbackMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(rollbackMetadata.getPartitionMetadata.get(partitionPath)) + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala Review Comment: same comment as above ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String) + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites == 0) "INSERT" + else if (writeStat.getNumInserts == 0 && writeStat.getNumUpdateWrites > 0) "UPDATE" + else if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites > 0) "UPSERT" + else if (writeStat.getNumDeletes > 0) "DELETE" + else "UNKNOWN" + } + + def processTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry], + limit: Int, + startTime: String, + endTime: String): Unit = { + + val writeTimeline = timeline.getWriteTimeline + val instants = writeTimeline.getInstants.asScala.toSeq + .filter { instant => + val instantTime = instant.requestedTime() + val withinStartTime = if (startTime.nonEmpty) instantTime >= startTime else true + val withinEndTime = if (endTime.nonEmpty) instantTime <= endTime else true + withinStartTime && withinEndTime + } + val filteredInstants = instants.sortWith((a, b) => a.requestedTime() > b.requestedTime()) + + if (startTime.nonEmpty && endTime.nonEmpty) { + for (instant <- filteredInstants) { + try { + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } else { + var foundCount = 0 + for (instant <- filteredInstants if foundCount < limit) { + try { + val initialSize = entries.size() + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + if (entries.size() > initialSize) { + foundCount += 1 + } + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } + } + + private def processInstant(timeline: HoodieTimeline, + instant: HoodieInstant, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry]): Unit = { + + if (instant.getState == HoodieInstant.State.INFLIGHT || instant.getState == HoodieInstant.State.REQUESTED) { + val entry = createInflightRequestedEntry(instant, targetPartition, timelineType) + entries.add(entry) + } + + val commitMetadata = readCommitMetadata(timeline, instant) + if (commitMetadata.isDefined) { + val metadata = commitMetadata.get + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(metadata.getPartitionToWriteStats.keySet().asScala.toSet) + + for (partitionPath <- partitionsToCheck) { + val writeStatsOpt = Option(metadata.getPartitionToWriteStats.get(partitionPath)) + if (writeStatsOpt.isDefined) { + val writeStats = writeStatsOpt.get.asScala + for (writeStat <- writeStats) { + if (writeStat.getFileId == fileGroupId) { + val entry = createHistoryEntry(instant, writeStat, partitionPath, timelineType) + entries.add(entry) + } + } + } + } + } + } + + def createHistoryEntry(instant: HoodieInstant, + writeStat: HoodieWriteStat, + targetPartition: String, + timelineType: String): HistoryEntry = { + + val fileName = Option(writeStat.getPath).map(_.split("/").last).getOrElse("") + val operationType = determineOperationType(writeStat) + + val runtimeStats = Option(writeStat.getRuntimeStats) + val scanTime = runtimeStats.map(_.getTotalScanTime).getOrElse(0L) + val upsertTime = runtimeStats.map(_.getTotalUpsertTime).getOrElse(0L) + val createTime = runtimeStats.map(_.getTotalCreateTime).getOrElse(0L) + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition, + fileName = fileName, + operationType = operationType, + numWrites = writeStat.getNumWrites, + numInserts = writeStat.getNumInserts, + numUpdates = writeStat.getNumUpdateWrites, + numDeletes = writeStat.getNumDeletes, + fileSizeBytes = writeStat.getFileSizeInBytes, + totalWriteBytes = writeStat.getTotalWriteBytes, + prevCommit = writeStat.getPrevCommit, + totalWriteErrors = writeStat.getTotalWriteErrors, + totalScanTimeMs = scanTime, + totalUpsertTimeMs = upsertTime, + totalCreateTimeMs = createTime, + prevBaseFile = Option(writeStat.getPrevBaseFile), + columnStatsAvailable = writeStat.getColumnStats.isPresent + ) + } + + def createInflightRequestedEntry(instant: HoodieInstant, + targetPartition: Option[String], + timelineType: String): HistoryEntry = { + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition.getOrElse(""), + fileName = "", + operationType = "UNKNOWN", + numWrites = -1, + numInserts = -1, + numUpdates = -1, + numDeletes = -1, + fileSizeBytes = -1, + totalWriteBytes = -1, + prevCommit = "", + totalWriteErrors = -1, + totalScanTimeMs = -1, + totalUpsertTimeMs = -1, + totalCreateTimeMs = -1, + prevBaseFile = None, + columnStatsAvailable = false + ) + } + + def checkForDeletionsAndReplacements(metaClient: HoodieTableMetaClient, + fileGroupId: String, + targetPartition: Option[String], + showArchived: Boolean): (Map[String, DeletionInfo], Map[String, ReplacementInfo]) = { + + val deletions = scala.collection.mutable.Map[String, DeletionInfo]() + val replacements = scala.collection.mutable.Map[String, ReplacementInfo]() + + checkDeletionsAndReplacementsInTimeline(metaClient.getActiveTimeline, fileGroupId, targetPartition, "ACTIVE", deletions, replacements) + + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() + checkDeletionsAndReplacementsInTimeline(archivedTimeline, fileGroupId, targetPartition, "ARCHIVED", deletions, replacements) + } catch { + case e: Exception => + log.warn(s"Failed to check deletions in archived timeline: ${e.getMessage}") + } + } + + (deletions.toMap, replacements.toMap) + } + + def checkDeletionsAndReplacementsInTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + deletions: scala.collection.mutable.Map[String, DeletionInfo], + replacements: scala.collection.mutable.Map[String, ReplacementInfo]): Unit = { + + val cleanInstants = timeline.getCleanerTimeline.getInstants.iterator().asScala + for (instant <- cleanInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val cleanMetadata = timeline.readCleanMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(cleanMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(cleanMetadata.getPartitionMetadata.get(partitionPath)) + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala + if matchesDeletedFileGroup(deletedFile, fileGroupId) + } { + val deletedFileName = extractActualFileName(deletedFile) + deletions(deletedFileName) = DeletionInfo(HoodieTimeline.CLEAN_ACTION, instant.requestedTime, timelineType) + } + } + } catch { + case e: Exception => + log.warn(s"Failed to process clean instant ${instant.requestedTime}: ${e.getMessage}") + } + } + + val rollbackInstants = timeline.getRollbackTimeline.getInstants.iterator().asScala Review Comment: not required right away. can you file a follow up to add `restore` instants processing as well as part of these procedures ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String) + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites == 0) "INSERT" + else if (writeStat.getNumInserts == 0 && writeStat.getNumUpdateWrites > 0) "UPDATE" + else if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites > 0) "UPSERT" + else if (writeStat.getNumDeletes > 0) "DELETE" + else "UNKNOWN" + } + + def processTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry], + limit: Int, + startTime: String, + endTime: String): Unit = { + + val writeTimeline = timeline.getWriteTimeline + val instants = writeTimeline.getInstants.asScala.toSeq + .filter { instant => + val instantTime = instant.requestedTime() + val withinStartTime = if (startTime.nonEmpty) instantTime >= startTime else true + val withinEndTime = if (endTime.nonEmpty) instantTime <= endTime else true + withinStartTime && withinEndTime + } + val filteredInstants = instants.sortWith((a, b) => a.requestedTime() > b.requestedTime()) + + if (startTime.nonEmpty && endTime.nonEmpty) { + for (instant <- filteredInstants) { + try { + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } else { + var foundCount = 0 + for (instant <- filteredInstants if foundCount < limit) { + try { + val initialSize = entries.size() + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + if (entries.size() > initialSize) { + foundCount += 1 + } + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } + } + + private def processInstant(timeline: HoodieTimeline, + instant: HoodieInstant, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry]): Unit = { + + if (instant.getState == HoodieInstant.State.INFLIGHT || instant.getState == HoodieInstant.State.REQUESTED) { + val entry = createInflightRequestedEntry(instant, targetPartition, timelineType) + entries.add(entry) + } + + val commitMetadata = readCommitMetadata(timeline, instant) + if (commitMetadata.isDefined) { + val metadata = commitMetadata.get + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(metadata.getPartitionToWriteStats.keySet().asScala.toSet) + + for (partitionPath <- partitionsToCheck) { + val writeStatsOpt = Option(metadata.getPartitionToWriteStats.get(partitionPath)) + if (writeStatsOpt.isDefined) { + val writeStats = writeStatsOpt.get.asScala + for (writeStat <- writeStats) { + if (writeStat.getFileId == fileGroupId) { + val entry = createHistoryEntry(instant, writeStat, partitionPath, timelineType) + entries.add(entry) + } + } + } + } + } + } + + def createHistoryEntry(instant: HoodieInstant, + writeStat: HoodieWriteStat, + targetPartition: String, + timelineType: String): HistoryEntry = { + + val fileName = Option(writeStat.getPath).map(_.split("/").last).getOrElse("") + val operationType = determineOperationType(writeStat) + + val runtimeStats = Option(writeStat.getRuntimeStats) + val scanTime = runtimeStats.map(_.getTotalScanTime).getOrElse(0L) + val upsertTime = runtimeStats.map(_.getTotalUpsertTime).getOrElse(0L) + val createTime = runtimeStats.map(_.getTotalCreateTime).getOrElse(0L) + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition, + fileName = fileName, + operationType = operationType, + numWrites = writeStat.getNumWrites, + numInserts = writeStat.getNumInserts, + numUpdates = writeStat.getNumUpdateWrites, + numDeletes = writeStat.getNumDeletes, + fileSizeBytes = writeStat.getFileSizeInBytes, + totalWriteBytes = writeStat.getTotalWriteBytes, + prevCommit = writeStat.getPrevCommit, + totalWriteErrors = writeStat.getTotalWriteErrors, + totalScanTimeMs = scanTime, + totalUpsertTimeMs = upsertTime, + totalCreateTimeMs = createTime, + prevBaseFile = Option(writeStat.getPrevBaseFile), + columnStatsAvailable = writeStat.getColumnStats.isPresent + ) + } + + def createInflightRequestedEntry(instant: HoodieInstant, + targetPartition: Option[String], + timelineType: String): HistoryEntry = { + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition.getOrElse(""), + fileName = "", + operationType = "UNKNOWN", + numWrites = -1, + numInserts = -1, + numUpdates = -1, + numDeletes = -1, + fileSizeBytes = -1, + totalWriteBytes = -1, + prevCommit = "", + totalWriteErrors = -1, + totalScanTimeMs = -1, + totalUpsertTimeMs = -1, + totalCreateTimeMs = -1, + prevBaseFile = None, + columnStatsAvailable = false + ) + } + + def checkForDeletionsAndReplacements(metaClient: HoodieTableMetaClient, + fileGroupId: String, + targetPartition: Option[String], + showArchived: Boolean): (Map[String, DeletionInfo], Map[String, ReplacementInfo]) = { + + val deletions = scala.collection.mutable.Map[String, DeletionInfo]() + val replacements = scala.collection.mutable.Map[String, ReplacementInfo]() + + checkDeletionsAndReplacementsInTimeline(metaClient.getActiveTimeline, fileGroupId, targetPartition, "ACTIVE", deletions, replacements) + + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() + checkDeletionsAndReplacementsInTimeline(archivedTimeline, fileGroupId, targetPartition, "ARCHIVED", deletions, replacements) + } catch { + case e: Exception => + log.warn(s"Failed to check deletions in archived timeline: ${e.getMessage}") + } + } + + (deletions.toMap, replacements.toMap) + } + + def checkDeletionsAndReplacementsInTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + deletions: scala.collection.mutable.Map[String, DeletionInfo], + replacements: scala.collection.mutable.Map[String, ReplacementInfo]): Unit = { + + val cleanInstants = timeline.getCleanerTimeline.getInstants.iterator().asScala + for (instant <- cleanInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val cleanMetadata = timeline.readCleanMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(cleanMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(cleanMetadata.getPartitionMetadata.get(partitionPath)) + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala + if matchesDeletedFileGroup(deletedFile, fileGroupId) + } { + val deletedFileName = extractActualFileName(deletedFile) + deletions(deletedFileName) = DeletionInfo(HoodieTimeline.CLEAN_ACTION, instant.requestedTime, timelineType) + } + } + } catch { + case e: Exception => + log.warn(s"Failed to process clean instant ${instant.requestedTime}: ${e.getMessage}") + } + } + + val rollbackInstants = timeline.getRollbackTimeline.getInstants.iterator().asScala + for (instant <- rollbackInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val rollbackMetadata = timeline.readRollbackMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(rollbackMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(rollbackMetadata.getPartitionMetadata.get(partitionPath)) + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala + if matchesDeletedFileGroup(deletedFile, fileGroupId) + } { + val deletedFileName = extractActualFileName(deletedFile) + deletions(deletedFileName) = DeletionInfo(HoodieTimeline.ROLLBACK_ACTION, instant.requestedTime, timelineType) + } + } + } catch { + case e: Exception => + log.warn(s"Failed to process rollback instant ${instant.requestedTime}: ${e.getMessage}") + } + } + + val replacementInstants = timeline.getCommitsAndCompactionTimeline.getInstants.iterator().asScala Review Comment: I guess I get what you are looking to do here. we are looking for any action that could have replaced a given data file. only replace commits will have information about the replacements. wrt other operations like compaction, we may not have clear indication as to which file was replaced. and we should not even call those as replaced. by `replaced`, we should only interpret the ones replaced by `replace commit` actions and not any other operations. we can sync up f2f if you have any further questions/clarifications. ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String) + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites == 0) "INSERT" + else if (writeStat.getNumInserts == 0 && writeStat.getNumUpdateWrites > 0) "UPDATE" + else if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites > 0) "UPSERT" + else if (writeStat.getNumDeletes > 0) "DELETE" + else "UNKNOWN" + } + + def processTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry], + limit: Int, + startTime: String, + endTime: String): Unit = { + + val writeTimeline = timeline.getWriteTimeline + val instants = writeTimeline.getInstants.asScala.toSeq + .filter { instant => + val instantTime = instant.requestedTime() + val withinStartTime = if (startTime.nonEmpty) instantTime >= startTime else true + val withinEndTime = if (endTime.nonEmpty) instantTime <= endTime else true + withinStartTime && withinEndTime + } + val filteredInstants = instants.sortWith((a, b) => a.requestedTime() > b.requestedTime()) Review Comment: can we revisit the control flow and ensure we don't keep sorting everywhere. I see we sort here, and then I also saw we sort at the caller end. Can we keep it to one place. ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileGroupHistoryProcedure.scala: ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.table.HoodieTableMetaClient + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, StructType} + +import java.util +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +/** + * Spark SQL procedure to show the complete history of a specific file group in a Hudi table. + * + * This procedure displays comprehensive information about all operations performed on a specific file group, + * including commits, updates, deletions, replacements, and metadata changes in a detailed partition specific view. It tracks the lifecycle of + * files from creation through various modifications and eventual deletion or replacement. + * + * == Parameters == + * - `table`: Optional. The name of the Hudi table to query (mutually exclusive with `path`) + * - `path`: Optional. The base path of the Hudi table (mutually exclusive with `table`) + * - `fileGroupId`: Required. The unique identifier of the file group to track + * - `partition`: Optional. Specific partition to filter results (default: all partitions) + * - `showArchived`: Optional. Whether to include archived timeline data (default: false) + * - `limit`: Optional. Maximum number of history entries to return (default: 20) + * - `filter`: Optional. SQL expression to filter results (default: empty string) + * - `startTime`: Optional. Start timestamp for filtering results (format: yyyyMMddHHmmss) + * - `endTime`: Optional. End timestamp for filtering results (format: yyyyMMddHHmmss) + * + * == Output Schema == + * - `instant_time`: Timestamp when the operation was performed + * - `completion_time`: Time when the operation completed (null for pending operations) + * - `action`: The action type (commit, deltacommit, compaction, clustering, etc.) + * - `timeline_type`: Whether the data is from ACTIVE or ARCHIVED timeline + * - `state`: Current state of the operation (REQUESTED, INFLIGHT, COMPLETED) + * - `partition_path`: Partition path where the file group resides + * - `file_name`: Name of the file in the file group + * - `operation_type`: Type of write operation (INSERT, UPDATE, UPSERT, DELETE) + * - `num_writes`: Total number of records written in this operation + * - `num_inserts`: Number of new records inserted + * - `num_updates`: Number of existing records updated + * - `num_deletes`: Number of records deleted + * - `file_size_bytes`: Size of the file in bytes + * - `total_write_bytes`: Total bytes written during the operation + * - `prev_commit`: Previous commit timestamp that this operation builds upon + * - `was_deleted`: Whether the file was deleted in a subsequent operation + * - `delete_action`: Action that caused the deletion (clean, rollback, etc.) + * - `delete_instant`: Timestamp when the deletion occurred + * - `is_replaced`: Whether the file was replaced in a subsequent operation Review Comment: same as above. can we call out, whether this will be honored even w/ start and end filters not really covering some fo the future instants(where the file was actually replaced) or will this only work within the start and end range. ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileGroupHistoryProcedure.scala: ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.table.HoodieTableMetaClient + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, StructType} + +import java.util +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +/** + * Spark SQL procedure to show the complete history of a specific file group in a Hudi table. + * + * This procedure displays comprehensive information about all operations performed on a specific file group, + * including commits, updates, deletions, replacements, and metadata changes in a detailed partition specific view. It tracks the lifecycle of + * files from creation through various modifications and eventual deletion or replacement. + * + * == Parameters == + * - `table`: Optional. The name of the Hudi table to query (mutually exclusive with `path`) + * - `path`: Optional. The base path of the Hudi table (mutually exclusive with `table`) + * - `fileGroupId`: Required. The unique identifier of the file group to track + * - `partition`: Optional. Specific partition to filter results (default: all partitions) + * - `showArchived`: Optional. Whether to include archived timeline data (default: false) + * - `limit`: Optional. Maximum number of history entries to return (default: 20) + * - `filter`: Optional. SQL expression to filter results (default: empty string) + * - `startTime`: Optional. Start timestamp for filtering results (format: yyyyMMddHHmmss) + * - `endTime`: Optional. End timestamp for filtering results (format: yyyyMMddHHmmss) + * + * == Output Schema == + * - `instant_time`: Timestamp when the operation was performed + * - `completion_time`: Time when the operation completed (null for pending operations) + * - `action`: The action type (commit, deltacommit, compaction, clustering, etc.) + * - `timeline_type`: Whether the data is from ACTIVE or ARCHIVED timeline + * - `state`: Current state of the operation (REQUESTED, INFLIGHT, COMPLETED) + * - `partition_path`: Partition path where the file group resides + * - `file_name`: Name of the file in the file group + * - `operation_type`: Type of write operation (INSERT, UPDATE, UPSERT, DELETE) + * - `num_writes`: Total number of records written in this operation + * - `num_inserts`: Number of new records inserted + * - `num_updates`: Number of existing records updated + * - `num_deletes`: Number of records deleted + * - `file_size_bytes`: Size of the file in bytes + * - `total_write_bytes`: Total bytes written during the operation + * - `prev_commit`: Previous commit timestamp that this operation builds upon + * - `was_deleted`: Whether the file was deleted in a subsequent operation + * - `delete_action`: Action that caused the deletion (clean, rollback, etc.) + * - `delete_instant`: Timestamp when the deletion occurred + * - `is_replaced`: Whether the file was replaced in a subsequent operation + * - `replace_action`: Action that caused the replacement (compaction, clustering, etc.) + * - `replace_instant`: Timestamp when the replacement occurred + * - `total_write_errors`: Number of write errors encountered + * - `total_scan_time_ms`: Total time spent scanning during the operation + * - `total_upsert_time_ms`: Total time spent in upsert processing + * - `total_create_time_ms`: Total time spent in file creation + * - `prev_base_file`: Previous base file that was replaced (for compaction/clustering) + * - `column_stats_available`: Whether column statistics are available for this file + * + * == Error Handling == + * - Throws `IllegalArgumentException` for invalid filter expressions or missing fileGroupId + * - Throws `HoodieException` for table access issues or invalid file group identifiers + * - Returns empty result set if no file group history matches the criteria + * - Gracefully handles archived timeline access failures with warning logs + * + * == Filter Support == + * The `filter` parameter supports SQL expressions for filtering results on any output column. + * The filter uses Spark SQL syntax and supports various data types and operations. + * + * == Usage Examples == + * {{{ + * -- Basic usage: Show file group history + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123' + * ) + * + * -- Show history with custom limit + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123', + * limit => 50 + * ) + * + * -- Show history for specific partition (partitioned to datetime column here) + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123', + * partition => '2025/08/28' + * ) + * + * -- Include archived timeline data + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123', + * showArchived => true + * ) + * + * -- Filter for specific operation types + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123', + * filter => "operation_type = 'INSERT'" + * ) + * }}} + * + * @see [[ShowFileHistoryProcedureUtils]] for underlying utility methods + * @see [[HoodieProcedureFilterUtils]] for detailed filter expression syntax + */ +class ShowFileGroupHistoryProcedure extends BaseProcedure with ProcedureBuilder with Logging { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.optional(0, "table", DataTypes.StringType), + ProcedureParameter.optional(1, "path", DataTypes.StringType), + ProcedureParameter.required(2, "fileGroupId", DataTypes.StringType), + ProcedureParameter.optional(3, "partition", DataTypes.StringType), + ProcedureParameter.optional(4, "showArchived", DataTypes.BooleanType, false), + ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 20), + ProcedureParameter.optional(6, "filter", DataTypes.StringType, ""), + ProcedureParameter.optional(7, "startTime", DataTypes.StringType, ""), + ProcedureParameter.optional(8, "endTime", DataTypes.StringType, "") + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = ShowFileHistoryProcedureUtils.OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) + val fileGroupId = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String] + val partition = getArgValueOrDefault(args, PARAMETERS(3)).asInstanceOf[Option[String]] + val showArchived = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean] + val limit = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[Int] + val filter = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[String] + val startTime = getArgValueOrDefault(args, PARAMETERS(7)).get.asInstanceOf[String] + val endTime = getArgValueOrDefault(args, PARAMETERS(8)).get.asInstanceOf[String] + + if (filter != null && filter.trim.nonEmpty) { + HoodieProcedureFilterUtils.validateFilterExpression(filter, outputType, sparkSession) match { + case Left(errorMessage) => + throw new IllegalArgumentException(s"Invalid filter expression: $errorMessage") + case Right(_) => // Validation passed, continue + } + } + + val basePath = getBasePath(tableName, tablePath) + val metaClient = createMetaClient(jsc, basePath) + + val fileGroupHistory = collectFileGroupHistory(metaClient, fileGroupId, partition, showArchived, limit, startTime, endTime) + + if (filter != null && filter.trim.nonEmpty) { + HoodieProcedureFilterUtils.evaluateFilter(fileGroupHistory, filter, outputType, sparkSession) + } else { + fileGroupHistory + } + } + + private def collectFileGroupHistory(metaClient: HoodieTableMetaClient, + fileGroupId: String, + partition: Option[String], + showArchived: Boolean, + limit: Int, + startTime: String, + endTime: String): Seq[Row] = { + + import ShowFileHistoryProcedureUtils._ + + val activeEntries = new util.ArrayList[HistoryEntry]() + val activeTimeline = metaClient.getActiveTimeline + ShowFileHistoryProcedureUtils.processTimeline(activeTimeline, fileGroupId, partition, "ACTIVE", activeEntries, limit, startTime, endTime) + + val archivedEntries = new util.ArrayList[HistoryEntry]() + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() + ShowFileHistoryProcedureUtils.processTimeline(archivedTimeline, fileGroupId, partition, "ARCHIVED", archivedEntries, limit, startTime, endTime) + } catch { + case e: Exception => + log.warn(s"Failed to process archived timeline: ${e.getMessage}") + } + } + + val allEntries = (activeEntries.asScala ++ archivedEntries.asScala).toList + val sortedEntries = allEntries Review Comment: archive could contain requested, inflight and completed right. so, in addition to "instant time" based ordering, should we also include the state while sorting? ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String) + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites == 0) "INSERT" + else if (writeStat.getNumInserts == 0 && writeStat.getNumUpdateWrites > 0) "UPDATE" + else if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites > 0) "UPSERT" + else if (writeStat.getNumDeletes > 0) "DELETE" + else "UNKNOWN" + } + + def processTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry], + limit: Int, + startTime: String, + endTime: String): Unit = { + + val writeTimeline = timeline.getWriteTimeline + val instants = writeTimeline.getInstants.asScala.toSeq + .filter { instant => + val instantTime = instant.requestedTime() + val withinStartTime = if (startTime.nonEmpty) instantTime >= startTime else true + val withinEndTime = if (endTime.nonEmpty) instantTime <= endTime else true + withinStartTime && withinEndTime + } + val filteredInstants = instants.sortWith((a, b) => a.requestedTime() > b.requestedTime()) + + if (startTime.nonEmpty && endTime.nonEmpty) { + for (instant <- filteredInstants) { + try { + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } else { + var foundCount = 0 + for (instant <- filteredInstants if foundCount < limit) { + try { + val initialSize = entries.size() + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + if (entries.size() > initialSize) { + foundCount += 1 + } + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } + } + + private def processInstant(timeline: HoodieTimeline, + instant: HoodieInstant, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry]): Unit = { + + if (instant.getState == HoodieInstant.State.INFLIGHT || instant.getState == HoodieInstant.State.REQUESTED) { + val entry = createInflightRequestedEntry(instant, targetPartition, timelineType) + entries.add(entry) + } + + val commitMetadata = readCommitMetadata(timeline, instant) + if (commitMetadata.isDefined) { + val metadata = commitMetadata.get + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(metadata.getPartitionToWriteStats.keySet().asScala.toSet) + + for (partitionPath <- partitionsToCheck) { + val writeStatsOpt = Option(metadata.getPartitionToWriteStats.get(partitionPath)) + if (writeStatsOpt.isDefined) { + val writeStats = writeStatsOpt.get.asScala + for (writeStat <- writeStats) { + if (writeStat.getFileId == fileGroupId) { + val entry = createHistoryEntry(instant, writeStat, partitionPath, timelineType) + entries.add(entry) + } + } + } + } + } + } + + def createHistoryEntry(instant: HoodieInstant, + writeStat: HoodieWriteStat, + targetPartition: String, + timelineType: String): HistoryEntry = { + + val fileName = Option(writeStat.getPath).map(_.split("/").last).getOrElse("") + val operationType = determineOperationType(writeStat) + + val runtimeStats = Option(writeStat.getRuntimeStats) + val scanTime = runtimeStats.map(_.getTotalScanTime).getOrElse(0L) + val upsertTime = runtimeStats.map(_.getTotalUpsertTime).getOrElse(0L) + val createTime = runtimeStats.map(_.getTotalCreateTime).getOrElse(0L) + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition, + fileName = fileName, + operationType = operationType, + numWrites = writeStat.getNumWrites, + numInserts = writeStat.getNumInserts, + numUpdates = writeStat.getNumUpdateWrites, + numDeletes = writeStat.getNumDeletes, + fileSizeBytes = writeStat.getFileSizeInBytes, + totalWriteBytes = writeStat.getTotalWriteBytes, + prevCommit = writeStat.getPrevCommit, + totalWriteErrors = writeStat.getTotalWriteErrors, + totalScanTimeMs = scanTime, + totalUpsertTimeMs = upsertTime, + totalCreateTimeMs = createTime, + prevBaseFile = Option(writeStat.getPrevBaseFile), + columnStatsAvailable = writeStat.getColumnStats.isPresent + ) + } + + def createInflightRequestedEntry(instant: HoodieInstant, + targetPartition: Option[String], + timelineType: String): HistoryEntry = { + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition.getOrElse(""), + fileName = "", + operationType = "UNKNOWN", + numWrites = -1, + numInserts = -1, + numUpdates = -1, + numDeletes = -1, + fileSizeBytes = -1, + totalWriteBytes = -1, + prevCommit = "", + totalWriteErrors = -1, + totalScanTimeMs = -1, + totalUpsertTimeMs = -1, + totalCreateTimeMs = -1, + prevBaseFile = None, + columnStatsAvailable = false + ) + } + + def checkForDeletionsAndReplacements(metaClient: HoodieTableMetaClient, + fileGroupId: String, + targetPartition: Option[String], + showArchived: Boolean): (Map[String, DeletionInfo], Map[String, ReplacementInfo]) = { + + val deletions = scala.collection.mutable.Map[String, DeletionInfo]() + val replacements = scala.collection.mutable.Map[String, ReplacementInfo]() + + checkDeletionsAndReplacementsInTimeline(metaClient.getActiveTimeline, fileGroupId, targetPartition, "ACTIVE", deletions, replacements) + + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() + checkDeletionsAndReplacementsInTimeline(archivedTimeline, fileGroupId, targetPartition, "ARCHIVED", deletions, replacements) + } catch { + case e: Exception => + log.warn(s"Failed to check deletions in archived timeline: ${e.getMessage}") + } + } + + (deletions.toMap, replacements.toMap) + } + + def checkDeletionsAndReplacementsInTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + deletions: scala.collection.mutable.Map[String, DeletionInfo], + replacements: scala.collection.mutable.Map[String, ReplacementInfo]): Unit = { + + val cleanInstants = timeline.getCleanerTimeline.getInstants.iterator().asScala + for (instant <- cleanInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val cleanMetadata = timeline.readCleanMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(cleanMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(cleanMetadata.getPartitionMetadata.get(partitionPath)) + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala Review Comment: lets check even failed deleted files as well. we should somehow surface this to the user when using this cli. bcoz, w/ metadata table enabled, even failed deletes will be accounted for. ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String) + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites == 0) "INSERT" + else if (writeStat.getNumInserts == 0 && writeStat.getNumUpdateWrites > 0) "UPDATE" + else if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites > 0) "UPSERT" + else if (writeStat.getNumDeletes > 0) "DELETE" + else "UNKNOWN" + } + + def processTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry], + limit: Int, + startTime: String, + endTime: String): Unit = { + + val writeTimeline = timeline.getWriteTimeline + val instants = writeTimeline.getInstants.asScala.toSeq + .filter { instant => + val instantTime = instant.requestedTime() + val withinStartTime = if (startTime.nonEmpty) instantTime >= startTime else true + val withinEndTime = if (endTime.nonEmpty) instantTime <= endTime else true + withinStartTime && withinEndTime + } + val filteredInstants = instants.sortWith((a, b) => a.requestedTime() > b.requestedTime()) + + if (startTime.nonEmpty && endTime.nonEmpty) { + for (instant <- filteredInstants) { + try { + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } else { + var foundCount = 0 + for (instant <- filteredInstants if foundCount < limit) { + try { + val initialSize = entries.size() + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + if (entries.size() > initialSize) { + foundCount += 1 + } + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } + } + + private def processInstant(timeline: HoodieTimeline, + instant: HoodieInstant, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry]): Unit = { + + if (instant.getState == HoodieInstant.State.INFLIGHT || instant.getState == HoodieInstant.State.REQUESTED) { + val entry = createInflightRequestedEntry(instant, targetPartition, timelineType) + entries.add(entry) + } + + val commitMetadata = readCommitMetadata(timeline, instant) + if (commitMetadata.isDefined) { + val metadata = commitMetadata.get + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(metadata.getPartitionToWriteStats.keySet().asScala.toSet) + + for (partitionPath <- partitionsToCheck) { + val writeStatsOpt = Option(metadata.getPartitionToWriteStats.get(partitionPath)) + if (writeStatsOpt.isDefined) { + val writeStats = writeStatsOpt.get.asScala + for (writeStat <- writeStats) { + if (writeStat.getFileId == fileGroupId) { + val entry = createHistoryEntry(instant, writeStat, partitionPath, timelineType) + entries.add(entry) + } + } + } + } + } + } + + def createHistoryEntry(instant: HoodieInstant, + writeStat: HoodieWriteStat, + targetPartition: String, + timelineType: String): HistoryEntry = { + + val fileName = Option(writeStat.getPath).map(_.split("/").last).getOrElse("") + val operationType = determineOperationType(writeStat) + + val runtimeStats = Option(writeStat.getRuntimeStats) + val scanTime = runtimeStats.map(_.getTotalScanTime).getOrElse(0L) + val upsertTime = runtimeStats.map(_.getTotalUpsertTime).getOrElse(0L) + val createTime = runtimeStats.map(_.getTotalCreateTime).getOrElse(0L) + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition, + fileName = fileName, + operationType = operationType, + numWrites = writeStat.getNumWrites, + numInserts = writeStat.getNumInserts, + numUpdates = writeStat.getNumUpdateWrites, + numDeletes = writeStat.getNumDeletes, + fileSizeBytes = writeStat.getFileSizeInBytes, + totalWriteBytes = writeStat.getTotalWriteBytes, + prevCommit = writeStat.getPrevCommit, + totalWriteErrors = writeStat.getTotalWriteErrors, + totalScanTimeMs = scanTime, + totalUpsertTimeMs = upsertTime, + totalCreateTimeMs = createTime, + prevBaseFile = Option(writeStat.getPrevBaseFile), + columnStatsAvailable = writeStat.getColumnStats.isPresent + ) + } + + def createInflightRequestedEntry(instant: HoodieInstant, + targetPartition: Option[String], + timelineType: String): HistoryEntry = { + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition.getOrElse(""), + fileName = "", + operationType = "UNKNOWN", + numWrites = -1, + numInserts = -1, + numUpdates = -1, + numDeletes = -1, + fileSizeBytes = -1, + totalWriteBytes = -1, + prevCommit = "", + totalWriteErrors = -1, + totalScanTimeMs = -1, + totalUpsertTimeMs = -1, + totalCreateTimeMs = -1, + prevBaseFile = None, + columnStatsAvailable = false + ) + } + + def checkForDeletionsAndReplacements(metaClient: HoodieTableMetaClient, + fileGroupId: String, + targetPartition: Option[String], + showArchived: Boolean): (Map[String, DeletionInfo], Map[String, ReplacementInfo]) = { + + val deletions = scala.collection.mutable.Map[String, DeletionInfo]() + val replacements = scala.collection.mutable.Map[String, ReplacementInfo]() + + checkDeletionsAndReplacementsInTimeline(metaClient.getActiveTimeline, fileGroupId, targetPartition, "ACTIVE", deletions, replacements) + + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() Review Comment: same comment as above. lets load w/ required filters ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String) + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites == 0) "INSERT" + else if (writeStat.getNumInserts == 0 && writeStat.getNumUpdateWrites > 0) "UPDATE" + else if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites > 0) "UPSERT" + else if (writeStat.getNumDeletes > 0) "DELETE" + else "UNKNOWN" + } + + def processTimeline(timeline: HoodieTimeline, Review Comment: `processWriteTimeline` ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String) + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites == 0) "INSERT" + else if (writeStat.getNumInserts == 0 && writeStat.getNumUpdateWrites > 0) "UPDATE" + else if (writeStat.getNumInserts > 0 && writeStat.getNumUpdateWrites > 0) "UPSERT" + else if (writeStat.getNumDeletes > 0) "DELETE" + else "UNKNOWN" + } + + def processTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry], + limit: Int, + startTime: String, + endTime: String): Unit = { + + val writeTimeline = timeline.getWriteTimeline + val instants = writeTimeline.getInstants.asScala.toSeq + .filter { instant => + val instantTime = instant.requestedTime() + val withinStartTime = if (startTime.nonEmpty) instantTime >= startTime else true + val withinEndTime = if (endTime.nonEmpty) instantTime <= endTime else true + withinStartTime && withinEndTime + } + val filteredInstants = instants.sortWith((a, b) => a.requestedTime() > b.requestedTime()) + + if (startTime.nonEmpty && endTime.nonEmpty) { + for (instant <- filteredInstants) { + try { + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } else { + var foundCount = 0 + for (instant <- filteredInstants if foundCount < limit) { + try { + val initialSize = entries.size() + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + if (entries.size() > initialSize) { + foundCount += 1 + } + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } + } + + private def processInstant(timeline: HoodieTimeline, + instant: HoodieInstant, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry]): Unit = { + + if (instant.getState == HoodieInstant.State.INFLIGHT || instant.getState == HoodieInstant.State.REQUESTED) { + val entry = createInflightRequestedEntry(instant, targetPartition, timelineType) + entries.add(entry) + } + + val commitMetadata = readCommitMetadata(timeline, instant) + if (commitMetadata.isDefined) { + val metadata = commitMetadata.get + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(metadata.getPartitionToWriteStats.keySet().asScala.toSet) + + for (partitionPath <- partitionsToCheck) { + val writeStatsOpt = Option(metadata.getPartitionToWriteStats.get(partitionPath)) + if (writeStatsOpt.isDefined) { + val writeStats = writeStatsOpt.get.asScala + for (writeStat <- writeStats) { + if (writeStat.getFileId == fileGroupId) { + val entry = createHistoryEntry(instant, writeStat, partitionPath, timelineType) + entries.add(entry) + } + } + } + } + } + } + + def createHistoryEntry(instant: HoodieInstant, + writeStat: HoodieWriteStat, + targetPartition: String, + timelineType: String): HistoryEntry = { + + val fileName = Option(writeStat.getPath).map(_.split("/").last).getOrElse("") + val operationType = determineOperationType(writeStat) + + val runtimeStats = Option(writeStat.getRuntimeStats) + val scanTime = runtimeStats.map(_.getTotalScanTime).getOrElse(0L) + val upsertTime = runtimeStats.map(_.getTotalUpsertTime).getOrElse(0L) + val createTime = runtimeStats.map(_.getTotalCreateTime).getOrElse(0L) + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition, + fileName = fileName, + operationType = operationType, + numWrites = writeStat.getNumWrites, + numInserts = writeStat.getNumInserts, + numUpdates = writeStat.getNumUpdateWrites, + numDeletes = writeStat.getNumDeletes, + fileSizeBytes = writeStat.getFileSizeInBytes, + totalWriteBytes = writeStat.getTotalWriteBytes, + prevCommit = writeStat.getPrevCommit, + totalWriteErrors = writeStat.getTotalWriteErrors, + totalScanTimeMs = scanTime, + totalUpsertTimeMs = upsertTime, + totalCreateTimeMs = createTime, + prevBaseFile = Option(writeStat.getPrevBaseFile), + columnStatsAvailable = writeStat.getColumnStats.isPresent + ) + } + + def createInflightRequestedEntry(instant: HoodieInstant, + targetPartition: Option[String], + timelineType: String): HistoryEntry = { + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition.getOrElse(""), + fileName = "", + operationType = "UNKNOWN", + numWrites = -1, + numInserts = -1, + numUpdates = -1, + numDeletes = -1, + fileSizeBytes = -1, + totalWriteBytes = -1, + prevCommit = "", + totalWriteErrors = -1, + totalScanTimeMs = -1, + totalUpsertTimeMs = -1, + totalCreateTimeMs = -1, + prevBaseFile = None, + columnStatsAvailable = false + ) + } + + def checkForDeletionsAndReplacements(metaClient: HoodieTableMetaClient, + fileGroupId: String, + targetPartition: Option[String], + showArchived: Boolean): (Map[String, DeletionInfo], Map[String, ReplacementInfo]) = { + + val deletions = scala.collection.mutable.Map[String, DeletionInfo]() + val replacements = scala.collection.mutable.Map[String, ReplacementInfo]() + + checkDeletionsAndReplacementsInTimeline(metaClient.getActiveTimeline, fileGroupId, targetPartition, "ACTIVE", deletions, replacements) + + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() + checkDeletionsAndReplacementsInTimeline(archivedTimeline, fileGroupId, targetPartition, "ARCHIVED", deletions, replacements) + } catch { + case e: Exception => + log.warn(s"Failed to check deletions in archived timeline: ${e.getMessage}") + } + } + + (deletions.toMap, replacements.toMap) + } + + def checkDeletionsAndReplacementsInTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + deletions: scala.collection.mutable.Map[String, DeletionInfo], + replacements: scala.collection.mutable.Map[String, ReplacementInfo]): Unit = { + + val cleanInstants = timeline.getCleanerTimeline.getInstants.iterator().asScala + for (instant <- cleanInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val cleanMetadata = timeline.readCleanMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(cleanMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(cleanMetadata.getPartitionMetadata.get(partitionPath)) + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala + if matchesDeletedFileGroup(deletedFile, fileGroupId) + } { + val deletedFileName = extractActualFileName(deletedFile) + deletions(deletedFileName) = DeletionInfo(HoodieTimeline.CLEAN_ACTION, instant.requestedTime, timelineType) + } + } + } catch { + case e: Exception => + log.warn(s"Failed to process clean instant ${instant.requestedTime}: ${e.getMessage}") + } + } + + val rollbackInstants = timeline.getRollbackTimeline.getInstants.iterator().asScala + for (instant <- rollbackInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val rollbackMetadata = timeline.readRollbackMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(rollbackMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(rollbackMetadata.getPartitionMetadata.get(partitionPath)) + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala + if matchesDeletedFileGroup(deletedFile, fileGroupId) + } { + val deletedFileName = extractActualFileName(deletedFile) + deletions(deletedFileName) = DeletionInfo(HoodieTimeline.ROLLBACK_ACTION, instant.requestedTime, timelineType) + } + } + } catch { + case e: Exception => + log.warn(s"Failed to process rollback instant ${instant.requestedTime}: ${e.getMessage}") + } + } + + val replacementInstants = timeline.getCommitsAndCompactionTimeline.getInstants.iterator().asScala Review Comment: why are we polling `getCommitsAndCompactionTimeline`. Lets use `getCommitAndReplaceTimeline` and then filter based on action. once we read the commit metadata, we can fetch the write operation type and filter out other replace commits and only process `clustering` write operations. ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileGroupHistoryProcedure.scala: ########## @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.table.HoodieTableMetaClient + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, StructType} + +import java.util +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +/** + * Spark SQL procedure to show the complete history of a specific file group in a Hudi table. + * + * This procedure displays comprehensive information about all operations performed on a specific file group, + * including commits, updates, deletions, replacements, and metadata changes in a detailed partition specific view. It tracks the lifecycle of + * files from creation through various modifications and eventual deletion or replacement. + * + * == Parameters == + * - `table`: Optional. The name of the Hudi table to query (mutually exclusive with `path`) + * - `path`: Optional. The base path of the Hudi table (mutually exclusive with `table`) + * - `fileGroupId`: Required. The unique identifier of the file group to track + * - `partition`: Optional. Specific partition to filter results (default: all partitions) + * - `showArchived`: Optional. Whether to include archived timeline data (default: false) + * - `limit`: Optional. Maximum number of history entries to return (default: 20) + * - `filter`: Optional. SQL expression to filter results (default: empty string) + * - `startTime`: Optional. Start timestamp for filtering results (format: yyyyMMddHHmmss) + * - `endTime`: Optional. End timestamp for filtering results (format: yyyyMMddHHmmss) + * + * == Output Schema == + * - `instant_time`: Timestamp when the operation was performed + * - `completion_time`: Time when the operation completed (null for pending operations) + * - `action`: The action type (commit, deltacommit, compaction, clustering, etc.) + * - `timeline_type`: Whether the data is from ACTIVE or ARCHIVED timeline + * - `state`: Current state of the operation (REQUESTED, INFLIGHT, COMPLETED) + * - `partition_path`: Partition path where the file group resides + * - `file_name`: Name of the file in the file group + * - `operation_type`: Type of write operation (INSERT, UPDATE, UPSERT, DELETE) + * - `num_writes`: Total number of records written in this operation + * - `num_inserts`: Number of new records inserted + * - `num_updates`: Number of existing records updated + * - `num_deletes`: Number of records deleted + * - `file_size_bytes`: Size of the file in bytes + * - `total_write_bytes`: Total bytes written during the operation + * - `prev_commit`: Previous commit timestamp that this operation builds upon + * - `was_deleted`: Whether the file was deleted in a subsequent operation + * - `delete_action`: Action that caused the deletion (clean, rollback, etc.) + * - `delete_instant`: Timestamp when the deletion occurred + * - `is_replaced`: Whether the file was replaced in a subsequent operation + * - `replace_action`: Action that caused the replacement (compaction, clustering, etc.) + * - `replace_instant`: Timestamp when the replacement occurred + * - `total_write_errors`: Number of write errors encountered + * - `total_scan_time_ms`: Total time spent scanning during the operation + * - `total_upsert_time_ms`: Total time spent in upsert processing + * - `total_create_time_ms`: Total time spent in file creation + * - `prev_base_file`: Previous base file that was replaced (for compaction/clustering) + * - `column_stats_available`: Whether column statistics are available for this file + * + * == Error Handling == + * - Throws `IllegalArgumentException` for invalid filter expressions or missing fileGroupId + * - Throws `HoodieException` for table access issues or invalid file group identifiers + * - Returns empty result set if no file group history matches the criteria + * - Gracefully handles archived timeline access failures with warning logs + * + * == Filter Support == + * The `filter` parameter supports SQL expressions for filtering results on any output column. + * The filter uses Spark SQL syntax and supports various data types and operations. + * + * == Usage Examples == + * {{{ + * -- Basic usage: Show file group history + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123' + * ) + * + * -- Show history with custom limit + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123', + * limit => 50 + * ) + * + * -- Show history for specific partition (partitioned to datetime column here) + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123', + * partition => '2025/08/28' + * ) + * + * -- Include archived timeline data + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123', + * showArchived => true + * ) + * + * -- Filter for specific operation types + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123', + * filter => "operation_type = 'INSERT'" + * ) + * }}} + * + * @see [[ShowFileHistoryProcedureUtils]] for underlying utility methods + * @see [[HoodieProcedureFilterUtils]] for detailed filter expression syntax + */ +class ShowFileGroupHistoryProcedure extends BaseProcedure with ProcedureBuilder with Logging { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.optional(0, "table", DataTypes.StringType), + ProcedureParameter.optional(1, "path", DataTypes.StringType), + ProcedureParameter.required(2, "fileGroupId", DataTypes.StringType), + ProcedureParameter.optional(3, "partition", DataTypes.StringType), + ProcedureParameter.optional(4, "showArchived", DataTypes.BooleanType, false), + ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 20), + ProcedureParameter.optional(6, "filter", DataTypes.StringType, ""), + ProcedureParameter.optional(7, "startTime", DataTypes.StringType, ""), + ProcedureParameter.optional(8, "endTime", DataTypes.StringType, "") + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = ShowFileHistoryProcedureUtils.OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val tablePath = getArgValueOrDefault(args, PARAMETERS(1)) + val fileGroupId = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String] + val partition = getArgValueOrDefault(args, PARAMETERS(3)).asInstanceOf[Option[String]] + val showArchived = getArgValueOrDefault(args, PARAMETERS(4)).get.asInstanceOf[Boolean] + val limit = getArgValueOrDefault(args, PARAMETERS(5)).get.asInstanceOf[Int] + val filter = getArgValueOrDefault(args, PARAMETERS(6)).get.asInstanceOf[String] + val startTime = getArgValueOrDefault(args, PARAMETERS(7)).get.asInstanceOf[String] + val endTime = getArgValueOrDefault(args, PARAMETERS(8)).get.asInstanceOf[String] + + if (filter != null && filter.trim.nonEmpty) { + HoodieProcedureFilterUtils.validateFilterExpression(filter, outputType, sparkSession) match { + case Left(errorMessage) => + throw new IllegalArgumentException(s"Invalid filter expression: $errorMessage") + case Right(_) => // Validation passed, continue + } + } + + val basePath = getBasePath(tableName, tablePath) + val metaClient = createMetaClient(jsc, basePath) + + val fileGroupHistory = collectFileGroupHistory(metaClient, fileGroupId, partition, showArchived, limit, startTime, endTime) + + if (filter != null && filter.trim.nonEmpty) { + HoodieProcedureFilterUtils.evaluateFilter(fileGroupHistory, filter, outputType, sparkSession) + } else { + fileGroupHistory + } + } + + private def collectFileGroupHistory(metaClient: HoodieTableMetaClient, + fileGroupId: String, + partition: Option[String], + showArchived: Boolean, + limit: Int, + startTime: String, + endTime: String): Seq[Row] = { + + import ShowFileHistoryProcedureUtils._ + + val activeEntries = new util.ArrayList[HistoryEntry]() + val activeTimeline = metaClient.getActiveTimeline + ShowFileHistoryProcedureUtils.processTimeline(activeTimeline, fileGroupId, partition, "ACTIVE", activeEntries, limit, startTime, endTime) + + val archivedEntries = new util.ArrayList[HistoryEntry]() + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() Review Comment: I see we have overloaded methods to load w/ filters ``` void loadCompactionDetailsInMemory(String startTs, String endTs); ``` we we leverage that based on filters set the user. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
