This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch stats_job
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit ec5e29c11ebdeb14a353dc58f640dbafc4d9aea7
Author: fanng <[email protected]>
AuthorDate: Thu Mar 5 09:55:50 2026 +0800

    Refine update-stats submission flow and keep Java 8 compatibility
    
    - Simplify submit-update-stats-job CLI by removing limit and file-based 
JSON options, and centralize job options in command context
    
    - Drop legacy updater option aliases for stats/all validation and tighten 
spark config requirements
    
    - Restore Java 8 bytecode compatibility for maintenance jobs and related 
optimizer modules by relying on root compatibility rules
    
    - Replace Java 9+ collection factories in optimizer-api/updaters code paths 
to preserve Java 8 compatibility
    
    - Strengthen Spark update-stats integration test to assert exact 
table/partition metrics coverage and reorder the main env test for readability
---
 build.gradle.kts                                   |   3 +
 .../updater/metrics/GravitinoMetricsUpdater.java   |   6 +-
 .../storage/jdbc/JdbcMetricsRepository.java        |  14 +-
 .../statistics/GravitinoStatisticsUpdater.java     |   5 +-
 maintenance/jobs/build.gradle.kts                  |   5 -
 .../jobs/iceberg/IcebergUpdateStatsJob.java        |   2 +-
 .../TestIcebergUpdateStatsJobWithSpark.java        | 253 +++++++++++++++------
 .../optimizer/api/common/PartitionPath.java        |   4 +-
 .../api/common/TableAndPartitionStatistics.java    |  12 +-
 .../optimizer/api/monitor/EvaluationResult.java    |   8 +-
 .../optimizer/api/recommender/StrategyHandler.java |   3 +-
 .../api/recommender/StrategyHandlerContext.java    |  11 +-
 .../optimizer/common/conf/OptimizerConfig.java     |   3 +-
 .../maintenance/optimizer/OptimizerCmd.java        |  38 +---
 .../optimizer/command/OptimizerCommandContext.java |  75 +++---
 .../command/SubmitUpdateStatsJobCommand.java       |  84 +------
 .../maintenance/optimizer/TestOptimizerCmd.java    |  49 +---
 .../job/TestBuiltinIcebergRewriteDataFiles.java    |  52 -----
 18 files changed, 309 insertions(+), 318 deletions(-)

diff --git a/build.gradle.kts b/build.gradle.kts
index 4bd2e1e8e0..d1b53e093e 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -335,6 +335,9 @@ subprojects {
     val name = project.name.lowercase()
     val path = project.path.lowercase()
     if (path.startsWith(":maintenance:jobs") ||
+      path.startsWith(":maintenance:optimizer-api") ||
+      path.startsWith(":maintenance:gravitino-updaters") ||
+      path.startsWith(":clients:client-java") ||
       name == "api" ||
       name == "common" ||
       name == "catalog-common" ||
diff --git 
a/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
index 7834efa6e2..3a28de2ed0 100644
--- 
a/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
+++ 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
@@ -21,6 +21,8 @@ package 
org.apache.gravitino.maintenance.optimizer.updater.metrics;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import org.apache.gravitino.maintenance.optimizer.api.common.DataScope;
 import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
@@ -51,14 +53,14 @@ public class GravitinoMetricsUpdater implements 
MetricsUpdater {
   public void updateTableAndPartitionMetrics(List<MetricPoint> metrics) {
     ensureInitialized();
     validateScopes(
-        metrics, List.of(DataScope.Type.TABLE, DataScope.Type.PARTITION), 
"table/partition");
+        metrics, Arrays.asList(DataScope.Type.TABLE, 
DataScope.Type.PARTITION), "table/partition");
     metricsStorage.storeTableAndPartitionMetrics(metrics);
   }
 
   @Override
   public void updateJobMetrics(List<MetricPoint> metrics) {
     ensureInitialized();
-    validateScopes(metrics, List.of(DataScope.Type.JOB), "job");
+    validateScopes(metrics, Collections.singletonList(DataScope.Type.JOB), 
"job");
     metricsStorage.storeJobMetrics(metrics);
   }
 
diff --git 
a/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java
 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java
index e138e18fd7..e47c8bdb5c 100644
--- 
a/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java
+++ 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java
@@ -31,6 +31,8 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
@@ -70,9 +72,10 @@ public abstract class JdbcMetricsRepository implements 
MetricsRepository {
   private static final String TABLE_METRICS_TABLE = "table_metrics";
   private static final String JOB_METRICS_TABLE = "job_metrics";
   private static final Set<String> REQUIRED_TABLE_METRICS_COLUMNS =
-      Set.of("table_identifier", "metric_name", "table_partition", 
"metric_ts", "metric_value");
+      buildRequiredColumnsSet(
+          "table_identifier", "metric_name", "table_partition", "metric_ts", 
"metric_value");
   private static final Set<String> REQUIRED_JOB_METRICS_COLUMNS =
-      Set.of("job_identifier", "metric_name", "metric_ts", "metric_value");
+      buildRequiredColumnsSet("job_identifier", "metric_name", "metric_ts", 
"metric_value");
 
   protected static final String DEFAULT_USER = "sa";
   protected static final String DEFAULT_PASSWORD = "";
@@ -81,6 +84,10 @@ public abstract class JdbcMetricsRepository implements 
MetricsRepository {
   private JdbcMetricsDialect dialect;
   private volatile boolean initialized = false;
 
+  private static Set<String> buildRequiredColumnsSet(String... columns) {
+    return Collections.unmodifiableSet(new HashSet<>(Arrays.asList(columns)));
+  }
+
   protected final void initializeStorage(DataSourceJdbcConnectionProvider 
connectionProvider) {
     Preconditions.checkState(!initialized, "JdbcMetricsRepository has already 
been initialized.");
     Preconditions.checkArgument(connectionProvider != null, 
"connectionProvider must not be null");
@@ -220,7 +227,8 @@ public abstract class JdbcMetricsRepository implements 
MetricsRepository {
         Files.exists(scriptPath),
         "H2 metrics schema script not found at %s. Please ensure distribution 
scripts are present.",
         scriptPath);
-    executeSchemaSql(connection, Files.readString(scriptPath, 
StandardCharsets.UTF_8));
+    executeSchemaSql(
+        connection, new String(Files.readAllBytes(scriptPath), 
StandardCharsets.UTF_8));
   }
 
   private void executeSchemaSql(Connection connection, String sqlContent) 
throws SQLException {
diff --git 
a/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
index a3635cf2c7..2f10d95792 100644
--- 
a/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
+++ 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
@@ -21,6 +21,7 @@ package 
org.apache.gravitino.maintenance.optimizer.updater.statistics;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -86,7 +87,7 @@ public class GravitinoStatisticsUpdater implements 
StatisticsUpdater {
 
   private Map<String, StatisticValue<?>> 
getTableStatisticsMap(List<StatisticEntry<?>> statistics) {
     if (statistics == null || statistics.isEmpty()) {
-      return Map.of();
+      return Collections.emptyMap();
     }
     return toStatisticValueMap(statistics, "table statistics");
   }
@@ -94,7 +95,7 @@ public class GravitinoStatisticsUpdater implements 
StatisticsUpdater {
   private List<PartitionStatisticsUpdate> getPartitionStatisticsUpdates(
       Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics) {
     if (partitionStatistics == null || partitionStatistics.isEmpty()) {
-      return List.of();
+      return Collections.emptyList();
     }
     return partitionStatistics.entrySet().stream()
         .map(
diff --git a/maintenance/jobs/build.gradle.kts 
b/maintenance/jobs/build.gradle.kts
index 5b42074c32..8c82e9c040 100644
--- a/maintenance/jobs/build.gradle.kts
+++ b/maintenance/jobs/build.gradle.kts
@@ -25,11 +25,6 @@ plugins {
   alias(libs.plugins.shadow)
 }
 
-java {
-  sourceCompatibility = JavaVersion.VERSION_17
-  targetCompatibility = JavaVersion.VERSION_17
-}
-
 repositories {
   mavenCentral()
 }
diff --git 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java
 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java
index 6cb08ba04a..9f03fc68f2 100644
--- 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java
+++ 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java
@@ -249,7 +249,7 @@ public class IcebergUpdateStatsJob implements BuiltInJob {
       String sql = buildTableStatsSql(catalogName, tableIdentifier, 
targetFileSizeBytes);
       Row[] rows = (Row[]) spark.sql(sql).collect();
       List<StatisticEntry<?>> tableStatistics =
-          rows.length == 0 ? List.of() : toStatistics(rows[0]);
+          rows.length == 0 ? Collections.emptyList() : toStatistics(rows[0]);
 
       if (updateMode.updateStats) {
         statisticsUpdater.updateTableStatistics(gravitinoTableIdentifier, 
tableStatistics);
diff --git 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
index 736e35db03..291ab11b70 100644
--- 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
+++ 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
@@ -31,7 +31,9 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -78,6 +80,18 @@ public class TestIcebergUpdateStatsJobWithSpark {
   private static final String JOB_TEMPLATE_NAME = 
"builtin-iceberg-update-stats";
   private static final String SPARK_CATALOG_NAME = "rest_catalog";
   private static final String METALAKE_NAME = "test";
+  private static final int EXPECTED_METRIC_COUNT_PER_SCOPE = 8;
+  private static final Set<String> EXPECTED_METRIC_NAMES =
+      new HashSet<>(
+          Arrays.asList(
+              "custom-file_count",
+              "custom-data_files",
+              "custom-position_delete_files",
+              "custom-equality_delete_files",
+              "custom-small_files",
+              "custom-datafile_mse",
+              "custom-avg_size",
+              "custom-total_size"));
 
   @TempDir static File tempDir;
 
@@ -145,6 +159,100 @@ public class TestIcebergUpdateStatsJobWithSpark {
     }
   }
 
+  // Requires a running deploy-mode Gravitino server and Spark environment.
+  @Test
+  @Tag("gravitino-docker-test")
+  @EnabledIfEnvironmentVariable(named = "GRAVITINO_ENV_IT", matches = "true")
+  public void testRunBuiltInUpdateStatsJobViaServerForAllModes() throws 
Exception {
+    String suffix = UUID.randomUUID().toString().replace("-", "");
+    String statsTableName = "jobs_it_update_stats_mode_stats_" + suffix;
+    String metricsTableName = "jobs_it_update_stats_mode_metrics_" + suffix;
+    String partitionMetricsTableName = 
"jobs_it_update_stats_mode_partition_metrics_" + suffix;
+    String allModeTableName = "jobs_it_update_stats_mode_all_" + suffix;
+    String statsFullTableName = SPARK_CATALOG_NAME + ".db." + statsTableName;
+    String metricsFullTableName = SPARK_CATALOG_NAME + ".db." + 
metricsTableName;
+    String partitionMetricsFullTableName = SPARK_CATALOG_NAME + ".db." + 
partitionMetricsTableName;
+    String allModeFullTableName = SPARK_CATALOG_NAME + ".db." + 
allModeTableName;
+
+    SparkSession restSpark = createRestSparkSession();
+    try (MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0.33");
+        GravitinoAdminClient adminClient = 
GravitinoAdminClient.builder(SERVER_URI).build()) {
+      mysql.start();
+      initializeMySqlMetricsSchema(mysql);
+
+      GravitinoMetalake metalake = loadOrCreateMetalake(adminClient, 
METALAKE_NAME);
+      recreateRestCatalog(metalake);
+      createTableAndInsertData(restSpark, statsFullTableName);
+      createTableAndInsertData(restSpark, metricsFullTableName);
+      createPartitionedTableAndInsertData(restSpark, 
partitionMetricsFullTableName);
+      createTableAndInsertData(restSpark, allModeFullTableName);
+
+      NameIdentifier statsTableIdentifier =
+          NameIdentifier.of(SPARK_CATALOG_NAME, "db", statsTableName);
+      NameIdentifier metricsTableIdentifier =
+          NameIdentifier.of(SPARK_CATALOG_NAME, "db", metricsTableName);
+      NameIdentifier partitionMetricsTableIdentifier =
+          NameIdentifier.of(SPARK_CATALOG_NAME, "db", 
partitionMetricsTableName);
+      NameIdentifier allModeTableIdentifier =
+          NameIdentifier.of(SPARK_CATALOG_NAME, "db", allModeTableName);
+
+      PartitionPath dsPartition1 =
+          PartitionPath.of(List.of(new PartitionEntryImpl("ds", 
"2026-03-01")));
+      PartitionPath dsPartition2 =
+          PartitionPath.of(List.of(new PartitionEntryImpl("ds", 
"2026-03-02")));
+
+      try (GenericJdbcMetricsRepository repository = new 
GenericJdbcMetricsRepository()) {
+        repository.initialize(buildJdbcMetricsConfigs(mysql));
+
+        submitJob(
+            metalake,
+            buildUpdateStatsJobConfig(
+                statsTableName, "stats", 
buildUpdaterOptionsForStatsUpdaterOnly()));
+        awaitCustomStatisticsVisible(statsTableName);
+        assertEquals(0, getTableMetricsCount(repository, 
statsTableIdentifier));
+
+        submitJob(
+            metalake,
+            buildUpdateStatsJobConfig(
+                metricsTableName, "metrics", 
buildUpdaterOptionsForMetricsUpdaterOnly(mysql)));
+        assertFalse(containsCustomStatistics(metricsTableName));
+        awaitTableMetricsExactly(
+            repository, metricsTableIdentifier, 
EXPECTED_METRIC_COUNT_PER_SCOPE);
+        assertTableMetricsMatch(repository, metricsTableIdentifier);
+
+        submitJob(
+            metalake,
+            buildUpdateStatsJobConfig(
+                partitionMetricsTableName,
+                "metrics",
+                buildUpdaterOptionsForMetricsUpdaterOnly(mysql)));
+        assertFalse(containsCustomStatistics(partitionMetricsTableName));
+        assertEquals(0, getTableMetricsCount(repository, 
partitionMetricsTableIdentifier));
+        awaitPartitionMetricsExactly(
+            repository,
+            partitionMetricsTableIdentifier,
+            dsPartition1,
+            EXPECTED_METRIC_COUNT_PER_SCOPE);
+        awaitPartitionMetricsExactly(
+            repository,
+            partitionMetricsTableIdentifier,
+            dsPartition2,
+            EXPECTED_METRIC_COUNT_PER_SCOPE);
+        assertPartitionMetricsMatch(repository, 
partitionMetricsTableIdentifier, dsPartition1);
+        assertPartitionMetricsMatch(repository, 
partitionMetricsTableIdentifier, dsPartition2);
+
+        submitJob(
+            metalake,
+            buildUpdateStatsJobConfig(
+                allModeTableName, "all", 
buildUpdaterOptionsForAllMode(mysql)));
+        awaitCustomStatisticsVisible(allModeTableName);
+        awaitTableMetricsExactly(
+            repository, allModeTableIdentifier, 
EXPECTED_METRIC_COUNT_PER_SCOPE);
+        assertTableMetricsMatch(repository, allModeTableIdentifier);
+      }
+    }
+  }
+
   @Test
   public void testUpdateNonPartitionedTableStatistics() {
     RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater();
@@ -414,65 +522,6 @@ public class TestIcebergUpdateStatsJobWithSpark {
         parsedPartitions);
   }
 
-  // Requires a running deploy-mode Gravitino server and Spark environment.
-  @Test
-  @Tag("gravitino-docker-test")
-  @EnabledIfEnvironmentVariable(named = "GRAVITINO_ENV_IT", matches = "true")
-  public void testRunBuiltInUpdateStatsJobViaServerForAllModes() throws 
Exception {
-    String suffix = UUID.randomUUID().toString().replace("-", "");
-    String statsTableName = "jobs_it_update_stats_mode_stats_" + suffix;
-    String metricsTableName = "jobs_it_update_stats_mode_metrics_" + suffix;
-    String allModeTableName = "jobs_it_update_stats_mode_all_" + suffix;
-    String statsFullTableName = SPARK_CATALOG_NAME + ".db." + statsTableName;
-    String metricsFullTableName = SPARK_CATALOG_NAME + ".db." + 
metricsTableName;
-    String allModeFullTableName = SPARK_CATALOG_NAME + ".db." + 
allModeTableName;
-
-    SparkSession restSpark = createRestSparkSession();
-    try (MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0.33");
-        GravitinoAdminClient adminClient = 
GravitinoAdminClient.builder(SERVER_URI).build()) {
-      mysql.start();
-      initializeMySqlMetricsSchema(mysql);
-
-      GravitinoMetalake metalake = loadOrCreateMetalake(adminClient, 
METALAKE_NAME);
-      recreateRestCatalog(metalake);
-      createTableAndInsertData(restSpark, statsFullTableName);
-      createTableAndInsertData(restSpark, metricsFullTableName);
-      createTableAndInsertData(restSpark, allModeFullTableName);
-
-      NameIdentifier statsTableIdentifier =
-          NameIdentifier.of(SPARK_CATALOG_NAME, "db", statsTableName);
-      NameIdentifier metricsTableIdentifier =
-          NameIdentifier.of(SPARK_CATALOG_NAME, "db", metricsTableName);
-      NameIdentifier allModeTableIdentifier =
-          NameIdentifier.of(SPARK_CATALOG_NAME, "db", allModeTableName);
-
-      try (GenericJdbcMetricsRepository repository = new 
GenericJdbcMetricsRepository()) {
-        repository.initialize(buildJdbcMetricsConfigs(mysql));
-
-        submitJob(
-            metalake,
-            buildUpdateStatsJobConfig(
-                statsTableName, "stats", 
buildUpdaterOptionsForStatsUpdaterOnly()));
-        awaitCustomStatisticsVisible(statsTableName);
-        assertEquals(0, getTableMetricsCount(repository, 
statsTableIdentifier));
-
-        submitJob(
-            metalake,
-            buildUpdateStatsJobConfig(
-                metricsTableName, "metrics", 
buildUpdaterOptionsForMetricsUpdaterOnly(mysql)));
-        assertFalse(containsCustomStatistics(metricsTableName));
-        awaitTableMetricsAtLeast(repository, metricsTableIdentifier, 1);
-
-        submitJob(
-            metalake,
-            buildUpdateStatsJobConfig(
-                allModeTableName, "all", 
buildUpdaterOptionsForAllMode(mysql)));
-        awaitCustomStatisticsVisible(allModeTableName);
-        awaitTableMetricsAtLeast(repository, allModeTableIdentifier, 1);
-      }
-    }
-  }
-
   private void awaitCustomStatisticsVisible(String tableName) throws Exception 
{
     Awaitility.await()
         .atMost(Duration.ofSeconds(30))
@@ -498,22 +547,77 @@ public class TestIcebergUpdateStatsJobWithSpark {
     }
   }
 
-  private static void awaitTableMetricsAtLeast(
+  private static void awaitTableMetricsExactly(
+      GenericJdbcMetricsRepository repository, NameIdentifier tableIdentifier, 
int expectedCount) {
+    Awaitility.await()
+        .atMost(Duration.ofSeconds(30))
+        .pollInterval(Duration.ofSeconds(2))
+        .until(() -> getTableMetricsCount(repository, tableIdentifier) == 
expectedCount);
+  }
+
+  private static void awaitPartitionMetricsExactly(
       GenericJdbcMetricsRepository repository,
       NameIdentifier tableIdentifier,
-      int expectedLowerBound) {
+      PartitionPath partitionPath,
+      int expectedCount) {
     Awaitility.await()
         .atMost(Duration.ofSeconds(30))
         .pollInterval(Duration.ofSeconds(2))
-        .until(() -> getTableMetricsCount(repository, tableIdentifier) >= 
expectedLowerBound);
+        .until(
+            () ->
+                getPartitionMetricsCount(repository, tableIdentifier, 
partitionPath)
+                    == expectedCount);
+  }
+
+  private static void assertTableMetricsMatch(
+      GenericJdbcMetricsRepository repository, NameIdentifier tableIdentifier) 
{
+    List<MetricPoint> metrics = getTableMetrics(repository, tableIdentifier);
+    assertEquals(EXPECTED_METRIC_COUNT_PER_SCOPE, metrics.size());
+    assertTrue(metrics.stream().allMatch(metric -> metric.scope() == 
DataScope.Type.TABLE));
+    assertTrue(metrics.stream().allMatch(metric -> 
metric.partitionPath().isEmpty()));
+    assertEquals(
+        EXPECTED_METRIC_NAMES,
+        
metrics.stream().map(MetricPoint::metricName).collect(Collectors.toSet()));
+  }
+
+  private static void assertPartitionMetricsMatch(
+      GenericJdbcMetricsRepository repository,
+      NameIdentifier tableIdentifier,
+      PartitionPath partitionPath) {
+    List<MetricPoint> metrics = getPartitionMetrics(repository, 
tableIdentifier, partitionPath);
+    assertEquals(EXPECTED_METRIC_COUNT_PER_SCOPE, metrics.size());
+    assertTrue(metrics.stream().allMatch(metric -> metric.scope() == 
DataScope.Type.PARTITION));
+    assertTrue(metrics.stream().allMatch(metric -> 
metric.partitionPath().isPresent()));
+    assertEquals(
+        EXPECTED_METRIC_NAMES,
+        
metrics.stream().map(MetricPoint::metricName).collect(Collectors.toSet()));
   }
 
   private static int getTableMetricsCount(
       GenericJdbcMetricsRepository repository, NameIdentifier tableIdentifier) 
{
+    return getTableMetrics(repository, tableIdentifier).size();
+  }
+
+  private static List<MetricPoint> getTableMetrics(
+      GenericJdbcMetricsRepository repository, NameIdentifier tableIdentifier) 
{
     long now = Instant.now().getEpochSecond();
-    return repository
-        .getMetrics(DataScope.forTable(tableIdentifier), now - 1800, now + 
1800)
-        .size();
+    return repository.getMetrics(DataScope.forTable(tableIdentifier), now - 
1800, now + 1800);
+  }
+
+  private static int getPartitionMetricsCount(
+      GenericJdbcMetricsRepository repository,
+      NameIdentifier tableIdentifier,
+      PartitionPath partitionPath) {
+    return getPartitionMetrics(repository, tableIdentifier, 
partitionPath).size();
+  }
+
+  private static List<MetricPoint> getPartitionMetrics(
+      GenericJdbcMetricsRepository repository,
+      NameIdentifier tableIdentifier,
+      PartitionPath partitionPath) {
+    long now = Instant.now().getEpochSecond();
+    return repository.getMetrics(
+        DataScope.forPartition(tableIdentifier, partitionPath), now - 1800, 
now + 1800);
   }
 
   private static Map<String, String> buildJdbcMetricsConfigs(MySQLContainer<?> 
mysql) {
@@ -598,6 +702,25 @@ public class TestIcebergUpdateStatsJobWithSpark {
     }
   }
 
+  private static void createPartitionedTableAndInsertData(
+      SparkSession sparkSession, String fullTableName) {
+    sparkSession.sql("CREATE NAMESPACE IF NOT EXISTS " + SPARK_CATALOG_NAME + 
".db");
+    sparkSession.sql("DROP TABLE IF EXISTS " + fullTableName);
+    sparkSession.sql(
+        "CREATE TABLE "
+            + fullTableName
+            + " (id INT, data STRING, ds STRING) USING iceberg PARTITIONED BY 
(ds)");
+    sparkSession.sql(
+        "ALTER TABLE "
+            + fullTableName
+            + " SET TBLPROPERTIES ('write.target-file-size-bytes'='1024000')");
+    for (int i = 0; i < 10; i++) {
+      String ds = i < 5 ? "2026-03-01" : "2026-03-02";
+      sparkSession.sql(
+          "INSERT INTO " + fullTableName + " VALUES (" + i + ", 'value_" + i + 
"', '" + ds + "')");
+    }
+  }
+
   private static Map<String, String> buildUpdateStatsJobConfig(
       String tableName, String updateMode, String updaterOptions) {
     Map<String, String> jobConf = new HashMap<>();
diff --git 
a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java
index 11f4e80ad6..cbc6c4cdb6 100644
--- 
a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java
+++ 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java
@@ -20,6 +20,8 @@
 package org.apache.gravitino.maintenance.optimizer.api.common;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -42,7 +44,7 @@ public final class PartitionPath {
   public static PartitionPath of(List<PartitionEntry> entries) {
     Preconditions.checkArgument(
         entries != null && !entries.isEmpty(), "partition entries must not be 
empty");
-    return new PartitionPath(List.copyOf(entries));
+    return new PartitionPath(Collections.unmodifiableList(new 
ArrayList<>(entries)));
   }
 
   /**
diff --git 
a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java
index 2e7642c00e..6ae96ce9b9 100644
--- 
a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java
+++ 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java
@@ -19,6 +19,9 @@
 
 package org.apache.gravitino.maintenance.optimizer.api.common;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.annotation.DeveloperApi;
@@ -32,9 +35,14 @@ public class TableAndPartitionStatistics {
   public TableAndPartitionStatistics(
       List<StatisticEntry<?>> tableStatistics,
       Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics) {
-    this.tableStatistics = tableStatistics != null ? 
List.copyOf(tableStatistics) : List.of();
+    this.tableStatistics =
+        tableStatistics != null
+            ? Collections.unmodifiableList(new ArrayList<>(tableStatistics))
+            : Collections.emptyList();
     this.partitionStatistics =
-        partitionStatistics != null ? Map.copyOf(partitionStatistics) : 
Map.of();
+        partitionStatistics != null
+            ? Collections.unmodifiableMap(new 
LinkedHashMap<>(partitionStatistics))
+            : Collections.emptyMap();
   }
 
   public List<StatisticEntry<?>> tableStatistics() {
diff --git 
a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java
index 30456064dc..b843f89d33 100644
--- 
a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java
+++ 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java
@@ -20,6 +20,7 @@
 package org.apache.gravitino.maintenance.optimizer.api.monitor;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -124,7 +125,7 @@ public class EvaluationResult {
   private static Map<String, List<MetricSample>> immutableCopy(
       Map<String, List<MetricSample>> metrics) {
     if (metrics.isEmpty()) {
-      return Map.of();
+      return Collections.emptyMap();
     }
 
     Map<String, List<MetricSample>> copied = new LinkedHashMap<>();
@@ -132,7 +133,10 @@ public class EvaluationResult {
       Preconditions.checkArgument(entry.getKey() != null, "metric name must 
not be null");
       List<MetricSample> values = entry.getValue();
       Preconditions.checkArgument(values != null, "metric values must not be 
null");
-      copied.put(entry.getKey(), 
Collections.unmodifiableList(List.copyOf(values)));
+      for (MetricSample value : values) {
+        Preconditions.checkArgument(value != null, "metric sample must not be 
null");
+      }
+      copied.put(entry.getKey(), Collections.unmodifiableList(new 
ArrayList<>(values)));
     }
     return Collections.unmodifiableMap(copied);
   }
diff --git 
a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java
index 879b2ecb73..79da7e01cd 100644
--- 
a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java
+++ 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java
@@ -19,6 +19,7 @@
 
 package org.apache.gravitino.maintenance.optimizer.api.recommender;
 
+import java.util.Collections;
 import java.util.Set;
 import org.apache.gravitino.annotation.DeveloperApi;
 
@@ -46,7 +47,7 @@ public interface StrategyHandler {
    * @return set of requested data items; empty means no additional data needed
    */
   default Set<DataRequirement> dataRequirements() {
-    return Set.of();
+    return Collections.emptySet();
   }
 
   /**
diff --git 
a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java
index 82f6c9307b..5b3fd7fcd7 100644
--- 
a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java
+++ 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java
@@ -19,7 +19,9 @@
 
 package org.apache.gravitino.maintenance.optimizer.api.recommender;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -139,7 +141,10 @@ public final class StrategyHandlerContext {
      * @return builder for chaining
      */
     public Builder withTableStatistics(List<StatisticEntry<?>> statistics) {
-      this.tableStatistics = statistics == null ? Collections.emptyList() : 
List.copyOf(statistics);
+      this.tableStatistics =
+          statistics == null
+              ? Collections.emptyList()
+              : Collections.unmodifiableList(new ArrayList<>(statistics));
       return this;
     }
 
@@ -152,7 +157,9 @@ public final class StrategyHandlerContext {
     public Builder withPartitionStatistics(
         Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics) {
       this.partitionStatistics =
-          partitionStatistics == null ? Collections.emptyMap() : 
Map.copyOf(partitionStatistics);
+          partitionStatistics == null
+              ? Collections.emptyMap()
+              : Collections.unmodifiableMap(new 
LinkedHashMap<>(partitionStatistics));
       return this;
     }
 
diff --git 
a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
index f915f2cb07..0cb063f304 100644
--- 
a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
+++ 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
@@ -19,6 +19,7 @@
 
 package org.apache.gravitino.maintenance.optimizer.common.conf;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
@@ -164,7 +165,7 @@ public class OptimizerConfig extends Config {
           .version(ConfigConstants.VERSION_1_2_0)
           .stringConf()
           .toSequence()
-          .createWithDefault(List.of());
+          .createWithDefault(Collections.emptyList());
 
   public static final ConfigEntry<String> GRAVITINO_URI_CONFIG =
       new ConfigBuilder(GRAVITINO_URI)
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java
index 6dc6fa01fb..6800e5203e 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java
@@ -121,13 +121,10 @@ public class OptimizerCmd {
               EnumSet.of(CliOption.IDENTIFIERS),
               EnumSet.of(
                   CliOption.DRY_RUN,
-                  CliOption.LIMIT,
                   CliOption.UPDATE_MODE,
                   CliOption.TARGET_FILE_SIZE_BYTES,
                   CliOption.UPDATER_OPTIONS,
-                  CliOption.UPDATER_OPTIONS_FILE,
-                  CliOption.SPARK_CONF,
-                  CliOption.SPARK_CONF_FILE),
+                  CliOption.SPARK_CONF),
               "Submit built-in Iceberg update stats jobs for given table 
identifiers.",
               "./bin/gravitino-optimizer.sh --type submit-update-stats-job 
--identifiers "
                   + "rest.ab.t1,rest.ab.t2 --update-mode stats --dry-run",
@@ -185,9 +182,10 @@ public class OptimizerCmd {
       String updateMode = cmd.getOptionValue(CliOption.UPDATE_MODE.longOpt());
       String targetFileSizeBytes = 
cmd.getOptionValue(CliOption.TARGET_FILE_SIZE_BYTES.longOpt());
       String updaterOptions = 
cmd.getOptionValue(CliOption.UPDATER_OPTIONS.longOpt());
-      String updaterOptionsFile = 
cmd.getOptionValue(CliOption.UPDATER_OPTIONS_FILE.longOpt());
       String sparkConf = cmd.getOptionValue(CliOption.SPARK_CONF.longOpt());
-      String sparkConfFile = 
cmd.getOptionValue(CliOption.SPARK_CONF_FILE.longOpt());
+      OptimizerCommandContext.UpdateStatsJobOptions updateStatsJobOptions =
+          new OptimizerCommandContext.UpdateStatsJobOptions(
+              updateMode, targetFileSizeBytes, updaterOptions, sparkConf);
       String statisticsPayload = 
cmd.getOptionValue(CliOption.STATISTICS_PAYLOAD.longOpt());
       String filePath = cmd.getOptionValue(CliOption.FILE_PATH.longOpt());
       Optional<StatisticsInputContent> statisticsInputContent =
@@ -203,12 +201,7 @@ public class OptimizerCmd {
               actionTime,
               rangeSeconds,
               partitionPathRaw,
-              updateMode,
-              targetFileSizeBytes,
-              updaterOptions,
-              updaterOptionsFile,
-              sparkConf,
-              sparkConfFile,
+              updateStatsJobOptions,
               statisticsInputContent,
               out);
       executeCommand(optimizerType, context);
@@ -340,14 +333,7 @@ public class OptimizerCmd {
   }
 
   private static CommandRules.ValidationPlan updateStatsJobInputRules() {
-    return CommandRules.newBuilder()
-        .addMutuallyExclusive(
-            List.of(CliOption.UPDATER_OPTIONS.longOpt(), 
CliOption.UPDATER_OPTIONS_FILE.longOpt()),
-            "--updater-options and --updater-options-file cannot be used 
together")
-        .addMutuallyExclusive(
-            List.of(CliOption.SPARK_CONF.longOpt(), 
CliOption.SPARK_CONF_FILE.longOpt()),
-            "--spark-conf and --spark-conf-file cannot be used together")
-        .build();
+    return CommandRules.emptyPlan();
   }
 
   private static void printGlobalHelp(Options options, PrintStream out) {
@@ -518,21 +504,11 @@ public class OptimizerCmd {
         CliOptionArgType.SINGLE,
         null,
         "JSON map for updater options in submit-update-stats-job"),
-    UPDATER_OPTIONS_FILE(
-        "updater-options-file",
-        CliOptionArgType.SINGLE,
-        null,
-        "Path to JSON file for updater options in submit-update-stats-job"),
     SPARK_CONF(
         "spark-conf",
         CliOptionArgType.SINGLE,
         null,
-        "JSON map for Spark configs in submit-update-stats-job"),
-    SPARK_CONF_FILE(
-        "spark-conf-file",
-        CliOptionArgType.SINGLE,
-        null,
-        "Path to JSON file for Spark configs in submit-update-stats-job");
+        "JSON map for Spark configs in submit-update-stats-job");
 
     private final String longOpt;
     private final CliOptionArgType argType;
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java
index b29ba0d09d..88d11046fa 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.maintenance.optimizer.command;
 
 import java.io.PrintStream;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
@@ -37,12 +38,7 @@ public final class OptimizerCommandContext {
   private final String actionTime;
   private final String rangeSeconds;
   private final String partitionPathRaw;
-  private final String updateMode;
-  private final String targetFileSizeBytes;
-  private final String updaterOptions;
-  private final String updaterOptionsFile;
-  private final String sparkConf;
-  private final String sparkConfFile;
+  private final UpdateStatsJobOptions updateStatsJobOptions;
   private final Optional<StatisticsInputContent> statisticsInputContent;
   private final PrintStream output;
 
@@ -56,12 +52,7 @@ public final class OptimizerCommandContext {
       String actionTime,
       String rangeSeconds,
       String partitionPathRaw,
-      String updateMode,
-      String targetFileSizeBytes,
-      String updaterOptions,
-      String updaterOptionsFile,
-      String sparkConf,
-      String sparkConfFile,
+      UpdateStatsJobOptions updateStatsJobOptions,
       Optional<StatisticsInputContent> statisticsInputContent,
       PrintStream output) {
     this.optimizerEnv = optimizerEnv;
@@ -73,12 +64,8 @@ public final class OptimizerCommandContext {
     this.actionTime = actionTime;
     this.rangeSeconds = rangeSeconds;
     this.partitionPathRaw = partitionPathRaw;
-    this.updateMode = updateMode;
-    this.targetFileSizeBytes = targetFileSizeBytes;
-    this.updaterOptions = updaterOptions;
-    this.updaterOptionsFile = updaterOptionsFile;
-    this.sparkConf = sparkConf;
-    this.sparkConfFile = sparkConfFile;
+    this.updateStatsJobOptions =
+        Objects.requireNonNull(updateStatsJobOptions, "updateStatsJobOptions 
must not be null");
     this.statisticsInputContent = statisticsInputContent;
     this.output = output;
   }
@@ -127,28 +114,24 @@ public final class OptimizerCommandContext {
     return partitionPathRaw;
   }
 
+  public UpdateStatsJobOptions updateStatsJobOptions() {
+    return updateStatsJobOptions;
+  }
+
   public String updateMode() {
-    return updateMode;
+    return updateStatsJobOptions.updateMode();
   }
 
   public String targetFileSizeBytes() {
-    return targetFileSizeBytes;
+    return updateStatsJobOptions.targetFileSizeBytes();
   }
 
   public String updaterOptions() {
-    return updaterOptions;
-  }
-
-  public String updaterOptionsFile() {
-    return updaterOptionsFile;
+    return updateStatsJobOptions.updaterOptions();
   }
 
   public String sparkConf() {
-    return sparkConf;
-  }
-
-  public String sparkConfFile() {
-    return sparkConfFile;
+    return updateStatsJobOptions.sparkConf();
   }
 
   public Optional<StatisticsInputContent> statisticsInputContent() {
@@ -158,4 +141,36 @@ public final class OptimizerCommandContext {
   public PrintStream output() {
     return output;
   }
+
+  /** Submit-update-stats-job specific command options. */
+  public static final class UpdateStatsJobOptions {
+    private final String updateMode;
+    private final String targetFileSizeBytes;
+    private final String updaterOptions;
+    private final String sparkConf;
+
+    public UpdateStatsJobOptions(
+        String updateMode, String targetFileSizeBytes, String updaterOptions, 
String sparkConf) {
+      this.updateMode = updateMode;
+      this.targetFileSizeBytes = targetFileSizeBytes;
+      this.updaterOptions = updaterOptions;
+      this.sparkConf = sparkConf;
+    }
+
+    public String updateMode() {
+      return updateMode;
+    }
+
+    public String targetFileSizeBytes() {
+      return targetFileSizeBytes;
+    }
+
+    public String updaterOptions() {
+      return updaterOptions;
+    }
+
+    public String sparkConf() {
+      return sparkConf;
+    }
+  }
 }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java
index c06b024b47..00e62d615a 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java
@@ -22,9 +22,6 @@ package org.apache.gravitino.maintenance.optimizer.command;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -47,22 +44,18 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
   private static final String DEFAULT_UPDATE_MODE = "stats";
   private static final long DEFAULT_TARGET_FILE_SIZE_BYTES = 100_000L;
   private static final String OPTION_UPDATER_OPTIONS = "updater-options";
-  private static final String OPTION_UPDATER_OPTIONS_FILE = 
"updater-options-file";
   private static final String OPTION_SPARK_CONF = "spark-conf";
-  private static final String OPTION_SPARK_CONF_FILE = "spark-conf-file";
 
   private static final ObjectMapper MAPPER = new ObjectMapper();
 
   @Override
   public void execute(OptimizerCommandContext context) throws Exception {
     Map<String, String> submitterConfigs = 
context.optimizerEnv().config().jobSubmitterConfigs();
-    int limit = parseLimit(context.limit());
 
     List<TableTarget> tableTargets =
         parseTableTargets(
             context.identifiers(),
-            
context.optimizerEnv().config().get(OptimizerConfig.GRAVITINO_DEFAULT_CATALOG_CONFIG),
-            limit);
+            
context.optimizerEnv().config().get(OptimizerConfig.GRAVITINO_DEFAULT_CATALOG_CONFIG));
 
     String updateMode =
         parseUpdateMode(
@@ -73,19 +66,9 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
                 context.targetFileSizeBytes(), 
submitterConfigs.get("target_file_size_bytes")));
 
     String updaterOptionsJson =
-        resolveJsonOption(
-            OPTION_UPDATER_OPTIONS,
-            context.updaterOptions(),
-            OPTION_UPDATER_OPTIONS_FILE,
-            context.updaterOptionsFile(),
-            submitterConfigs.get("updater_options"));
+        resolveJsonOption(context.updaterOptions(), 
submitterConfigs.get("updater_options"));
     String sparkConfJson =
-        resolveJsonOption(
-            OPTION_SPARK_CONF,
-            context.sparkConf(),
-            OPTION_SPARK_CONF_FILE,
-            context.sparkConfFile(),
-            submitterConfigs.get("spark_conf"));
+        resolveJsonOption(context.sparkConf(), 
submitterConfigs.get("spark_conf"));
 
     Map<String, String> updaterOptions =
         parseFlatJsonMap(updaterOptionsJson, OPTION_UPDATER_OPTIONS);
@@ -156,32 +139,10 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
     return StringUtils.isBlank(confValue) ? null : confValue.trim();
   }
 
-  private static String resolveJsonOption(
-      String cliOptionName,
-      String cliValue,
-      String fileOptionName,
-      String filePath,
-      String confValue) {
+  private static String resolveJsonOption(String cliValue, String confValue) {
     if (StringUtils.isNotBlank(cliValue)) {
       return cliValue.trim();
     }
-    if (StringUtils.isNotBlank(filePath)) {
-      try {
-        Path path = Path.of(filePath.trim());
-        Preconditions.checkArgument(
-            Files.exists(path), "Option %s file does not exist: %s", 
fileOptionName, filePath);
-        return Files.readString(path, StandardCharsets.UTF_8).trim();
-      } catch (Exception e) {
-        throw new IllegalArgumentException(
-            String.format(
-                Locale.ROOT,
-                "Failed to read option %s from %s: %s",
-                cliOptionName,
-                filePath,
-                e.getMessage()),
-            e);
-      }
-    }
     return StringUtils.isBlank(confValue) ? null : confValue.trim();
   }
 
@@ -202,27 +163,13 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
     return OptimizerCommandUtils.parseLongOption("target-file-size-bytes", 
value.trim(), false);
   }
 
-  private static int parseLimit(String limit) {
-    if (StringUtils.isBlank(limit)) {
-      return Integer.MAX_VALUE;
-    }
-    long parsed = OptimizerCommandUtils.parseLongOption("limit", limit.trim(), 
false);
-    Preconditions.checkArgument(
-        parsed <= Integer.MAX_VALUE, "Option limit must be <= %s", 
Integer.MAX_VALUE);
-    return (int) parsed;
-  }
-
-  private static List<TableTarget> parseTableTargets(
-      String[] identifiers, String defaultCatalog, int limit) {
+  private static List<TableTarget> parseTableTargets(String[] identifiers, 
String defaultCatalog) {
     Preconditions.checkArgument(
         identifiers != null && identifiers.length > 0,
         "Missing required option --identifiers for command 
'submit-update-stats-job'");
 
     List<TableTarget> tableTargets = new ArrayList<>();
     for (String rawIdentifier : identifiers) {
-      if (tableTargets.size() >= limit) {
-        break;
-      }
       Preconditions.checkArgument(
           StringUtils.isNotBlank(rawIdentifier), "--identifiers contains blank 
identifier");
       String[] levels = rawIdentifier.trim().split("\\.");
@@ -265,14 +212,8 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
     if (!"stats".equals(updateMode) && !"all".equals(updateMode)) {
       return;
     }
-    String gravitinoUri =
-        firstNonBlank(
-            updaterOptions.get("gravitino_uri"),
-            updaterOptions.get("gravitino-uri"),
-            updaterOptions.get(OptimizerConfig.GRAVITINO_URI));
-    String metalake =
-        firstNonBlank(
-            updaterOptions.get("metalake"), 
updaterOptions.get(OptimizerConfig.GRAVITINO_METALAKE));
+    String gravitinoUri = 
StringUtils.trimToNull(updaterOptions.get("gravitino_uri"));
+    String metalake = StringUtils.trimToNull(updaterOptions.get("metalake"));
     Preconditions.checkArgument(
         StringUtils.isNotBlank(gravitinoUri),
         "Option --updater-options (or config key 
jobSubmitterConfig.updater_options) "
@@ -287,7 +228,7 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
       List<TableTarget> tableTargets, Map<String, String> sparkConfigs) {
     Preconditions.checkArgument(
         !sparkConfigs.isEmpty(),
-        "Missing spark config. Set --spark-conf/--spark-conf-file or "
+        "Missing spark config. Set --spark-conf or "
             + "gravitino.optimizer.jobSubmitterConfig.spark_conf in the config 
file");
     for (TableTarget tableTarget : tableTargets) {
       String requiredKey = "spark.sql.catalog." + tableTarget.catalogName;
@@ -333,15 +274,6 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
     }
   }
 
-  private static String firstNonBlank(String... values) {
-    for (String value : values) {
-      if (StringUtils.isNotBlank(value)) {
-        return value.trim();
-      }
-    }
-    return null;
-  }
-
   private static final class TableTarget {
     private final String fullIdentifier;
     private final String catalogName;
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java
index 2be5785a87..b18d73be1f 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java
@@ -421,22 +421,8 @@ class TestOptimizerCmd {
   }
 
   @Test
-  void testSubmitUpdateStatsJobSupportsJsonFileOverrides() throws Exception {
+  void testSubmitUpdateStatsJobSupportsJsonOverrides() throws Exception {
     Path confPath = createOptimizerConfForSubmitUpdateStatsJob();
-    Path updaterOptionsFile = 
Files.createTempFile("optimizer-updater-options-", ".json");
-    Path sparkConfFile = Files.createTempFile("optimizer-spark-conf-", 
".json");
-    Files.writeString(
-        updaterOptionsFile,
-        "{\"metrics_updater\":\"custom-metrics-updater\"}",
-        StandardCharsets.UTF_8);
-    Files.writeString(
-        sparkConfFile,
-        
"{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\","
-            + "\"spark.sql.catalog.rest.type\":\"rest\","
-            + 
"\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\"}";,
-        StandardCharsets.UTF_8);
-    updaterOptionsFile.toFile().deleteOnExit();
-    sparkConfFile.toFile().deleteOnExit();
 
     String[] output =
         runCommand(
@@ -446,10 +432,12 @@ class TestOptimizerCmd {
             "rest.ab.t1",
             "--update-mode",
             "metrics",
-            "--updater-options-file",
-            updaterOptionsFile.toString(),
-            "--spark-conf-file",
-            sparkConfFile.toString(),
+            "--updater-options",
+            "{\"metrics_updater\":\"custom-metrics-updater\"}",
+            "--spark-conf",
+            
"{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\","
+                + "\"spark.sql.catalog.rest.type\":\"rest\","
+                + 
"\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\"}";,
             "--dry-run",
             "--conf-path",
             confPath.toString());
@@ -458,29 +446,6 @@ class TestOptimizerCmd {
     Assertions.assertTrue(output[0].contains("spark.sql.catalog.rest"));
   }
 
-  @Test
-  void testSubmitUpdateStatsJobRejectsMutuallyExclusiveUpdaterInputs() throws 
Exception {
-    Path confPath = createOptimizerConfForSubmitUpdateStatsJob();
-    Path updaterOptionsFile = 
Files.createTempFile("optimizer-updater-options-", ".json");
-    Files.writeString(updaterOptionsFile, "{\"metrics_updater\":\"m\"}", 
StandardCharsets.UTF_8);
-    updaterOptionsFile.toFile().deleteOnExit();
-    String[] output =
-        runCommand(
-            "--type",
-            "submit-update-stats-job",
-            "--identifiers",
-            "rest.ab.t1",
-            "--updater-options",
-            "{\"metrics_updater\":\"m1\"}",
-            "--updater-options-file",
-            updaterOptionsFile.toString(),
-            "--dry-run",
-            "--conf-path",
-            confPath.toString());
-    Assertions.assertTrue(
-        output[1].contains("--updater-options and --updater-options-file 
cannot be used together"));
-  }
-
   private Path createOptimizerConfForMetricsProvider() throws Exception {
     Path confPath = Files.createTempFile("optimizer-test-", ".conf");
     // Route command reads to deterministic in-memory fixtures from 
MetricsProviderForTest.
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
index 6514ac755e..444a11d55f 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.client.GravitinoAdminClient;
-import org.apache.gravitino.client.GravitinoClient;
 import org.apache.gravitino.client.GravitinoMetalake;
 import org.apache.gravitino.dto.rel.ColumnDTO;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
@@ -38,11 +37,9 @@ import 
org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
 import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
 import 
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionJobContext;
 import org.apache.gravitino.rel.Column;
-import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.expressions.transforms.Transforms;
 import org.apache.gravitino.rel.types.Types;
-import org.apache.gravitino.stats.Statistic;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.awaitility.Awaitility;
@@ -58,7 +55,6 @@ public class TestBuiltinIcebergRewriteDataFiles {
   private static final String METALAKE_NAME = "test";
   private static final String ICEBERG_REST_URI = 
"http://localhost:9001/iceberg";;
   private static final String JOB_TEMPLATE_NAME = 
"builtin-iceberg-rewrite-data-files";
-  private static final String UPDATE_STATS_JOB_TEMPLATE_NAME = 
"builtin-iceberg-update-stats";
   private static final String SPARK_CATALOG_NAME = "rest_catalog";
   private static final String WAREHOUSE_LOCATION = "";
 
@@ -214,38 +210,6 @@ public class TestBuiltinIcebergRewriteDataFiles {
         });
   }
 
-  @Test
-  void testSubmitBuiltinIcebergUpdateStatsJobAndPersistStatistics() throws 
Exception {
-    String tableName = "update_stats_table";
-    String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName;
-    runWithSparkAndMetalake(
-        (spark, metalake) -> {
-          createTableAndInsertData(spark, fullTableName);
-
-          Map<String, String> jobConf = buildUpdateStatsJobConfig(tableName);
-          submitJob(metalake, UPDATE_STATS_JOB_TEMPLATE_NAME, jobConf);
-
-          try (GravitinoClient client =
-              
GravitinoClient.builder(SERVER_URI).withMetalake(METALAKE_NAME).build()) {
-            Table table =
-                client
-                    .loadCatalog(SPARK_CATALOG_NAME)
-                    .asTableCatalog()
-                    .loadTable(NameIdentifier.of("db", tableName));
-            List<Statistic> stats = 
table.supportsStatistics().listStatistics();
-            Assertions.assertFalse(stats.isEmpty(), "Expected table statistics 
to be updated");
-
-            Map<String, Statistic> statsMap = new HashMap<>();
-            for (Statistic stat : stats) {
-              statsMap.put(stat.name(), stat);
-            }
-            Assertions.assertTrue(statsMap.containsKey("custom-file_count"));
-            Assertions.assertTrue(statsMap.containsKey("custom-datafile_mse"));
-            Assertions.assertTrue(statsMap.containsKey("custom-total_size"));
-          }
-        });
-  }
-
   private static GravitinoMetalake loadOrCreateMetalake(
       GravitinoAdminClient client, String metalakeName) {
     try {
@@ -306,22 +270,6 @@ public class TestBuiltinIcebergRewriteDataFiles {
     return jobConf;
   }
 
-  private static Map<String, String> buildUpdateStatsJobConfig(String 
tableName) {
-    Map<String, String> jobConf = new HashMap<>();
-    jobConf.put("table_identifier", "db." + tableName);
-    jobConf.put("update_mode", "all");
-    jobConf.put("target_file_size_bytes", "100000");
-    jobConf.put(
-        "updater_options",
-        "{\"gravitino_uri\":\""
-            + SERVER_URI
-            + "\",\"metalake\":\""
-            + METALAKE_NAME
-            + "\",\"statistics_updater\":\"gravitino-statistics-updater\"}");
-    jobConf.putAll(createOptimizerConfig().jobSubmitterConfigs());
-    return jobConf;
-  }
-
   private static Map<String, String> buildCompactionJobConfig(
       OptimizerConfig optimizerConfig,
       String tableName,

Reply via email to