This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch stats_job
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit f2775b4305fd8738706fa8666d843ff3d125eee9
Author: fanng <[email protected]>
AuthorDate: Thu Mar 5 17:12:16 2026 +0800

    Refactor update-stats Spark config handling and simplify job options
    
    - Introduce IcebergSparkConfigUtils in optimizer-api to share Spark 
template building, flat JSON parsing, and catalog config validation
    
    - Reuse the shared Spark template utility in both rewrite-data-files and 
update-stats built-in jobs
    
    - Simplify submit-update-stats-job CLI/context by removing target-file-size 
option and relying on job-side fixed target size
    
    - Keep datafile_mse target fixed at 128MiB (small-file threshold 32MiB) and 
remove target_file_size_bytes from job template arguments/config
    
    - Update optimizer/jobs tests to match new CLI/job signatures and add unit 
tests for the shared Spark config utility
---
 .../jobs/iceberg/IcebergRewriteDataFilesJob.java   |  23 +--
 .../iceberg/IcebergUpdateStatsAndMetricsJob.java   |  75 +++-------
 .../jobs/iceberg/TestIcebergUpdateStatsJob.java    |  46 +++---
 .../TestIcebergUpdateStatsJobWithSpark.java        |  25 ++--
 .../common/util/IcebergSparkConfigUtils.java       | 156 +++++++++++++++++++++
 .../common/util/TestIcebergSparkConfigUtils.java   | 125 +++++++++++++++++
 .../maintenance/optimizer/OptimizerCmd.java        |  10 +-
 .../optimizer/command/OptimizerCommandContext.java |  13 +-
 .../command/SubmitUpdateStatsJobCommand.java       |  62 ++------
 .../maintenance/optimizer/TestOptimizerCmd.java    |  87 +++++++++++-
 .../job/TestBuiltinIcebergUpdateStatsJob.java      |   1 -
 11 files changed, 427 insertions(+), 196 deletions(-)

diff --git 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java
 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java
index 5324204434..c2788281b0 100644
--- 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java
+++ 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.gravitino.job.JobTemplateProvider;
 import org.apache.gravitino.job.SparkJobTemplate;
 import org.apache.gravitino.maintenance.jobs.BuiltInJob;
+import 
org.apache.gravitino.maintenance.optimizer.common.util.IcebergSparkConfigUtils;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
@@ -497,26 +498,6 @@ public class IcebergRewriteDataFilesJob implements 
BuiltInJob {
    * @return map of Spark configuration keys to template values
    */
   private static Map<String, String> buildSparkConfigs() {
-    Map<String, String> configs = new HashMap<>();
-
-    // Spark runtime configs
-    configs.put("spark.master", "{{spark_master}}");
-    configs.put("spark.executor.instances", "{{spark_executor_instances}}");
-    configs.put("spark.executor.cores", "{{spark_executor_cores}}");
-    configs.put("spark.executor.memory", "{{spark_executor_memory}}");
-    configs.put("spark.driver.memory", "{{spark_driver_memory}}");
-
-    // Iceberg catalog configuration
-    configs.put("spark.sql.catalog.{{catalog_name}}", 
"org.apache.iceberg.spark.SparkCatalog");
-    configs.put("spark.sql.catalog.{{catalog_name}}.type", "{{catalog_type}}");
-    configs.put("spark.sql.catalog.{{catalog_name}}.uri", "{{catalog_uri}}");
-    configs.put("spark.sql.catalog.{{catalog_name}}.warehouse", 
"{{warehouse_location}}");
-
-    // Iceberg extensions
-    configs.put(
-        "spark.sql.extensions",
-        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");
-
-    return Collections.unmodifiableMap(configs);
+    return IcebergSparkConfigUtils.buildTemplateSparkConfigs();
   }
 }
diff --git 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsAndMetricsJob.java
 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsAndMetricsJob.java
index c6172acce5..b2826c1d55 100644
--- 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsAndMetricsJob.java
+++ 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsAndMetricsJob.java
@@ -44,6 +44,7 @@ import 
org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
 import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
 import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
 import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.common.util.IcebergSparkConfigUtils;
 import org.apache.gravitino.maintenance.optimizer.common.util.ProviderUtils;
 import org.apache.gravitino.stats.StatisticValues;
 import org.apache.spark.sql.Row;
@@ -96,7 +97,6 @@ public class IcebergUpdateStatsAndMetricsJob implements 
BuiltInJob {
       System.exit(1);
     }
 
-    long targetFileSizeBytes = 
parseTargetFileSize(argMap.get("target-file-size-bytes"));
     Map<String, String> updaterOptions = 
parseJsonOptions(argMap.get("updater-options"));
     String sparkConfJson = argMap.get("spark-conf");
 
@@ -129,13 +129,7 @@ public class IcebergUpdateStatsAndMetricsJob implements 
BuiltInJob {
       }
 
       updateStatistics(
-          spark,
-          statisticsUpdater,
-          metricsUpdater,
-          updateMode,
-          catalogName,
-          tableIdentifier,
-          targetFileSizeBytes);
+          spark, statisticsUpdater, metricsUpdater, updateMode, catalogName, 
tableIdentifier);
     } catch (Exception e) {
       LOG.error("Failed to update Iceberg statistics/metrics", e);
       System.exit(1);
@@ -162,16 +156,9 @@ public class IcebergUpdateStatsAndMetricsJob implements 
BuiltInJob {
       SparkSession spark,
       StatisticsUpdater statisticsUpdater,
       String catalogName,
-      String tableIdentifier,
-      long targetFileSizeBytes) {
+      String tableIdentifier) {
     updateStatistics(
-        spark,
-        statisticsUpdater,
-        null,
-        UpdateMode.STATS,
-        catalogName,
-        tableIdentifier,
-        targetFileSizeBytes);
+        spark, statisticsUpdater, null, UpdateMode.STATS, catalogName, 
tableIdentifier);
   }
 
   static void updateStatistics(
@@ -179,16 +166,9 @@ public class IcebergUpdateStatsAndMetricsJob implements 
BuiltInJob {
       StatisticsUpdater statisticsUpdater,
       MetricsUpdater metricsUpdater,
       String catalogName,
-      String tableIdentifier,
-      long targetFileSizeBytes) {
+      String tableIdentifier) {
     updateStatistics(
-        spark,
-        statisticsUpdater,
-        metricsUpdater,
-        UpdateMode.ALL,
-        catalogName,
-        tableIdentifier,
-        targetFileSizeBytes);
+        spark, statisticsUpdater, metricsUpdater, UpdateMode.ALL, catalogName, 
tableIdentifier);
   }
 
   static void updateStatistics(
@@ -197,8 +177,7 @@ public class IcebergUpdateStatsAndMetricsJob implements 
BuiltInJob {
       MetricsUpdater metricsUpdater,
       UpdateMode updateMode,
       String catalogName,
-      String tableIdentifier,
-      long targetFileSizeBytes) {
+      String tableIdentifier) {
     Objects.requireNonNull(updateMode, "updateMode must not be null");
 
     if (updateMode.updateStats && statisticsUpdater == null) {
@@ -216,7 +195,7 @@ public class IcebergUpdateStatsAndMetricsJob implements 
BuiltInJob {
     long metricTimestamp = System.currentTimeMillis() / 1000L;
     boolean partitioned = isPartitionedTable(spark, catalogName, 
tableIdentifier);
     if (partitioned) {
-      String sql = buildPartitionStatsSql(catalogName, tableIdentifier, 
targetFileSizeBytes);
+      String sql = buildPartitionStatsSql(catalogName, tableIdentifier);
       Row[] rows = (Row[]) spark.sql(sql).collect();
       Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics = new 
LinkedHashMap<>();
       List<MetricPoint> tableAndPartitionMetrics = new ArrayList<>();
@@ -247,7 +226,7 @@ public class IcebergUpdateStatsAndMetricsJob implements 
BuiltInJob {
           rows.length,
           gravitinoTableIdentifier);
     } else {
-      String sql = buildTableStatsSql(catalogName, tableIdentifier, 
targetFileSizeBytes);
+      String sql = buildTableStatsSql(catalogName, tableIdentifier);
       Row[] rows = (Row[]) spark.sql(sql).collect();
       List<StatisticEntry<?>> tableStatistics =
           rows.length == 0 ? Collections.emptyList() : toStatistics(rows[0]);
@@ -269,8 +248,7 @@ public class IcebergUpdateStatsAndMetricsJob implements 
BuiltInJob {
     }
   }
 
-  static String buildTableStatsSql(
-      String catalogName, String tableIdentifier, long targetFileSizeBytes) {
+  static String buildTableStatsSql(String catalogName, String tableIdentifier) 
{
     String filesTable = buildFilesTableIdentifier(catalogName, 
tableIdentifier);
     return "SELECT "
         + "COUNT(*) AS file_count, "
@@ -281,9 +259,9 @@ public class IcebergUpdateStatsAndMetricsJob implements 
BuiltInJob {
         + SMALL_FILE_THRESHOLD_BYTES
         + " THEN 1 ELSE 0 END) AS small_files, "
         + "AVG(POWER("
-        + targetFileSizeBytes
+        + DEFAULT_TARGET_FILE_SIZE_BYTES
         + " - LEAST("
-        + targetFileSizeBytes
+        + DEFAULT_TARGET_FILE_SIZE_BYTES
         + ", file_size_in_bytes), 2)) AS datafile_mse, "
         + "AVG(file_size_in_bytes) AS avg_size, "
         + "SUM(file_size_in_bytes) AS total_size "
@@ -291,8 +269,7 @@ public class IcebergUpdateStatsAndMetricsJob implements 
BuiltInJob {
         + filesTable;
   }
 
-  static String buildPartitionStatsSql(
-      String catalogName, String tableIdentifier, long targetFileSizeBytes) {
+  static String buildPartitionStatsSql(String catalogName, String 
tableIdentifier) {
     String filesTable = buildFilesTableIdentifier(catalogName, 
tableIdentifier);
     return "SELECT "
         + "partition, "
@@ -304,9 +281,9 @@ public class IcebergUpdateStatsAndMetricsJob implements 
BuiltInJob {
         + SMALL_FILE_THRESHOLD_BYTES
         + " THEN 1 ELSE 0 END) AS small_files, "
         + "AVG(POWER("
-        + targetFileSizeBytes
+        + DEFAULT_TARGET_FILE_SIZE_BYTES
         + " - LEAST("
-        + targetFileSizeBytes
+        + DEFAULT_TARGET_FILE_SIZE_BYTES
         + ", file_size_in_bytes), 2)) AS datafile_mse, "
         + "AVG(file_size_in_bytes) AS avg_size, "
         + "SUM(file_size_in_bytes) AS total_size "
@@ -425,21 +402,6 @@ public class IcebergUpdateStatsAndMetricsJob implements 
BuiltInJob {
     }
   }
 
-  static long parseTargetFileSize(String value) {
-    if (value == null || value.trim().isEmpty()) {
-      return DEFAULT_TARGET_FILE_SIZE_BYTES;
-    }
-    try {
-      long parsed = Long.parseLong(value.trim());
-      if (parsed <= 0) {
-        throw new IllegalArgumentException("target-file-size-bytes must be > 
0");
-      }
-      return parsed;
-    } catch (NumberFormatException e) {
-      throw new IllegalArgumentException("Invalid target-file-size-bytes: " + 
value, e);
-    }
-  }
-
   static UpdateMode parseUpdateMode(String value) {
     if (value == null || value.trim().isEmpty()) {
       return UpdateMode.from(DEFAULT_UPDATE_MODE);
@@ -582,8 +544,6 @@ public class IcebergUpdateStatsAndMetricsJob implements 
BuiltInJob {
         "{{table_identifier}}",
         "--update-mode",
         "{{update_mode}}",
-        "--target-file-size-bytes",
-        "{{target_file_size_bytes}}",
         "--updater-options",
         "{{updater_options}}",
         "--spark-conf",
@@ -591,7 +551,7 @@ public class IcebergUpdateStatsAndMetricsJob implements 
BuiltInJob {
   }
 
   private static Map<String, String> buildSparkConfigs() {
-    return Collections.emptyMap();
+    return IcebergSparkConfigUtils.buildTemplateSparkConfigs();
   }
 
   private static void printUsage() {
@@ -604,8 +564,7 @@ public class IcebergUpdateStatsAndMetricsJob implements 
BuiltInJob {
             + "\\n"
             + "Optional Options:\\n"
             + "  --update-mode <stats|metrics|all> Update behavior mode, 
default: all\\n"
-            + "  --target-file-size-bytes <bytes>   MSE target file size in 
bytes\\n"
-            + "                                     Default: 134217728 
(128MB)\\n"
+            + "  datafile_mse target file size is fixed at 134217728 
(128MB)\\n"
             + "                                     small_files threshold is 
fixed at 33554432 (32MB)\\n"
             + "  --updater-options <json>           JSON map for updater and 
repository settings\\n"
             + "                                     Example: 
'{\"gravitino_uri\":\"http://localhost:8090\",\\n";
diff --git 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java
 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java
index cbbc27fce5..937f62cefc 100644
--- 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java
+++ 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java
@@ -48,28 +48,46 @@ public class TestIcebergUpdateStatsJob {
     SparkJobTemplate template = job.jobTemplate();
 
     assertNotNull(template.arguments());
-    assertEquals(12, template.arguments().size());
+    assertEquals(10, template.arguments().size());
     assertTrue(template.arguments().contains("--catalog"));
     assertTrue(template.arguments().contains("{{catalog_name}}"));
     assertTrue(template.arguments().contains("--table"));
     assertTrue(template.arguments().contains("{{table_identifier}}"));
     assertTrue(template.arguments().contains("--update-mode"));
     assertTrue(template.arguments().contains("{{update_mode}}"));
-    assertTrue(template.arguments().contains("--target-file-size-bytes"));
-    assertTrue(template.arguments().contains("{{target_file_size_bytes}}"));
     assertTrue(template.arguments().contains("--updater-options"));
     assertTrue(template.arguments().contains("{{updater_options}}"));
     assertTrue(template.arguments().contains("--spark-conf"));
     assertTrue(template.arguments().contains("{{spark_conf}}"));
   }
 
+  @Test
+  public void testJobTemplateHasSparkConfigs() {
+    IcebergUpdateStatsAndMetricsJob job = new 
IcebergUpdateStatsAndMetricsJob();
+    SparkJobTemplate template = job.jobTemplate();
+
+    Map<String, String> configs = template.configs();
+    assertNotNull(configs);
+    assertTrue(configs.containsKey("spark.master"));
+    assertTrue(configs.containsKey("spark.executor.instances"));
+    assertTrue(configs.containsKey("spark.executor.cores"));
+    assertTrue(configs.containsKey("spark.executor.memory"));
+    assertTrue(configs.containsKey("spark.driver.memory"));
+    assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}"));
+    assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}.type"));
+    assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}.uri"));
+    
assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}.warehouse"));
+    assertEquals(
+        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
+        configs.get("spark.sql.extensions"));
+  }
+
   @Test
   public void testParseArguments() {
     String[] args = {
       "--catalog", "cat",
       "--table", "db.tbl",
       "--update-mode", "metrics",
-      "--target-file-size-bytes", "2048",
       "--updater-options", 
"{\"metalake\":\"ml\",\"gravitino_uri\":\"http://localhost:8090\"}";,
       "--spark-conf", "{\"spark.master\":\"local[2]\"}"
     };
@@ -78,7 +96,6 @@ public class TestIcebergUpdateStatsJob {
     assertEquals("cat", parsed.get("catalog"));
     assertEquals("db.tbl", parsed.get("table"));
     assertEquals("metrics", parsed.get("update-mode"));
-    assertEquals("2048", parsed.get("target-file-size-bytes"));
     assertEquals(
         "{\"metalake\":\"ml\",\"gravitino_uri\":\"http://localhost:8090\"}";,
         parsed.get("updater-options"));
@@ -87,10 +104,8 @@ public class TestIcebergUpdateStatsJob {
 
   @Test
   public void testBuildStatsSql() {
-    String tableSql =
-        IcebergUpdateStatsAndMetricsJob.buildTableStatsSql("cat", "db.tbl", 
134_217_728L);
-    String partitionSql =
-        IcebergUpdateStatsAndMetricsJob.buildPartitionStatsSql("cat", 
"db.tbl", 134_217_728L);
+    String tableSql = 
IcebergUpdateStatsAndMetricsJob.buildTableStatsSql("cat", "db.tbl");
+    String partitionSql = 
IcebergUpdateStatsAndMetricsJob.buildPartitionStatsSql("cat", "db.tbl");
 
     assertTrue(tableSql.contains("FROM cat.db.tbl.files"));
     assertTrue(tableSql.contains("AS datafile_mse"));
@@ -103,19 +118,6 @@ public class TestIcebergUpdateStatsJob {
     assertTrue(partitionSql.contains("134217728 - LEAST(134217728, 
file_size_in_bytes)"));
   }
 
-  @Test
-  public void testParseTargetFileSize() {
-    assertEquals(134_217_728L, 
IcebergUpdateStatsAndMetricsJob.parseTargetFileSize(null));
-    assertEquals(134_217_728L, 
IcebergUpdateStatsAndMetricsJob.parseTargetFileSize(""));
-    assertEquals(2048L, 
IcebergUpdateStatsAndMetricsJob.parseTargetFileSize("2048"));
-    assertThrows(
-        IllegalArgumentException.class,
-        () -> IcebergUpdateStatsAndMetricsJob.parseTargetFileSize("-1"));
-    assertThrows(
-        IllegalArgumentException.class,
-        () -> IcebergUpdateStatsAndMetricsJob.parseTargetFileSize("abc"));
-  }
-
   @Test
   public void testParseUpdateMode() {
     assertEquals(
diff --git 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
index 187c4c1b37..ca6f8171d3 100644
--- 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
+++ 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
@@ -270,8 +270,7 @@ public class TestIcebergUpdateStatsJobWithSpark {
         null,
         IcebergUpdateStatsAndMetricsJob.UpdateMode.STATS,
         catalogName,
-        "db.non_partitioned",
-        100_000L);
+        "db.non_partitioned");
 
     assertEquals(NameIdentifier.of(catalogName, "db", "non_partitioned"), 
updater.tableIdentifier);
     assertNotNull(updater.tableStatistics);
@@ -296,8 +295,7 @@ public class TestIcebergUpdateStatsJobWithSpark {
         null,
         IcebergUpdateStatsAndMetricsJob.UpdateMode.STATS,
         catalogName,
-        "db.partitioned",
-        100_000L);
+        "db.partitioned");
 
     assertEquals(NameIdentifier.of(catalogName, "db", "partitioned"), 
updater.tableIdentifier);
     assertTrue(updater.tableStatistics.isEmpty());
@@ -327,8 +325,7 @@ public class TestIcebergUpdateStatsJobWithSpark {
         metricsUpdater,
         IcebergUpdateStatsAndMetricsJob.UpdateMode.ALL,
         catalogName,
-        "db.partitioned",
-        100_000L);
+        "db.partitioned");
 
     assertEquals(
         NameIdentifier.of(catalogName, "db", "partitioned"), 
statisticsUpdater.tableIdentifier);
@@ -358,8 +355,7 @@ public class TestIcebergUpdateStatsJobWithSpark {
         metricsUpdater,
         IcebergUpdateStatsAndMetricsJob.UpdateMode.METRICS,
         catalogName,
-        "db.partitioned",
-        100_000L);
+        "db.partitioned");
 
     assertEquals(16, metricsUpdater.tableMetrics.size());
     assertTrue(
@@ -378,8 +374,7 @@ public class TestIcebergUpdateStatsJobWithSpark {
         metricsUpdater,
         IcebergUpdateStatsAndMetricsJob.UpdateMode.METRICS,
         catalogName,
-        "db.non_partitioned",
-        100_000L);
+        "db.non_partitioned");
 
     assertEquals(8, metricsUpdater.tableMetrics.size());
     assertTrue(
@@ -401,8 +396,7 @@ public class TestIcebergUpdateStatsJobWithSpark {
         metricsUpdater,
         IcebergUpdateStatsAndMetricsJob.UpdateMode.ALL,
         catalogName,
-        "db.non_partitioned",
-        100_000L);
+        "db.non_partitioned");
 
     assertEquals(
         NameIdentifier.of(catalogName, "db", "non_partitioned"), 
statisticsUpdater.tableIdentifier);
@@ -440,8 +434,7 @@ public class TestIcebergUpdateStatsJobWithSpark {
             metricsUpdater,
             IcebergUpdateStatsAndMetricsJob.UpdateMode.METRICS,
             catalogName,
-            "db.partitioned",
-            100_000L);
+            "db.partitioned");
       } finally {
         metricsUpdater.close();
       }
@@ -498,8 +491,7 @@ public class TestIcebergUpdateStatsJobWithSpark {
         null,
         IcebergUpdateStatsAndMetricsJob.UpdateMode.STATS,
         catalogName,
-        "db.multi_partitioned",
-        100_000L);
+        "db.multi_partitioned");
 
     assertEquals(
         NameIdentifier.of(catalogName, "db", "multi_partitioned"), 
updater.tableIdentifier);
@@ -733,7 +725,6 @@ public class TestIcebergUpdateStatsJobWithSpark {
     Map<String, String> jobConf = new HashMap<>();
     jobConf.put("table_identifier", "db." + tableName);
     jobConf.put("update_mode", updateMode);
-    jobConf.put("target_file_size_bytes", "100000");
     jobConf.put("updater_options", updaterOptions);
     jobConf.put("catalog_name", SPARK_CATALOG_NAME);
     // Keep compatibility with old built-in template placeholders in deploy 
package.
diff --git 
a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IcebergSparkConfigUtils.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IcebergSparkConfigUtils.java
new file mode 100644
index 0000000000..cf78fabcf5
--- /dev/null
+++ 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IcebergSparkConfigUtils.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+
+/** Shared utilities for Iceberg Spark template configs and config validation. 
*/
+public final class IcebergSparkConfigUtils {
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  private static final String SPARK_SQL_EXTENSIONS_KEY = 
"spark.sql.extensions";
+  private static final String SPARK_SQL_CATALOG_PREFIX = "spark.sql.catalog.";
+  private static final String CATALOG_TYPE_REST = "rest";
+  private static final String CATALOG_TYPE_HIVE = "hive";
+  private static final String CATALOG_TYPE_HADOOP = "hadoop";
+
+  public static final String ICEBERG_SPARK_EXTENSIONS =
+      "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions";
+
+  private IcebergSparkConfigUtils() {}
+
+  /** Build default Spark template configs for Iceberg jobs. */
+  public static Map<String, String> buildTemplateSparkConfigs() {
+    Map<String, String> configs = new HashMap<>();
+    configs.put("spark.master", "{{spark_master}}");
+    configs.put("spark.executor.instances", "{{spark_executor_instances}}");
+    configs.put("spark.executor.cores", "{{spark_executor_cores}}");
+    configs.put("spark.executor.memory", "{{spark_executor_memory}}");
+    configs.put("spark.driver.memory", "{{spark_driver_memory}}");
+    configs.put(
+        SPARK_SQL_CATALOG_PREFIX + "{{catalog_name}}", 
"org.apache.iceberg.spark.SparkCatalog");
+    configs.put(SPARK_SQL_CATALOG_PREFIX + "{{catalog_name}}.type", 
"{{catalog_type}}");
+    configs.put(SPARK_SQL_CATALOG_PREFIX + "{{catalog_name}}.uri", 
"{{catalog_uri}}");
+    configs.put(SPARK_SQL_CATALOG_PREFIX + "{{catalog_name}}.warehouse", 
"{{warehouse_location}}");
+    configs.put(SPARK_SQL_EXTENSIONS_KEY, ICEBERG_SPARK_EXTENSIONS);
+    return Collections.unmodifiableMap(configs);
+  }
+
+  /**
+   * Parse a flat JSON map from CLI/config option.
+   *
+   * <p>Nested objects/arrays are rejected.
+   */
+  public static Map<String, String> parseFlatJsonMap(String json, String 
optionName) {
+    if (StringUtils.isBlank(json)) {
+      return ImmutableMap.of();
+    }
+    try {
+      Map<String, Object> parsedMap =
+          MAPPER.readValue(json, new TypeReference<Map<String, Object>>() {});
+      Map<String, String> result = new LinkedHashMap<>();
+      for (Map.Entry<String, Object> entry : parsedMap.entrySet()) {
+        Object value = entry.getValue();
+        Preconditions.checkArgument(
+            !(value instanceof Map || value instanceof List),
+            "Option --%s must be a flat key-value JSON map, but key '%s' has 
non-scalar value",
+            optionName,
+            entry.getKey());
+        result.put(entry.getKey(), value == null ? "" : value.toString());
+      }
+      return ImmutableMap.copyOf(result);
+    } catch (Exception e) {
+      throw new IllegalArgumentException(
+          String.format(
+              Locale.ROOT, "Option --%s is not valid JSON: %s", optionName, 
e.getMessage()),
+          e);
+    }
+  }
+
+  /** Validate required Spark/Iceberg catalog configs for one table 
identifier. */
+  public static void validateSparkConfigsForCatalog(
+      Map<String, String> sparkConfigs, String catalogName, String 
tableIdentifier) {
+    Preconditions.checkArgument(
+        sparkConfigs != null && !sparkConfigs.isEmpty(),
+        "Missing spark config. Set --spark-conf or "
+            + "gravitino.optimizer.jobSubmitterConfig.spark_conf in the config 
file");
+    Preconditions.checkArgument(StringUtils.isNotBlank(catalogName), 
"catalogName is blank");
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(tableIdentifier), "tableIdentifier is blank");
+
+    String extensions = 
StringUtils.trimToNull(sparkConfigs.get(SPARK_SQL_EXTENSIONS_KEY));
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(extensions),
+        "Spark config must contain key '%s' and include '%s'",
+        SPARK_SQL_EXTENSIONS_KEY,
+        ICEBERG_SPARK_EXTENSIONS);
+    Preconditions.checkArgument(
+        extensions.contains(ICEBERG_SPARK_EXTENSIONS),
+        "Spark config key '%s' must include '%s'",
+        SPARK_SQL_EXTENSIONS_KEY,
+        ICEBERG_SPARK_EXTENSIONS);
+
+    String catalogPrefix = SPARK_SQL_CATALOG_PREFIX + catalogName;
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(sparkConfigs.get(catalogPrefix)),
+        "Spark config must contain key '%s' for identifier '%s'",
+        catalogPrefix,
+        tableIdentifier);
+
+    String typeKey = catalogPrefix + ".type";
+    String catalogType = StringUtils.trimToNull(sparkConfigs.get(typeKey));
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(catalogType),
+        "Spark config must contain key '%s' for identifier '%s'",
+        typeKey,
+        tableIdentifier);
+
+    String normalizedCatalogType = catalogType.toLowerCase(Locale.ROOT);
+    if (CATALOG_TYPE_REST.equals(normalizedCatalogType)
+        || CATALOG_TYPE_HIVE.equals(normalizedCatalogType)) {
+      String uriKey = catalogPrefix + ".uri";
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(sparkConfigs.get(uriKey)),
+          "Spark config must contain key '%s' when catalog type is '%s' for 
identifier '%s'",
+          uriKey,
+          normalizedCatalogType,
+          tableIdentifier);
+    } else if (CATALOG_TYPE_HADOOP.equals(normalizedCatalogType)) {
+      String warehouseKey = catalogPrefix + ".warehouse";
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(sparkConfigs.get(warehouseKey)),
+          "Spark config must contain key '%s' when catalog type is '%s' for 
identifier '%s'",
+          warehouseKey,
+          normalizedCatalogType,
+          tableIdentifier);
+    }
+  }
+}
diff --git 
a/maintenance/optimizer-api/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIcebergSparkConfigUtils.java
 
b/maintenance/optimizer-api/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIcebergSparkConfigUtils.java
new file mode 100644
index 0000000000..7ac05be062
--- /dev/null
+++ 
b/maintenance/optimizer-api/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIcebergSparkConfigUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+
+public class TestIcebergSparkConfigUtils {
+
+  @Test
+  public void testBuildTemplateSparkConfigsContainsRequiredKeys() {
+    Map<String, String> configs = 
IcebergSparkConfigUtils.buildTemplateSparkConfigs();
+
+    assertTrue(configs.containsKey("spark.master"));
+    assertTrue(configs.containsKey("spark.executor.instances"));
+    assertTrue(configs.containsKey("spark.executor.cores"));
+    assertTrue(configs.containsKey("spark.executor.memory"));
+    assertTrue(configs.containsKey("spark.driver.memory"));
+    assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}"));
+    assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}.type"));
+    assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}.uri"));
+    
assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}.warehouse"));
+    assertEquals(
+        IcebergSparkConfigUtils.ICEBERG_SPARK_EXTENSIONS, 
configs.get("spark.sql.extensions"));
+  }
+
+  @Test
+  public void testParseFlatJsonMap() {
+    Map<String, String> parsed =
+        IcebergSparkConfigUtils.parseFlatJsonMap(
+            "{\"k1\":\"v1\",\"k2\":1,\"k3\":null}", "spark-conf");
+
+    assertEquals("v1", parsed.get("k1"));
+    assertEquals("1", parsed.get("k2"));
+    assertEquals("", parsed.get("k3"));
+  }
+
+  @Test
+  public void testParseFlatJsonMapRejectsNestedValue() {
+    IllegalArgumentException e =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                IcebergSparkConfigUtils.parseFlatJsonMap(
+                    "{\"spark\":{\"master\":\"local[2]\"}}", "spark-conf"));
+    assertTrue(e.getMessage().contains("flat key-value JSON map"));
+  }
+
+  @Test
+  public void testValidateSparkConfigsForRestCatalog() {
+    Map<String, String> sparkConfigs = baseSparkConfigs("rest");
+    sparkConfigs.put("spark.sql.catalog.rest.uri", 
"http://localhost:9001/iceberg";);
+    sparkConfigs.put("spark.sql.catalog.rest.warehouse", "/tmp/warehouse");
+
+    IcebergSparkConfigUtils.validateSparkConfigsForCatalog(sparkConfigs, 
"rest", "rest.ab.t1");
+  }
+
+  @Test
+  public void testValidateSparkConfigsForHadoopCatalog() {
+    Map<String, String> sparkConfigs = baseSparkConfigs("hadoop");
+    sparkConfigs.put("spark.sql.catalog.rest.warehouse", "/tmp/warehouse");
+
+    IcebergSparkConfigUtils.validateSparkConfigsForCatalog(sparkConfigs, 
"rest", "rest.ab.t1");
+  }
+
+  @Test
+  public void testValidateSparkConfigsMissingExtensions() {
+    Map<String, String> sparkConfigs = baseSparkConfigs("rest");
+    sparkConfigs.remove("spark.sql.extensions");
+    sparkConfigs.put("spark.sql.catalog.rest.uri", 
"http://localhost:9001/iceberg";);
+
+    IllegalArgumentException e =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                IcebergSparkConfigUtils.validateSparkConfigsForCatalog(
+                    sparkConfigs, "rest", "rest.ab.t1"));
+    assertTrue(e.getMessage().contains("spark.sql.extensions"));
+  }
+
+  @Test
+  public void testValidateSparkConfigsMissingUriForRestCatalog() {
+    Map<String, String> sparkConfigs = baseSparkConfigs("rest");
+
+    IllegalArgumentException e =
+        assertThrows(
+            IllegalArgumentException.class,
+            () ->
+                IcebergSparkConfigUtils.validateSparkConfigsForCatalog(
+                    sparkConfigs, "rest", "rest.ab.t1"));
+    assertTrue(e.getMessage().contains("spark.sql.catalog.rest.uri"));
+  }
+
+  private Map<String, String> baseSparkConfigs(String catalogType) {
+    Map<String, String> configs = new HashMap<>();
+    configs.put(
+        "spark.sql.extensions",
+        "com.example.Ext," + IcebergSparkConfigUtils.ICEBERG_SPARK_EXTENSIONS);
+    configs.put("spark.sql.catalog.rest", 
"org.apache.iceberg.spark.SparkCatalog");
+    configs.put("spark.sql.catalog.rest.type", catalogType);
+    return configs;
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java
index 6800e5203e..19bd5818fd 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java
@@ -122,7 +122,6 @@ public class OptimizerCmd {
               EnumSet.of(
                   CliOption.DRY_RUN,
                   CliOption.UPDATE_MODE,
-                  CliOption.TARGET_FILE_SIZE_BYTES,
                   CliOption.UPDATER_OPTIONS,
                   CliOption.SPARK_CONF),
               "Submit built-in Iceberg update stats jobs for given table 
identifiers.",
@@ -180,12 +179,10 @@ public class OptimizerCmd {
               CliOption.RANGE_SECONDS.longOpt(), 
Long.toString(DEFAULT_RANGE_SECONDS));
       String partitionPathRaw = 
cmd.getOptionValue(CliOption.PARTITION_PATH.longOpt());
       String updateMode = cmd.getOptionValue(CliOption.UPDATE_MODE.longOpt());
-      String targetFileSizeBytes = 
cmd.getOptionValue(CliOption.TARGET_FILE_SIZE_BYTES.longOpt());
       String updaterOptions = 
cmd.getOptionValue(CliOption.UPDATER_OPTIONS.longOpt());
       String sparkConf = cmd.getOptionValue(CliOption.SPARK_CONF.longOpt());
       OptimizerCommandContext.UpdateStatsJobOptions updateStatsJobOptions =
-          new OptimizerCommandContext.UpdateStatsJobOptions(
-              updateMode, targetFileSizeBytes, updaterOptions, sparkConf);
+          new OptimizerCommandContext.UpdateStatsJobOptions(updateMode, 
updaterOptions, sparkConf);
       String statisticsPayload = 
cmd.getOptionValue(CliOption.STATISTICS_PAYLOAD.longOpt());
       String filePath = cmd.getOptionValue(CliOption.FILE_PATH.longOpt());
       Optional<StatisticsInputContent> statisticsInputContent =
@@ -494,11 +491,6 @@ public class OptimizerCmd {
         CliOptionArgType.SINGLE,
         null,
         "Update mode for submit-update-stats-job: stats|metrics|all"),
-    TARGET_FILE_SIZE_BYTES(
-        "target-file-size-bytes",
-        CliOptionArgType.SINGLE,
-        null,
-        "Target file size bytes for submit-update-stats-job"),
     UPDATER_OPTIONS(
         "updater-options",
         CliOptionArgType.SINGLE,
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java
index 88d11046fa..e1e9d62e03 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java
@@ -122,10 +122,6 @@ public final class OptimizerCommandContext {
     return updateStatsJobOptions.updateMode();
   }
 
-  public String targetFileSizeBytes() {
-    return updateStatsJobOptions.targetFileSizeBytes();
-  }
-
   public String updaterOptions() {
     return updateStatsJobOptions.updaterOptions();
   }
@@ -145,14 +141,11 @@ public final class OptimizerCommandContext {
   /** Submit-update-stats-job specific command options. */
   public static final class UpdateStatsJobOptions {
     private final String updateMode;
-    private final String targetFileSizeBytes;
     private final String updaterOptions;
     private final String sparkConf;
 
-    public UpdateStatsJobOptions(
-        String updateMode, String targetFileSizeBytes, String updaterOptions, 
String sparkConf) {
+    public UpdateStatsJobOptions(String updateMode, String updaterOptions, 
String sparkConf) {
       this.updateMode = updateMode;
-      this.targetFileSizeBytes = targetFileSizeBytes;
       this.updaterOptions = updaterOptions;
       this.sparkConf = sparkConf;
     }
@@ -161,10 +154,6 @@ public final class OptimizerCommandContext {
       return updateMode;
     }
 
-    public String targetFileSizeBytes() {
-      return targetFileSizeBytes;
-    }
-
     public String updaterOptions() {
       return updaterOptions;
     }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java
index dd0bc3cc8f..7d3f0df312 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java
@@ -19,7 +19,6 @@
 
 package org.apache.gravitino.maintenance.optimizer.command;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
@@ -33,6 +32,7 @@ import org.apache.gravitino.client.GravitinoClient;
 import org.apache.gravitino.job.JobHandle;
 import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
 import 
org.apache.gravitino.maintenance.optimizer.common.util.GravitinoClientUtils;
+import 
org.apache.gravitino.maintenance.optimizer.common.util.IcebergSparkConfigUtils;
 
 /**
  * Handles CLI command {@code submit-update-stats-job} for submitting built-in 
Iceberg update stats
@@ -42,7 +42,6 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
 
   private static final String JOB_TEMPLATE_NAME = 
"builtin-iceberg-update-stats";
   private static final String DEFAULT_UPDATE_MODE = "stats";
-  private static final long DEFAULT_TARGET_FILE_SIZE_BYTES = 128L * 1024 * 
1024;
   private static final String OPTION_UPDATER_OPTIONS = "updater-options";
   private static final String OPTION_SPARK_CONF = "spark-conf";
 
@@ -60,10 +59,6 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
     String updateMode =
         parseUpdateMode(
             resolveScalarOption(context.updateMode(), 
submitterConfigs.get("update_mode")));
-    long targetFileSizeBytes =
-        parseTargetFileSize(
-            resolveScalarOption(
-                context.targetFileSizeBytes(), 
submitterConfigs.get("target_file_size_bytes")));
 
     String updaterOptionsJson =
         resolveJsonOption(context.updaterOptions(), 
submitterConfigs.get("updater_options"));
@@ -71,8 +66,9 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
         resolveJsonOption(context.sparkConf(), 
submitterConfigs.get("spark_conf"));
 
     Map<String, String> updaterOptions =
-        parseFlatJsonMap(updaterOptionsJson, OPTION_UPDATER_OPTIONS);
-    Map<String, String> sparkConfigs = parseFlatJsonMap(sparkConfJson, 
OPTION_SPARK_CONF);
+        IcebergSparkConfigUtils.parseFlatJsonMap(updaterOptionsJson, 
OPTION_UPDATER_OPTIONS);
+    Map<String, String> sparkConfigs =
+        IcebergSparkConfigUtils.parseFlatJsonMap(sparkConfJson, 
OPTION_SPARK_CONF);
 
     validateUpdaterOptions(updateMode, updaterOptions);
     validateSparkConfigs(tableTargets, sparkConfigs);
@@ -80,8 +76,7 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
     if (context.dryRun()) {
       for (TableTarget tableTarget : tableTargets) {
         Map<String, String> jobConfig =
-            buildJobConfig(
-                tableTarget, updateMode, targetFileSizeBytes, updaterOptions, 
sparkConfigs);
+            buildJobConfig(tableTarget, updateMode, updaterOptions, 
sparkConfigs);
         context
             .output()
             .printf(
@@ -98,8 +93,7 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
       int submitted = 0;
       for (TableTarget tableTarget : tableTargets) {
         Map<String, String> jobConfig =
-            buildJobConfig(
-                tableTarget, updateMode, targetFileSizeBytes, updaterOptions, 
sparkConfigs);
+            buildJobConfig(tableTarget, updateMode, updaterOptions, 
sparkConfigs);
         JobHandle jobHandle = client.runJob(JOB_TEMPLATE_NAME, jobConfig);
         submitted++;
         context
@@ -119,14 +113,12 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
   private static Map<String, String> buildJobConfig(
       TableTarget tableTarget,
       String updateMode,
-      long targetFileSizeBytes,
       Map<String, String> updaterOptions,
       Map<String, String> sparkConfigs) {
     Map<String, String> jobConfig = new LinkedHashMap<>();
     jobConfig.put("catalog_name", tableTarget.catalogName);
     jobConfig.put("table_identifier", tableTarget.schemaAndTable);
     jobConfig.put("update_mode", updateMode);
-    jobConfig.put("target_file_size_bytes", 
Long.toString(targetFileSizeBytes));
     jobConfig.put("updater_options", toCanonicalJson(updaterOptions));
     jobConfig.put("spark_conf", toCanonicalJson(sparkConfigs));
     return jobConfig;
@@ -156,13 +148,6 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
     return normalized;
   }
 
-  private static long parseTargetFileSize(String value) {
-    if (StringUtils.isBlank(value)) {
-      return DEFAULT_TARGET_FILE_SIZE_BYTES;
-    }
-    return OptimizerCommandUtils.parseLongOption("target-file-size-bytes", 
value.trim(), false);
-  }
-
   private static List<TableTarget> parseTableTargets(String[] identifiers, 
String defaultCatalog) {
     Preconditions.checkArgument(
         identifiers != null && identifiers.length > 0,
@@ -234,39 +219,10 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
         !sparkConfigs.isEmpty(),
         "Missing spark config. Set --spark-conf or "
             + "gravitino.optimizer.jobSubmitterConfig.spark_conf in the config 
file");
-    for (TableTarget tableTarget : tableTargets) {
-      String requiredKey = "spark.sql.catalog." + tableTarget.catalogName;
-      Preconditions.checkArgument(
-          StringUtils.isNotBlank(sparkConfigs.get(requiredKey)),
-          "Spark config must contain key '%s' for identifier '%s'",
-          requiredKey,
-          tableTarget.fullIdentifier);
-    }
-  }
 
-  private static Map<String, String> parseFlatJsonMap(String json, String 
optionName) {
-    if (StringUtils.isBlank(json)) {
-      return Map.of();
-    }
-    try {
-      Map<String, Object> parsedMap =
-          MAPPER.readValue(json, new TypeReference<Map<String, Object>>() {});
-      Map<String, String> result = new LinkedHashMap<>();
-      for (Map.Entry<String, Object> entry : parsedMap.entrySet()) {
-        Object value = entry.getValue();
-        Preconditions.checkArgument(
-            !(value instanceof Map || value instanceof List),
-            "Option --%s must be a flat key-value JSON map, but key '%s' has 
non-scalar value",
-            optionName,
-            entry.getKey());
-        result.put(entry.getKey(), value == null ? "" : value.toString());
-      }
-      return result;
-    } catch (Exception e) {
-      throw new IllegalArgumentException(
-          String.format(
-              Locale.ROOT, "Option --%s is not valid JSON: %s", optionName, 
e.getMessage()),
-          e);
+    for (TableTarget tableTarget : tableTargets) {
+      IcebergSparkConfigUtils.validateSparkConfigsForCatalog(
+          sparkConfigs, tableTarget.catalogName, tableTarget.fullIdentifier);
     }
   }
 
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java
index b450ad9916..3c89722216 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java
@@ -437,7 +437,9 @@ class TestOptimizerCmd {
             "--spark-conf",
             
"{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\","
                 + "\"spark.sql.catalog.rest.type\":\"rest\","
-                + 
"\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\"}";,
+                + 
"\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\",";
+                + "\"spark.sql.catalog.rest.warehouse\":\"/tmp/warehouse\","
+                + 
"\"spark.sql.extensions\":\"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\"}",
             "--dry-run",
             "--conf-path",
             confPath.toString());
@@ -446,6 +448,37 @@ class TestOptimizerCmd {
     Assertions.assertTrue(output[0].contains("spark.sql.catalog.rest"));
   }
 
+  @Test
+  void testSubmitUpdateStatsJobRequiresSparkSqlExtensions() throws Exception {
+    Path confPath = createOptimizerConfForSubmitUpdateStatsWithoutExtensions();
+    String[] output =
+        runCommand(
+            "--type",
+            "submit-update-stats-job",
+            "--identifiers",
+            "rest.ab.t1",
+            "--dry-run",
+            "--conf-path",
+            confPath.toString());
+    Assertions.assertTrue(output[1].contains("spark.sql.extensions"));
+  }
+
+  @Test
+  void testSubmitUpdateStatsJobRequiresCatalogUriForRestCatalog() throws 
Exception {
+    Path confPath = createOptimizerConfForSubmitUpdateStatsWithoutCatalogUri();
+    String[] output =
+        runCommand(
+            "--type",
+            "submit-update-stats-job",
+            "--identifiers",
+            "rest.ab.t1",
+            "--dry-run",
+            "--conf-path",
+            confPath.toString());
+    Assertions.assertTrue(output[1].contains("spark.sql.catalog.rest.uri"));
+    Assertions.assertTrue(output[1].contains("catalog type is 'rest'"));
+  }
+
   private Path createOptimizerConfForMetricsProvider() throws Exception {
     Path confPath = Files.createTempFile("optimizer-test-", ".conf");
     // Route command reads to deterministic in-memory fixtures from 
MetricsProviderForTest.
@@ -526,13 +559,15 @@ class TestOptimizerCmd {
                 "gravitino.optimizer.gravitinoMetalake = test",
                 "gravitino.optimizer.gravitinoDefaultCatalog = rest",
                 "gravitino.optimizer.jobSubmitterConfig.update_mode = stats",
-                "gravitino.optimizer.jobSubmitterConfig.target_file_size_bytes 
= 134217728",
                 "gravitino.optimizer.jobSubmitterConfig.updater_options = "
                     + 
"{\"gravitino_uri\":\"http://localhost:8090\",\"metalake\":\"test\"}";,
                 "gravitino.optimizer.jobSubmitterConfig.spark_conf = "
                     + 
"{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\","
                     + "\"spark.sql.catalog.rest.type\":\"rest\","
-                    + 
"\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\"}";)
+                    + 
"\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\",";
+                    + 
"\"spark.sql.catalog.rest.warehouse\":\"/tmp/warehouse\","
+                    + "\"spark.sql.extensions\":"
+                    + 
"\"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\"}")
             + System.lineSeparator();
     Files.writeString(confPath, content, StandardCharsets.UTF_8);
     confPath.toFile().deleteOnExit();
@@ -556,6 +591,52 @@ class TestOptimizerCmd {
     return confPath;
   }
 
+  private Path createOptimizerConfForSubmitUpdateStatsWithoutExtensions() 
throws Exception {
+    Path confPath =
+        
Files.createTempFile("optimizer-submit-update-stats-without-extensions-", 
".conf");
+    String content =
+        String.join(
+                System.lineSeparator(),
+                "gravitino.optimizer.gravitinoUri = http://localhost:8090";,
+                "gravitino.optimizer.gravitinoMetalake = test",
+                "gravitino.optimizer.gravitinoDefaultCatalog = rest",
+                "gravitino.optimizer.jobSubmitterConfig.update_mode = stats",
+                "gravitino.optimizer.jobSubmitterConfig.updater_options = "
+                    + 
"{\"gravitino_uri\":\"http://localhost:8090\",\"metalake\":\"test\"}";,
+                "gravitino.optimizer.jobSubmitterConfig.spark_conf = "
+                    + 
"{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\","
+                    + "\"spark.sql.catalog.rest.type\":\"rest\","
+                    + 
"\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\",";
+                    + 
"\"spark.sql.catalog.rest.warehouse\":\"/tmp/warehouse\"}")
+            + System.lineSeparator();
+    Files.writeString(confPath, content, StandardCharsets.UTF_8);
+    confPath.toFile().deleteOnExit();
+    return confPath;
+  }
+
+  private Path createOptimizerConfForSubmitUpdateStatsWithoutCatalogUri() 
throws Exception {
+    Path confPath = 
Files.createTempFile("optimizer-submit-update-stats-without-uri-", ".conf");
+    String content =
+        String.join(
+                System.lineSeparator(),
+                "gravitino.optimizer.gravitinoUri = http://localhost:8090";,
+                "gravitino.optimizer.gravitinoMetalake = test",
+                "gravitino.optimizer.gravitinoDefaultCatalog = rest",
+                "gravitino.optimizer.jobSubmitterConfig.update_mode = stats",
+                "gravitino.optimizer.jobSubmitterConfig.updater_options = "
+                    + 
"{\"gravitino_uri\":\"http://localhost:8090\",\"metalake\":\"test\"}";,
+                "gravitino.optimizer.jobSubmitterConfig.spark_conf = "
+                    + 
"{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\","
+                    + "\"spark.sql.catalog.rest.type\":\"rest\","
+                    + 
"\"spark.sql.catalog.rest.warehouse\":\"/tmp/warehouse\","
+                    + "\"spark.sql.extensions\":"
+                    + 
"\"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\"}")
+            + System.lineSeparator();
+    Files.writeString(confPath, content, StandardCharsets.UTF_8);
+    confPath.toFile().deleteOnExit();
+    return confPath;
+  }
+
   private String[] runCommand(String... args) {
     ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
     ByteArrayOutputStream errBuffer = new ByteArrayOutputStream();
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java
index b2b8fbb4e4..8f9a6b2f38 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java
@@ -115,7 +115,6 @@ public class TestBuiltinIcebergUpdateStatsJob {
     Map<String, String> jobConf = new HashMap<>();
     jobConf.put("table_identifier", "db." + tableName);
     jobConf.put("update_mode", "all");
-    jobConf.put("target_file_size_bytes", "100000");
     jobConf.put(
         "updater_options",
         "{\"gravitino_uri\":\""

Reply via email to