vamsikarnika commented on code in PR #17463: URL: https://github.com/apache/hudi/pull/17463#discussion_r2627804851
########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String, deleteStatus: String = "SUCCESS") + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_status", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + val hasInserts = writeStat.getNumInserts > 0 + val hasUpdates = writeStat.getNumUpdateWrites > 0 + val hasDeletes = writeStat.getNumDeletes > 0 + + val operations = Seq( + if (hasInserts) Some("INSERT") else None, + if (hasUpdates) Some("UPDATE") else None, + if (hasDeletes) Some("DELETE") else None + ).flatten + + if (operations.nonEmpty) { + operations.mkString("_AND_") + } else { + "UNKNOWN" + } + } + + def processWriteTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry], + limit: Int, + startTime: String, + endTime: String): Unit = { + + val writeTimeline = timeline.getWriteTimeline + val allInstants = writeTimeline.getInstants.asScala.toSeq + + val filteredInstants = if (startTime.nonEmpty && endTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime && instantTime <= endTime + } + } else if (startTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime + } + } else { + allInstants + } Review Comment: Can this logic be simplified? ``` val filteredInstants = allInstants.filter { instant => val t = instant.requestedTime() (startTime.isEmpty || t >= startTime) && (endTime.isEmpty || t <= endTime) } ``` ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileGroupHistoryProcedure.scala: ########## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.table.HoodieTableMetaClient + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, StructType} + +import java.util +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +/** + * Spark SQL procedure to show the complete history of a specific file group in a Hudi table. + * + * This procedure displays comprehensive information about all operations performed on a specific file group, + * including commits, updates, deletions, replacements, and metadata changes in a detailed partition specific view. It tracks the lifecycle of + * files from creation through various modifications and eventual deletion or replacement. + * + * == Parameters == + * - `table`: Optional. The name of the Hudi table to query (mutually exclusive with `path`) + * - `path`: Optional. The base path of the Hudi table (mutually exclusive with `table`) + * - `fileGroupId`: Required. The unique identifier of the file group to track + * - `partition`: Optional. Specific partition to filter results (default: all partitions) + * - `showArchived`: Optional. Whether to include archived timeline data (default: false) + * - `limit`: Optional. Maximum number of history entries to return (default: 20) + * - `filter`: Optional. SQL expression to filter results (default: empty string) + * - `startTime`: Optional. Start timestamp for filtering results (supported formats: yyyyMMddHHMMssmm, yyyyMMddHHmmss, yyyy-MM-dd, yyyy/MM/dd, yyyyMMdd) + * - `endTime`: Optional. End timestamp for filtering results (supported formats: yyyyMMddHHMMssmm, yyyyMMddHHmmss, yyyy-MM-dd, yyyy/MM/dd, yyyyMMdd) + * - `verbose` : Optional. Whether to include detailed statistics (default: false) Review Comment: Do we support verbose? I don't see this flag added in input parameters ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala: ########## @@ -23,7 +23,10 @@ import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.hudi.command.procedures.ShowFileHistoryProcedureUtils.log Review Comment: This import is not used. ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String, deleteStatus: String = "SUCCESS") + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_status", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + val hasInserts = writeStat.getNumInserts > 0 + val hasUpdates = writeStat.getNumUpdateWrites > 0 + val hasDeletes = writeStat.getNumDeletes > 0 + + val operations = Seq( + if (hasInserts) Some("INSERT") else None, + if (hasUpdates) Some("UPDATE") else None, + if (hasDeletes) Some("DELETE") else None + ).flatten + + if (operations.nonEmpty) { + operations.mkString("_AND_") + } else { + "UNKNOWN" + } + } + + def processWriteTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry], + limit: Int, + startTime: String, + endTime: String): Unit = { + + val writeTimeline = timeline.getWriteTimeline + val allInstants = writeTimeline.getInstants.asScala.toSeq + + val filteredInstants = if (startTime.nonEmpty && endTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime && instantTime <= endTime + } + } else if (startTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime + } + } else { + allInstants + } + + if (startTime.nonEmpty && endTime.nonEmpty) { + for (instant <- filteredInstants) { + try { + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } else { + var foundCount = 0 + for (instant <- filteredInstants if foundCount < limit) { + try { + val initialSize = entries.size() + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + if (entries.size() > initialSize) { + foundCount += 1 + } + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } + } + + private def processInstant(timeline: HoodieTimeline, + instant: HoodieInstant, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry]): Unit = { + + if (instant.getState == HoodieInstant.State.INFLIGHT || instant.getState == HoodieInstant.State.REQUESTED) { + val entry = createInflightRequestedEntry(instant, targetPartition, timelineType) + entries.add(entry) + } + + val commitMetadata = readCommitMetadata(timeline, instant) + if (commitMetadata.isDefined) { + val metadata = commitMetadata.get + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(metadata.getPartitionToWriteStats.keySet().asScala.toSet) + + for (partitionPath <- partitionsToCheck) { + val writeStatsOpt = Option(metadata.getPartitionToWriteStats.get(partitionPath)) + if (writeStatsOpt.isDefined) { + val writeStats = writeStatsOpt.get.asScala + for (writeStat <- writeStats) { + if (writeStat.getFileId == fileGroupId) { + val entry = createHistoryEntry(instant, writeStat, partitionPath, timelineType) + entries.add(entry) + } + } + } + } + } + } + + def createHistoryEntry(instant: HoodieInstant, + writeStat: HoodieWriteStat, + targetPartition: String, + timelineType: String): HistoryEntry = { + + val fileName = Option(writeStat.getPath).map(_.split("/").last).getOrElse("") + val operationType = determineOperationType(writeStat) + + val runtimeStats = Option(writeStat.getRuntimeStats) + val scanTime = runtimeStats.map(_.getTotalScanTime).getOrElse(0L) + val upsertTime = runtimeStats.map(_.getTotalUpsertTime).getOrElse(0L) + val createTime = runtimeStats.map(_.getTotalCreateTime).getOrElse(0L) + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition, + fileName = fileName, + operationType = operationType, + numWrites = writeStat.getNumWrites, + numInserts = writeStat.getNumInserts, + numUpdates = writeStat.getNumUpdateWrites, + numDeletes = writeStat.getNumDeletes, + fileSizeBytes = writeStat.getFileSizeInBytes, + totalWriteBytes = writeStat.getTotalWriteBytes, + prevCommit = writeStat.getPrevCommit, + totalWriteErrors = writeStat.getTotalWriteErrors, + totalScanTimeMs = scanTime, + totalUpsertTimeMs = upsertTime, + totalCreateTimeMs = createTime, + prevBaseFile = Option(writeStat.getPrevBaseFile), + columnStatsAvailable = writeStat.getColumnStats.isPresent + ) + } + + def createInflightRequestedEntry(instant: HoodieInstant, + targetPartition: Option[String], + timelineType: String): HistoryEntry = { + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition.getOrElse(""), + fileName = "", + operationType = "UNKNOWN", + numWrites = -1, + numInserts = -1, + numUpdates = -1, + numDeletes = -1, + fileSizeBytes = -1, + totalWriteBytes = -1, + prevCommit = "", + totalWriteErrors = -1, + totalScanTimeMs = -1, + totalUpsertTimeMs = -1, + totalCreateTimeMs = -1, + prevBaseFile = None, + columnStatsAvailable = false + ) + } + + def checkForDeletions(metaClient: HoodieTableMetaClient, + fileGroupId: String, + targetPartition: Option[String], + showArchived: Boolean): Map[String, DeletionInfo] = { + + val deletions = scala.collection.mutable.Map[String, DeletionInfo]() + + checkDeletionsInTimeline(metaClient.getActiveTimeline, fileGroupId, targetPartition, "ACTIVE", deletions) + Review Comment: I see that we're using ACTIVE and ARCHIVED strings across multiple procedures. can we have some constants defined and use them here? ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedureUtils.scala: ########## @@ -128,4 +131,27 @@ object HoodieProcedureUtils { } instants.sortBy(f => f).toSeq } + + def normalizeTimeFormat(timeInput: String, isEndTime: Boolean = false): String = { + if (timeInput.isEmpty) { + timeInput + } + else if (timeInput.matches("^\\d{17}$")) { + timeInput + } else if (timeInput.matches("^\\d{14}$")) { + timeInput + "000" + } else if (timeInput.matches("^\\d{4}-\\d{2}-\\d{2}$")) { + val date = LocalDate.parse(timeInput, DateTimeFormatter.ofPattern("yyyy-MM-dd")) + val datePrefix = date.format(DateTimeFormatter.ofPattern("yyyyMMdd")) + if (isEndTime) datePrefix + "235959999" else datePrefix + "000000000" + } else if (timeInput.matches("^\\d{4}/\\d{2}/\\d{2}$")) { + val date = LocalDate.parse(timeInput, DateTimeFormatter.ofPattern("yyyy/MM/dd")) + val datePrefix = date.format(DateTimeFormatter.ofPattern("yyyyMMdd")) + if (isEndTime) datePrefix + "235959999" else datePrefix + "000000000" + } else if (timeInput.matches("^\\d{8}$")) { + if (isEndTime) timeInput + "235959999" else timeInput + "000000000" + } else { + throw new DateTimeParseException(s"Unsupported time format: $timeInput", timeInput, 0) Review Comment: Let's add a UT to this method? ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileGroupHistoryProcedure.scala: ########## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.table.HoodieTableMetaClient + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, StructType} + +import java.util +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +/** + * Spark SQL procedure to show the complete history of a specific file group in a Hudi table. + * + * This procedure displays comprehensive information about all operations performed on a specific file group, + * including commits, updates, deletions, replacements, and metadata changes in a detailed partition specific view. It tracks the lifecycle of + * files from creation through various modifications and eventual deletion or replacement. + * + * == Parameters == + * - `table`: Optional. The name of the Hudi table to query (mutually exclusive with `path`) + * - `path`: Optional. The base path of the Hudi table (mutually exclusive with `table`) + * - `fileGroupId`: Required. The unique identifier of the file group to track + * - `partition`: Optional. Specific partition to filter results (default: all partitions) + * - `showArchived`: Optional. Whether to include archived timeline data (default: false) + * - `limit`: Optional. Maximum number of history entries to return (default: 20) + * - `filter`: Optional. SQL expression to filter results (default: empty string) + * - `startTime`: Optional. Start timestamp for filtering results (supported formats: yyyyMMddHHMMssmm, yyyyMMddHHmmss, yyyy-MM-dd, yyyy/MM/dd, yyyyMMdd) + * - `endTime`: Optional. End timestamp for filtering results (supported formats: yyyyMMddHHMMssmm, yyyyMMddHHmmss, yyyy-MM-dd, yyyy/MM/dd, yyyyMMdd) + * - `verbose` : Optional. Whether to include detailed statistics (default: false) + * + * == Output Schema == + * - `instant_time`: Timestamp when the operation was performed + * - `completion_time`: Time when the operation completed (null for pending operations) + * - `action`: The action type (commit, deltacommit, compaction, clustering, etc.) + * - `timeline_type`: Whether the data is from ACTIVE or ARCHIVED timeline + * - `state`: Current state of the operation (REQUESTED, INFLIGHT, COMPLETED) + * - `partition_path`: Partition path where the file group resides + * - `file_name`: Name of the file in the file group + * - `operation_type`: Type of write operation (INSERT, UPDATE, UPSERT, DELETE) + * - `num_writes`: Total number of records written in this operation + * - `num_inserts`: Number of new records inserted + * - `num_updates`: Number of existing records updated + * - `num_deletes`: Number of records deleted + * - `file_size_bytes`: Size of the file in bytes + * - `total_write_bytes`: Total bytes written during the operation + * - `prev_commit`: Previous commit timestamp that this operation builds upon + * - `was_deleted`: Whether the file was deleted in a subsequent operation + * - `delete_action`: Action that caused the deletion (clean, rollback, etc.) + * - `delete_instant`: Timestamp when the deletion occurred + * - `delete_status`: Status of the deletion attempt (SUCCESS, FAILED) + * - `is_replaced`: Whether the file was replaced in a subsequent operation + * - `replace_action`: Action that caused the replacement (compaction, clustering, etc.) + * - `replace_instant`: Timestamp when the replacement occurred + * - `total_write_errors`: Number of write errors encountered + * - `total_scan_time_ms`: Total time spent scanning during the operation + * - `total_upsert_time_ms`: Total time spent in upsert processing + * - `total_create_time_ms`: Total time spent in file creation + * - `prev_base_file`: Previous base file that was replaced (for compaction/clustering) + * - `column_stats_available`: Whether column statistics are available for this file + * + * == Error Handling == + * - Throws `IllegalArgumentException` for invalid filter expressions or missing fileGroupId + * - Throws `HoodieException` for table access issues or invalid file group identifiers + * - Returns empty result set if no file group history matches the criteria + * - Gracefully handles archived timeline access failures with warning logs + * + * == Filter Support == + * The `filter` parameter supports SQL expressions for filtering results on any output column. + * The filter uses Spark SQL syntax and supports various data types and operations. + * + * == Usage Examples == + * {{{ + * -- Basic usage: Show file group history + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123' + * ) + * + * -- Show history with custom limit + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123', + * limit => 50 + * ) + * + * -- Show history for specific partition (partitioned to datetime column here) + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123', + * partition => '2025/08/28' + * ) + * + * -- Include archived timeline data + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123', + * showArchived => true + * ) + * + * -- Filter for specific operation types + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123', + * filter => "operation_type = 'INSERT'" + * ) + * + * -- Filter by date range using yyyy-MM-dd format + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123', + * startTime => '2024-01-01', + * endTime => '2024-12-31' + * ) + * + * -- Filter by date range using yyyy/MM/dd format + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123', + * startTime => '2024/01/01', + * endTime => '2024/12/31' + * ) + * + * -- Filter by date range using yyyyMMdd format + * CALL show_file_group_history( + * table => 'hudi_table_1', + * fileGroupId => 'abc123', + * startTime => '20240101', + * endTime => '20241231' + * ) + * }}} + * + * @see [[ShowFileHistoryProcedureUtils]] for underlying utility methods + * @see [[HoodieProcedureFilterUtils]] for detailed filter expression syntax + */ +class ShowFileGroupHistoryProcedure extends BaseProcedure with ProcedureBuilder with Logging { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.optional(0, "table", DataTypes.StringType), + ProcedureParameter.optional(1, "path", DataTypes.StringType), + ProcedureParameter.required(2, "fileGroupId", DataTypes.StringType), + ProcedureParameter.optional(3, "partition", DataTypes.StringType), + ProcedureParameter.optional(4, "showArchived", DataTypes.BooleanType, false), + ProcedureParameter.optional(5, "limit", DataTypes.IntegerType, 20), + ProcedureParameter.optional(6, "filter", DataTypes.StringType, ""), + ProcedureParameter.optional(7, "startTime", DataTypes.StringType, ""), + ProcedureParameter.optional(8, "endTime", DataTypes.StringType, "") Review Comment: make it snake case, to match with other procedures? ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String, deleteStatus: String = "SUCCESS") + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_status", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + val hasInserts = writeStat.getNumInserts > 0 + val hasUpdates = writeStat.getNumUpdateWrites > 0 + val hasDeletes = writeStat.getNumDeletes > 0 + + val operations = Seq( + if (hasInserts) Some("INSERT") else None, + if (hasUpdates) Some("UPDATE") else None, + if (hasDeletes) Some("DELETE") else None + ).flatten + + if (operations.nonEmpty) { + operations.mkString("_AND_") + } else { + "UNKNOWN" + } + } + + def processWriteTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry], + limit: Int, + startTime: String, + endTime: String): Unit = { + + val writeTimeline = timeline.getWriteTimeline + val allInstants = writeTimeline.getInstants.asScala.toSeq + + val filteredInstants = if (startTime.nonEmpty && endTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime && instantTime <= endTime + } + } else if (startTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime + } + } else { + allInstants + } + + if (startTime.nonEmpty && endTime.nonEmpty) { + for (instant <- filteredInstants) { + try { + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } else { + var foundCount = 0 + for (instant <- filteredInstants if foundCount < limit) { + try { + val initialSize = entries.size() + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + if (entries.size() > initialSize) { + foundCount += 1 + } + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } + } + + private def processInstant(timeline: HoodieTimeline, + instant: HoodieInstant, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry]): Unit = { + + if (instant.getState == HoodieInstant.State.INFLIGHT || instant.getState == HoodieInstant.State.REQUESTED) { + val entry = createInflightRequestedEntry(instant, targetPartition, timelineType) + entries.add(entry) Review Comment: Do we need to show these requested/inflight instants, since we're not sure if this instant actually touches the provided file group? ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String, deleteStatus: String = "SUCCESS") + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_status", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + val hasInserts = writeStat.getNumInserts > 0 + val hasUpdates = writeStat.getNumUpdateWrites > 0 + val hasDeletes = writeStat.getNumDeletes > 0 + + val operations = Seq( + if (hasInserts) Some("INSERT") else None, + if (hasUpdates) Some("UPDATE") else None, + if (hasDeletes) Some("DELETE") else None + ).flatten + + if (operations.nonEmpty) { + operations.mkString("_AND_") + } else { + "UNKNOWN" + } + } + + def processWriteTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry], + limit: Int, + startTime: String, + endTime: String): Unit = { + + val writeTimeline = timeline.getWriteTimeline + val allInstants = writeTimeline.getInstants.asScala.toSeq + + val filteredInstants = if (startTime.nonEmpty && endTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime && instantTime <= endTime + } + } else if (startTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime + } + } else { + allInstants + } + + if (startTime.nonEmpty && endTime.nonEmpty) { + for (instant <- filteredInstants) { + try { + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } else { + var foundCount = 0 + for (instant <- filteredInstants if foundCount < limit) { Review Comment: I see that the logic here only applies limit when startTime and endTime are not set. What happens when startTime, endTime and limit are provided in the params. If it's not allowed, should we throw exception or add a warning? ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String, deleteStatus: String = "SUCCESS") + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_status", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + val hasInserts = writeStat.getNumInserts > 0 + val hasUpdates = writeStat.getNumUpdateWrites > 0 + val hasDeletes = writeStat.getNumDeletes > 0 + + val operations = Seq( + if (hasInserts) Some("INSERT") else None, + if (hasUpdates) Some("UPDATE") else None, + if (hasDeletes) Some("DELETE") else None + ).flatten + + if (operations.nonEmpty) { + operations.mkString("_AND_") + } else { + "UNKNOWN" + } + } + + def processWriteTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry], + limit: Int, + startTime: String, + endTime: String): Unit = { + + val writeTimeline = timeline.getWriteTimeline + val allInstants = writeTimeline.getInstants.asScala.toSeq + + val filteredInstants = if (startTime.nonEmpty && endTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime && instantTime <= endTime + } + } else if (startTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime + } + } else { + allInstants + } + + if (startTime.nonEmpty && endTime.nonEmpty) { + for (instant <- filteredInstants) { + try { + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } else { + var foundCount = 0 + for (instant <- filteredInstants if foundCount < limit) { + try { + val initialSize = entries.size() + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + if (entries.size() > initialSize) { + foundCount += 1 + } + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } + } + + private def processInstant(timeline: HoodieTimeline, + instant: HoodieInstant, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry]): Unit = { + + if (instant.getState == HoodieInstant.State.INFLIGHT || instant.getState == HoodieInstant.State.REQUESTED) { + val entry = createInflightRequestedEntry(instant, targetPartition, timelineType) + entries.add(entry) + } + + val commitMetadata = readCommitMetadata(timeline, instant) + if (commitMetadata.isDefined) { + val metadata = commitMetadata.get + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(metadata.getPartitionToWriteStats.keySet().asScala.toSet) + + for (partitionPath <- partitionsToCheck) { + val writeStatsOpt = Option(metadata.getPartitionToWriteStats.get(partitionPath)) + if (writeStatsOpt.isDefined) { + val writeStats = writeStatsOpt.get.asScala + for (writeStat <- writeStats) { + if (writeStat.getFileId == fileGroupId) { + val entry = createHistoryEntry(instant, writeStat, partitionPath, timelineType) + entries.add(entry) + } + } + } + } + } + } + + def createHistoryEntry(instant: HoodieInstant, + writeStat: HoodieWriteStat, + targetPartition: String, + timelineType: String): HistoryEntry = { + + val fileName = Option(writeStat.getPath).map(_.split("/").last).getOrElse("") + val operationType = determineOperationType(writeStat) + + val runtimeStats = Option(writeStat.getRuntimeStats) + val scanTime = runtimeStats.map(_.getTotalScanTime).getOrElse(0L) + val upsertTime = runtimeStats.map(_.getTotalUpsertTime).getOrElse(0L) + val createTime = runtimeStats.map(_.getTotalCreateTime).getOrElse(0L) + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition, + fileName = fileName, + operationType = operationType, + numWrites = writeStat.getNumWrites, + numInserts = writeStat.getNumInserts, + numUpdates = writeStat.getNumUpdateWrites, + numDeletes = writeStat.getNumDeletes, + fileSizeBytes = writeStat.getFileSizeInBytes, + totalWriteBytes = writeStat.getTotalWriteBytes, + prevCommit = writeStat.getPrevCommit, + totalWriteErrors = writeStat.getTotalWriteErrors, + totalScanTimeMs = scanTime, + totalUpsertTimeMs = upsertTime, + totalCreateTimeMs = createTime, + prevBaseFile = Option(writeStat.getPrevBaseFile), + columnStatsAvailable = writeStat.getColumnStats.isPresent + ) + } + + def createInflightRequestedEntry(instant: HoodieInstant, + targetPartition: Option[String], + timelineType: String): HistoryEntry = { + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition.getOrElse(""), + fileName = "", + operationType = "UNKNOWN", + numWrites = -1, + numInserts = -1, + numUpdates = -1, + numDeletes = -1, + fileSizeBytes = -1, + totalWriteBytes = -1, + prevCommit = "", + totalWriteErrors = -1, + totalScanTimeMs = -1, + totalUpsertTimeMs = -1, + totalCreateTimeMs = -1, + prevBaseFile = None, + columnStatsAvailable = false + ) + } + + def checkForDeletions(metaClient: HoodieTableMetaClient, + fileGroupId: String, + targetPartition: Option[String], + showArchived: Boolean): Map[String, DeletionInfo] = { + + val deletions = scala.collection.mutable.Map[String, DeletionInfo]() + + checkDeletionsInTimeline(metaClient.getActiveTimeline, fileGroupId, targetPartition, "ACTIVE", deletions) + + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() + checkDeletionsInTimeline(archivedTimeline, fileGroupId, targetPartition, "ARCHIVED", deletions) + } catch { + case e: Exception => + log.warn(s"Failed to check deletions in archived timeline: ${e.getMessage}") + } + } + + deletions.toMap + } + + def checkForReplacements(metaClient: HoodieTableMetaClient, + fileGroupId: String, + targetPartition: Option[String], + showArchived: Boolean): Map[String, ReplacementInfo] = { + + val replacements = scala.collection.mutable.Map[String, ReplacementInfo]() + + checkReplacementsInTimeline(metaClient.getActiveTimeline, fileGroupId, targetPartition, "ACTIVE", replacements) + + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() + checkReplacementsInTimeline(archivedTimeline, fileGroupId, targetPartition, "ARCHIVED", replacements) + } catch { + case e: Exception => + log.warn(s"Failed to check replacements in archived timeline: ${e.getMessage}") + } + } + + replacements.toMap + } + + private def checkDeletionsInTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + deletions: scala.collection.mutable.Map[String, DeletionInfo]): Unit = { + + val cleanInstants = timeline.getCleanerTimeline.getInstants.iterator().asScala + for (instant <- cleanInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val cleanMetadata = timeline.readCleanMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(cleanMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(cleanMetadata.getPartitionMetadata.get(partitionPath)) + } { + for { + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala + if matchesDeletedFileGroup(deletedFile, fileGroupId) + } { + val deletedFileName = extractActualFileName(deletedFile) + deletions(deletedFileName) = DeletionInfo(HoodieTimeline.CLEAN_ACTION, instant.requestedTime, timelineType, "SUCCESS") + } + for { + failedDeleteFile <- partitionMetadata.getFailedDeleteFiles.asScala + if matchesDeletedFileGroup(failedDeleteFile, fileGroupId) + } { + val failedDeleteFileName = extractActualFileName(failedDeleteFile) + deletions(failedDeleteFileName) = DeletionInfo(HoodieTimeline.CLEAN_ACTION, instant.requestedTime, timelineType, "FAILED") + } + } + } + } catch { + case e: Exception => + log.warn(s"Failed to process clean instant ${instant.requestedTime}: ${e.getMessage}") + } + } + + val rollbackInstants = timeline.getRollbackTimeline.getInstants.iterator().asScala + for (instant <- rollbackInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val rollbackMetadata = timeline.readRollbackMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(rollbackMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(rollbackMetadata.getPartitionMetadata.get(partitionPath)) + } { + for { + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala + if matchesDeletedFileGroup(deletedFile, fileGroupId) + } { + val deletedFileName = extractActualFileName(deletedFile) + deletions(deletedFileName) = DeletionInfo(HoodieTimeline.ROLLBACK_ACTION, instant.requestedTime, timelineType, "SUCCESS") + } + for { + failedDeleteFile <- partitionMetadata.getFailedDeleteFiles.asScala + if matchesDeletedFileGroup(failedDeleteFile, fileGroupId) + } { + val failedDeleteFileName = extractActualFileName(failedDeleteFile) + deletions(failedDeleteFileName) = DeletionInfo(HoodieTimeline.ROLLBACK_ACTION, instant.requestedTime, timelineType, "FAILED") Review Comment: same here, can we define static variables for FAILED and reuse them? ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String, deleteStatus: String = "SUCCESS") + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_status", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + val hasInserts = writeStat.getNumInserts > 0 + val hasUpdates = writeStat.getNumUpdateWrites > 0 + val hasDeletes = writeStat.getNumDeletes > 0 + + val operations = Seq( + if (hasInserts) Some("INSERT") else None, + if (hasUpdates) Some("UPDATE") else None, + if (hasDeletes) Some("DELETE") else None + ).flatten + + if (operations.nonEmpty) { + operations.mkString("_AND_") + } else { + "UNKNOWN" + } + } + + def processWriteTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry], + limit: Int, + startTime: String, + endTime: String): Unit = { + + val writeTimeline = timeline.getWriteTimeline + val allInstants = writeTimeline.getInstants.asScala.toSeq + + val filteredInstants = if (startTime.nonEmpty && endTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime && instantTime <= endTime + } + } else if (startTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime + } + } else { + allInstants + } + + if (startTime.nonEmpty && endTime.nonEmpty) { + for (instant <- filteredInstants) { + try { + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } else { + var foundCount = 0 + for (instant <- filteredInstants if foundCount < limit) { + try { + val initialSize = entries.size() + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + if (entries.size() > initialSize) { + foundCount += 1 + } + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } + } + + private def processInstant(timeline: HoodieTimeline, + instant: HoodieInstant, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry]): Unit = { + + if (instant.getState == HoodieInstant.State.INFLIGHT || instant.getState == HoodieInstant.State.REQUESTED) { + val entry = createInflightRequestedEntry(instant, targetPartition, timelineType) + entries.add(entry) + } + + val commitMetadata = readCommitMetadata(timeline, instant) + if (commitMetadata.isDefined) { + val metadata = commitMetadata.get + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(metadata.getPartitionToWriteStats.keySet().asScala.toSet) + + for (partitionPath <- partitionsToCheck) { + val writeStatsOpt = Option(metadata.getPartitionToWriteStats.get(partitionPath)) + if (writeStatsOpt.isDefined) { + val writeStats = writeStatsOpt.get.asScala + for (writeStat <- writeStats) { + if (writeStat.getFileId == fileGroupId) { + val entry = createHistoryEntry(instant, writeStat, partitionPath, timelineType) + entries.add(entry) + } + } + } + } + } + } + + def createHistoryEntry(instant: HoodieInstant, + writeStat: HoodieWriteStat, + targetPartition: String, + timelineType: String): HistoryEntry = { + + val fileName = Option(writeStat.getPath).map(_.split("/").last).getOrElse("") + val operationType = determineOperationType(writeStat) + + val runtimeStats = Option(writeStat.getRuntimeStats) + val scanTime = runtimeStats.map(_.getTotalScanTime).getOrElse(0L) + val upsertTime = runtimeStats.map(_.getTotalUpsertTime).getOrElse(0L) + val createTime = runtimeStats.map(_.getTotalCreateTime).getOrElse(0L) + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition, + fileName = fileName, + operationType = operationType, + numWrites = writeStat.getNumWrites, + numInserts = writeStat.getNumInserts, + numUpdates = writeStat.getNumUpdateWrites, + numDeletes = writeStat.getNumDeletes, + fileSizeBytes = writeStat.getFileSizeInBytes, + totalWriteBytes = writeStat.getTotalWriteBytes, + prevCommit = writeStat.getPrevCommit, + totalWriteErrors = writeStat.getTotalWriteErrors, + totalScanTimeMs = scanTime, + totalUpsertTimeMs = upsertTime, + totalCreateTimeMs = createTime, + prevBaseFile = Option(writeStat.getPrevBaseFile), + columnStatsAvailable = writeStat.getColumnStats.isPresent + ) + } + + def createInflightRequestedEntry(instant: HoodieInstant, + targetPartition: Option[String], + timelineType: String): HistoryEntry = { + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition.getOrElse(""), + fileName = "", + operationType = "UNKNOWN", + numWrites = -1, + numInserts = -1, + numUpdates = -1, + numDeletes = -1, + fileSizeBytes = -1, + totalWriteBytes = -1, + prevCommit = "", + totalWriteErrors = -1, + totalScanTimeMs = -1, + totalUpsertTimeMs = -1, + totalCreateTimeMs = -1, + prevBaseFile = None, + columnStatsAvailable = false + ) + } + + def checkForDeletions(metaClient: HoodieTableMetaClient, + fileGroupId: String, + targetPartition: Option[String], + showArchived: Boolean): Map[String, DeletionInfo] = { + + val deletions = scala.collection.mutable.Map[String, DeletionInfo]() + + checkDeletionsInTimeline(metaClient.getActiveTimeline, fileGroupId, targetPartition, "ACTIVE", deletions) + + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() + checkDeletionsInTimeline(archivedTimeline, fileGroupId, targetPartition, "ARCHIVED", deletions) + } catch { + case e: Exception => + log.warn(s"Failed to check deletions in archived timeline: ${e.getMessage}") + } + } + + deletions.toMap + } + + def checkForReplacements(metaClient: HoodieTableMetaClient, + fileGroupId: String, + targetPartition: Option[String], + showArchived: Boolean): Map[String, ReplacementInfo] = { + + val replacements = scala.collection.mutable.Map[String, ReplacementInfo]() + + checkReplacementsInTimeline(metaClient.getActiveTimeline, fileGroupId, targetPartition, "ACTIVE", replacements) + + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() + checkReplacementsInTimeline(archivedTimeline, fileGroupId, targetPartition, "ARCHIVED", replacements) + } catch { + case e: Exception => + log.warn(s"Failed to check replacements in archived timeline: ${e.getMessage}") + } + } + + replacements.toMap + } + + private def checkDeletionsInTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + deletions: scala.collection.mutable.Map[String, DeletionInfo]): Unit = { + + val cleanInstants = timeline.getCleanerTimeline.getInstants.iterator().asScala + for (instant <- cleanInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val cleanMetadata = timeline.readCleanMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(cleanMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(cleanMetadata.getPartitionMetadata.get(partitionPath)) + } { + for { + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala + if matchesDeletedFileGroup(deletedFile, fileGroupId) + } { + val deletedFileName = extractActualFileName(deletedFile) + deletions(deletedFileName) = DeletionInfo(HoodieTimeline.CLEAN_ACTION, instant.requestedTime, timelineType, "SUCCESS") + } + for { + failedDeleteFile <- partitionMetadata.getFailedDeleteFiles.asScala + if matchesDeletedFileGroup(failedDeleteFile, fileGroupId) + } { + val failedDeleteFileName = extractActualFileName(failedDeleteFile) + deletions(failedDeleteFileName) = DeletionInfo(HoodieTimeline.CLEAN_ACTION, instant.requestedTime, timelineType, "FAILED") + } + } + } + } catch { + case e: Exception => + log.warn(s"Failed to process clean instant ${instant.requestedTime}: ${e.getMessage}") + } + } + + val rollbackInstants = timeline.getRollbackTimeline.getInstants.iterator().asScala + for (instant <- rollbackInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val rollbackMetadata = timeline.readRollbackMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(rollbackMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(rollbackMetadata.getPartitionMetadata.get(partitionPath)) + } { + for { + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala + if matchesDeletedFileGroup(deletedFile, fileGroupId) + } { + val deletedFileName = extractActualFileName(deletedFile) + deletions(deletedFileName) = DeletionInfo(HoodieTimeline.ROLLBACK_ACTION, instant.requestedTime, timelineType, "SUCCESS") + } + for { + failedDeleteFile <- partitionMetadata.getFailedDeleteFiles.asScala + if matchesDeletedFileGroup(failedDeleteFile, fileGroupId) + } { + val failedDeleteFileName = extractActualFileName(failedDeleteFile) + deletions(failedDeleteFileName) = DeletionInfo(HoodieTimeline.ROLLBACK_ACTION, instant.requestedTime, timelineType, "FAILED") + } + } + } + } catch { + case e: Exception => + log.warn(s"Failed to process rollback instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } + + private def checkReplacementsInTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + replacements: scala.collection.mutable.Map[String, ReplacementInfo]): Unit = { + + val replacementInstants = timeline.getCommitsAndCompactionTimeline.getInstants.iterator().asScala + for (instant <- replacementInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val commitMetadata = readCommitMetadata(timeline, instant) + if (commitMetadata.isDefined) { + val metadata = commitMetadata.get + + val isCompaction = (metadata.getCompacted != null && metadata.getCompacted.booleanValue()) || + (metadata.getOperationType != null && metadata.getOperationType.toString.toLowerCase.equals(WriteOperationType.COMPACT.value())) + val isClustering = instant.getAction.equals(HoodieTimeline.REPLACE_COMMIT_ACTION) + + if (isCompaction || isClustering) { + val operationType = if (isCompaction) HoodieTimeline.COMPACTION_ACTION else HoodieTimeline.CLUSTERING_ACTION + + if (isClustering) { + metadata match { + case replaceMetadata: org.apache.hudi.common.model.HoodieReplaceCommitMetadata => + val partitionToReplaceFileIds = replaceMetadata.getPartitionToReplaceFileIds + if (partitionToReplaceFileIds != null) { + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(partitionToReplaceFileIds.keySet().asScala.toSet) + for (partitionPath <- partitionsToCheck) { + val replacedFileIds = Option(partitionToReplaceFileIds.get(partitionPath)) + if (replacedFileIds.isDefined) { + replacedFileIds.get.asScala.foreach { replacedFileId => + if (fileGroupId.isEmpty || fileGroupId.contains(replacedFileId)) { + replacements(replacedFileId) = ReplacementInfo(operationType, instant.requestedTime, timelineType) + } + } + } + } + } + } + } else { + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(metadata.getPartitionToWriteStats.keySet().asScala.toSet) + for (partitionPath <- partitionsToCheck) { + val writeStatsOpt = Option(metadata.getPartitionToWriteStats.get(partitionPath)) + if (writeStatsOpt.isDefined) { + val writeStats = writeStatsOpt.get.asScala + for (writeStat <- writeStats) { + if (writeStat.getPrevCommit != null && writeStat.getFileId == fileGroupId) { + val prevBaseFile = writeStat.getPrevBaseFile + if (prevBaseFile != null && prevBaseFile.nonEmpty) { + val replacedFileName = prevBaseFile.split("/").last + replacements(replacedFileName) = ReplacementInfo(operationType, instant.requestedTime, timelineType) Review Comment: Can we simplify this logic here? chatGPT suggested below ``` for { partitionPath <- partitionsToCheck writeStats <- Option(metadata.getPartitionToWriteStats.get(partitionPath)).toSeq writeStat <- writeStats.asScala if writeStat.getPrevCommit != null if writeStat.getFileId == fileGroupId prevBaseFile = writeStat.getPrevBaseFile if prevBaseFile != null && prevBaseFile.nonEmpty } { val replacedFileName = prevBaseFile.split("/").last replacements(replacedFileName) = ReplacementInfo(operationType, instant.requestedTime, timelineType) } ``` ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String, deleteStatus: String = "SUCCESS") + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_status", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + val hasInserts = writeStat.getNumInserts > 0 + val hasUpdates = writeStat.getNumUpdateWrites > 0 + val hasDeletes = writeStat.getNumDeletes > 0 + + val operations = Seq( + if (hasInserts) Some("INSERT") else None, + if (hasUpdates) Some("UPDATE") else None, + if (hasDeletes) Some("DELETE") else None + ).flatten + + if (operations.nonEmpty) { + operations.mkString("_AND_") + } else { + "UNKNOWN" + } + } + + def processWriteTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry], + limit: Int, + startTime: String, + endTime: String): Unit = { + + val writeTimeline = timeline.getWriteTimeline + val allInstants = writeTimeline.getInstants.asScala.toSeq + + val filteredInstants = if (startTime.nonEmpty && endTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime && instantTime <= endTime + } + } else if (startTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime + } + } else { + allInstants + } + + if (startTime.nonEmpty && endTime.nonEmpty) { + for (instant <- filteredInstants) { + try { + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } else { + var foundCount = 0 + for (instant <- filteredInstants if foundCount < limit) { + try { + val initialSize = entries.size() + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + if (entries.size() > initialSize) { + foundCount += 1 + } + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } + } + + private def processInstant(timeline: HoodieTimeline, + instant: HoodieInstant, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry]): Unit = { + + if (instant.getState == HoodieInstant.State.INFLIGHT || instant.getState == HoodieInstant.State.REQUESTED) { + val entry = createInflightRequestedEntry(instant, targetPartition, timelineType) + entries.add(entry) + } + + val commitMetadata = readCommitMetadata(timeline, instant) + if (commitMetadata.isDefined) { + val metadata = commitMetadata.get + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(metadata.getPartitionToWriteStats.keySet().asScala.toSet) + + for (partitionPath <- partitionsToCheck) { + val writeStatsOpt = Option(metadata.getPartitionToWriteStats.get(partitionPath)) + if (writeStatsOpt.isDefined) { + val writeStats = writeStatsOpt.get.asScala + for (writeStat <- writeStats) { + if (writeStat.getFileId == fileGroupId) { + val entry = createHistoryEntry(instant, writeStat, partitionPath, timelineType) + entries.add(entry) + } + } + } + } + } + } + + def createHistoryEntry(instant: HoodieInstant, + writeStat: HoodieWriteStat, + targetPartition: String, + timelineType: String): HistoryEntry = { + + val fileName = Option(writeStat.getPath).map(_.split("/").last).getOrElse("") + val operationType = determineOperationType(writeStat) + + val runtimeStats = Option(writeStat.getRuntimeStats) + val scanTime = runtimeStats.map(_.getTotalScanTime).getOrElse(0L) + val upsertTime = runtimeStats.map(_.getTotalUpsertTime).getOrElse(0L) + val createTime = runtimeStats.map(_.getTotalCreateTime).getOrElse(0L) + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition, + fileName = fileName, + operationType = operationType, + numWrites = writeStat.getNumWrites, + numInserts = writeStat.getNumInserts, + numUpdates = writeStat.getNumUpdateWrites, + numDeletes = writeStat.getNumDeletes, + fileSizeBytes = writeStat.getFileSizeInBytes, + totalWriteBytes = writeStat.getTotalWriteBytes, + prevCommit = writeStat.getPrevCommit, + totalWriteErrors = writeStat.getTotalWriteErrors, + totalScanTimeMs = scanTime, + totalUpsertTimeMs = upsertTime, + totalCreateTimeMs = createTime, + prevBaseFile = Option(writeStat.getPrevBaseFile), + columnStatsAvailable = writeStat.getColumnStats.isPresent + ) + } + + def createInflightRequestedEntry(instant: HoodieInstant, + targetPartition: Option[String], + timelineType: String): HistoryEntry = { + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition.getOrElse(""), + fileName = "", + operationType = "UNKNOWN", + numWrites = -1, + numInserts = -1, + numUpdates = -1, + numDeletes = -1, + fileSizeBytes = -1, + totalWriteBytes = -1, + prevCommit = "", + totalWriteErrors = -1, + totalScanTimeMs = -1, + totalUpsertTimeMs = -1, + totalCreateTimeMs = -1, + prevBaseFile = None, + columnStatsAvailable = false + ) + } + + def checkForDeletions(metaClient: HoodieTableMetaClient, + fileGroupId: String, + targetPartition: Option[String], + showArchived: Boolean): Map[String, DeletionInfo] = { + + val deletions = scala.collection.mutable.Map[String, DeletionInfo]() + + checkDeletionsInTimeline(metaClient.getActiveTimeline, fileGroupId, targetPartition, "ACTIVE", deletions) + + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() + checkDeletionsInTimeline(archivedTimeline, fileGroupId, targetPartition, "ARCHIVED", deletions) + } catch { + case e: Exception => + log.warn(s"Failed to check deletions in archived timeline: ${e.getMessage}") + } + } + + deletions.toMap + } + + def checkForReplacements(metaClient: HoodieTableMetaClient, + fileGroupId: String, + targetPartition: Option[String], + showArchived: Boolean): Map[String, ReplacementInfo] = { + + val replacements = scala.collection.mutable.Map[String, ReplacementInfo]() + + checkReplacementsInTimeline(metaClient.getActiveTimeline, fileGroupId, targetPartition, "ACTIVE", replacements) + + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() + checkReplacementsInTimeline(archivedTimeline, fileGroupId, targetPartition, "ARCHIVED", replacements) + } catch { + case e: Exception => + log.warn(s"Failed to check replacements in archived timeline: ${e.getMessage}") + } + } + + replacements.toMap + } + + private def checkDeletionsInTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + deletions: scala.collection.mutable.Map[String, DeletionInfo]): Unit = { + + val cleanInstants = timeline.getCleanerTimeline.getInstants.iterator().asScala + for (instant <- cleanInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val cleanMetadata = timeline.readCleanMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(cleanMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(cleanMetadata.getPartitionMetadata.get(partitionPath)) + } { + for { + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala + if matchesDeletedFileGroup(deletedFile, fileGroupId) + } { + val deletedFileName = extractActualFileName(deletedFile) + deletions(deletedFileName) = DeletionInfo(HoodieTimeline.CLEAN_ACTION, instant.requestedTime, timelineType, "SUCCESS") + } + for { + failedDeleteFile <- partitionMetadata.getFailedDeleteFiles.asScala + if matchesDeletedFileGroup(failedDeleteFile, fileGroupId) + } { + val failedDeleteFileName = extractActualFileName(failedDeleteFile) + deletions(failedDeleteFileName) = DeletionInfo(HoodieTimeline.CLEAN_ACTION, instant.requestedTime, timelineType, "FAILED") + } + } + } + } catch { + case e: Exception => + log.warn(s"Failed to process clean instant ${instant.requestedTime}: ${e.getMessage}") + } + } + + val rollbackInstants = timeline.getRollbackTimeline.getInstants.iterator().asScala + for (instant <- rollbackInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val rollbackMetadata = timeline.readRollbackMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(rollbackMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(rollbackMetadata.getPartitionMetadata.get(partitionPath)) + } { + for { + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala + if matchesDeletedFileGroup(deletedFile, fileGroupId) + } { + val deletedFileName = extractActualFileName(deletedFile) + deletions(deletedFileName) = DeletionInfo(HoodieTimeline.ROLLBACK_ACTION, instant.requestedTime, timelineType, "SUCCESS") + } + for { + failedDeleteFile <- partitionMetadata.getFailedDeleteFiles.asScala + if matchesDeletedFileGroup(failedDeleteFile, fileGroupId) + } { + val failedDeleteFileName = extractActualFileName(failedDeleteFile) + deletions(failedDeleteFileName) = DeletionInfo(HoodieTimeline.ROLLBACK_ACTION, instant.requestedTime, timelineType, "FAILED") + } + } + } + } catch { + case e: Exception => + log.warn(s"Failed to process rollback instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } + + private def checkReplacementsInTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + replacements: scala.collection.mutable.Map[String, ReplacementInfo]): Unit = { + + val replacementInstants = timeline.getCommitsAndCompactionTimeline.getInstants.iterator().asScala + for (instant <- replacementInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val commitMetadata = readCommitMetadata(timeline, instant) + if (commitMetadata.isDefined) { + val metadata = commitMetadata.get + + val isCompaction = (metadata.getCompacted != null && metadata.getCompacted.booleanValue()) || + (metadata.getOperationType != null && metadata.getOperationType.toString.toLowerCase.equals(WriteOperationType.COMPACT.value())) + val isClustering = instant.getAction.equals(HoodieTimeline.REPLACE_COMMIT_ACTION) + + if (isCompaction || isClustering) { + val operationType = if (isCompaction) HoodieTimeline.COMPACTION_ACTION else HoodieTimeline.CLUSTERING_ACTION + + if (isClustering) { + metadata match { + case replaceMetadata: org.apache.hudi.common.model.HoodieReplaceCommitMetadata => + val partitionToReplaceFileIds = replaceMetadata.getPartitionToReplaceFileIds + if (partitionToReplaceFileIds != null) { + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(partitionToReplaceFileIds.keySet().asScala.toSet) + for (partitionPath <- partitionsToCheck) { + val replacedFileIds = Option(partitionToReplaceFileIds.get(partitionPath)) + if (replacedFileIds.isDefined) { + replacedFileIds.get.asScala.foreach { replacedFileId => + if (fileGroupId.isEmpty || fileGroupId.contains(replacedFileId)) { + replacements(replacedFileId) = ReplacementInfo(operationType, instant.requestedTime, timelineType) + } + } + } + } + } + } + } else { + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(metadata.getPartitionToWriteStats.keySet().asScala.toSet) + for (partitionPath <- partitionsToCheck) { + val writeStatsOpt = Option(metadata.getPartitionToWriteStats.get(partitionPath)) + if (writeStatsOpt.isDefined) { + val writeStats = writeStatsOpt.get.asScala + for (writeStat <- writeStats) { + if (writeStat.getPrevCommit != null && writeStat.getFileId == fileGroupId) { + val prevBaseFile = writeStat.getPrevBaseFile + if (prevBaseFile != null && prevBaseFile.nonEmpty) { + val replacedFileName = prevBaseFile.split("/").last + replacements(replacedFileName) = ReplacementInfo(operationType, instant.requestedTime, timelineType) Review Comment: Also I see similar logic in multiple places. Can we reduce duplicate code by extracting this logic to a method ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowFileHistoryProcedureUtils.scala: ########## @@ -0,0 +1,481 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieWriteStat, WriteOperationType} +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} +import org.apache.hudi.common.util.ClusteringUtils + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util + +import scala.collection.JavaConverters._ + +case class HistoryEntry(instantTime: String, + completionTime: String, + action: String, + timelineType: String, + state: String, + partitionPath: String, + fileName: String, + operationType: String, + numWrites: Long, + numInserts: Long, + numUpdates: Long, + numDeletes: Long, + fileSizeBytes: Long, + totalWriteBytes: Long, + prevCommit: String, + totalWriteErrors: Long, + totalScanTimeMs: Long, + totalUpsertTimeMs: Long, + totalCreateTimeMs: Long, + prevBaseFile: Option[String], + columnStatsAvailable: Boolean + ) + +case class DeletionInfo(action: String, instant: String, timelineType: String, deleteStatus: String = "SUCCESS") + +case class ReplacementInfo(action: String, instant: String, timelineType: String) + +object ShowFileHistoryProcedureUtils extends Logging { + + val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("completion_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("timeline_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("partition_path", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("file_name", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("operation_type", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_updates", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("file_size_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_write_bytes", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_commit", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_deleted", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("delete_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("delete_status", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("was_replaced", DataTypes.BooleanType, nullable = true, Metadata.empty), + StructField("replace_action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("replace_instant", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("total_write_errors", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_scan_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_upsert_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("total_create_time_ms", DataTypes.LongType, nullable = true, Metadata.empty), + StructField("prev_base_file", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("column_stats_available", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + def readCommitMetadata(timeline: HoodieTimeline, instant: HoodieInstant): Option[HoodieCommitMetadata] = { + try { + if (ClusteringUtils.isClusteringOrReplaceCommitAction(instant.getAction)) { + Option(timeline.readReplaceCommitMetadata(instant)) + } else { + Option(timeline.readCommitMetadata(instant)) + } + } catch { + case e: Exception => + log.warn(s"Failed to read commit metadata for ${instant.requestedTime}: ${e.getMessage}") + None + } + } + + def determineOperationType(writeStat: HoodieWriteStat): String = { + val hasInserts = writeStat.getNumInserts > 0 + val hasUpdates = writeStat.getNumUpdateWrites > 0 + val hasDeletes = writeStat.getNumDeletes > 0 + + val operations = Seq( + if (hasInserts) Some("INSERT") else None, + if (hasUpdates) Some("UPDATE") else None, + if (hasDeletes) Some("DELETE") else None + ).flatten + + if (operations.nonEmpty) { + operations.mkString("_AND_") + } else { + "UNKNOWN" + } + } + + def processWriteTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry], + limit: Int, + startTime: String, + endTime: String): Unit = { + + val writeTimeline = timeline.getWriteTimeline + val allInstants = writeTimeline.getInstants.asScala.toSeq + + val filteredInstants = if (startTime.nonEmpty && endTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime && instantTime <= endTime + } + } else if (startTime.nonEmpty) { + allInstants.filter { instant => + val instantTime = instant.requestedTime() + instantTime >= startTime + } + } else { + allInstants + } + + if (startTime.nonEmpty && endTime.nonEmpty) { + for (instant <- filteredInstants) { + try { + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } else { + var foundCount = 0 + for (instant <- filteredInstants if foundCount < limit) { + try { + val initialSize = entries.size() + processInstant(timeline, instant, fileGroupId, targetPartition, timelineType, entries) + if (entries.size() > initialSize) { + foundCount += 1 + } + } catch { + case e: Exception => + log.warn(s"Failed to process instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } + } + + private def processInstant(timeline: HoodieTimeline, + instant: HoodieInstant, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + entries: util.ArrayList[HistoryEntry]): Unit = { + + if (instant.getState == HoodieInstant.State.INFLIGHT || instant.getState == HoodieInstant.State.REQUESTED) { + val entry = createInflightRequestedEntry(instant, targetPartition, timelineType) + entries.add(entry) + } + + val commitMetadata = readCommitMetadata(timeline, instant) + if (commitMetadata.isDefined) { + val metadata = commitMetadata.get + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(metadata.getPartitionToWriteStats.keySet().asScala.toSet) + + for (partitionPath <- partitionsToCheck) { + val writeStatsOpt = Option(metadata.getPartitionToWriteStats.get(partitionPath)) + if (writeStatsOpt.isDefined) { + val writeStats = writeStatsOpt.get.asScala + for (writeStat <- writeStats) { + if (writeStat.getFileId == fileGroupId) { + val entry = createHistoryEntry(instant, writeStat, partitionPath, timelineType) + entries.add(entry) + } + } + } + } + } + } + + def createHistoryEntry(instant: HoodieInstant, + writeStat: HoodieWriteStat, + targetPartition: String, + timelineType: String): HistoryEntry = { + + val fileName = Option(writeStat.getPath).map(_.split("/").last).getOrElse("") + val operationType = determineOperationType(writeStat) + + val runtimeStats = Option(writeStat.getRuntimeStats) + val scanTime = runtimeStats.map(_.getTotalScanTime).getOrElse(0L) + val upsertTime = runtimeStats.map(_.getTotalUpsertTime).getOrElse(0L) + val createTime = runtimeStats.map(_.getTotalCreateTime).getOrElse(0L) + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition, + fileName = fileName, + operationType = operationType, + numWrites = writeStat.getNumWrites, + numInserts = writeStat.getNumInserts, + numUpdates = writeStat.getNumUpdateWrites, + numDeletes = writeStat.getNumDeletes, + fileSizeBytes = writeStat.getFileSizeInBytes, + totalWriteBytes = writeStat.getTotalWriteBytes, + prevCommit = writeStat.getPrevCommit, + totalWriteErrors = writeStat.getTotalWriteErrors, + totalScanTimeMs = scanTime, + totalUpsertTimeMs = upsertTime, + totalCreateTimeMs = createTime, + prevBaseFile = Option(writeStat.getPrevBaseFile), + columnStatsAvailable = writeStat.getColumnStats.isPresent + ) + } + + def createInflightRequestedEntry(instant: HoodieInstant, + targetPartition: Option[String], + timelineType: String): HistoryEntry = { + + HistoryEntry( + instantTime = instant.requestedTime, + completionTime = instant.getCompletionTime, + action = instant.getAction, + timelineType = timelineType, + state = instant.getState.toString, + partitionPath = targetPartition.getOrElse(""), + fileName = "", + operationType = "UNKNOWN", + numWrites = -1, + numInserts = -1, + numUpdates = -1, + numDeletes = -1, + fileSizeBytes = -1, + totalWriteBytes = -1, + prevCommit = "", + totalWriteErrors = -1, + totalScanTimeMs = -1, + totalUpsertTimeMs = -1, + totalCreateTimeMs = -1, + prevBaseFile = None, + columnStatsAvailable = false + ) + } + + def checkForDeletions(metaClient: HoodieTableMetaClient, + fileGroupId: String, + targetPartition: Option[String], + showArchived: Boolean): Map[String, DeletionInfo] = { + + val deletions = scala.collection.mutable.Map[String, DeletionInfo]() + + checkDeletionsInTimeline(metaClient.getActiveTimeline, fileGroupId, targetPartition, "ACTIVE", deletions) + + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() + checkDeletionsInTimeline(archivedTimeline, fileGroupId, targetPartition, "ARCHIVED", deletions) + } catch { + case e: Exception => + log.warn(s"Failed to check deletions in archived timeline: ${e.getMessage}") + } + } + + deletions.toMap + } + + def checkForReplacements(metaClient: HoodieTableMetaClient, + fileGroupId: String, + targetPartition: Option[String], + showArchived: Boolean): Map[String, ReplacementInfo] = { + + val replacements = scala.collection.mutable.Map[String, ReplacementInfo]() + + checkReplacementsInTimeline(metaClient.getActiveTimeline, fileGroupId, targetPartition, "ACTIVE", replacements) + + if (showArchived) { + try { + val archivedTimeline = metaClient.getArchivedTimeline.reload() + archivedTimeline.loadCompletedInstantDetailsInMemory() + checkReplacementsInTimeline(archivedTimeline, fileGroupId, targetPartition, "ARCHIVED", replacements) + } catch { + case e: Exception => + log.warn(s"Failed to check replacements in archived timeline: ${e.getMessage}") + } + } + + replacements.toMap + } + + private def checkDeletionsInTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + deletions: scala.collection.mutable.Map[String, DeletionInfo]): Unit = { + + val cleanInstants = timeline.getCleanerTimeline.getInstants.iterator().asScala + for (instant <- cleanInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val cleanMetadata = timeline.readCleanMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(cleanMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(cleanMetadata.getPartitionMetadata.get(partitionPath)) + } { + for { + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala + if matchesDeletedFileGroup(deletedFile, fileGroupId) + } { + val deletedFileName = extractActualFileName(deletedFile) + deletions(deletedFileName) = DeletionInfo(HoodieTimeline.CLEAN_ACTION, instant.requestedTime, timelineType, "SUCCESS") + } + for { + failedDeleteFile <- partitionMetadata.getFailedDeleteFiles.asScala + if matchesDeletedFileGroup(failedDeleteFile, fileGroupId) + } { + val failedDeleteFileName = extractActualFileName(failedDeleteFile) + deletions(failedDeleteFileName) = DeletionInfo(HoodieTimeline.CLEAN_ACTION, instant.requestedTime, timelineType, "FAILED") + } + } + } + } catch { + case e: Exception => + log.warn(s"Failed to process clean instant ${instant.requestedTime}: ${e.getMessage}") + } + } + + val rollbackInstants = timeline.getRollbackTimeline.getInstants.iterator().asScala + for (instant <- rollbackInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val rollbackMetadata = timeline.readRollbackMetadata(instant) + val partitionsToCheck = targetPartition.map(Set(_)).getOrElse(rollbackMetadata.getPartitionMetadata.keySet().asScala.toSet) + + for { + partitionPath <- partitionsToCheck + partitionMetadata <- Option(rollbackMetadata.getPartitionMetadata.get(partitionPath)) + } { + for { + deletedFile <- partitionMetadata.getSuccessDeleteFiles.asScala + if matchesDeletedFileGroup(deletedFile, fileGroupId) + } { + val deletedFileName = extractActualFileName(deletedFile) + deletions(deletedFileName) = DeletionInfo(HoodieTimeline.ROLLBACK_ACTION, instant.requestedTime, timelineType, "SUCCESS") + } + for { + failedDeleteFile <- partitionMetadata.getFailedDeleteFiles.asScala + if matchesDeletedFileGroup(failedDeleteFile, fileGroupId) + } { + val failedDeleteFileName = extractActualFileName(failedDeleteFile) + deletions(failedDeleteFileName) = DeletionInfo(HoodieTimeline.ROLLBACK_ACTION, instant.requestedTime, timelineType, "FAILED") + } + } + } + } catch { + case e: Exception => + log.warn(s"Failed to process rollback instant ${instant.requestedTime}: ${e.getMessage}") + } + } + } + + private def checkReplacementsInTimeline(timeline: HoodieTimeline, + fileGroupId: String, + targetPartition: Option[String], + timelineType: String, + replacements: scala.collection.mutable.Map[String, ReplacementInfo]): Unit = { + + val replacementInstants = timeline.getCommitsAndCompactionTimeline.getInstants.iterator().asScala + for (instant <- replacementInstants) { + try { + if (instant.getState == HoodieInstant.State.COMPLETED) { + val commitMetadata = readCommitMetadata(timeline, instant) + if (commitMetadata.isDefined) { + val metadata = commitMetadata.get Review Comment: we're only looking for compaction and clustering instants, so why not filter for them directly? why readCommitMetadata for all instants when we're not using them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
