hudi-agent commented on code in PR #18709: URL: https://github.com/apache/hudi/pull/18709#discussion_r3211517961
########## hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieTimelineCleanupUtil.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; +import org.apache.hudi.common.table.timeline.HoodieTimeline; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Date; +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class HoodieTimelineCleanupUtil { + private static final Logger LOG = LoggerFactory.getLogger(HoodieTimelineCleanupUtil.class); + + public static List<HoodieInstant> inflightWriteCommitsOlderThan(HoodieTableMetaClient metaClient, long mins, boolean includeIngestionCommits) { Review Comment: 🤖 nit: could you rename `mins` to `ageMinutes`? `mins` is a common abbreviation for both "minutes" and "minimum", so `ageMinutes` makes the intent unambiguous at every call site. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CleanupStaleInflightCommitsProcedure.scala: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.{HoodieCLIUtils, HoodieTimelineCleanupUtil} +import org.apache.hudi.client.SparkRDDWriteClient +import org.apache.hudi.common.HoodiePendingRollbackInfo +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.HoodieTimeline +import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.hadoop.fs.HadoopFSUtils +import org.apache.hudi.table.HoodieSparkTable + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +/** + * Spark SQL stored procedure to roll back stale inflight write commits older than a configurable + * age threshold. + * + * SAFETY WARNING: Unlike rollback_to_instant_time which targets a single named instant, this + * procedure rolls back a SET of instants matched by age. A misconfigured threshold can erase + * recent in-progress writes when include_ingestion_commits=true. Operators should call + * show_inflight_commits first to preview the pending instants, and use dry_run => true (see + * below) when in doubt about which instants will be processed. + * + * This procedure targets the write timeline (COMMIT, DELTA_COMMIT, COMPACTION, LOG_COMPACTION, + * REPLACE_COMMIT, CLUSTERING actions). Stale rollback, clean, or restore inflights visible in + * show_inflight_commits are NOT covered by this procedure. + * + * Compaction, log-compaction, and clustering inflights are handled via the targeted table + * methods (table.rollbackInflightCompaction / rollbackInflightLogCompaction / + * rollbackInflightClustering) — the supporting state (HoodieSparkTable, table-service client, + * pending-rollback lookup) is constructed lazily on the first such instant and reused. + * + * Parameters: + * - table: Required. Catalog name of the Hudi table. + * - allowed_inflight_interval_minutes: Optional (default 180). Instants older than this many + * minutes are considered stale and eligible for rollback. + * - include_ingestion_commits: Optional (default false). DANGEROUS when true: enabling + * this allows the procedure to roll back COMMIT_ACTION and + * DELTA_COMMIT_ACTION inflights, which means it can drop + * in-progress ingestion data. The default false is the + * safe choice; true is for operators recovering from a + * known-stuck ingestion job. + * - dry_run: Optional (default false). When true, the matched-instant + * set is resolved exactly as in normal mode but no rollback + * calls are issued. Each returned row carries + * rollback_status = NULL meaning "matched but not acted + * upon". Re-run with dry_run => false to act. + * + * Output columns (one row per processed instant): + * - instant_time: The instant's requested timestamp. + * - action: The action type. + * - rollback_status: true if the rollback succeeded, false if the rollback failed, + * NULL if dry_run was true (matched but not actioned). + * + * Example usage: + * {{{ + * -- Clean stale table-service inflights (default 180-min threshold) + * CALL cleanup_stale_inflight_commits(table => 'my_table'); + * + * -- Preview what would be processed without acting + * CALL cleanup_stale_inflight_commits(table => 'my_table', dry_run => true); + * + * -- Clean stale inflights older than 1 hour, including ingestion commits (DANGEROUS) + * CALL cleanup_stale_inflight_commits( + * table => 'my_table', + * allowed_inflight_interval_minutes => 60, + * include_ingestion_commits => true + * ); + * }}} + * + * For inflight commit types not covered by this procedure (clean, restore, rollback inflights), + * use hudi-cli's `repair rollback` command. + */ +class CleanupStaleInflightCommitsProcedure extends BaseProcedure with ProcedureBuilder with Logging { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType), + ProcedureParameter.optional(1, "allowed_inflight_interval_minutes", DataTypes.IntegerType, 180), + ProcedureParameter.optional(2, "include_ingestion_commits", DataTypes.BooleanType, false), + ProcedureParameter.optional(3, "dry_run", DataTypes.BooleanType, false) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("rollback_status", DataTypes.BooleanType, nullable = true, Metadata.empty) + )) + + override def parameters: Array[ProcedureParameter] = PARAMETERS + + override def outputType: StructType = OUTPUT_TYPE + + override def build: Procedure = new CleanupStaleInflightCommitsProcedure() + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val allowedMinutes = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int] + val includeIngestionCommits = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[Boolean] + val dryRun = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[Boolean] + + val basePath = getBasePath(tableName) + val metaClient = HoodieTableMetaClient.builder + .setConf(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration)) + .setBasePath(basePath) + .build + + val staleInflights = HoodieTimelineCleanupUtil + .inflightWriteCommitsOlderThan(metaClient, allowedMinutes.toLong, includeIngestionCommits) + + if (staleInflights.isEmpty) { + Seq.empty[Row] + } else if (dryRun) { + // Dry-run: do not open the table for write — emit preview rows with NULL rollback_status + // to mean "matched but not actioned". Re-run with dry_run => false to act. + staleInflights.asScala.map { instant => + Row(instant.requestedTime, instant.getAction, null) + }.toSeq + } else { + // Pass ROLLBACK_USING_MARKERS_ENABLE=false via the createHoodieWriteClient confs Map. + // Inflight commits may not have marker files, so timeline-based rollback is required. + // The user-specified confs win over defaults / table config / session conf — see + // HoodieCLIUtils.scala "Priority: defaults < catalog props < table config < sparkSession conf < specified conf". + val confs = Map(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key() -> "false") + var client: SparkRDDWriteClient[_] = null + try { + client = HoodieCLIUtils.createHoodieWriteClient(sparkSession, basePath, confs, + tableName.asInstanceOf[scala.Option[String]]) + + // Lazy state for compaction / log-compaction / clustering branches. + // For matched sets that contain no such instants, none of these are constructed. + lazy val tsClient = client.getTableServiceClient + lazy val table = HoodieSparkTable.create(client.getConfig, client.getEngineContext) + lazy val getPendingRollbackInstantFunc: java.util.function.Function[String, HOption[HoodiePendingRollbackInfo]] = + new java.util.function.Function[String, HOption[HoodiePendingRollbackInfo]] { + override def apply(commitToRollback: String): HOption[HoodiePendingRollbackInfo] = { + tsClient.getPendingRollbackInfo(table.getMetaClient, commitToRollback, false) + } + } + + val rows = staleInflights.asScala.map { instant => + val status: java.lang.Boolean = try { + val result: Boolean = instant.getAction match { + case HoodieTimeline.COMPACTION_ACTION => Review Comment: 🤖 Have you considered the case where a concurrent writer completes the inflight between detection in `inflightWriteCommitsOlderThan` and this `client.rollback(...)` call? `BaseHoodieTableServiceClient.rollback` looks up `commitInstantOpt` from `getCommitsTimeline()` (completed + pending), so if T just completed and is the latest commit, `validateRollbackCommitSequence` passes (no instants after T) and the now-completed commit gets rolled back destructively. The `table.rollbackInflight*` branches above fail loudly via `revertInstantFromInflightToRequested` if the inflight is gone, but this branch doesn't have an inflight-state recheck. With the 180-min default it's a narrow window, but `allowed_inflight_interval_minutes => 1` widens it significantly. Worth either re-reading the timeline inside the loop and skipping if the instant is no longer in `INFLIGHT`/`REQUESTED`, or documenting the assumption. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInflightCommitsProcedure.scala: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.common.table.HoodieTableMetaClient +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator +import org.apache.hudi.hadoop.fs.HadoopFSUtils + +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.time.Duration +import java.util.Date +import java.util.function.Supplier + +import scala.collection.JavaConverters._ + +/** + * Spark SQL stored procedure to list all pending inflight and requested instants on a Hudi table. + * + * Unlike cleanup_stale_inflight_commits (which targets the write timeline only), this procedure + * queries the full active timeline and therefore includes inflight rollback, clean, restore, and + * indexing instants in addition to write-action instants. A stale rollback or clean inflight + * visible here cannot be cleaned by cleanup_stale_inflight_commits. + * + * Parameters: + * - table: Required. Catalog name of the Hudi table. + * - min_age_minutes: Optional (default 0). When > 0, only instants older than this many minutes + * are returned. When 0 (default), all pending instants are returned. + * + * Output columns (one row per pending instant): + * - instant_time: The instant's requested timestamp. + * - action: The action type (commit, delta_commit, compaction, replace, rollback, clean, etc.). + * - state: The instant's state (REQUESTED or INFLIGHT). + * + * Example usage: + * {{{ + * -- Show all pending inflights + * CALL show_inflight_commits(table => 'my_table'); + * + * -- Show only inflights older than 2 hours + * CALL show_inflight_commits(table => 'my_table', min_age_minutes => 120); + * }}} + */ +class ShowInflightCommitsProcedure extends BaseProcedure with ProcedureBuilder { + + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType), + ProcedureParameter.optional(1, "min_age_minutes", DataTypes.IntegerType, 0) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("instant_time", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("action", DataTypes.StringType, nullable = true, Metadata.empty), + StructField("state", DataTypes.StringType, nullable = true, Metadata.empty) + )) + + override def parameters: Array[ProcedureParameter] = PARAMETERS + + override def outputType: StructType = OUTPUT_TYPE + + override def build: Procedure = new ShowInflightCommitsProcedure() + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val minAgeMinutes = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int] + + val basePath = getBasePath(tableName) + val metaClient = HoodieTableMetaClient.builder + .setConf(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration)) + .setBasePath(basePath) + .build + + val baseTimeline = metaClient.reloadActiveTimeline().filterInflightsAndRequested() + + val timeline = if (minAgeMinutes > 0) { + val goBackMs = Duration.ofMinutes(minAgeMinutes).getSeconds * 1000L Review Comment: 🤖 nit: same as in `HoodieTimelineCleanupUtil` — `Duration.ofMinutes(minAgeMinutes).toMillis()` would be simpler than `.getSeconds * 1000L`. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieTimelineCleanupUtil.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; +import org.apache.hudi.common.table.timeline.HoodieTimeline; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Date; +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class HoodieTimelineCleanupUtil { + private static final Logger LOG = LoggerFactory.getLogger(HoodieTimelineCleanupUtil.class); + + public static List<HoodieInstant> inflightWriteCommitsOlderThan(HoodieTableMetaClient metaClient, long mins, boolean includeIngestionCommits) { + long goBackMs = Duration.ofMinutes(mins).getSeconds() * 1000; Review Comment: 🤖 nit: `Duration.ofMinutes(mins).toMillis()` is more direct than `.getSeconds() * 1000` — avoids the manual unit conversion and is harder to get wrong. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
