This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b5c5801bf7d5 fix(spark): Add options for archive procedure (#18437)
b5c5801bf7d5 is described below

commit b5c5801bf7d5e0b53c60ef5d46cea1e2d22cc6d0
Author: fhan <[email protected]>
AuthorDate: Tue May 26 11:18:46 2026 +0800

    fix(spark): Add options for archive procedure (#18437)
    
    * fix(spark): Add options for archive procedure
    * set 'enable_metadata' default value to true
    * fix args in SparkMain
    * fix options in ArchiveCommitsProcedure
    * fix(spark): set named parameters with higher priority and improve 
extractOptions()
    * optimize entire impl and add UTs for HoodieCLIUtils
    * optimize ArchiveCommitsProcedure.
    * optimize ArchiveExecutorUtils and HoodieCLIUtils according to hudi-agent 
review results.
    
    ---------
    
    Co-authored-by: fhan <[email protected]>
---
 .../org/apache/hudi/cli/commands/SparkMain.java    |   2 +-
 .../scala/org/apache/hudi/HoodieCLIUtils.scala     |  60 ++++++++-
 .../scala/org/apache/hudi/TestHoodieCLIUtils.scala | 104 +++++++++++++++
 .../org/apache/hudi/cli/ArchiveExecutorUtils.java  |  17 ++-
 .../procedures/ArchiveCommitsProcedure.scala       |  99 ++++++++++++--
 .../procedure/TestArchiveCommitsProcedure.scala    | 147 +++++++++++++++------
 6 files changed, 372 insertions(+), 57 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 097ba984dc92..a7955032497e 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -599,7 +599,7 @@ public class SparkMain {
 
   private static int archive(JavaSparkContext jsc, int minCommits, int 
maxCommits, int commitsRetained, boolean enableMetadata, String basePath) {
     try {
-      return ArchiveExecutorUtils.archive(jsc, minCommits, maxCommits, 
commitsRetained, enableMetadata, basePath);
+      return ArchiveExecutorUtils.archive(jsc, minCommits, maxCommits, 
commitsRetained, enableMetadata, basePath, new HashMap<>());
     } catch (IOException ex) {
       return -1;
     }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
index 1b40b0fe5701..c6e37f5bb6dd 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
@@ -107,11 +107,63 @@ object HoodieCLIUtils extends Logging {
     }
   }
 
+  /**
+   * Parse a comma-separated string of key=value pairs into a Map.
+   *
+   * Notes:
+   *  - Whitespace surrounding keys/values is trimmed; empty tokens (e.g. from 
a
+   *    trailing comma or `", ,"`) are silently ignored.
+   *  - The delimiter is the first `=` in a token, so values may themselves
+   *    contain `=` (e.g. `k=a=b` parses to `k -> "a=b"`).
+   *  - Values cannot contain literal commas; the parser does not support
+   *    escaping. Configs that need commas should be set via Spark conf 
instead.
+   *  - If the same key appears more than once, a WARN is logged and the last
+   *    occurrence wins (consistent with `toMap`'s last-write-wins semantics).
+   *
+   * @throws IllegalArgumentException if a non-empty token does not contain `=`
+   *                                  or has an empty key.
+   */
   def extractOptions(s: String): Map[String, String] = {
-    StringUtils.split(s, ",").asScala
-      .map(split => StringUtils.split(split, "="))
-      .map(pair => pair.get(0) -> pair.get(1))
-      .toMap
+    if (s == null) {
+      Map.empty
+    } else {
+      // Single pass: build the result Map and collect duplicate keys at the
+      // same time, avoiding an intermediate Seq + groupBy + toMap chain.
+      val (result, duplicates) = StringUtils.split(s, ",").asScala
+        .map(_.trim)
+        .filter(_.nonEmpty)
+        .map(parseOptionToken)
+        .foldLeft((Map.empty[String, String], Set.empty[String])) {
+          case ((acc, dups), (key, value)) =>
+            val newDups = if (acc.contains(key)) dups + key else dups
+            (acc + (key -> value), newDups)
+        }
+
+      if (duplicates.nonEmpty) {
+        logWarning(s"Duplicate option keys detected: ${duplicates.mkString(", 
")}. "
+          + "The last occurrence will take effect.")
+      }
+      result
+    }
+  }
+
+  private def parseOptionToken(token: String): (String, String) = {
+    val delimiterIndex = token.indexOf('=')
+    if (delimiterIndex <= 0) {
+      throw new IllegalArgumentException(
+        s"Invalid options format: '$token'. Expected 'key=value' pairs 
separated by commas, "
+          + "for example: 'k1=v1,k2=v2'.")
+    }
+
+    val key = token.substring(0, delimiterIndex).trim
+    if (key.isEmpty) {
+      throw new IllegalArgumentException(
+        s"Invalid options format: '$token'. Option key must not be empty and 
options should "
+          + "follow 'key=value' format.")
+    }
+
+    val value = token.substring(delimiterIndex + 1).trim
+    key -> value
   }
 
   def getLockOptions(tablePath: String, schema: String, lockConfig: 
TypedProperties): Map[String, String] = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestHoodieCLIUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestHoodieCLIUtils.scala
new file mode 100644
index 000000000000..ca4869286c26
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/hudi/TestHoodieCLIUtils.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
+import org.junit.jupiter.api.Test
+
+class TestHoodieCLIUtils {
+
+  @Test
+  def testExtractOptionsBasic(): Unit = {
+    val parsed = HoodieCLIUtils.extractOptions("k1=v1,k2=v2")
+    assertEquals(2, parsed.size)
+    assertEquals("v1", parsed("k1"))
+    assertEquals("v2", parsed("k2"))
+  }
+
+  @Test
+  def testExtractOptionsTrimsWhitespace(): Unit = {
+    val parsed = HoodieCLIUtils.extractOptions(" k1 = v1 ,  k2= v 2 ")
+    assertEquals("v1", parsed("k1"))
+    // internal whitespace inside value is preserved, only edges are trimmed
+    assertEquals("v 2", parsed("k2"))
+  }
+
+  @Test
+  def testExtractOptionsIgnoresEmptyTokens(): Unit = {
+    // trailing comma, consecutive commas, leading comma — all silently ignored
+    val parsed = HoodieCLIUtils.extractOptions(",k1=v1,, ,k2=v2,")
+    assertEquals(2, parsed.size)
+    assertEquals("v1", parsed("k1"))
+    assertEquals("v2", parsed("k2"))
+  }
+
+  @Test
+  def testExtractOptionsValueContainsEquals(): Unit = {
+    // only the first `=` should be treated as a delimiter
+    val parsed = HoodieCLIUtils.extractOptions("k=a=b=c")
+    assertEquals(1, parsed.size)
+    assertEquals("a=b=c", parsed("k"))
+  }
+
+  @Test
+  def testExtractOptionsAllowsEmptyValue(): Unit = {
+    val parsed = HoodieCLIUtils.extractOptions("k=")
+    assertEquals(1, parsed.size)
+    assertEquals("", parsed("k"))
+  }
+
+  @Test
+  def testExtractOptionsDuplicateKeyLastWins(): Unit = {
+    val parsed = HoodieCLIUtils.extractOptions("k=v1,k=v2,k=v3")
+    assertEquals(1, parsed.size)
+    assertEquals("v3", parsed("k"))
+  }
+
+  @Test
+  def testExtractOptionsNullAndEmpty(): Unit = {
+    assertTrue(HoodieCLIUtils.extractOptions(null).isEmpty)
+    assertTrue(HoodieCLIUtils.extractOptions("").isEmpty)
+    assertTrue(HoodieCLIUtils.extractOptions("   ").isEmpty)
+    assertTrue(HoodieCLIUtils.extractOptions(",,, ").isEmpty)
+  }
+
+  @Test
+  def testExtractOptionsThrowsOnMissingDelimiter(): Unit = {
+    val ex = assertThrows(
+      classOf[IllegalArgumentException],
+      () => HoodieCLIUtils.extractOptions("k1=v1,invalid"))
+    assertTrue(ex.getMessage.contains("invalid"))
+  }
+
+  @Test
+  def testExtractOptionsThrowsOnEmptyKey(): Unit = {
+    val ex = assertThrows(
+      classOf[IllegalArgumentException],
+      () => HoodieCLIUtils.extractOptions("=v"))
+    assertTrue(ex.getMessage.contains("key=value") || 
ex.getMessage.contains("Option key"))
+  }
+
+  @Test
+  def testExtractOptionsThrowsOnWhitespaceKey(): Unit = {
+    assertThrows(
+      classOf[IllegalArgumentException],
+      () => HoodieCLIUtils.extractOptions("   =v"))
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
index 772450903e5f..b52d1965e040 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/ArchiveExecutorUtils.java
@@ -40,6 +40,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
+import java.util.Map;
 
 /**
  * Archive Utils.
@@ -53,12 +54,26 @@ public final class ArchiveExecutorUtils {
                             int maxCommits,
                             int commitsRetained,
                             boolean enableMetadata,
-                            String basePath) throws IOException {
+                            String basePath,
+                            Map<String, String> options) throws IOException {
+    // NOTE on builder ordering:
+    //   `withArchivalConfig`/`withCleanConfig`/`withMetadataConfig` each call
+    //   `putAll(subConfig.getProps())` onto `writeConfig.getProps()`, which
+    //   includes every key filled in by `setDefaults` during the sub-config's
+    //   `build()`. If `withProps(conf)` ran BEFORE them, those defaults would
+    //   overwrite the user's options (e.g. `hoodie.keep.min.commits`).
+    //
+    //   Therefore `withProps(conf)` is intentionally placed LAST so 
user-supplied
+    //   options reliably win over sub-config defaults. Named procedure params
+    //   (min/max/retain/enableMetadata) are forwarded via the dedicated 
builders
+    //   below; if the caller wants those to win over a same-name key in 
`conf`,
+    //   the procedure layer is responsible for not putting that key into 
`conf`.
     HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath(basePath)
         
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(minCommits,
 maxCommits).build())
         
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(commitsRetained).build())
         .withEmbeddedTimelineServerEnabled(false)
         
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata).build())
+        .withProps(options)
         .build();
     HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
     HoodieSparkTable<HoodieAvroPayload> table = 
HoodieSparkTable.create(config, context);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
index efc5a0cc5c2a..6dd578c91cf6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ArchiveCommitsProcedure.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
-import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.{HoodieCLIUtils, SparkAdapterSupport}
 import org.apache.hudi.cli.ArchiveExecutorUtils
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.config.{HoodieArchivalConfig, HoodieCleanConfig}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
@@ -26,17 +28,36 @@ import org.apache.spark.sql.types._
 
 import java.util.function.Supplier
 
+import scala.collection.JavaConverters._
+
 class ArchiveCommitsProcedure extends BaseProcedure
   with ProcedureBuilder
   with SparkAdapterSupport
   with Logging {
+  // NOTE: min_commits / max_commits / retain_commits / enable_metadata are
+  // intentionally declared WITHOUT default values. Whether a caller actually
+  // passed them is determined by `isArgDefined`; their effective values fall
+  // back to the corresponding ConfigProperty defaults (see `call`).
   private val PARAMETERS = Array[ProcedureParameter](
     ProcedureParameter.optional(0, "table", DataTypes.StringType),
     ProcedureParameter.optional(1, "path", DataTypes.StringType),
-    ProcedureParameter.optional(2, "min_commits", DataTypes.IntegerType, 20),
-    ProcedureParameter.optional(3, "max_commits", DataTypes.IntegerType, 30),
-    ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType, 
10),
-    ProcedureParameter.optional(5, "enable_metadata", DataTypes.BooleanType, 
true)
+    ProcedureParameter.optional(2, "min_commits", DataTypes.IntegerType),
+    ProcedureParameter.optional(3, "max_commits", DataTypes.IntegerType),
+    ProcedureParameter.optional(4, "retain_commits", DataTypes.IntegerType),
+    ProcedureParameter.optional(5, "enable_metadata", DataTypes.BooleanType),
+    // free-form hoodie.* config overrides; format: 'k1=v1,k2=v2'
+    ProcedureParameter.optional(6, "options", DataTypes.StringType)
+  )
+
+  // Mapping of (named parameter -> hoodie.* config key) used both to merge
+  // named-parameter overrides on top of `options` and to back-fill scalar
+  // values fed to ArchiveExecutorUtils. Listed once to keep the named-param
+  // <-> ConfigProperty wiring in a single place.
+  private val NAMED_PARAM_TO_CONFIG_KEY: Seq[(ProcedureParameter, String)] = 
Seq(
+    PARAMETERS(2) -> HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(),
+    PARAMETERS(3) -> HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(),
+    PARAMETERS(4) -> HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(),
+    PARAMETERS(5) -> HoodieMetadataConfig.ENABLE.key()
   )
 
   private val OUTPUT_TYPE = new StructType(Array[StructField](
@@ -52,20 +73,74 @@ class ArchiveCommitsProcedure extends BaseProcedure
 
     val tableName = getArgValueOrDefault(args, PARAMETERS(0))
     val tablePath = getArgValueOrDefault(args, PARAMETERS(1))
-
-    val minCommits = getArgValueOrDefault(args, 
PARAMETERS(2)).get.asInstanceOf[Int]
-    val maxCommits = getArgValueOrDefault(args, 
PARAMETERS(3)).get.asInstanceOf[Int]
-    val retainCommits = getArgValueOrDefault(args, 
PARAMETERS(4)).get.asInstanceOf[Int]
-    val enableMetadata = getArgValueOrDefault(args, 
PARAMETERS(5)).get.asInstanceOf[Boolean]
+    val confs = getArchiveConfigs(args)
+
+    val minCommits = parseInt(confs,
+      HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(),
+      HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.defaultValue())
+    val maxCommits = parseInt(confs,
+      HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(),
+      HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.defaultValue())
+    val retainCommits = parseInt(confs,
+      HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(),
+      HoodieCleanConfig.CLEANER_COMMITS_RETAINED.defaultValue())
+    val enableMetadata = parseBoolean(confs,
+      HoodieMetadataConfig.ENABLE.key(),
+      HoodieMetadataConfig.ENABLE.defaultValue().toString)
 
     val basePath = getBasePath(tableName, tablePath)
-
     Seq(Row(ArchiveExecutorUtils.archive(jsc,
       minCommits,
       maxCommits,
       retainCommits,
       enableMetadata,
-      basePath)))
+      basePath,
+      confs.asJava)))
+  }
+
+  /**
+   * Build the effective hoodie.* config map by overlaying named parameters
+   * (only those the caller explicitly passed) on top of the user `options`
+   * string. Whether a parameter was explicitly passed is decided by
+   * `isArgDefined` rather than by checking the parameter's default, so the
+   * precedence semantics stay correct even if future maintainers add defaults
+   * back to the named parameters.
+   */
+  private def getArchiveConfigs(args: ProcedureArgs): Map[String, String] = {
+    val optionConfs = getArgValueOrDefault(args, PARAMETERS(6))
+      .map(p => HoodieCLIUtils.extractOptions(p.toString))
+      .getOrElse(Map.empty[String, String])
+
+    NAMED_PARAM_TO_CONFIG_KEY.foldLeft(optionConfs) {
+      case (confs, (parameter, configKey)) =>
+        if (isArgDefined(args, parameter)) {
+          confs + (configKey -> getArgValueOrDefault(args, 
parameter).get.toString)
+        } else {
+          confs
+        }
+    }
+  }
+
+  private def parseInt(confs: Map[String, String], key: String, default: 
String): Int = {
+    val raw = confs.getOrElse(key, default)
+    try {
+      raw.toInt
+    } catch {
+      case _: NumberFormatException =>
+        throw new IllegalArgumentException(
+          s"Invalid integer value for '$key': '$raw'. Expected a base-10 
integer.")
+    }
+  }
+
+  private def parseBoolean(confs: Map[String, String], key: String, default: 
String): Boolean = {
+    val raw = confs.getOrElse(key, default).trim.toLowerCase
+    raw match {
+      case "true" => true
+      case "false" => false
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Invalid boolean value for '$key': '$raw'. Expected 'true' or 
'false'.")
+    }
   }
 
   override def build = new ArchiveCommitsProcedure()
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestArchiveCommitsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestArchiveCommitsProcedure.scala
index c81ffcfb59d6..e5be572eb65f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestArchiveCommitsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestArchiveCommitsProcedure.scala
@@ -21,52 +21,121 @@ package org.apache.spark.sql.hudi.procedure
 
 class TestArchiveCommitsProcedure extends HoodieSparkProcedureTestBase {
 
-  test("Test Call archive_commits Procedure by Table") {
+  /**
+   * Helper: create a fresh COW table at the given location with `numCommits`
+   * insert commits already written. Returns the table name.
+   */
+  private def createTableWithCommits(location: String, numCommits: Int): 
String = {
+    val tableName = generateTableName
+    spark.sql(
+      s"""
+         |create table $tableName (
+         | id int,
+         | name string,
+         | price double,
+         | ts long
+         | ) using hudi
+         | location '$location'
+         | tblproperties (
+         |   primaryKey = 'id',
+         |   type = 'cow',
+         |   orderingFields = 'ts',
+         |   hoodie.metadata.enable = "false"
+         | )
+         |""".stripMargin)
+
+    (1 to numCommits).foreach { i =>
+      spark.sql(s"insert into $tableName values($i, 'a$i', ${i * 10}, ${i * 
1000})")
+    }
+    tableName
+  }
+
+  test("Test Call archive_commits Procedure with named parameters") {
     withTempDir { tmp =>
-      val tableName = generateTableName
-      spark.sql(
-        s"""
-           |create table $tableName (
-           | id int,
-           | name string,
-           | price double,
-           | ts long
-           | ) using hudi
-           | location '${tmp.getCanonicalPath}'
-           | tblproperties (
-           |   primaryKey = 'id',
-           |   type = 'cow',
-           |   orderingFields = 'ts',
-           |   hoodie.metadata.enable = "false"
-           | )
-           |""".stripMargin)
-
-      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-      spark.sql(s"insert into $tableName values(2, 'a2', 20, 2000)")
-      spark.sql(s"insert into $tableName values(3, 'a3', 30, 3000)")
-      spark.sql(s"insert into $tableName values(4, 'a4', 40, 4000)")
-      spark.sql(s"insert into $tableName values(5, 'a5', 50, 5000)")
-      spark.sql(s"insert into $tableName values(6, 'a6', 60, 6000)")
-
-      val result1 = spark.sql(s"call archive_commits(table => '$tableName'" +
-        s", min_commits => 2, max_commits => 3, retain_commits => 1, 
enable_metadata => false)")
+      val tableName = createTableWithCommits(tmp.getCanonicalPath, 6)
+
+      val result = spark.sql(
+        s"call archive_commits(table => '$tableName'," +
+          " min_commits => 2, max_commits => 3, retain_commits => 1, 
enable_metadata => false)")
         .collect()
         .map(row => Seq(row.getInt(0)))
-      assertResult(1)(result1.length)
-      assertResult(0)(result1(0).head)
+      assertResult(1)(result.length)
+      assertResult(0)(result(0).head)
 
-      // collect active commits for table
-      val commits = spark.sql(s"""call show_commits(table => '$tableName', 
limit => 10)""").collect()
-      assertResult(2) {
-        commits.length
-      }
+      val commits = spark.sql(s"call show_commits(table => '$tableName', limit 
=> 10)").collect()
+      assertResult(2)(commits.length)
+
+      val endTs = commits(0).get(0).toString
+      val archived = spark.sql(
+        s"call show_archived_commits(table => '$tableName', end_ts => 
'$endTs')").collect()
+      assertResult(4)(archived.length)
+    }
+  }
+
+  test("Test Call archive_commits Procedure driven only by options") {
+    withTempDir { tmp =>
+      val tableName = createTableWithCommits(tmp.getCanonicalPath, 6)
+
+      // No min/max named params — archival behavior must come from `options` 
alone.
+      // This used to fail (Expected 2, but got 6) because 
withArchivalConfig#putAll
+      // would overwrite hoodie.keep.min.commits/hoodie.keep.max.commits from
+      // user props with the procedure's named-default min=20/max=30.
+      val result = spark.sql(
+        s"call archive_commits(table => '$tableName'," +
+          " retain_commits => 1," +
+          " options => 'hoodie.keep.min.commits=2,hoodie.keep.max.commits=3," +
+          "hoodie.commits.archival.batch=1,hoodie.metadata.enable=false')")
+        .collect()
+        .map(row => Seq(row.getInt(0)))
+      assertResult(1)(result.length)
+      assertResult(0)(result(0).head)
+
+      val commits = spark.sql(s"call show_commits(table => '$tableName', limit 
=> 10)").collect()
+      assertResult(2)(commits.length)
 
-      // collect archived commits for table
       val endTs = commits(0).get(0).toString
-      val archivedCommits = spark.sql(s"""call show_archived_commits(table => 
'$tableName', end_ts => '$endTs')""").collect()
-      assertResult(4) {
-        archivedCommits.length
+      val archived = spark.sql(
+        s"call show_archived_commits(table => '$tableName', end_ts => 
'$endTs')").collect()
+      assertResult(4)(archived.length)
+    }
+  }
+
+  test("Test Call archive_commits Procedure: named parameters override 
options") {
+    withTempDir { tmp =>
+      val tableName = createTableWithCommits(tmp.getCanonicalPath, 6)
+
+      // options requests min=10/max=20 (would archive nothing for 6 commits),
+      // but named min_commits=2/max_commits=3 must take precedence.
+      val result = spark.sql(
+        s"call archive_commits(table => '$tableName'," +
+          " min_commits => 2, max_commits => 3, retain_commits => 1, 
enable_metadata => false," +
+          " options => 
'hoodie.keep.min.commits=10,hoodie.keep.max.commits=20')")
+        .collect()
+        .map(row => Seq(row.getInt(0)))
+      assertResult(1)(result.length)
+      assertResult(0)(result(0).head)
+
+      val commits = spark.sql(s"call show_commits(table => '$tableName', limit 
=> 10)").collect()
+      // named params won → archival happened, only 2 active commits left
+      assertResult(2)(commits.length)
+
+      val endTs = commits(0).get(0).toString
+      val archived = spark.sql(
+        s"call show_archived_commits(table => '$tableName', end_ts => 
'$endTs')").collect()
+      assertResult(4)(archived.length)
+    }
+  }
+
+  test("Test Call archive_commits Procedure: invalid options string fails 
fast") {
+    withTempDir { tmp =>
+      val tableName = createTableWithCommits(tmp.getCanonicalPath, 2)
+
+      val ex = intercept[IllegalArgumentException] {
+        spark.sql(
+          s"call archive_commits(table => '$tableName', options => 
'invalid_token')")
+          .collect()
       }
+      assert(ex.getMessage.contains("Invalid options format"))
     }
   }
 }

Reply via email to