This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch stats_job
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 5ac6881a8c56ab2b75f2ebfc2cc354f361d1cd02
Author: fanng <[email protected]>
AuthorDate: Wed Mar 4 16:35:36 2026 +0800

    [#12688] Add optimizer cmd for builtin update stats job
---
 .../optimizer/common/OptimizerContent.java         |   0
 .../maintenance/optimizer/OptimizerCmd.java        |  80 ++++-
 .../optimizer/command/OptimizerCommandContext.java |  42 +++
 .../command/SubmitUpdateStatsJobCommand.java       | 356 +++++++++++++++++++++
 .../maintenance/optimizer/TestOptimizerCmd.java    | 151 +++++++++
 5 files changed, 625 insertions(+), 4 deletions(-)

diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerContent.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerContent.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerContent.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerContent.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java
index 14fc1e2011..6dc6fa01fb 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java
@@ -53,6 +53,7 @@ import 
org.apache.gravitino.maintenance.optimizer.command.MonitorMetricsCommand;
 import 
org.apache.gravitino.maintenance.optimizer.command.OptimizerCommandContext;
 import 
org.apache.gravitino.maintenance.optimizer.command.OptimizerCommandExecutor;
 import 
org.apache.gravitino.maintenance.optimizer.command.SubmitStrategyJobsCommand;
+import 
org.apache.gravitino.maintenance.optimizer.command.SubmitUpdateStatsJobCommand;
 import 
org.apache.gravitino.maintenance.optimizer.command.UpdateStatisticsCommand;
 import org.apache.gravitino.maintenance.optimizer.command.rule.CommandRules;
 import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
@@ -114,7 +115,23 @@ public class OptimizerCmd {
               EnumSet.of(CliOption.IDENTIFIERS),
               EnumSet.noneOf(CliOption.class),
               "List stored job metrics.",
-              "./bin/gravitino-optimizer.sh --type list-job-metrics 
--identifiers c.db.job"));
+              "./bin/gravitino-optimizer.sh --type list-job-metrics 
--identifiers c.db.job"),
+          OptimizerCommandType.SUBMIT_UPDATE_STATS_JOB,
+          CommandOptionSpec.of(
+              EnumSet.of(CliOption.IDENTIFIERS),
+              EnumSet.of(
+                  CliOption.DRY_RUN,
+                  CliOption.LIMIT,
+                  CliOption.UPDATE_MODE,
+                  CliOption.TARGET_FILE_SIZE_BYTES,
+                  CliOption.UPDATER_OPTIONS,
+                  CliOption.UPDATER_OPTIONS_FILE,
+                  CliOption.SPARK_CONF,
+                  CliOption.SPARK_CONF_FILE),
+              "Submit built-in Iceberg update stats jobs for given table 
identifiers.",
+              "./bin/gravitino-optimizer.sh --type submit-update-stats-job 
--identifiers "
+                  + "rest.ab.t1,rest.ab.t2 --update-mode stats --dry-run",
+              updateStatsJobInputRules()));
   private static final Map<OptimizerCommandType, OptimizerCommandExecutor> 
COMMAND_HANDLERS =
       Map.of(
           OptimizerCommandType.SUBMIT_STRATEGY_JOBS, new 
SubmitStrategyJobsCommand(),
@@ -122,7 +139,8 @@ public class OptimizerCmd {
           OptimizerCommandType.APPEND_METRICS, new AppendMetricsCommand(),
           OptimizerCommandType.MONITOR_METRICS, new MonitorMetricsCommand(),
           OptimizerCommandType.LIST_TABLE_METRICS, new 
ListTableMetricsCommand(),
-          OptimizerCommandType.LIST_JOB_METRICS, new ListJobMetricsCommand());
+          OptimizerCommandType.LIST_JOB_METRICS, new ListJobMetricsCommand(),
+          OptimizerCommandType.SUBMIT_UPDATE_STATS_JOB, new 
SubmitUpdateStatsJobCommand());
   private static final String LOCAL_STATS_CALCULATOR_NAME = 
"local-stats-calculator";
 
   static {
@@ -164,6 +182,12 @@ public class OptimizerCmd {
           cmd.getOptionValue(
               CliOption.RANGE_SECONDS.longOpt(), 
Long.toString(DEFAULT_RANGE_SECONDS));
       String partitionPathRaw = 
cmd.getOptionValue(CliOption.PARTITION_PATH.longOpt());
+      String updateMode = cmd.getOptionValue(CliOption.UPDATE_MODE.longOpt());
+      String targetFileSizeBytes = 
cmd.getOptionValue(CliOption.TARGET_FILE_SIZE_BYTES.longOpt());
+      String updaterOptions = 
cmd.getOptionValue(CliOption.UPDATER_OPTIONS.longOpt());
+      String updaterOptionsFile = 
cmd.getOptionValue(CliOption.UPDATER_OPTIONS_FILE.longOpt());
+      String sparkConf = cmd.getOptionValue(CliOption.SPARK_CONF.longOpt());
+      String sparkConfFile = 
cmd.getOptionValue(CliOption.SPARK_CONF_FILE.longOpt());
       String statisticsPayload = 
cmd.getOptionValue(CliOption.STATISTICS_PAYLOAD.longOpt());
       String filePath = cmd.getOptionValue(CliOption.FILE_PATH.longOpt());
       Optional<StatisticsInputContent> statisticsInputContent =
@@ -179,6 +203,12 @@ public class OptimizerCmd {
               actionTime,
               rangeSeconds,
               partitionPathRaw,
+              updateMode,
+              targetFileSizeBytes,
+              updaterOptions,
+              updaterOptionsFile,
+              sparkConf,
+              sparkConfFile,
               statisticsInputContent,
               out);
       executeCommand(optimizerType, context);
@@ -309,6 +339,17 @@ public class OptimizerCmd {
         .build();
   }
 
+  private static CommandRules.ValidationPlan updateStatsJobInputRules() {
+    return CommandRules.newBuilder()
+        .addMutuallyExclusive(
+            List.of(CliOption.UPDATER_OPTIONS.longOpt(), 
CliOption.UPDATER_OPTIONS_FILE.longOpt()),
+            "--updater-options and --updater-options-file cannot be used 
together")
+        .addMutuallyExclusive(
+            List.of(CliOption.SPARK_CONF.longOpt(), 
CliOption.SPARK_CONF_FILE.longOpt()),
+            "--spark-conf and --spark-conf-file cannot be used together")
+        .build();
+  }
+
   private static void printGlobalHelp(Options options, PrintStream out) {
     HelpFormatter formatter = new HelpFormatter();
     formatter.printHelp(
@@ -461,7 +502,37 @@ public class OptimizerCmd {
         CliOptionArgType.SINGLE,
         null,
         "Partition path for monitor-metrics "
-            + "(JSON array, for example: [{\"p1\":\"v1\"},{\"p2\":\"v2\"}])");
+            + "(JSON array, for example: [{\"p1\":\"v1\"},{\"p2\":\"v2\"}])"),
+    UPDATE_MODE(
+        "update-mode",
+        CliOptionArgType.SINGLE,
+        null,
+        "Update mode for submit-update-stats-job: stats|metrics|all"),
+    TARGET_FILE_SIZE_BYTES(
+        "target-file-size-bytes",
+        CliOptionArgType.SINGLE,
+        null,
+        "Target file size bytes for submit-update-stats-job"),
+    UPDATER_OPTIONS(
+        "updater-options",
+        CliOptionArgType.SINGLE,
+        null,
+        "JSON map for updater options in submit-update-stats-job"),
+    UPDATER_OPTIONS_FILE(
+        "updater-options-file",
+        CliOptionArgType.SINGLE,
+        null,
+        "Path to JSON file for updater options in submit-update-stats-job"),
+    SPARK_CONF(
+        "spark-conf",
+        CliOptionArgType.SINGLE,
+        null,
+        "JSON map for Spark configs in submit-update-stats-job"),
+    SPARK_CONF_FILE(
+        "spark-conf-file",
+        CliOptionArgType.SINGLE,
+        null,
+        "Path to JSON file for Spark configs in submit-update-stats-job");
 
     private final String longOpt;
     private final CliOptionArgType argType;
@@ -509,7 +580,8 @@ public class OptimizerCmd {
     APPEND_METRICS,
     MONITOR_METRICS,
     LIST_TABLE_METRICS,
-    LIST_JOB_METRICS;
+    LIST_JOB_METRICS,
+    SUBMIT_UPDATE_STATS_JOB;
 
     public static String allValues() {
       return Arrays.stream(values())
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java
index c3f7051ab9..b29ba0d09d 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java
@@ -37,6 +37,12 @@ public final class OptimizerCommandContext {
   private final String actionTime;
   private final String rangeSeconds;
   private final String partitionPathRaw;
+  private final String updateMode;
+  private final String targetFileSizeBytes;
+  private final String updaterOptions;
+  private final String updaterOptionsFile;
+  private final String sparkConf;
+  private final String sparkConfFile;
   private final Optional<StatisticsInputContent> statisticsInputContent;
   private final PrintStream output;
 
@@ -50,6 +56,12 @@ public final class OptimizerCommandContext {
       String actionTime,
       String rangeSeconds,
       String partitionPathRaw,
+      String updateMode,
+      String targetFileSizeBytes,
+      String updaterOptions,
+      String updaterOptionsFile,
+      String sparkConf,
+      String sparkConfFile,
       Optional<StatisticsInputContent> statisticsInputContent,
       PrintStream output) {
     this.optimizerEnv = optimizerEnv;
@@ -61,6 +73,12 @@ public final class OptimizerCommandContext {
     this.actionTime = actionTime;
     this.rangeSeconds = rangeSeconds;
     this.partitionPathRaw = partitionPathRaw;
+    this.updateMode = updateMode;
+    this.targetFileSizeBytes = targetFileSizeBytes;
+    this.updaterOptions = updaterOptions;
+    this.updaterOptionsFile = updaterOptionsFile;
+    this.sparkConf = sparkConf;
+    this.sparkConfFile = sparkConfFile;
     this.statisticsInputContent = statisticsInputContent;
     this.output = output;
   }
@@ -109,6 +127,30 @@ public final class OptimizerCommandContext {
     return partitionPathRaw;
   }
 
+  public String updateMode() {
+    return updateMode;
+  }
+
+  public String targetFileSizeBytes() {
+    return targetFileSizeBytes;
+  }
+
+  public String updaterOptions() {
+    return updaterOptions;
+  }
+
+  public String updaterOptionsFile() {
+    return updaterOptionsFile;
+  }
+
+  public String sparkConf() {
+    return sparkConf;
+  }
+
+  public String sparkConfFile() {
+    return sparkConfFile;
+  }
+
   public Optional<StatisticsInputContent> statisticsInputContent() {
     return statisticsInputContent;
   }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java
new file mode 100644
index 0000000000..c06b024b47
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.command;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.common.util.GravitinoClientUtils;
+
+/**
+ * Handles CLI command {@code submit-update-stats-job} for submitting built-in 
Iceberg update stats
+ * jobs directly from optimizer CLI.
+ */
+public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor {
+
+  private static final String JOB_TEMPLATE_NAME = 
"builtin-iceberg-update-stats";
+  private static final String DEFAULT_UPDATE_MODE = "stats";
+  private static final long DEFAULT_TARGET_FILE_SIZE_BYTES = 100_000L;
+  private static final String OPTION_UPDATER_OPTIONS = "updater-options";
+  private static final String OPTION_UPDATER_OPTIONS_FILE = 
"updater-options-file";
+  private static final String OPTION_SPARK_CONF = "spark-conf";
+  private static final String OPTION_SPARK_CONF_FILE = "spark-conf-file";
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  @Override
+  public void execute(OptimizerCommandContext context) throws Exception {
+    Map<String, String> submitterConfigs = 
context.optimizerEnv().config().jobSubmitterConfigs();
+    int limit = parseLimit(context.limit());
+
+    List<TableTarget> tableTargets =
+        parseTableTargets(
+            context.identifiers(),
+            
context.optimizerEnv().config().get(OptimizerConfig.GRAVITINO_DEFAULT_CATALOG_CONFIG),
+            limit);
+
+    String updateMode =
+        parseUpdateMode(
+            resolveScalarOption(context.updateMode(), 
submitterConfigs.get("update_mode")));
+    long targetFileSizeBytes =
+        parseTargetFileSize(
+            resolveScalarOption(
+                context.targetFileSizeBytes(), 
submitterConfigs.get("target_file_size_bytes")));
+
+    String updaterOptionsJson =
+        resolveJsonOption(
+            OPTION_UPDATER_OPTIONS,
+            context.updaterOptions(),
+            OPTION_UPDATER_OPTIONS_FILE,
+            context.updaterOptionsFile(),
+            submitterConfigs.get("updater_options"));
+    String sparkConfJson =
+        resolveJsonOption(
+            OPTION_SPARK_CONF,
+            context.sparkConf(),
+            OPTION_SPARK_CONF_FILE,
+            context.sparkConfFile(),
+            submitterConfigs.get("spark_conf"));
+
+    Map<String, String> updaterOptions =
+        parseFlatJsonMap(updaterOptionsJson, OPTION_UPDATER_OPTIONS);
+    Map<String, String> sparkConfigs = parseFlatJsonMap(sparkConfJson, 
OPTION_SPARK_CONF);
+
+    validateUpdaterOptions(updateMode, updaterOptions);
+    validateSparkConfigs(tableTargets, sparkConfigs);
+
+    if (context.dryRun()) {
+      for (TableTarget tableTarget : tableTargets) {
+        Map<String, String> jobConfig =
+            buildJobConfig(
+                tableTarget, updateMode, targetFileSizeBytes, updaterOptions, 
sparkConfigs);
+        context
+            .output()
+            .printf(
+                "DRY-RUN: identifier=%s jobTemplate=%s jobConfig=%s%n",
+                tableTarget.fullIdentifier, JOB_TEMPLATE_NAME, jobConfig);
+      }
+      context
+          .output()
+          .printf("SUMMARY: submit-update-stats-job total=%d dryRun=true%n", 
tableTargets.size());
+      return;
+    }
+
+    try (GravitinoClient client = 
GravitinoClientUtils.createClient(context.optimizerEnv())) {
+      int submitted = 0;
+      for (TableTarget tableTarget : tableTargets) {
+        Map<String, String> jobConfig =
+            buildJobConfig(
+                tableTarget, updateMode, targetFileSizeBytes, updaterOptions, 
sparkConfigs);
+        JobHandle jobHandle = client.runJob(JOB_TEMPLATE_NAME, jobConfig);
+        submitted++;
+        context
+            .output()
+            .printf(
+                "SUBMIT: identifier=%s jobTemplate=%s jobId=%s jobConfig=%s%n",
+                tableTarget.fullIdentifier, JOB_TEMPLATE_NAME, 
jobHandle.jobId(), jobConfig);
+      }
+      context
+          .output()
+          .printf(
+              "SUMMARY: submit-update-stats-job total=%d submitted=%d 
dryRun=false%n",
+              tableTargets.size(), submitted);
+    }
+  }
+
+  private static Map<String, String> buildJobConfig(
+      TableTarget tableTarget,
+      String updateMode,
+      long targetFileSizeBytes,
+      Map<String, String> updaterOptions,
+      Map<String, String> sparkConfigs) {
+    Map<String, String> jobConfig = new LinkedHashMap<>();
+    jobConfig.put("catalog_name", tableTarget.catalogName);
+    jobConfig.put("table_identifier", tableTarget.schemaAndTable);
+    jobConfig.put("update_mode", updateMode);
+    jobConfig.put("target_file_size_bytes", 
Long.toString(targetFileSizeBytes));
+    jobConfig.put("updater_options", toCanonicalJson(updaterOptions));
+    jobConfig.put("spark_conf", toCanonicalJson(sparkConfigs));
+    return jobConfig;
+  }
+
+  private static String resolveScalarOption(String cliValue, String confValue) 
{
+    if (StringUtils.isNotBlank(cliValue)) {
+      return cliValue.trim();
+    }
+    return StringUtils.isBlank(confValue) ? null : confValue.trim();
+  }
+
+  private static String resolveJsonOption(
+      String cliOptionName,
+      String cliValue,
+      String fileOptionName,
+      String filePath,
+      String confValue) {
+    if (StringUtils.isNotBlank(cliValue)) {
+      return cliValue.trim();
+    }
+    if (StringUtils.isNotBlank(filePath)) {
+      try {
+        Path path = Path.of(filePath.trim());
+        Preconditions.checkArgument(
+            Files.exists(path), "Option %s file does not exist: %s", 
fileOptionName, filePath);
+        return Files.readString(path, StandardCharsets.UTF_8).trim();
+      } catch (Exception e) {
+        throw new IllegalArgumentException(
+            String.format(
+                Locale.ROOT,
+                "Failed to read option %s from %s: %s",
+                cliOptionName,
+                filePath,
+                e.getMessage()),
+            e);
+      }
+    }
+    return StringUtils.isBlank(confValue) ? null : confValue.trim();
+  }
+
+  private static String parseUpdateMode(String value) {
+    String normalized =
+        StringUtils.isBlank(value) ? DEFAULT_UPDATE_MODE : 
value.trim().toLowerCase(Locale.ROOT);
+    Preconditions.checkArgument(
+        "stats".equals(normalized) || "metrics".equals(normalized) || 
"all".equals(normalized),
+        "Invalid --update-mode: %s. Supported values are: stats, metrics, all",
+        value);
+    return normalized;
+  }
+
+  private static long parseTargetFileSize(String value) {
+    if (StringUtils.isBlank(value)) {
+      return DEFAULT_TARGET_FILE_SIZE_BYTES;
+    }
+    return OptimizerCommandUtils.parseLongOption("target-file-size-bytes", 
value.trim(), false);
+  }
+
+  private static int parseLimit(String limit) {
+    if (StringUtils.isBlank(limit)) {
+      return Integer.MAX_VALUE;
+    }
+    long parsed = OptimizerCommandUtils.parseLongOption("limit", limit.trim(), 
false);
+    Preconditions.checkArgument(
+        parsed <= Integer.MAX_VALUE, "Option limit must be <= %s", 
Integer.MAX_VALUE);
+    return (int) parsed;
+  }
+
+  private static List<TableTarget> parseTableTargets(
+      String[] identifiers, String defaultCatalog, int limit) {
+    Preconditions.checkArgument(
+        identifiers != null && identifiers.length > 0,
+        "Missing required option --identifiers for command 
'submit-update-stats-job'");
+
+    List<TableTarget> tableTargets = new ArrayList<>();
+    for (String rawIdentifier : identifiers) {
+      if (tableTargets.size() >= limit) {
+        break;
+      }
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(rawIdentifier), "--identifiers contains blank 
identifier");
+      String[] levels = rawIdentifier.trim().split("\\.");
+      if (levels.length == 3) {
+        tableTargets.add(
+            new TableTarget(
+                rawIdentifier.trim(),
+                requireNonBlank(levels[0], "catalog"),
+                requireNonBlank(levels[1], "schema") + "." + 
requireNonBlank(levels[2], "table")));
+      } else if (levels.length == 2) {
+        Preconditions.checkArgument(
+            StringUtils.isNotBlank(defaultCatalog),
+            "Identifier '%s' uses schema.table format, but %s is not 
configured",
+            rawIdentifier,
+            OptimizerConfig.GRAVITINO_DEFAULT_CATALOG);
+        tableTargets.add(
+            new TableTarget(
+                defaultCatalog + "." + rawIdentifier.trim(),
+                defaultCatalog.trim(),
+                requireNonBlank(levels[0], "schema") + "." + 
requireNonBlank(levels[1], "table")));
+      } else {
+        throw new IllegalArgumentException(
+            String.format(
+                Locale.ROOT,
+                "Identifier '%s' is invalid. Use catalog.schema.table or 
schema.table",
+                rawIdentifier));
+      }
+    }
+    return tableTargets;
+  }
+
+  private static String requireNonBlank(String value, String levelName) {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(value), "%s in identifier cannot be blank", 
levelName);
+    return value.trim();
+  }
+
+  private static void validateUpdaterOptions(
+      String updateMode, Map<String, String> updaterOptions) {
+    if (!"stats".equals(updateMode) && !"all".equals(updateMode)) {
+      return;
+    }
+    String gravitinoUri =
+        firstNonBlank(
+            updaterOptions.get("gravitino_uri"),
+            updaterOptions.get("gravitino-uri"),
+            updaterOptions.get(OptimizerConfig.GRAVITINO_URI));
+    String metalake =
+        firstNonBlank(
+            updaterOptions.get("metalake"), 
updaterOptions.get(OptimizerConfig.GRAVITINO_METALAKE));
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(gravitinoUri),
+        "Option --updater-options (or config key 
jobSubmitterConfig.updater_options) "
+            + "must contain 'gravitino_uri' when update_mode is stats or all");
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(metalake),
+        "Option --updater-options (or config key 
jobSubmitterConfig.updater_options) "
+            + "must contain 'metalake' when update_mode is stats or all");
+  }
+
+  private static void validateSparkConfigs(
+      List<TableTarget> tableTargets, Map<String, String> sparkConfigs) {
+    Preconditions.checkArgument(
+        !sparkConfigs.isEmpty(),
+        "Missing spark config. Set --spark-conf/--spark-conf-file or "
+            + "gravitino.optimizer.jobSubmitterConfig.spark_conf in the config 
file");
+    for (TableTarget tableTarget : tableTargets) {
+      String requiredKey = "spark.sql.catalog." + tableTarget.catalogName;
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(sparkConfigs.get(requiredKey)),
+          "Spark config must contain key '%s' for identifier '%s'",
+          requiredKey,
+          tableTarget.fullIdentifier);
+    }
+  }
+
+  private static Map<String, String> parseFlatJsonMap(String json, String 
optionName) {
+    if (StringUtils.isBlank(json)) {
+      return Map.of();
+    }
+    try {
+      Map<String, Object> parsedMap =
+          MAPPER.readValue(json, new TypeReference<Map<String, Object>>() {});
+      Map<String, String> result = new LinkedHashMap<>();
+      for (Map.Entry<String, Object> entry : parsedMap.entrySet()) {
+        Object value = entry.getValue();
+        Preconditions.checkArgument(
+            !(value instanceof Map || value instanceof List),
+            "Option --%s must be a flat key-value JSON map, but key '%s' has 
non-scalar value",
+            optionName,
+            entry.getKey());
+        result.put(entry.getKey(), value == null ? "" : value.toString());
+      }
+      return result;
+    } catch (Exception e) {
+      throw new IllegalArgumentException(
+          String.format(
+              Locale.ROOT, "Option --%s is not valid JSON: %s", optionName, 
e.getMessage()),
+          e);
+    }
+  }
+
+  private static String toCanonicalJson(Map<String, String> options) {
+    try {
+      return MAPPER.writeValueAsString(new TreeMap<>(options == null ? 
Map.of() : options));
+    } catch (Exception e) {
+      throw new IllegalStateException("Failed to serialize options as JSON", 
e);
+    }
+  }
+
+  private static String firstNonBlank(String... values) {
+    for (String value : values) {
+      if (StringUtils.isNotBlank(value)) {
+        return value.trim();
+      }
+    }
+    return null;
+  }
+
+  private static final class TableTarget {
+    private final String fullIdentifier;
+    private final String catalogName;
+    private final String schemaAndTable;
+
+    private TableTarget(String fullIdentifier, String catalogName, String 
schemaAndTable) {
+      this.fullIdentifier = fullIdentifier;
+      this.catalogName = catalogName;
+      this.schemaAndTable = schemaAndTable;
+    }
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java
index 133567f11c..2be5785a87 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java
@@ -369,6 +369,118 @@ class TestOptimizerCmd {
         "Expected job metric updates from updateAll, but got " + 
totalJobUpdates);
   }
 
+  @Test
+  void testSubmitUpdateStatsJobDryRunUsesConfigDefaults() throws Exception {
+    Path confPath = createOptimizerConfForSubmitUpdateStatsJob();
+    String[] output =
+        runCommand(
+            "--type",
+            "submit-update-stats-job",
+            "--identifiers",
+            "ab.t1",
+            "--dry-run",
+            "--conf-path",
+            confPath.toString());
+    Assertions.assertTrue(output[1].isEmpty(), "stderr=" + output[1] + ", 
stdout=" + output[0]);
+    Assertions.assertTrue(output[0].contains("DRY-RUN: 
identifier=rest.ab.t1"));
+    
Assertions.assertTrue(output[0].contains("jobTemplate=builtin-iceberg-update-stats"));
+    Assertions.assertTrue(output[0].contains("update_mode=stats"));
+    Assertions.assertTrue(output[0].contains("table_identifier=ab.t1"));
+  }
+
+  @Test
+  void testSubmitUpdateStatsJobRejectsInvalidUpdateMode() throws Exception {
+    Path confPath = createOptimizerConfForSubmitUpdateStatsJob();
+    String[] output =
+        runCommand(
+            "--type",
+            "submit-update-stats-job",
+            "--identifiers",
+            "rest.ab.t1",
+            "--update-mode",
+            "bad_mode",
+            "--dry-run",
+            "--conf-path",
+            confPath.toString());
+    Assertions.assertTrue(output[1].contains("Invalid --update-mode: 
bad_mode"));
+  }
+
+  @Test
+  void testSubmitUpdateStatsJobRequiresSparkConf() throws Exception {
+    Path confPath = createOptimizerConfForSubmitUpdateStatsWithoutSparkConf();
+    String[] output =
+        runCommand(
+            "--type",
+            "submit-update-stats-job",
+            "--identifiers",
+            "rest.ab.t1",
+            "--dry-run",
+            "--conf-path",
+            confPath.toString());
+    Assertions.assertTrue(output[1].contains("Missing spark config."));
+  }
+
+  @Test
+  void testSubmitUpdateStatsJobSupportsJsonFileOverrides() throws Exception {
+    Path confPath = createOptimizerConfForSubmitUpdateStatsJob();
+    Path updaterOptionsFile = 
Files.createTempFile("optimizer-updater-options-", ".json");
+    Path sparkConfFile = Files.createTempFile("optimizer-spark-conf-", 
".json");
+    Files.writeString(
+        updaterOptionsFile,
+        "{\"metrics_updater\":\"custom-metrics-updater\"}",
+        StandardCharsets.UTF_8);
+    Files.writeString(
+        sparkConfFile,
+        
"{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\","
+            + "\"spark.sql.catalog.rest.type\":\"rest\","
+            + 
"\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\"}";,
+        StandardCharsets.UTF_8);
+    updaterOptionsFile.toFile().deleteOnExit();
+    sparkConfFile.toFile().deleteOnExit();
+
+    String[] output =
+        runCommand(
+            "--type",
+            "submit-update-stats-job",
+            "--identifiers",
+            "rest.ab.t1",
+            "--update-mode",
+            "metrics",
+            "--updater-options-file",
+            updaterOptionsFile.toString(),
+            "--spark-conf-file",
+            sparkConfFile.toString(),
+            "--dry-run",
+            "--conf-path",
+            confPath.toString());
+    Assertions.assertTrue(output[1].isEmpty(), "stderr=" + output[1] + ", 
stdout=" + output[0]);
+    Assertions.assertTrue(output[0].contains("custom-metrics-updater"));
+    Assertions.assertTrue(output[0].contains("spark.sql.catalog.rest"));
+  }
+
+  @Test
+  void testSubmitUpdateStatsJobRejectsMutuallyExclusiveUpdaterInputs() throws 
Exception {
+    Path confPath = createOptimizerConfForSubmitUpdateStatsJob();
+    Path updaterOptionsFile = 
Files.createTempFile("optimizer-updater-options-", ".json");
+    Files.writeString(updaterOptionsFile, "{\"metrics_updater\":\"m\"}", 
StandardCharsets.UTF_8);
+    updaterOptionsFile.toFile().deleteOnExit();
+    String[] output =
+        runCommand(
+            "--type",
+            "submit-update-stats-job",
+            "--identifiers",
+            "rest.ab.t1",
+            "--updater-options",
+            "{\"metrics_updater\":\"m1\"}",
+            "--updater-options-file",
+            updaterOptionsFile.toString(),
+            "--dry-run",
+            "--conf-path",
+            confPath.toString());
+    Assertions.assertTrue(
+        output[1].contains("--updater-options and --updater-options-file 
cannot be used together"));
+  }
+
   private Path createOptimizerConfForMetricsProvider() throws Exception {
     Path confPath = Files.createTempFile("optimizer-test-", ".conf");
     // Route command reads to deterministic in-memory fixtures from 
MetricsProviderForTest.
@@ -440,6 +552,45 @@ class TestOptimizerCmd {
     return confPath;
   }
 
+  private Path createOptimizerConfForSubmitUpdateStatsJob() throws Exception {
+    Path confPath = Files.createTempFile("optimizer-submit-update-stats-", 
".conf");
+    String content =
+        String.join(
+                System.lineSeparator(),
+                "gravitino.optimizer.gravitinoUri = http://localhost:8090";,
+                "gravitino.optimizer.gravitinoMetalake = test",
+                "gravitino.optimizer.gravitinoDefaultCatalog = rest",
+                "gravitino.optimizer.jobSubmitterConfig.update_mode = stats",
+                "gravitino.optimizer.jobSubmitterConfig.target_file_size_bytes 
= 100000",
+                "gravitino.optimizer.jobSubmitterConfig.updater_options = "
+                    + 
"{\"gravitino_uri\":\"http://localhost:8090\",\"metalake\":\"test\"}";,
+                "gravitino.optimizer.jobSubmitterConfig.spark_conf = "
+                    + 
"{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\","
+                    + "\"spark.sql.catalog.rest.type\":\"rest\","
+                    + 
"\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\"}";)
+            + System.lineSeparator();
+    Files.writeString(confPath, content, StandardCharsets.UTF_8);
+    confPath.toFile().deleteOnExit();
+    return confPath;
+  }
+
+  private Path createOptimizerConfForSubmitUpdateStatsWithoutSparkConf() 
throws Exception {
+    Path confPath = 
Files.createTempFile("optimizer-submit-update-stats-without-spark-", ".conf");
+    String content =
+        String.join(
+                System.lineSeparator(),
+                "gravitino.optimizer.gravitinoUri = http://localhost:8090";,
+                "gravitino.optimizer.gravitinoMetalake = test",
+                "gravitino.optimizer.gravitinoDefaultCatalog = rest",
+                "gravitino.optimizer.jobSubmitterConfig.update_mode = stats",
+                "gravitino.optimizer.jobSubmitterConfig.updater_options = "
+                    + 
"{\"gravitino_uri\":\"http://localhost:8090\",\"metalake\":\"test\"}";)
+            + System.lineSeparator();
+    Files.writeString(confPath, content, StandardCharsets.UTF_8);
+    confPath.toFile().deleteOnExit();
+    return confPath;
+  }
+
   private String[] runCommand(String... args) {
     ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
     ByteArrayOutputStream errBuffer = new ByteArrayOutputStream();

Reply via email to