This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d94b2e2c3162 feat(spark): add show_inflight_commits and
cleanup_stale_inflight_com… (#18709)
d94b2e2c3162 is described below
commit d94b2e2c3162fefd26a7c86301228f6c2c74ceef
Author: Mahsood Ebrahim <[email protected]>
AuthorDate: Mon May 18 23:44:27 2026 -0700
feat(spark): add show_inflight_commits and cleanup_stale_inflight_com…
(#18709)
* feat(spark): add show_inflight_commits and cleanup_stale_inflight_commits
stored procedures
Two new CALL procedures so operators can inspect and remediate stale
inflight commits via SQL instead of using hudi-cli.
show_inflight_commits(table, min_age_minutes?) lists REQUESTED+INFLIGHT
instants from the active timeline.
cleanup_stale_inflight_commits(table, allowed_inflight_interval_minutes?,
include_ingestion_commits?, dry_run?) rolls back stale write-timeline
inflights older than the threshold (default 180 min). COMPACTION,
LOG_COMPACTION, and CLUSTERING route to their dedicated
table.rollbackInflight* methods (HoodieSparkTable is lazy-init on first
such instant); other write actions go through client.rollback().
include_ingestion_commits and dry_run both default to false; dry_run
emits rollback_status=NULL and skips write-client construction.
A single-method utility HoodieTimelineCleanupUtil
(inflightWriteCommitsOlderThan) is added in hudi-spark-common.
Tested with 4 show + 9 cleanup unit tests covering empty/threshold/
ingestion-gating/dry_run paths plus COMPACTION, CLUSTERING, partitioned
COW, and MOR delta_commit. checkstyle + scalastyle clean.
* address review comments on show_inflight_commits /
cleanup_stale_inflight_commits
- Rename HoodieTimelineCleanupUtil.inflightWriteCommitsOlderThan param
`mins` -> `ageMinutes` for clarity.
- Replace Duration.ofMinutes(...).getSeconds() * 1000 with .toMillis()
in HoodieTimelineCleanupUtil and ShowInflightCommitsProcedure.
- Add inflight-state recheck in CleanupStaleInflightCommitsProcedure
before client.rollback(): reloads the active timeline and skips the
rollback if the instant is no longer INFLIGHT/REQUESTED, preventing
destructive rollback of a commit that completed concurrently after
detection.
---------
Co-authored-by: mahsoode <[email protected]>
---
.../org/apache/hudi/HoodieTimelineCleanupUtil.java | 59 +++
.../CleanupStaleInflightCommitsProcedure.scala | 224 +++++++++++
.../hudi/command/procedures/HoodieProcedures.scala | 2 +
.../procedures/ShowInflightCommitsProcedure.scala | 113 ++++++
.../TestCleanupStaleInflightCommitsProcedure.scala | 413 +++++++++++++++++++++
.../TestShowInflightCommitsProcedure.scala | 151 ++++++++
6 files changed, 962 insertions(+)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieTimelineCleanupUtil.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieTimelineCleanupUtil.java
new file mode 100644
index 000000000000..f8583d5d5d2a
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieTimelineCleanupUtil.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Date;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class HoodieTimelineCleanupUtil {
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieTimelineCleanupUtil.class);
+
+ public static List<HoodieInstant>
inflightWriteCommitsOlderThan(HoodieTableMetaClient metaClient, long
ageMinutes, boolean includeIngestionCommits) {
+ long goBackMs = Duration.ofMinutes(ageMinutes).toMillis();
+ String oldestAllowedTimestamp = HoodieInstantTimeGenerator.formatDate(new
Date(System.currentTimeMillis() - goBackMs));
+
+ Stream<HoodieInstant> inflightInstants = metaClient
+ .reloadActiveTimeline()
+ .getWriteTimeline()
+ .filterInflightsAndRequested()
+ .findInstantsBefore(oldestAllowedTimestamp)
+ .getInstants().stream();
+
+ if (!includeIngestionCommits) {
+ Predicate<HoodieInstant> ingestionCommitsFilter =
+ (x) -> x.getAction().equals(HoodieTimeline.COMMIT_ACTION) ||
x.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION);
+ inflightInstants =
inflightInstants.filter(ingestionCommitsFilter.negate());
+ }
+ List<HoodieInstant> inflightInstantsList =
inflightInstants.collect(Collectors.toList());
+ LOG.info("Inflight commits older than {} minutes: {}", ageMinutes,
inflightInstantsList);
+ return inflightInstantsList;
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CleanupStaleInflightCommitsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CleanupStaleInflightCommitsProcedure.scala
new file mode 100644
index 000000000000..66e31302e45b
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/CleanupStaleInflightCommitsProcedure.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.{HoodieCLIUtils, HoodieTimelineCleanupUtil}
+import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.common.HoodiePendingRollbackInfo
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieTimeline
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.table.HoodieSparkTable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark SQL stored procedure to roll back stale inflight write commits older
than a configurable
+ * age threshold.
+ *
+ * SAFETY WARNING: Unlike rollback_to_instant_time which targets a single
named instant, this
+ * procedure rolls back a SET of instants matched by age. A misconfigured
threshold can erase
+ * recent in-progress writes when include_ingestion_commits=true. Operators
should call
+ * show_inflight_commits first to preview the pending instants, and use
dry_run => true (see
+ * below) when in doubt about which instants will be processed.
+ *
+ * This procedure targets the write timeline (COMMIT, DELTA_COMMIT,
COMPACTION, LOG_COMPACTION,
+ * REPLACE_COMMIT, CLUSTERING actions). Stale rollback, clean, or restore
inflights visible in
+ * show_inflight_commits are NOT covered by this procedure.
+ *
+ * Compaction, log-compaction, and clustering inflights are handled via the
targeted table
+ * methods (table.rollbackInflightCompaction / rollbackInflightLogCompaction /
+ * rollbackInflightClustering) — the supporting state (HoodieSparkTable,
table-service client,
+ * pending-rollback lookup) is constructed lazily on the first such instant
and reused.
+ *
+ * Parameters:
+ * - table: Required. Catalog name of the Hudi
table.
+ * - allowed_inflight_interval_minutes: Optional (default 180). Instants
older than this many
+ * minutes are considered stale and
eligible for rollback.
+ * - include_ingestion_commits: Optional (default false). DANGEROUS
when true: enabling
+ * this allows the procedure to roll
back COMMIT_ACTION and
+ * DELTA_COMMIT_ACTION inflights, which
means it can drop
+ * in-progress ingestion data. The
default false is the
+ * safe choice; true is for operators
recovering from a
+ * known-stuck ingestion job.
+ * - dry_run: Optional (default false). When true,
the matched-instant
+ * set is resolved exactly as in normal
mode but no rollback
+ * calls are issued. Each returned row
carries
+ * rollback_status = NULL meaning
"matched but not acted
+ * upon". Re-run with dry_run => false
to act.
+ *
+ * Output columns (one row per processed instant):
+ * - instant_time: The instant's requested timestamp.
+ * - action: The action type.
+ * - rollback_status: true if the rollback succeeded, false if the rollback
failed,
+ * NULL if dry_run was true (matched but not actioned).
+ *
+ * Example usage:
+ * {{{
+ * -- Clean stale table-service inflights (default 180-min threshold)
+ * CALL cleanup_stale_inflight_commits(table => 'my_table');
+ *
+ * -- Preview what would be processed without acting
+ * CALL cleanup_stale_inflight_commits(table => 'my_table', dry_run => true);
+ *
+ * -- Clean stale inflights older than 1 hour, including ingestion commits
(DANGEROUS)
+ * CALL cleanup_stale_inflight_commits(
+ * table => 'my_table',
+ * allowed_inflight_interval_minutes => 60,
+ * include_ingestion_commits => true
+ * );
+ * }}}
+ *
+ * For inflight commit types not covered by this procedure (clean, restore,
rollback inflights),
+ * use hudi-cli's `repair rollback` command.
+ */
+class CleanupStaleInflightCommitsProcedure extends BaseProcedure with
ProcedureBuilder with Logging {
+
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "allowed_inflight_interval_minutes",
DataTypes.IntegerType, 180),
+ ProcedureParameter.optional(2, "include_ingestion_commits",
DataTypes.BooleanType, false),
+ ProcedureParameter.optional(3, "dry_run", DataTypes.BooleanType, false)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("instant_time", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("action", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("rollback_status", DataTypes.BooleanType, nullable = true,
Metadata.empty)
+ ))
+
+ override def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ override def outputType: StructType = OUTPUT_TYPE
+
+ override def build: Procedure = new CleanupStaleInflightCommitsProcedure()
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val allowedMinutes = getArgValueOrDefault(args,
PARAMETERS(1)).get.asInstanceOf[Int]
+ val includeIngestionCommits = getArgValueOrDefault(args,
PARAMETERS(2)).get.asInstanceOf[Boolean]
+ val dryRun = getArgValueOrDefault(args,
PARAMETERS(3)).get.asInstanceOf[Boolean]
+
+ val basePath = getBasePath(tableName)
+ val metaClient = HoodieTableMetaClient.builder
+ .setConf(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration))
+ .setBasePath(basePath)
+ .build
+
+ val staleInflights = HoodieTimelineCleanupUtil
+ .inflightWriteCommitsOlderThan(metaClient, allowedMinutes.toLong,
includeIngestionCommits)
+
+ if (staleInflights.isEmpty) {
+ Seq.empty[Row]
+ } else if (dryRun) {
+ // Dry-run: do not open the table for write — emit preview rows with
NULL rollback_status
+ // to mean "matched but not actioned". Re-run with dry_run => false to
act.
+ staleInflights.asScala.map { instant =>
+ Row(instant.requestedTime, instant.getAction, null)
+ }.toSeq
+ } else {
+ // Pass ROLLBACK_USING_MARKERS_ENABLE=false via the
createHoodieWriteClient confs Map.
+ // Inflight commits may not have marker files, so timeline-based
rollback is required.
+ // The user-specified confs win over defaults / table config / session
conf — see
+ // HoodieCLIUtils.scala "Priority: defaults < catalog props < table
config < sparkSession conf < specified conf".
+ val confs = Map(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key() ->
"false")
+ var client: SparkRDDWriteClient[_] = null
+ try {
+ client = HoodieCLIUtils.createHoodieWriteClient(sparkSession,
basePath, confs,
+ tableName.asInstanceOf[scala.Option[String]])
+
+ // Lazy state for compaction / log-compaction / clustering branches.
+ // For matched sets that contain no such instants, none of these are
constructed.
+ lazy val tsClient = client.getTableServiceClient
+ lazy val table = HoodieSparkTable.create(client.getConfig,
client.getEngineContext)
+ lazy val getPendingRollbackInstantFunc:
java.util.function.Function[String, HOption[HoodiePendingRollbackInfo]] =
+ new java.util.function.Function[String,
HOption[HoodiePendingRollbackInfo]] {
+ override def apply(commitToRollback: String):
HOption[HoodiePendingRollbackInfo] = {
+ tsClient.getPendingRollbackInfo(table.getMetaClient,
commitToRollback, false)
+ }
+ }
+
+ val rows = staleInflights.asScala.map { instant =>
+ val status: java.lang.Boolean = try {
+ val result: Boolean = instant.getAction match {
+ case HoodieTimeline.COMPACTION_ACTION =>
+ table.rollbackInflightCompaction(instant,
getPendingRollbackInstantFunc, client.getTransactionManager)
+ true
+ case HoodieTimeline.LOG_COMPACTION_ACTION =>
+ table.rollbackInflightLogCompaction(instant,
getPendingRollbackInstantFunc, client.getTransactionManager)
+ true
+ case HoodieTimeline.CLUSTERING_ACTION =>
+ table.rollbackInflightClustering(instant,
getPendingRollbackInstantFunc, client.getTransactionManager)
+ true
+ case _ =>
+ // Recheck that the instant is still inflight before calling
client.rollback(),
+ // which searches getCommitsTimeline() (completed + pending).
Without this guard,
+ // a concurrent writer that completes the commit after
detection could have its
+ // now-completed commit rolled back destructively. The
table.rollbackInflight*
+ // branches above fail loudly via
revertInstantFromInflightToRequested if the
+ // inflight is gone; this branch performs an equivalent safety
check.
+ val stillInflight = metaClient.reloadActiveTimeline()
+ .filterInflightsAndRequested()
+ .containsInstant(instant.requestedTime)
+ if (!stillInflight) {
+ logWarning(s"Instant ${instant.requestedTime} is no longer
inflight; " +
+ "skipping rollback to avoid rolling back a completed
commit")
+ false
+ } else {
+ client.rollback(instant.requestedTime)
+ }
+ }
+ java.lang.Boolean.valueOf(result)
+ } catch {
+ case e: Exception =>
+ logError(s"Failed to rollback inflight instant
${instant.requestedTime}", e)
+ java.lang.Boolean.FALSE
+ }
+ Row(instant.requestedTime, instant.getAction, status)
+ }.toSeq
+
+ // Refresh catalog after all rollbacks, inside try — consistent with
RollbackToInstantTimeProcedure.
+ // Not placed in finally to avoid refreshing on client-creation
failures.
+ if (tableName.isDefined) {
+ spark.catalog.refreshTable(tableName.get.asInstanceOf[String])
+ }
+ rows
+ } finally {
+ if (client != null) client.close()
+ }
+ }
+ }
+}
+
+object CleanupStaleInflightCommitsProcedure {
+ val NAME = "cleanup_stale_inflight_commits"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new CleanupStaleInflightCommitsProcedure()
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index 89ada91e839c..f6657eee3fc6 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -94,6 +94,8 @@ object HoodieProcedures {
,(ShowTablePropertiesProcedure.NAME,
ShowTablePropertiesProcedure.builder)
,(HelpProcedure.NAME, HelpProcedure.builder)
,(ArchiveCommitsProcedure.NAME, ArchiveCommitsProcedure.builder)
+ ,(ShowInflightCommitsProcedure.NAME,
ShowInflightCommitsProcedure.builder)
+ ,(CleanupStaleInflightCommitsProcedure.NAME,
CleanupStaleInflightCommitsProcedure.builder)
,(RunTimelineCompactionProcedure.NAME,
RunTimelineCompactionProcedure.builder)
,(RunTTLProcedure.NAME, RunTTLProcedure.builder)
,(DropPartitionProcedure.NAME, DropPartitionProcedure.builder)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInflightCommitsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInflightCommitsProcedure.scala
new file mode 100644
index 000000000000..3b501bdb25a1
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInflightCommitsProcedure.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.time.Duration
+import java.util.Date
+import java.util.function.Supplier
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark SQL stored procedure to list all pending inflight and requested
instants on a Hudi table.
+ *
+ * Unlike cleanup_stale_inflight_commits (which targets the write timeline
only), this procedure
+ * queries the full active timeline and therefore includes inflight rollback,
clean, restore, and
+ * indexing instants in addition to write-action instants. A stale rollback or
clean inflight
+ * visible here cannot be cleaned by cleanup_stale_inflight_commits.
+ *
+ * Parameters:
+ * - table: Required. Catalog name of the Hudi table.
+ * - min_age_minutes: Optional (default 0). When > 0, only instants older
than this many minutes
+ * are returned. When 0 (default), all pending instants
are returned.
+ *
+ * Output columns (one row per pending instant):
+ * - instant_time: The instant's requested timestamp.
+ * - action: The action type (commit, delta_commit, compaction,
replace, rollback, clean, etc.).
+ * - state: The instant's state (REQUESTED or INFLIGHT).
+ *
+ * Example usage:
+ * {{{
+ * -- Show all pending inflights
+ * CALL show_inflight_commits(table => 'my_table');
+ *
+ * -- Show only inflights older than 2 hours
+ * CALL show_inflight_commits(table => 'my_table', min_age_minutes => 120);
+ * }}}
+ */
+class ShowInflightCommitsProcedure extends BaseProcedure with ProcedureBuilder
{
+
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType),
+ ProcedureParameter.optional(1, "min_age_minutes", DataTypes.IntegerType, 0)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("instant_time", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("action", DataTypes.StringType, nullable = true,
Metadata.empty),
+ StructField("state", DataTypes.StringType, nullable = true,
Metadata.empty)
+ ))
+
+ override def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ override def outputType: StructType = OUTPUT_TYPE
+
+ override def build: Procedure = new ShowInflightCommitsProcedure()
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val minAgeMinutes = getArgValueOrDefault(args,
PARAMETERS(1)).get.asInstanceOf[Int]
+
+ val basePath = getBasePath(tableName)
+ val metaClient = HoodieTableMetaClient.builder
+ .setConf(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration))
+ .setBasePath(basePath)
+ .build
+
+ val baseTimeline =
metaClient.reloadActiveTimeline().filterInflightsAndRequested()
+
+ val timeline = if (minAgeMinutes > 0) {
+ val goBackMs = Duration.ofMinutes(minAgeMinutes).toMillis
+ val cutoff = HoodieInstantTimeGenerator.formatDate(new
Date(System.currentTimeMillis() - goBackMs))
+ baseTimeline.findInstantsBefore(cutoff)
+ } else {
+ baseTimeline
+ }
+
+ timeline.getInstants.asScala.map { instant =>
+ Row(instant.requestedTime, instant.getAction, instant.getState.name())
+ }.toSeq
+ }
+}
+
+object ShowInflightCommitsProcedure {
+ val NAME = "show_inflight_commits"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new ShowInflightCommitsProcedure()
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanupStaleInflightCommitsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanupStaleInflightCommitsProcedure.scala
new file mode 100644
index 000000000000..f18a173e29ad
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCleanupStaleInflightCommitsProcedure.scala
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+import scala.collection.JavaConverters._
+
+class TestCleanupStaleInflightCommitsProcedure extends
HoodieSparkProcedureTestBase {
+
+ /**
+ * Creates a table DDL without inserting data. Tests that manipulate
inflight instants must NOT
+ * insert data before injecting the inflight, because
BaseRollbackActionExecutor
+ * .validateRollbackCommitSequence throws HoodieRollbackException when
committed instants exist
+ * after the injected (old) timestamp and no heartbeat exists for the
injected instant.
+ * With no prior inserts, commitTimeline.empty() = true and the guard is
bypassed.
+ */
+ private def createEmptyTable(tableName: String, tablePath: String): Unit = {
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | ts long
+ | ) using hudi
+ | location '$tablePath'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'cow',
+ | preCombineField = 'ts',
+ | hoodie.metadata.enable = "false"
+ | )
+ |""".stripMargin)
+ }
+
+ private def createEmptyPartitionedTable(tableName: String, tablePath:
String, tableType: String): Unit = {
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | ts long,
+ | part int
+ | ) using hudi
+ | partitioned by (part)
+ | location '$tablePath'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = '$tableType',
+ | preCombineField = 'ts',
+ | hoodie.metadata.enable = "false"
+ | )
+ |""".stripMargin)
+ }
+
+ test("Test cleanup_stale_inflight_commits returns empty when no stale
inflights exist") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ createEmptyTable(tableName, tmp.getCanonicalPath)
+ spark.sql(s"insert into $tableName values(1, 'a1', 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 2000)")
+
+ val result = spark.sql(s"call cleanup_stale_inflight_commits(table =>
'$tableName')").collect()
+ assertResult(0)(result.length)
+ }
+ }
+
+ test("Test cleanup_stale_inflight_commits rolls back stale
REPLACE_COMMIT_ACTION inflight") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath
+ // No inserts before injecting — see createEmptyTable docstring
+ createEmptyTable(tableName, tablePath)
+
+ val staleTs = "20200101120000"
+ // Must use REPLACE_COMMIT_ACTION: inflightWriteCommitsOlderThan with
+ // include_ingestion_commits=false (default) filters out COMMIT_ACTION
and DELTA_COMMIT_ACTION.
+ // REPLACE_COMMIT_ACTION is included in both getWriteTimeline() and
getCommitsTimeline(),
+ // so client.rollback() finds it and returns true.
+ injectInflightInstant(tablePath, HoodieTimeline.REPLACE_COMMIT_ACTION,
staleTs)
+
+ val result = spark.sql(
+ s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+ s"allowed_inflight_interval_minutes => 1)").collect()
+
+ assertResult(1)(result.length)
+ assertResult(staleTs)(result(0).getString(0))
+
assertResult(HoodieTimeline.REPLACE_COMMIT_ACTION)(result(0).getString(1))
+ assertResult(true)(result(0).getBoolean(2))
+
+ // Verify the instant is gone from the active timeline
+ val metaClient = HoodieTableMetaClient.builder
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+ .setBasePath(tablePath)
+ .build
+ val remaining =
metaClient.reloadActiveTimeline().filterInflightsAndRequested()
+ .getInstants.asScala
+ assert(!remaining.exists(_.requestedTime == staleTs),
+ s"Stale instant $staleTs should have been removed from the timeline
after rollback")
+ }
+ }
+
+ test("Test cleanup_stale_inflight_commits respects
allowed_inflight_interval_minutes threshold") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath
+ // No inserts before injecting — see createEmptyTable docstring
+ createEmptyTable(tableName, tablePath)
+
+ val staleTs = "20200101120000"
+ val freshTs = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())
+
+ injectInflightInstant(tablePath, HoodieTimeline.REPLACE_COMMIT_ACTION,
staleTs)
+ injectInflightInstant(tablePath, HoodieTimeline.REPLACE_COMMIT_ACTION,
freshTs)
+
+ // 60-minute threshold: only the stale instant qualifies; fresh instant
is too recent
+ val result = spark.sql(
+ s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+ s"allowed_inflight_interval_minutes => 60)").collect()
+
+ assertResult(1)(result.length)
+ assertResult(staleTs)(result(0).getString(0))
+ }
+ }
+
+ test("Test cleanup_stale_inflight_commits cleans COMMIT_ACTION with
include_ingestion_commits=true") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath
+ // No inserts before injecting — see createEmptyTable docstring
+ createEmptyTable(tableName, tablePath)
+
+ val staleTs = "20200101120000"
+ // COMMIT_ACTION is filtered out with the default
include_ingestion_commits=false,
+ // but included when include_ingestion_commits=true.
+ // client.rollback returns true for COMMIT_ACTION since
getCommitsTimeline() includes it.
+ injectInflightInstant(tablePath, HoodieTimeline.COMMIT_ACTION, staleTs)
+
+ // Default (include_ingestion_commits=false): should not find
COMMIT_ACTION inflight
+ val defaultResult = spark.sql(
+ s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+ s"allowed_inflight_interval_minutes => 1)").collect()
+ assertResult(0)(defaultResult.length)
+
+ // With include_ingestion_commits=true: should find and process
COMMIT_ACTION inflight
+ val result = spark.sql(
+ s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+ s"allowed_inflight_interval_minutes => 1, " +
+ s"include_ingestion_commits => true)").collect()
+
+ assertResult(1)(result.length)
+ assertResult(staleTs)(result(0).getString(0))
+ assertResult(HoodieTimeline.COMMIT_ACTION)(result(0).getString(1))
+ assertResult(true)(result(0).getBoolean(2))
+ }
+ }
+
+ test("Test cleanup_stale_inflight_commits dry_run lists matched instants
without rolling back") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath
+ createEmptyTable(tableName, tablePath)
+
+ val staleTs = "20200101120000"
+ injectInflightInstant(tablePath, HoodieTimeline.REPLACE_COMMIT_ACTION,
staleTs)
+
+ val result = spark.sql(
+ s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+ s"allowed_inflight_interval_minutes => 1, " +
+ s"dry_run => true)").collect()
+
+ assertResult(1)(result.length)
+ assertResult(staleTs)(result(0).getString(0))
+
assertResult(HoodieTimeline.REPLACE_COMMIT_ACTION)(result(0).getString(1))
+ // dry_run: rollback_status is NULL meaning "matched but not actioned"
+ assert(result(0).isNullAt(2),
+ "Expected rollback_status=NULL in dry_run mode, but got non-null
value")
+
+ // Verify the instant is STILL on the active timeline (dry_run did not
act)
+ val metaClient = HoodieTableMetaClient.builder
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+ .setBasePath(tablePath)
+ .build
+ val remaining =
metaClient.reloadActiveTimeline().filterInflightsAndRequested()
+ .getInstants.asScala
+ assert(remaining.exists(_.requestedTime == staleTs),
+ s"dry_run should not have rolled back $staleTs; expected to find it
still on the timeline")
+ }
+ }
+
+ test("Test cleanup_stale_inflight_commits rolls back stale COMPACTION_ACTION
inflight via table.rollbackInflightCompaction") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ // Compaction is MOR-only. Use the real schedule + run + delete-commit
pattern from
+ // TestRunRollbackInflightTableServiceProcedure to construct a valid
compaction inflight.
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'mor',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(ts)
+ | location '$tablePath'
+ """.stripMargin)
+ withSQLConf(
+ "hoodie.parquet.max.file.size" -> "10000",
+ "hoodie.compact.inline" -> "false",
+ "hoodie.compact.schedule.inline" -> "false",
+ // Prevent auto-clean from creating a clean instant after compaction
completion,
+ // which would shift getReverseOrderedInstants.findFirst() away from
the compaction commit.
+ "hoodie.clean.automatic" -> "false"
+ ) {
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
+ spark.sql(s"update $tableName set price = 11 where id = 1")
+
+ spark.sql(s"call run_compaction(op => 'schedule', table =>
'$tableName')")
+ spark.sql(s"call run_compaction(op => 'run', table => '$tableName')")
+
+ // Delete the completed compaction commit file so the inflight remains
+ val metaClient = HoodieTableMetaClient.builder
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+ .setBasePath(tablePath)
+ .build
+ val compactionInstant =
metaClient.getActiveTimeline.getReverseOrderedInstants.findFirst().get()
+
metaClient.getActiveTimeline.deleteInstantFileIfExists(compactionInstant)
+ val compactionInstantTime = compactionInstant.requestedTime
+
+ // Confirm the compaction inflight is actually present before we call
cleanup.
+ // If this assertion fires, the test setup (schedule+run+delete)
didn't produce the expected
+ // state and the rest of the test is moot — fail with a clear
diagnostic instead of an empty result.
+ val reloadedTimeline = metaClient.reloadActiveTimeline()
+ val compactionInflightPresent =
reloadedTimeline.getWriteTimeline.filterInflightsAndRequested.getInstants.asScala
+ .exists(i => i.getAction == HoodieTimeline.COMPACTION_ACTION &&
i.requestedTime == compactionInstantTime)
+ assert(compactionInflightPresent,
+ s"Setup failure: compaction inflight at $compactionInstantTime not
present after deleting completed commit. " +
+ s"Active timeline: ${reloadedTimeline.getInstants.asScala.map(i =>
s"${i.requestedTime}/${i.getAction}/${i.getState}").mkString(", ")}")
+
+ // Sleep so the second-precision cutoff timestamp is strictly newer
than the inflight's timestamp
+ Thread.sleep(2000)
+
+ val result = spark.sql(
+ s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+ s"allowed_inflight_interval_minutes => 0)").collect()
+
+ val compactionRow = result.find(r => r.getString(0) ==
compactionInstantTime)
+ assert(compactionRow.isDefined,
+ s"Expected compaction inflight $compactionInstantTime in result; got
${result.map(r => s"${r.getString(0)}/${r.getString(1)}").mkString(",")}")
+
assertResult(HoodieTimeline.COMPACTION_ACTION)(compactionRow.get.getString(1))
+ assertResult(true)(compactionRow.get.getBoolean(2))
+
+ // Inflight should be removed by table.rollbackInflightCompaction
+ val instantGenerator = metaClient.getTimelineLayout.getInstantGenerator
+ val expectedInflight = instantGenerator.createNewInstant(
+ HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION,
compactionInstantTime)
+
assert(!metaClient.reloadActiveTimeline().getInstants.contains(expectedInflight),
+ s"Compaction inflight $compactionInstantTime should be gone after
rollback")
+ }
+ }
+ }
+
+ test("Test cleanup_stale_inflight_commits rolls back stale CLUSTERING_ACTION
inflight via table.rollbackInflightClustering") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ // Use the real schedule + execute + delete-commit pattern from
+ // TestRunRollbackInflightTableServiceProcedure so the clustering
inflight is valid.
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'cow',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(ts)
+ | location '$tablePath'
+ """.stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
+
+ spark.sql(s"call run_clustering(table => '$tableName', op =>
'schedule')")
+ spark.sql(s"call run_clustering(table => '$tableName', op => 'execute')")
+
+ val metaClient = HoodieTableMetaClient.builder
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+ .setBasePath(tablePath)
+ .build
+ val clusteringInstant =
metaClient.getActiveTimeline.getCompletedReplaceTimeline.getInstants.get(0)
+ metaClient.getActiveTimeline.deleteInstantFileIfExists(clusteringInstant)
+ val clusteringInstantTime = clusteringInstant.requestedTime
+
+ Thread.sleep(2000)
+
+ val result = spark.sql(
+ s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+ s"allowed_inflight_interval_minutes => 0)").collect()
+
+ val clusteringRow = result.find(r => r.getString(0) ==
clusteringInstantTime)
+ assert(clusteringRow.isDefined,
+ s"Expected clustering inflight $clusteringInstantTime in result; got
${result.map(_.getString(0)).mkString(",")}")
+
assertResult(HoodieTimeline.CLUSTERING_ACTION)(clusteringRow.get.getString(1))
+ assertResult(true)(clusteringRow.get.getBoolean(2))
+
+ val instantGenerator = metaClient.getTimelineLayout.getInstantGenerator
+ val expectedInflight = instantGenerator.createNewInstant(
+ HoodieInstant.State.INFLIGHT, HoodieTimeline.CLUSTERING_ACTION,
clusteringInstantTime)
+
assert(!metaClient.reloadActiveTimeline().getInstants.contains(expectedInflight),
+ s"Clustering inflight $clusteringInstantTime should be gone after
rollback")
+ }
+ }
+
+ test("Test cleanup_stale_inflight_commits handles partitioned COW table") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath
+ createEmptyPartitionedTable(tableName, tablePath, "cow")
+
+ val staleTs = "20200101120000"
+ injectInflightInstant(tablePath, HoodieTimeline.REPLACE_COMMIT_ACTION,
staleTs)
+
+ val result = spark.sql(
+ s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+ s"allowed_inflight_interval_minutes => 1)").collect()
+
+ assertResult(1)(result.length)
+ assertResult(staleTs)(result(0).getString(0))
+
assertResult(HoodieTimeline.REPLACE_COMMIT_ACTION)(result(0).getString(1))
+ assertResult(true)(result(0).getBoolean(2))
+ }
+ }
+
+ test("Test cleanup_stale_inflight_commits handles MOR table
DELTA_COMMIT_ACTION with include_ingestion_commits") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath
+ createEmptyPartitionedTable(tableName, tablePath, "mor")
+
+ val staleTs = "20200101120000"
+ injectInflightInstant(tablePath, HoodieTimeline.DELTA_COMMIT_ACTION,
staleTs)
+
+ // Default (include_ingestion_commits=false): DELTA_COMMIT_ACTION is
filtered out
+ val defaultResult = spark.sql(
+ s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+ s"allowed_inflight_interval_minutes => 1)").collect()
+ assertResult(0)(defaultResult.length)
+
+ // With include_ingestion_commits=true: DELTA_COMMIT_ACTION is processed
+ val result = spark.sql(
+ s"call cleanup_stale_inflight_commits(table => '$tableName', " +
+ s"allowed_inflight_interval_minutes => 1, " +
+ s"include_ingestion_commits => true)").collect()
+
+ assertResult(1)(result.length)
+ assertResult(staleTs)(result(0).getString(0))
+ assertResult(HoodieTimeline.DELTA_COMMIT_ACTION)(result(0).getString(1))
+ assertResult(true)(result(0).getBoolean(2))
+ }
+ }
+
+ /**
+ * Injects a REQUESTED→INFLIGHT instant into the active timeline without
completing it.
+ * Used to simulate stale inflight operations for testing.
+ */
+ private def injectInflightInstant(tablePath: String, action: String,
instantTime: String): Unit = {
+ val metaClient = HoodieTableMetaClient.builder
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+ .setBasePath(tablePath)
+ .build
+ val timeline = metaClient.getActiveTimeline
+ val instantGenerator = metaClient.getTimelineLayout.getInstantGenerator
+ val requested =
instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, action,
instantTime)
+ timeline.createNewInstant(requested)
+ timeline.transitionRequestedToInflight(requested,
HOption.empty[Array[Byte]]())
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInflightCommitsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInflightCommitsProcedure.scala
new file mode 100644
index 000000000000..ca77172bd607
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInflightCommitsProcedure.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+
+import java.text.SimpleDateFormat
+import java.util.Date
+
+class TestShowInflightCommitsProcedure extends HoodieSparkProcedureTestBase {
+
+ test("Test show_inflight_commits returns empty for a fully committed table")
{
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | ts long
+ | ) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'cow',
+ | preCombineField = 'ts',
+ | hoodie.metadata.enable = "false"
+ | )
+ |""".stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 2000)")
+
+ val result = spark.sql(s"call show_inflight_commits(table =>
'$tableName')").collect()
+ assertResult(0)(result.length)
+ }
+ }
+
+ test("Test show_inflight_commits returns injected inflight instant") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | ts long
+ | ) using hudi
+ | location '$tablePath'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'cow',
+ | preCombineField = 'ts',
+ | hoodie.metadata.enable = "false"
+ | )
+ |""".stripMargin)
+
+ val injectedTs = "20200101120000"
+ injectInflightInstant(tablePath, HoodieTimeline.COMMIT_ACTION,
injectedTs)
+
+ val result = spark.sql(s"call show_inflight_commits(table =>
'$tableName')").collect()
+ assert(result.length >= 1)
+ val row = result.find(r => r.getString(0) == injectedTs)
+ assert(row.isDefined, s"Expected inflight instant $injectedTs not found
in results")
+ assertResult(HoodieTimeline.COMMIT_ACTION)(row.get.getString(1))
+ assertResult("INFLIGHT")(row.get.getString(2))
+ }
+ }
+
+ test("Test show_inflight_commits min_age_minutes filter includes old and
excludes recent") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = tmp.getCanonicalPath
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | ts long
+ | ) using hudi
+ | location '$tablePath'
+ | tblproperties (
+ | primaryKey = 'id',
+ | type = 'cow',
+ | preCombineField = 'ts',
+ | hoodie.metadata.enable = "false"
+ | )
+ |""".stripMargin)
+
+ val oldTs = "20200101120000"
+ val freshTs = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())
+
+ injectInflightInstant(tablePath, HoodieTimeline.COMMIT_ACTION, oldTs)
+ injectInflightInstant(tablePath, HoodieTimeline.DELTA_COMMIT_ACTION,
freshTs)
+
+ // No filter: both should appear
+ val allResults = spark.sql(s"call show_inflight_commits(table =>
'$tableName', min_age_minutes => 0)").collect()
+ assert(allResults.length >= 2)
+ assert(allResults.exists(r => r.getString(0) == oldTs))
+ assert(allResults.exists(r => r.getString(0) == freshTs))
+
+ // 60-minute filter: only the old instant should appear
+ val filteredResults = spark.sql(
+ s"call show_inflight_commits(table => '$tableName', min_age_minutes =>
60)").collect()
+ assert(filteredResults.exists(r => r.getString(0) == oldTs),
+ s"Expected old instant $oldTs to appear with min_age_minutes=60")
+ assert(!filteredResults.exists(r => r.getString(0) == freshTs),
+ s"Fresh instant $freshTs should not appear with min_age_minutes=60")
+ }
+ }
+
+ test("Test show_inflight_commits requires table parameter") {
+ checkExceptionContain(
+ "call show_inflight_commits()")(
+ "Argument: table is required")
+ }
+
+ /**
+ * Injects a REQUESTED→INFLIGHT instant into the active timeline without
completing it.
+ * Used to simulate stale inflight operations for testing.
+ */
+ private def injectInflightInstant(tablePath: String, action: String,
instantTime: String): Unit = {
+ val metaClient = HoodieTableMetaClient.builder
+
.setConf(HadoopFSUtils.getStorageConfWithCopy(spark.sparkContext.hadoopConfiguration))
+ .setBasePath(tablePath)
+ .build
+ val timeline = metaClient.getActiveTimeline
+ val instantGenerator = metaClient.getTimelineLayout.getInstantGenerator
+ val requested =
instantGenerator.createNewInstant(HoodieInstant.State.REQUESTED, action,
instantTime)
+ timeline.createNewInstant(requested)
+ timeline.transitionRequestedToInflight(requested,
HOption.empty[Array[Byte]]())
+ }
+}