This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch stats_job
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit adb57bd3e18d510d390b035b7fbf25733e1cfaf9
Author: fanng <[email protected]>
AuthorDate: Wed Mar 4 13:09:41 2026 +0800

    [#12688] Refine iceberg update stats job modes and options
---
 maintenance/jobs/build.gradle.kts                  |   5 +
 .../jobs/iceberg/IcebergUpdateStatsJob.java        | 286 ++++++++++++-------
 .../jobs/iceberg/TestIcebergUpdateStatsJob.java    |  97 +++++--
 .../TestIcebergUpdateStatsJobWithSpark.java        | 310 ++++++++++++++++++++-
 .../job/TestBuiltinIcebergRewriteDataFiles.java    |  11 +-
 .../job/TestBuiltinIcebergUpdateStatsJob.java      |  45 ++-
 6 files changed, 611 insertions(+), 143 deletions(-)

diff --git a/maintenance/jobs/build.gradle.kts 
b/maintenance/jobs/build.gradle.kts
index e88f3ebbef..5b42074c32 100644
--- a/maintenance/jobs/build.gradle.kts
+++ b/maintenance/jobs/build.gradle.kts
@@ -68,6 +68,11 @@ dependencies {
     exclude("javax.servlet")
   }
   testImplementation(libs.junit.jupiter.api)
+  testImplementation(libs.awaitility)
+  testImplementation(libs.testcontainers)
+  testImplementation(libs.testcontainers.junit.jupiter)
+  testImplementation(libs.testcontainers.mysql)
+  testRuntimeOnly(libs.mysql.driver)
   
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
 {
     exclude("org.slf4j")
     exclude("org.apache.logging.log4j")
diff --git 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java
 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java
index e2689825c3..40b1279bab 100644
--- 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java
+++ 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java
@@ -26,7 +26,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.job.JobTemplateProvider;
 import org.apache.gravitino.job.SparkJobTemplate;
@@ -61,6 +64,7 @@ public class IcebergUpdateStatsJob implements BuiltInJob {
   private static final String DEFAULT_STATISTICS_UPDATER = 
"gravitino-statistics-updater";
   private static final String DEFAULT_METRICS_UPDATER = 
"gravitino-metrics-updater";
   private static final long DEFAULT_TARGET_FILE_SIZE_BYTES = 100_000L;
+  private static final String DEFAULT_UPDATE_MODE = UpdateMode.ALL.modeName;
   private static final String CUSTOM_STAT_PREFIX = "custom-";
 
   @Override
@@ -83,25 +87,16 @@ public class IcebergUpdateStatsJob implements BuiltInJob {
     Map<String, String> argMap = parseArguments(args);
     String catalogName = argMap.get("catalog");
     String tableIdentifier = argMap.get("table");
-    String gravitinoUri = argMap.get("gravitino-uri");
-    String metalake = argMap.get("metalake");
-
-    if (catalogName == null
-        || tableIdentifier == null
-        || gravitinoUri == null
-        || metalake == null) {
-      System.err.println(
-          "Error: --catalog, --table, --gravitino-uri and --metalake are 
required arguments");
+    UpdateMode updateMode = parseUpdateMode(argMap.get("update-mode"));
+
+    if (catalogName == null || tableIdentifier == null) {
+      System.err.println("Error: --catalog and --table are required 
arguments");
       printUsage();
       System.exit(1);
     }
 
-    String updaterName =
-        argMap.getOrDefault("statistics-updater", 
DEFAULT_STATISTICS_UPDATER).trim();
-    boolean enableMetrics = parseEnableMetrics(argMap.get("enable-metrics"));
-    String metricsUpdaterName =
-        argMap.getOrDefault("metrics-updater", DEFAULT_METRICS_UPDATER).trim();
     long targetFileSizeBytes = 
parseTargetFileSize(argMap.get("target-file-size-bytes"));
+    Map<String, String> updaterOptions = 
parseJsonOptions(argMap.get("updater-options"));
     String sparkConfJson = argMap.get("spark-conf");
 
     SparkSession.Builder sparkBuilder =
@@ -118,19 +113,30 @@ public class IcebergUpdateStatsJob implements BuiltInJob {
     StatisticsUpdater statisticsUpdater = null;
     MetricsUpdater metricsUpdater = null;
     try {
-      statisticsUpdater = createStatisticsUpdater(updaterName, gravitinoUri, 
metalake);
-      if (enableMetrics) {
-        metricsUpdater = createMetricsUpdater(metricsUpdaterName, 
gravitinoUri, metalake);
+      Map<String, String> optimizerProperties = 
buildOptimizerProperties(updaterOptions);
+      if (updateMode.updateStats) {
+        String statisticsUpdaterName =
+            updaterOptions.getOrDefault("statistics_updater", 
DEFAULT_STATISTICS_UPDATER).trim();
+        statisticsUpdater =
+            createStatisticsUpdater(
+                statisticsUpdaterName, 
requireGravitinoConfig(optimizerProperties));
+      }
+      if (updateMode.updateMetrics) {
+        String metricsUpdaterName =
+            updaterOptions.getOrDefault("metrics_updater", 
DEFAULT_METRICS_UPDATER).trim();
+        metricsUpdater = createMetricsUpdater(metricsUpdaterName, 
optimizerProperties);
       }
+
       updateStatistics(
           spark,
           statisticsUpdater,
           metricsUpdater,
+          updateMode,
           catalogName,
           tableIdentifier,
           targetFileSizeBytes);
     } catch (Exception e) {
-      LOG.error("Failed to update Iceberg statistics", e);
+      LOG.error("Failed to update Iceberg statistics/metrics", e);
       System.exit(1);
     } finally {
       if (statisticsUpdater != null) {
@@ -158,16 +164,52 @@ public class IcebergUpdateStatsJob implements BuiltInJob {
       String tableIdentifier,
       long targetFileSizeBytes) {
     updateStatistics(
-        spark, statisticsUpdater, null, catalogName, tableIdentifier, 
targetFileSizeBytes);
+        spark,
+        statisticsUpdater,
+        null,
+        UpdateMode.STATS,
+        catalogName,
+        tableIdentifier,
+        targetFileSizeBytes);
+  }
+
+  static void updateStatistics(
+      SparkSession spark,
+      StatisticsUpdater statisticsUpdater,
+      MetricsUpdater metricsUpdater,
+      String catalogName,
+      String tableIdentifier,
+      long targetFileSizeBytes) {
+    updateStatistics(
+        spark,
+        statisticsUpdater,
+        metricsUpdater,
+        UpdateMode.ALL,
+        catalogName,
+        tableIdentifier,
+        targetFileSizeBytes);
   }
 
   static void updateStatistics(
       SparkSession spark,
       StatisticsUpdater statisticsUpdater,
       MetricsUpdater metricsUpdater,
+      UpdateMode updateMode,
       String catalogName,
       String tableIdentifier,
       long targetFileSizeBytes) {
+    Objects.requireNonNull(updateMode, "updateMode must not be null");
+
+    if (updateMode.updateStats && statisticsUpdater == null) {
+      throw new IllegalArgumentException(
+          "Statistics updater must be configured when update_mode is stats or 
all");
+    }
+
+    if (updateMode.updateMetrics && metricsUpdater == null) {
+      throw new IllegalArgumentException(
+          "Metrics updater must be configured when update_mode is metrics or 
all");
+    }
+
     NameIdentifier gravitinoTableIdentifier =
         toGravitinoTableIdentifier(catalogName, tableIdentifier);
     long metricTimestamp = System.currentTimeMillis() / 1000L;
@@ -180,33 +222,47 @@ public class IcebergUpdateStatsJob implements BuiltInJob {
       for (Row row : rows) {
         PartitionPath partitionPath = toPartitionPath(row.getAs("partition"));
         List<StatisticEntry<?>> statistics = toStatistics(row);
-        partitionStatistics.put(partitionPath, statistics);
-        if (metricsUpdater != null) {
+        if (updateMode.updateStats) {
+          partitionStatistics.put(partitionPath, statistics);
+        }
+        if (updateMode.updateMetrics) {
           tableAndPartitionMetrics.addAll(
               toPartitionMetricPoints(
                   gravitinoTableIdentifier, partitionPath, statistics, 
metricTimestamp));
         }
       }
-      statisticsUpdater.updatePartitionStatistics(gravitinoTableIdentifier, 
partitionStatistics);
-      if (metricsUpdater != null && !tableAndPartitionMetrics.isEmpty()) {
+
+      if (updateMode.updateStats) {
+        statisticsUpdater.updatePartitionStatistics(gravitinoTableIdentifier, 
partitionStatistics);
+      }
+
+      if (updateMode.updateMetrics && !tableAndPartitionMetrics.isEmpty()) {
         
metricsUpdater.updateTableAndPartitionMetrics(tableAndPartitionMetrics);
       }
+
       LOG.info(
-          "Updated partition statistics for {} partitions on {}",
-          partitionStatistics.size(),
+          "Updated partition data in mode {} for {} partitions on {}",
+          updateMode.modeName,
+          rows.length,
           gravitinoTableIdentifier);
     } else {
       String sql = buildTableStatsSql(catalogName, tableIdentifier, 
targetFileSizeBytes);
       Row[] rows = (Row[]) spark.sql(sql).collect();
       List<StatisticEntry<?>> tableStatistics =
           rows.length == 0 ? List.of() : toStatistics(rows[0]);
-      statisticsUpdater.updateTableStatistics(gravitinoTableIdentifier, 
tableStatistics);
-      if (metricsUpdater != null && !tableStatistics.isEmpty()) {
+
+      if (updateMode.updateStats) {
+        statisticsUpdater.updateTableStatistics(gravitinoTableIdentifier, 
tableStatistics);
+      }
+
+      if (updateMode.updateMetrics && !tableStatistics.isEmpty()) {
         metricsUpdater.updateTableAndPartitionMetrics(
             toTableMetricPoints(gravitinoTableIdentifier, tableStatistics, 
metricTimestamp));
       }
+
       LOG.info(
-          "Updated table statistics with {} metrics on {}",
+          "Updated table data in mode {} with {} metrics on {}",
+          updateMode.modeName,
           tableStatistics.size(),
           gravitinoTableIdentifier);
     }
@@ -338,13 +394,17 @@ public class IcebergUpdateStatsJob implements BuiltInJob {
   }
 
   static Map<String, String> parseCustomSparkConfigs(String sparkConfJson) {
-    if (sparkConfJson == null || sparkConfJson.isEmpty()) {
+    return parseJsonOptions(sparkConfJson);
+  }
+
+  static Map<String, String> parseJsonOptions(String json) {
+    if (json == null || json.isEmpty()) {
       return new HashMap<>();
     }
     try {
       ObjectMapper mapper = new ObjectMapper();
       Map<String, Object> parsedMap =
-          mapper.readValue(sparkConfJson, new TypeReference<Map<String, 
Object>>() {});
+          mapper.readValue(json, new TypeReference<Map<String, Object>>() {});
       Map<String, String> configs = new HashMap<>();
       for (Map.Entry<String, Object> entry : parsedMap.entrySet()) {
         configs.put(entry.getKey(), entry.getValue() == null ? "" : 
entry.getValue().toString());
@@ -352,11 +412,7 @@ public class IcebergUpdateStatsJob implements BuiltInJob {
       return configs;
     } catch (Exception e) {
       throw new IllegalArgumentException(
-          "Failed to parse Spark configurations JSON: "
-              + sparkConfJson
-              + ". Error: "
-              + e.getMessage(),
-          e);
+          "Failed to parse JSON options: " + json + ". Error: " + 
e.getMessage(), e);
     }
   }
 
@@ -375,37 +431,59 @@ public class IcebergUpdateStatsJob implements BuiltInJob {
     }
   }
 
-  static boolean parseEnableMetrics(String value) {
+  static UpdateMode parseUpdateMode(String value) {
     if (value == null || value.trim().isEmpty()) {
-      return false;
+      return UpdateMode.from(DEFAULT_UPDATE_MODE);
     }
-    if ("true".equalsIgnoreCase(value.trim())) {
-      return true;
+    return UpdateMode.from(value);
+  }
+
+  static Map<String, String> buildOptimizerProperties(Map<String, String> 
updaterOptions) {
+    Map<String, String> optimizerProperties = new HashMap<>(updaterOptions);
+
+    Optional<String> gravitinoUri =
+        firstNonEmpty(
+            updaterOptions.get("gravitino_uri"),
+            updaterOptions.get("gravitino-uri"),
+            updaterOptions.get(OptimizerConfig.GRAVITINO_URI));
+    Optional<String> metalake =
+        firstNonEmpty(
+            updaterOptions.get("metalake"), 
updaterOptions.get(OptimizerConfig.GRAVITINO_METALAKE));
+
+    gravitinoUri.ifPresent(uri -> 
optimizerProperties.put(OptimizerConfig.GRAVITINO_URI, uri));
+    metalake.ifPresent(value -> 
optimizerProperties.put(OptimizerConfig.GRAVITINO_METALAKE, value));
+    return optimizerProperties;
+  }
+
+  static Map<String, String> requireGravitinoConfig(Map<String, String> 
optimizerProperties) {
+    String gravitinoUri = 
optimizerProperties.get(OptimizerConfig.GRAVITINO_URI);
+    String metalake = 
optimizerProperties.get(OptimizerConfig.GRAVITINO_METALAKE);
+
+    if (gravitinoUri == null || gravitinoUri.trim().isEmpty()) {
+      throw new IllegalArgumentException(
+          "updater_options must contain 'gravitino_uri' when update_mode is 
stats or all");
     }
-    if ("false".equalsIgnoreCase(value.trim())) {
-      return false;
+
+    if (metalake == null || metalake.trim().isEmpty()) {
+      throw new IllegalArgumentException(
+          "updater_options must contain 'metalake' when update_mode is stats 
or all");
     }
-    throw new IllegalArgumentException("Invalid enable-metrics value: " + 
value);
+
+    return optimizerProperties;
   }
 
   private static StatisticsUpdater createStatisticsUpdater(
-      String updaterName, String gravitinoUri, String metalake) {
+      String updaterName, Map<String, String> optimizerProperties) {
     StatisticsUpdater statisticsUpdater =
         ProviderUtils.createStatisticsUpdaterInstance(updaterName);
-    Map<String, String> conf = new HashMap<>();
-    conf.put(OptimizerConfig.GRAVITINO_URI, gravitinoUri);
-    conf.put(OptimizerConfig.GRAVITINO_METALAKE, metalake);
-    statisticsUpdater.initialize(new OptimizerEnv(new OptimizerConfig(conf)));
+    statisticsUpdater.initialize(new OptimizerEnv(new 
OptimizerConfig(optimizerProperties)));
     return statisticsUpdater;
   }
 
   private static MetricsUpdater createMetricsUpdater(
-      String updaterName, String gravitinoUri, String metalake) {
+      String updaterName, Map<String, String> optimizerProperties) {
     MetricsUpdater metricsUpdater = 
ProviderUtils.createMetricsUpdaterInstance(updaterName);
-    Map<String, String> conf = new HashMap<>();
-    conf.put(OptimizerConfig.GRAVITINO_URI, gravitinoUri);
-    conf.put(OptimizerConfig.GRAVITINO_METALAKE, metalake);
-    metricsUpdater.initialize(new OptimizerEnv(new OptimizerConfig(conf)));
+    metricsUpdater.initialize(new OptimizerEnv(new 
OptimizerConfig(optimizerProperties)));
     return metricsUpdater;
   }
 
@@ -478,65 +556,83 @@ public class IcebergUpdateStatsJob implements BuiltInJob {
     return number == null ? 0D : number.doubleValue();
   }
 
+  private static Optional<String> firstNonEmpty(String... candidates) {
+    for (String candidate : candidates) {
+      if (candidate != null && !candidate.trim().isEmpty()) {
+        return Optional.of(candidate.trim());
+      }
+    }
+    return Optional.empty();
+  }
+
   private static List<String> buildArguments() {
     return Arrays.asList(
         "--catalog",
         "{{catalog_name}}",
         "--table",
         "{{table_identifier}}",
-        "--gravitino-uri",
-        "{{gravitino_uri}}",
-        "--metalake",
-        "{{metalake}}",
+        "--update-mode",
+        "{{update_mode}}",
         "--target-file-size-bytes",
         "{{target_file_size_bytes}}",
-        "--statistics-updater",
-        "{{statistics_updater}}",
-        "--enable-metrics",
-        "{{enable_metrics}}",
-        "--metrics-updater",
-        "{{metrics_updater}}",
+        "--updater-options",
+        "{{updater_options}}",
         "--spark-conf",
         "{{spark_conf}}");
   }
 
   private static Map<String, String> buildSparkConfigs() {
-    Map<String, String> configs = new HashMap<>();
-    configs.put("spark.master", "{{spark_master}}");
-    configs.put("spark.executor.instances", "{{spark_executor_instances}}");
-    configs.put("spark.executor.cores", "{{spark_executor_cores}}");
-    configs.put("spark.executor.memory", "{{spark_executor_memory}}");
-    configs.put("spark.driver.memory", "{{spark_driver_memory}}");
-    configs.put("spark.sql.catalog.{{catalog_name}}", 
"org.apache.iceberg.spark.SparkCatalog");
-    configs.put("spark.sql.catalog.{{catalog_name}}.type", "{{catalog_type}}");
-    configs.put("spark.sql.catalog.{{catalog_name}}.uri", "{{catalog_uri}}");
-    configs.put("spark.sql.catalog.{{catalog_name}}.warehouse", 
"{{warehouse_location}}");
-    configs.put(
-        "spark.sql.extensions",
-        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");
-    return Collections.unmodifiableMap(configs);
+    return Collections.emptyMap();
   }
 
   private static void printUsage() {
     System.err.println(
-        "Usage: IcebergUpdateStatsJob [OPTIONS]\n"
-            + "\n"
-            + "Required Options:\n"
-            + "  --catalog <name>                   Iceberg catalog name 
registered in Spark\n"
-            + "  --table <identifier>               Table name in schema.table 
format\n"
-            + "  --gravitino-uri <uri>              Gravitino server URI\n"
-            + "  --metalake <metalake_name>         Gravitino metalake name\n"
-            + "\n"
-            + "Optional Options:\n"
-            + "  --target-file-size-bytes <bytes>   Small-file threshold and 
MSE target\n"
-            + "                                     Default: 100000\n"
-            + "  --statistics-updater <name>        StatisticsUpdater provider 
name\n"
-            + "                                     Default: 
gravitino-statistics-updater\n"
-            + "  --enable-metrics <true|false>     Whether to persist metrics 
via MetricsUpdater\n"
-            + "                                     Default: false\n"
-            + "  --metrics-updater <name>           MetricsUpdater provider 
name\n"
-            + "                                     Default: 
gravitino-metrics-updater\n"
-            + "  --spark-conf <json>                JSON map of custom Spark 
configs\n"
-            + "                                     Example: 
'{\"spark.sql.shuffle.partitions\":\"200\"}'");
+        "Usage: IcebergUpdateStatsJob [OPTIONS]\\n"
+            + "\\n"
+            + "Required Options:\\n"
+            + "  --catalog <name>                   Iceberg catalog name 
registered in Spark\\n"
+            + "  --table <identifier>               Table name in schema.table 
format\\n"
+            + "\\n"
+            + "Optional Options:\\n"
+            + "  --update-mode <stats|metrics|all> Update behavior mode, 
default: all\\n"
+            + "  --target-file-size-bytes <bytes>   Small-file threshold and 
MSE target\\n"
+            + "                                     Default: 100000\\n"
+            + "  --updater-options <json>           JSON map for updater and 
repository settings\\n"
+            + "                                     Example: 
'{\"gravitino_uri\":\"http://localhost:8090\",\\n";
+            + "                                     
\"metalake\":\"test\",\"statistics_updater\":\"gravitino-statistics-updater\",\\n"
+            + "                                     
\"metrics_updater\":\"gravitino-metrics-updater\"}'\\n"
+            + "  --spark-conf <json>                JSON map of custom Spark 
configs\\n"
+            + "                                     Must include Iceberg 
catalog configs for --catalog\\n"
+            + "                                     Example: 
'{\"spark.master\":\"local[2]\","
+            + 
"\"spark.sql.catalog.rest_catalog\":\"org.apache.iceberg.spark.SparkCatalog\","
+            + "\"spark.sql.catalog.rest_catalog.type\":\"rest\","
+            + 
"\"spark.sql.catalog.rest_catalog.uri\":\"http://localhost:9001/iceberg\"}'");
+  }
+
+  enum UpdateMode {
+    STATS("stats", true, false),
+    METRICS("metrics", false, true),
+    ALL("all", true, true);
+
+    private final String modeName;
+    private final boolean updateStats;
+    private final boolean updateMetrics;
+
+    UpdateMode(String modeName, boolean updateStats, boolean updateMetrics) {
+      this.modeName = modeName;
+      this.updateStats = updateStats;
+      this.updateMetrics = updateMetrics;
+    }
+
+    static UpdateMode from(String value) {
+      String normalized = value.trim().toLowerCase(Locale.ROOT);
+      for (UpdateMode mode : values()) {
+        if (mode.modeName.equals(normalized)) {
+          return mode;
+        }
+      }
+      throw new IllegalArgumentException(
+          "Invalid update_mode value: " + value + ". Supported values are: 
stats, metrics, all");
+    }
   }
 }
diff --git 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java
 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java
index b035ed4b74..636b93143c 100644
--- 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java
+++ 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java
@@ -19,7 +19,6 @@
 package org.apache.gravitino.maintenance.jobs.iceberg;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -27,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.util.Map;
 import org.apache.gravitino.job.JobTemplateProvider;
 import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
 import org.junit.jupiter.api.Test;
 
 public class TestIcebergUpdateStatsJob {
@@ -48,23 +48,19 @@ public class TestIcebergUpdateStatsJob {
     SparkJobTemplate template = job.jobTemplate();
 
     assertNotNull(template.arguments());
-    assertEquals(18, template.arguments().size());
+    assertEquals(12, template.arguments().size());
     assertTrue(template.arguments().contains("--catalog"));
     assertTrue(template.arguments().contains("{{catalog_name}}"));
     assertTrue(template.arguments().contains("--table"));
     assertTrue(template.arguments().contains("{{table_identifier}}"));
-    assertTrue(template.arguments().contains("--gravitino-uri"));
-    assertTrue(template.arguments().contains("{{gravitino_uri}}"));
-    assertTrue(template.arguments().contains("--metalake"));
-    assertTrue(template.arguments().contains("{{metalake}}"));
+    assertTrue(template.arguments().contains("--update-mode"));
+    assertTrue(template.arguments().contains("{{update_mode}}"));
     assertTrue(template.arguments().contains("--target-file-size-bytes"));
     assertTrue(template.arguments().contains("{{target_file_size_bytes}}"));
-    assertTrue(template.arguments().contains("--statistics-updater"));
-    assertTrue(template.arguments().contains("{{statistics_updater}}"));
-    assertTrue(template.arguments().contains("--enable-metrics"));
-    assertTrue(template.arguments().contains("{{enable_metrics}}"));
-    assertTrue(template.arguments().contains("--metrics-updater"));
-    assertTrue(template.arguments().contains("{{metrics_updater}}"));
+    assertTrue(template.arguments().contains("--updater-options"));
+    assertTrue(template.arguments().contains("{{updater_options}}"));
+    assertTrue(template.arguments().contains("--spark-conf"));
+    assertTrue(template.arguments().contains("{{spark_conf}}"));
   }
 
   @Test
@@ -72,21 +68,21 @@ public class TestIcebergUpdateStatsJob {
     String[] args = {
       "--catalog", "cat",
       "--table", "db.tbl",
-      "--gravitino-uri", "http://localhost:8090";,
-      "--metalake", "ml",
+      "--update-mode", "metrics",
       "--target-file-size-bytes", "2048",
-      "--enable-metrics", "true",
-      "--metrics-updater", "gravitino-metrics-updater"
+      "--updater-options", 
"{\"metalake\":\"ml\",\"gravitino_uri\":\"http://localhost:8090\"}";,
+      "--spark-conf", "{\"spark.master\":\"local[2]\"}"
     };
 
     Map<String, String> parsed = IcebergUpdateStatsJob.parseArguments(args);
     assertEquals("cat", parsed.get("catalog"));
     assertEquals("db.tbl", parsed.get("table"));
-    assertEquals("http://localhost:8090";, parsed.get("gravitino-uri"));
-    assertEquals("ml", parsed.get("metalake"));
+    assertEquals("metrics", parsed.get("update-mode"));
     assertEquals("2048", parsed.get("target-file-size-bytes"));
-    assertEquals("true", parsed.get("enable-metrics"));
-    assertEquals("gravitino-metrics-updater", parsed.get("metrics-updater"));
+    assertEquals(
+        "{\"metalake\":\"ml\",\"gravitino_uri\":\"http://localhost:8090\"}";,
+        parsed.get("updater-options"));
+    assertEquals("{\"spark.master\":\"local[2]\"}", parsed.get("spark-conf"));
   }
 
   @Test
@@ -95,7 +91,6 @@ public class TestIcebergUpdateStatsJob {
     String partitionSql = IcebergUpdateStatsJob.buildPartitionStatsSql("cat", 
"db.tbl", 100000L);
 
     assertTrue(tableSql.contains("FROM cat.db.tbl.files"));
-    assertFalse(tableSql.contains("GROUP BY partition"));
     assertTrue(tableSql.contains("AS datafile_mse"));
     assertTrue(partitionSql.contains("FROM cat.db.tbl.files"));
     assertTrue(partitionSql.contains("GROUP BY partition"));
@@ -114,13 +109,59 @@ public class TestIcebergUpdateStatsJob {
   }
 
   @Test
-  public void testParseEnableMetrics() {
-    assertFalse(IcebergUpdateStatsJob.parseEnableMetrics(null));
-    assertFalse(IcebergUpdateStatsJob.parseEnableMetrics(""));
-    assertFalse(IcebergUpdateStatsJob.parseEnableMetrics("false"));
-    assertTrue(IcebergUpdateStatsJob.parseEnableMetrics("true"));
-    assertTrue(IcebergUpdateStatsJob.parseEnableMetrics("TRUE"));
+  public void testParseUpdateMode() {
+    assertEquals(IcebergUpdateStatsJob.UpdateMode.ALL, 
IcebergUpdateStatsJob.parseUpdateMode(null));
+    assertEquals(IcebergUpdateStatsJob.UpdateMode.ALL, 
IcebergUpdateStatsJob.parseUpdateMode(""));
+    assertEquals(
+        IcebergUpdateStatsJob.UpdateMode.STATS, 
IcebergUpdateStatsJob.parseUpdateMode("stats"));
+    assertEquals(
+        IcebergUpdateStatsJob.UpdateMode.METRICS, 
IcebergUpdateStatsJob.parseUpdateMode("metrics"));
+    assertEquals(
+        IcebergUpdateStatsJob.UpdateMode.ALL, 
IcebergUpdateStatsJob.parseUpdateMode("all"));
     assertThrows(
-        IllegalArgumentException.class, () -> 
IcebergUpdateStatsJob.parseEnableMetrics("yes"));
+        IllegalArgumentException.class, () -> 
IcebergUpdateStatsJob.parseUpdateMode("invalid"));
+  }
+
+  @Test
+  public void testParseJsonOptions() {
+    Map<String, String> parsed =
+        
IcebergUpdateStatsJob.parseJsonOptions("{\"a\":\"b\",\"x\":1,\"flag\":true,\"nil\":null}");
+    assertEquals("b", parsed.get("a"));
+    assertEquals("1", parsed.get("x"));
+    assertEquals("true", parsed.get("flag"));
+    assertEquals("", parsed.get("nil"));
+    assertThrows(
+        IllegalArgumentException.class, () -> 
IcebergUpdateStatsJob.parseJsonOptions("{not_json}"));
+  }
+
+  @Test
+  public void testBuildOptimizerProperties() {
+    Map<String, String> options =
+        Map.of(
+            "gravitino_uri", "http://localhost:8090";,
+            "metalake", "ml",
+            "gravitino.optimizer.jdbcMetrics.jdbcUrl", 
"jdbc:mysql://localhost:3306/metrics");
+    Map<String, String> optimizerProperties =
+        IcebergUpdateStatsJob.buildOptimizerProperties(options);
+
+    assertEquals("http://localhost:8090";, 
optimizerProperties.get(OptimizerConfig.GRAVITINO_URI));
+    assertEquals("ml", 
optimizerProperties.get(OptimizerConfig.GRAVITINO_METALAKE));
+    assertEquals(
+        "jdbc:mysql://localhost:3306/metrics",
+        optimizerProperties.get("gravitino.optimizer.jdbcMetrics.jdbcUrl"));
+  }
+
+  @Test
+  public void testRequireGravitinoConfig() {
+    Map<String, String> optimizerProperties =
+        Map.of(
+            OptimizerConfig.GRAVITINO_URI, "http://localhost:8090";,
+            OptimizerConfig.GRAVITINO_METALAKE, "ml");
+    assertEquals(
+        optimizerProperties, 
IcebergUpdateStatsJob.requireGravitinoConfig(optimizerProperties));
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> IcebergUpdateStatsJob.requireGravitinoConfig(Map.of()));
   }
 }
diff --git 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
index 8f51256ee0..f7e9b9e097 100644
--- 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
+++ 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
@@ -24,11 +24,25 @@ import static 
org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.File;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.job.JobHandle;
 import org.apache.gravitino.maintenance.optimizer.api.common.DataScope;
 import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
 import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
@@ -36,15 +50,31 @@ import 
org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
 import org.apache.gravitino.maintenance.optimizer.api.updater.MetricsUpdater;
 import 
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater;
 import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.stats.Statistic;
 import org.apache.spark.sql.SparkSession;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
 import org.junit.jupiter.api.io.TempDir;
+import org.testcontainers.containers.MySQLContainer;
 
 /** Integration tests for IcebergUpdateStatsJob with a real Spark+Iceberg 
runtime. */
 public class TestIcebergUpdateStatsJobWithSpark {
 
+  private static final String SERVER_URI = "http://localhost:8090";;
+  private static final String ICEBERG_REST_URI = 
"http://localhost:9001/iceberg";;
+  private static final String JOB_TEMPLATE_NAME = 
"builtin-iceberg-update-stats";
+  private static final String SPARK_CATALOG_NAME = "rest_catalog";
+  private static final String METALAKE_NAME = "test";
+
   @TempDir static File tempDir;
 
   private static SparkSession spark;
@@ -116,7 +146,13 @@ public class TestIcebergUpdateStatsJobWithSpark {
     RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater();
 
     IcebergUpdateStatsJob.updateStatistics(
-        spark, updater, catalogName, "db.non_partitioned", 100_000L);
+        spark,
+        updater,
+        null,
+        IcebergUpdateStatsJob.UpdateMode.STATS,
+        catalogName,
+        "db.non_partitioned",
+        100_000L);
 
     assertEquals(NameIdentifier.of(catalogName, "db", "non_partitioned"), 
updater.tableIdentifier);
     assertNotNull(updater.tableStatistics);
@@ -135,7 +171,14 @@ public class TestIcebergUpdateStatsJobWithSpark {
   public void testUpdatePartitionedTableStatistics() {
     RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater();
 
-    IcebergUpdateStatsJob.updateStatistics(spark, updater, catalogName, 
"db.partitioned", 100_000L);
+    IcebergUpdateStatsJob.updateStatistics(
+        spark,
+        updater,
+        null,
+        IcebergUpdateStatsJob.UpdateMode.STATS,
+        catalogName,
+        "db.partitioned",
+        100_000L);
 
     assertEquals(NameIdentifier.of(catalogName, "db", "partitioned"), 
updater.tableIdentifier);
     assertTrue(updater.tableStatistics.isEmpty());
@@ -155,12 +198,18 @@ public class TestIcebergUpdateStatsJobWithSpark {
   }
 
   @Test
-  public void testUpdatePartitionedTableStatisticsWithMetrics() {
+  public void testUpdatePartitionedTableStatisticsAndMetrics() {
     RecordingStatisticsUpdater statisticsUpdater = new 
RecordingStatisticsUpdater();
     RecordingMetricsUpdater metricsUpdater = new RecordingMetricsUpdater();
 
     IcebergUpdateStatsJob.updateStatistics(
-        spark, statisticsUpdater, metricsUpdater, catalogName, 
"db.partitioned", 100_000L);
+        spark,
+        statisticsUpdater,
+        metricsUpdater,
+        IcebergUpdateStatsJob.UpdateMode.ALL,
+        catalogName,
+        "db.partitioned",
+        100_000L);
 
     assertEquals(
         NameIdentifier.of(catalogName, "db", "partitioned"), 
statisticsUpdater.tableIdentifier);
@@ -180,12 +229,117 @@ public class TestIcebergUpdateStatsJobWithSpark {
     assertTrue(metricsUpdater.jobMetrics.isEmpty());
   }
 
+  @Test
+  public void testUpdatePartitionedTableMetricsOnly() {
+    RecordingMetricsUpdater metricsUpdater = new RecordingMetricsUpdater();
+
+    IcebergUpdateStatsJob.updateStatistics(
+        spark,
+        null,
+        metricsUpdater,
+        IcebergUpdateStatsJob.UpdateMode.METRICS,
+        catalogName,
+        "db.partitioned",
+        100_000L);
+
+    assertEquals(16, metricsUpdater.tableMetrics.size());
+    assertTrue(
+        metricsUpdater.tableMetrics.stream()
+            .allMatch(metric -> metric.scope() == DataScope.Type.PARTITION));
+    assertTrue(metricsUpdater.jobMetrics.isEmpty());
+  }
+
+  @Test
+  @Tag("gravitino-docker-test")
+  public void testUpdatePartitionedTableMetricsStoredInMySql() throws 
Exception {
+    try (MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0.33")) {
+      mysql.start();
+
+      Map<String, String> conf = new HashMap<>();
+      conf.put("gravitino.optimizer.jdbcMetrics.jdbcUrl", mysql.getJdbcUrl());
+      conf.put("gravitino.optimizer.jdbcMetrics.jdbcUser", 
mysql.getUsername());
+      conf.put("gravitino.optimizer.jdbcMetrics.jdbcPassword", 
mysql.getPassword());
+      conf.put("gravitino.optimizer.jdbcMetrics.jdbcDriver", 
mysql.getDriverClassName());
+      initializeMySqlMetricsSchema(mysql);
+
+      GravitinoMetricsUpdater metricsUpdater = new GravitinoMetricsUpdater();
+      metricsUpdater.initialize(new OptimizerEnv(new OptimizerConfig(conf)));
+      try {
+        IcebergUpdateStatsJob.updateStatistics(
+            spark,
+            null,
+            metricsUpdater,
+            IcebergUpdateStatsJob.UpdateMode.METRICS,
+            catalogName,
+            "db.partitioned",
+            100_000L);
+      } finally {
+        metricsUpdater.close();
+      }
+
+      GenericJdbcMetricsRepository repository = new 
GenericJdbcMetricsRepository();
+      repository.initialize(conf);
+      try {
+        NameIdentifier identifier = NameIdentifier.of(catalogName, "db", 
"partitioned");
+        long now = Instant.now().getEpochSecond();
+
+        PartitionPath partitionPath =
+            PartitionPath.of(List.of(new PartitionEntryImpl("ds", 
"2026-01-01")));
+        List<MetricPoint> partitionMetrics =
+            repository.getMetrics(
+                DataScope.forPartition(identifier, partitionPath), now - 300, 
now + 300);
+        assertEquals(8, partitionMetrics.size());
+      } finally {
+        repository.close();
+      }
+    }
+  }
+
+  private static void initializeMySqlMetricsSchema(MySQLContainer<?> mysql) 
throws Exception {
+    String createTableMetrics =
+        "CREATE TABLE IF NOT EXISTS `table_metrics` ("
+            + "`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,"
+            + "`table_identifier` VARCHAR(1024) NOT NULL,"
+            + "`metric_name` VARCHAR(1024) NOT NULL,"
+            + "`table_partition` VARCHAR(1024) DEFAULT NULL,"
+            + "`metric_ts` BIGINT(20) NOT NULL,"
+            + "`metric_value` VARCHAR(1024) NOT NULL,"
+            + "PRIMARY KEY (`id`),"
+            + "KEY `idx_table_metrics_metric_ts` (`metric_ts`),"
+            + "KEY `idx_table_metrics_composite` (`table_identifier`(255), 
`table_partition`(255), `metric_ts`)"
+            + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin";
+    String createJobMetrics =
+        "CREATE TABLE IF NOT EXISTS `job_metrics` ("
+            + "`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,"
+            + "`job_identifier` VARCHAR(1024) NOT NULL,"
+            + "`metric_name` VARCHAR(1024) NOT NULL,"
+            + "`metric_ts` BIGINT(20) NOT NULL,"
+            + "`metric_value` VARCHAR(1024) NOT NULL,"
+            + "PRIMARY KEY (`id`),"
+            + "KEY `idx_job_metrics_metric_ts` (`metric_ts`),"
+            + "KEY `idx_job_metrics_identifier_metric_ts` 
(`job_identifier`(255), `metric_ts`)"
+            + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin";
+    try (Connection connection =
+            DriverManager.getConnection(
+                mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
+        Statement statement = connection.createStatement()) {
+      statement.execute(createTableMetrics);
+      statement.execute(createJobMetrics);
+    }
+  }
+
   @Test
   public void testUpdateMultiLevelPartitionedTableStatistics() {
     RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater();
 
     IcebergUpdateStatsJob.updateStatistics(
-        spark, updater, catalogName, "db.multi_partitioned", 100_000L);
+        spark,
+        updater,
+        null,
+        IcebergUpdateStatsJob.UpdateMode.STATS,
+        catalogName,
+        "db.multi_partitioned",
+        100_000L);
 
     assertEquals(
         NameIdentifier.of(catalogName, "db", "multi_partitioned"), 
updater.tableIdentifier);
@@ -215,6 +369,152 @@ public class TestIcebergUpdateStatsJobWithSpark {
         parsedPartitions);
   }
 
+  // Requires a running deploy-mode Gravitino server and Spark environment.
+  @Test
+  @EnabledIfEnvironmentVariable(named = "GRAVITINO_ENV_IT", matches = "true")
+  public void testRunBuiltInUpdateStatsJobViaServer() throws Exception {
+    String tableName = "jobs_it_update_stats_" + 
UUID.randomUUID().toString().replace("-", "");
+    String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName;
+
+    try (SparkSession restSpark = createRestSparkSession();
+        GravitinoAdminClient adminClient = 
GravitinoAdminClient.builder(SERVER_URI).build()) {
+      GravitinoMetalake metalake = loadOrCreateMetalake(adminClient, 
METALAKE_NAME);
+      recreateRestCatalog(metalake);
+      createTableAndInsertData(restSpark, fullTableName);
+
+      submitJob(metalake, buildUpdateStatsJobConfig(tableName));
+
+      try (GravitinoClient client =
+          
GravitinoClient.builder(SERVER_URI).withMetalake(METALAKE_NAME).build()) {
+        Table table =
+            client
+                .loadCatalog(SPARK_CATALOG_NAME)
+                .asTableCatalog()
+                .loadTable(NameIdentifier.of("db", tableName));
+        List<Statistic> statistics = 
table.supportsStatistics().listStatistics();
+        Map<String, Statistic> statisticMap =
+            statistics.stream().collect(Collectors.toMap(Statistic::name, s -> 
s));
+        assertTrue(statisticMap.containsKey("custom-file_count"));
+        assertTrue(statisticMap.containsKey("custom-datafile_mse"));
+        assertTrue(statisticMap.containsKey("custom-total_size"));
+      }
+    }
+  }
+
+  private static SparkSession createRestSparkSession() {
+    return SparkSession.builder()
+        .master("local[2]")
+        .appName("jobs-iceberg-update-stats-it")
+        .config(
+            "spark.sql.extensions",
+            
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+        .config("spark.sql.catalog." + SPARK_CATALOG_NAME, 
"org.apache.iceberg.spark.SparkCatalog")
+        .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".type", "rest")
+        .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".cache-enabled", 
"false")
+        .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".uri", 
ICEBERG_REST_URI)
+        .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".warehouse", "")
+        .getOrCreate();
+  }
+
+  private static void createTableAndInsertData(SparkSession sparkSession, 
String fullTableName) {
+    sparkSession.sql("CREATE NAMESPACE IF NOT EXISTS " + SPARK_CATALOG_NAME + 
".db");
+    sparkSession.sql("DROP TABLE IF EXISTS " + fullTableName);
+    sparkSession.sql("CREATE TABLE " + fullTableName + " (id INT, data STRING) 
USING iceberg");
+    sparkSession.sql(
+        "ALTER TABLE "
+            + fullTableName
+            + " SET TBLPROPERTIES ('write.target-file-size-bytes'='1024000')");
+    for (int i = 0; i < 10; i++) {
+      sparkSession.sql("INSERT INTO " + fullTableName + " VALUES (" + i + ", 
'value_" + i + "')");
+    }
+  }
+
+  private static Map<String, String> buildUpdateStatsJobConfig(String 
tableName) {
+    Map<String, String> jobConf = new HashMap<>();
+    jobConf.put("table_identifier", "db." + tableName);
+    jobConf.put("update_mode", "all");
+    jobConf.put("target_file_size_bytes", "100000");
+    jobConf.put(
+        "updater_options",
+        "{\"gravitino_uri\":\""
+            + SERVER_URI
+            + "\",\"metalake\":\""
+            + METALAKE_NAME
+            + "\",\"statistics_updater\":\"gravitino-statistics-updater\"}");
+    jobConf.put("catalog_name", SPARK_CATALOG_NAME);
+    jobConf.put(
+        "spark_conf",
+        "{\"spark.master\":\"local[2]\","
+            + "\"spark.executor.instances\":\"1\","
+            + "\"spark.executor.cores\":\"1\","
+            + "\"spark.executor.memory\":\"1g\","
+            + "\"spark.driver.memory\":\"1g\","
+            + "\"spark.hadoop.fs.defaultFS\":\"file:///\","
+            + "\"spark.sql.catalog."
+            + SPARK_CATALOG_NAME
+            + "\":\"org.apache.iceberg.spark.SparkCatalog\","
+            + "\"spark.sql.catalog."
+            + SPARK_CATALOG_NAME
+            + ".type\":\"rest\","
+            + "\"spark.sql.catalog."
+            + SPARK_CATALOG_NAME
+            + ".uri\":\""
+            + ICEBERG_REST_URI
+            + "\","
+            + "\"spark.sql.catalog."
+            + SPARK_CATALOG_NAME
+            + ".warehouse\":\"\","
+            + 
"\"spark.sql.extensions\":\"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\"}");
+    return jobConf;
+  }
+
+  private static void submitJob(GravitinoMetalake metalake, Map<String, 
String> jobConf) {
+    JobHandle jobHandle = metalake.runJob(JOB_TEMPLATE_NAME, jobConf);
+    assertTrue(StringUtils.isNotBlank(jobHandle.jobId()), "Job id should not 
be blank");
+
+    Awaitility.await()
+        .atMost(Duration.ofMinutes(5))
+        .pollInterval(Duration.ofSeconds(2))
+        .until(
+            () -> {
+              JobHandle.Status status = 
metalake.getJob(jobHandle.jobId()).jobStatus();
+              return status == JobHandle.Status.SUCCEEDED
+                  || status == JobHandle.Status.FAILED
+                  || status == JobHandle.Status.CANCELLED;
+            });
+
+    JobHandle.Status finalStatus = 
metalake.getJob(jobHandle.jobId()).jobStatus();
+    assertEquals(JobHandle.Status.SUCCEEDED, finalStatus, "Job should 
succeed");
+  }
+
+  private static GravitinoMetalake loadOrCreateMetalake(
+      GravitinoAdminClient client, String metalakeName) {
+    try {
+      return client.loadMetalake(metalakeName);
+    } catch (NoSuchMetalakeException ignored) {
+      return client.createMetalake(metalakeName, "IT metalake", Map.of());
+    }
+  }
+
+  private static void recreateRestCatalog(GravitinoMetalake metalake) {
+    try {
+      metalake.dropCatalog(SPARK_CATALOG_NAME, true);
+    } catch (Exception ignored) {
+      // Ignore when the catalog does not exist, or when force-drop is not 
needed.
+    }
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("catalog-backend", "REST");
+    properties.put("uri", ICEBERG_REST_URI);
+
+    metalake.createCatalog(
+        SPARK_CATALOG_NAME,
+        Catalog.Type.RELATIONAL,
+        "lakehouse-iceberg",
+        "IT Iceberg REST catalog",
+        properties);
+  }
+
   private static final class RecordingStatisticsUpdater implements 
StatisticsUpdater {
     private NameIdentifier tableIdentifier;
     private List<StatisticEntry<?>> tableStatistics = List.of();
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
index bc6285c41b..6514ac755e 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
@@ -309,10 +309,15 @@ public class TestBuiltinIcebergRewriteDataFiles {
   private static Map<String, String> buildUpdateStatsJobConfig(String 
tableName) {
     Map<String, String> jobConf = new HashMap<>();
     jobConf.put("table_identifier", "db." + tableName);
-    jobConf.put("gravitino_uri", SERVER_URI);
-    jobConf.put("metalake", METALAKE_NAME);
+    jobConf.put("update_mode", "all");
     jobConf.put("target_file_size_bytes", "100000");
-    jobConf.put("statistics_updater", "gravitino-statistics-updater");
+    jobConf.put(
+        "updater_options",
+        "{\"gravitino_uri\":\""
+            + SERVER_URI
+            + "\",\"metalake\":\""
+            + METALAKE_NAME
+            + "\",\"statistics_updater\":\"gravitino-statistics-updater\"}");
     jobConf.putAll(createOptimizerConfig().jobSubmitterConfigs());
     return jobConf;
   }
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java
index 003bb16a0f..b2b8fbb4e4 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java
@@ -114,20 +114,41 @@ public class TestBuiltinIcebergUpdateStatsJob {
   private static Map<String, String> buildUpdateStatsJobConfig(String 
tableName) {
     Map<String, String> jobConf = new HashMap<>();
     jobConf.put("table_identifier", "db." + tableName);
-    jobConf.put("gravitino_uri", SERVER_URI);
-    jobConf.put("metalake", METALAKE_NAME);
+    jobConf.put("update_mode", "all");
     jobConf.put("target_file_size_bytes", "100000");
-    jobConf.put("statistics_updater", "gravitino-statistics-updater");
+    jobConf.put(
+        "updater_options",
+        "{\"gravitino_uri\":\""
+            + SERVER_URI
+            + "\",\"metalake\":\""
+            + METALAKE_NAME
+            + "\",\"statistics_updater\":\"gravitino-statistics-updater\"}");
     jobConf.put("catalog_name", SPARK_CATALOG_NAME);
-    jobConf.put("catalog_type", "rest");
-    jobConf.put("catalog_uri", ICEBERG_REST_URI);
-    jobConf.put("warehouse_location", WAREHOUSE_LOCATION);
-    jobConf.put("spark_master", "local[2]");
-    jobConf.put("spark_executor_instances", "1");
-    jobConf.put("spark_executor_cores", "1");
-    jobConf.put("spark_executor_memory", "1g");
-    jobConf.put("spark_driver_memory", "1g");
-    jobConf.put("spark_conf", "{\"spark.hadoop.fs.defaultFS\":\"file:///\"}");
+    jobConf.put(
+        "spark_conf",
+        "{\"spark.master\":\"local[2]\","
+            + "\"spark.executor.instances\":\"1\","
+            + "\"spark.executor.cores\":\"1\","
+            + "\"spark.executor.memory\":\"1g\","
+            + "\"spark.driver.memory\":\"1g\","
+            + "\"spark.hadoop.fs.defaultFS\":\"file:///\","
+            + "\"spark.sql.catalog."
+            + SPARK_CATALOG_NAME
+            + "\":\"org.apache.iceberg.spark.SparkCatalog\","
+            + "\"spark.sql.catalog."
+            + SPARK_CATALOG_NAME
+            + ".type\":\"rest\","
+            + "\"spark.sql.catalog."
+            + SPARK_CATALOG_NAME
+            + ".uri\":\""
+            + ICEBERG_REST_URI
+            + "\","
+            + "\"spark.sql.catalog."
+            + SPARK_CATALOG_NAME
+            + ".warehouse\":\""
+            + WAREHOUSE_LOCATION
+            + "\","
+            + 
"\"spark.sql.extensions\":\"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\"}");
     return jobConf;
   }
 

Reply via email to