This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch stats_job
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 3191bc438a46b8d08c9fbefc1960b96bd5ccd900
Author: fanng <[email protected]>
AuthorDate: Sat Feb 28 20:28:55 2026 +0800

    support stats_job
---
 .../build.gradle.kts}                              |  48 +-
 .../updater/metrics/GravitinoMetricsUpdater.java   |   0
 .../updater/metrics/storage/MetricsRepository.java |   0
 .../metrics/storage/MetricsStorageException.java   |   0
 .../jdbc/DataSourceJdbcConnectionProvider.java     |   0
 .../storage/jdbc/GenericJdbcMetricsRepository.java |   0
 .../metrics/storage/jdbc/H2MetricsDialect.java     |   0
 .../metrics/storage/jdbc/JdbcConnectionConfig.java |   0
 .../metrics/storage/jdbc/JdbcMetricsDialect.java   |   0
 .../storage/jdbc/JdbcMetricsRepository.java        |  21 +-
 .../metrics/storage/jdbc/MySQLMetricsDialect.java  |   0
 .../storage/jdbc/PostgreSQLMetricsDialect.java     |   0
 .../statistics/GravitinoStatisticsUpdater.java     |   4 +-
 ...itino.maintenance.optimizer.api.common.Provider |   9 -
 maintenance/jobs/build.gradle.kts                  |   9 +
 .../jobs/BuiltInJobTemplateProvider.java           |   4 +-
 .../jobs/iceberg/IcebergUpdateStatsJob.java        | 542 +++++++++++++++++++++
 .../jobs/iceberg/TestIcebergUpdateStatsJob.java    | 126 +++++
 .../TestIcebergUpdateStatsJobWithSpark.java        | 275 +++++++++++
 .../build.gradle.kts}                              |  31 +-
 .../optimizer/api/common/DataScope.java            |   0
 .../optimizer/api/common/MetricPoint.java          |   0
 .../optimizer/api/common/MetricSample.java         |   0
 .../optimizer/api/common/PartitionEntry.java       |   0
 .../optimizer/api/common/PartitionPath.java        |   0
 .../optimizer/api/common/PartitionStrategy.java    |   0
 .../maintenance/optimizer/api/common/Provider.java |   0
 .../optimizer/api/common/StatisticEntry.java       |   0
 .../maintenance/optimizer/api/common/Strategy.java |   0
 .../api/common/TableAndPartitionStatistics.java    |   0
 .../optimizer/api/monitor/EvaluationResult.java    |   0
 .../optimizer/api/monitor/MetricsEvaluator.java    |   0
 .../optimizer/api/monitor/MetricsProvider.java     |   0
 .../optimizer/api/monitor/MonitorCallback.java     |   0
 .../api/monitor/TableJobRelationProvider.java      |   0
 .../api/recommender/JobExecutionContext.java       |   0
 .../optimizer/api/recommender/JobSubmitter.java    |   0
 .../api/recommender/StatisticsProvider.java        |   0
 .../api/recommender/StrategyEvaluation.java        |   0
 .../optimizer/api/recommender/StrategyHandler.java |   0
 .../api/recommender/StrategyHandlerContext.java    |   0
 .../api/recommender/StrategyProvider.java          |   0
 .../api/recommender/SupportTableStatistics.java    |   0
 .../api/recommender/TableMetadataProvider.java     |   0
 .../optimizer/api/updater/MetricsUpdater.java      |   0
 .../api/updater/StatisticsCalculator.java          |   0
 .../optimizer/api/updater/StatisticsUpdater.java   |   0
 .../updater/SupportsCalculateBulkJobMetrics.java   |   0
 .../SupportsCalculateBulkJobStatistics.java        |   0
 .../updater/SupportsCalculateBulkTableMetrics.java |   0
 .../SupportsCalculateBulkTableStatistics.java      |   0
 .../api/updater/SupportsCalculateJobMetrics.java   |   0
 .../updater/SupportsCalculateJobStatistics.java    |   0
 .../api/updater/SupportsCalculateTableMetrics.java |   0
 .../updater/SupportsCalculateTableStatistics.java  |   0
 .../maintenance/optimizer/common/OptimizerEnv.java |   0
 .../optimizer/common/PartitionEntryImpl.java       |   0
 .../optimizer/common/StatisticEntryImpl.java       |   0
 .../optimizer/common/conf/OptimizerConfig.java     |  45 +-
 .../common/util/GravitinoClientUtils.java          |   0
 .../optimizer/common/util/IdentifierUtils.java     |   0
 .../common/util/PartitionPathSerdeUtils.java}      |  53 +-
 .../optimizer/common/util/ProviderUtils.java       |   0
 .../common/util/StatisticValueSerdeUtils.java      |  50 ++
 .../common/util/TestPartitionPathSerdeUtils.java   |  51 ++
 maintenance/optimizer/build.gradle.kts             |   2 +
 .../optimizer/recommender/util/PartitionUtils.java |  59 +--
 ...itino.maintenance.optimizer.api.common.Provider |   2 -
 .../job/TestBuiltinIcebergRewriteDataFiles.java    |  54 +-
 .../job/TestBuiltinIcebergUpdateStatsJob.java      | 180 +++++++
 settings.gradle.kts                                |   7 +-
 71 files changed, 1404 insertions(+), 168 deletions(-)

diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/StatisticEntry.java
 b/maintenance/gravitino-updaters/build.gradle.kts
similarity index 51%
copy from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/StatisticEntry.java
copy to maintenance/gravitino-updaters/build.gradle.kts
index 9c2443db50..9da6abb540 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/StatisticEntry.java
+++ b/maintenance/gravitino-updaters/build.gradle.kts
@@ -16,31 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+description = "Gravitino Built-in Updaters"
 
-package org.apache.gravitino.maintenance.optimizer.api.common;
+plugins {
+  `maven-publish`
+  id("java")
+  id("idea")
+}
 
-import org.apache.gravitino.annotation.DeveloperApi;
-import org.apache.gravitino.stats.StatisticValue;
+dependencies {
+  implementation(project(":api"))
+  implementation(project(":common"))
+  implementation(project(":clients:client-java"))
+  implementation(project(":maintenance:optimizer-api"))
+  implementation(libs.guava)
+  implementation(libs.commons.lang3)
+  implementation(libs.commons.dbcp2)
+  implementation(libs.h2db)
+  implementation(libs.slf4j.api)
+  implementation(libs.jackson.databind)
 
-/**
- * Named statistic value produced or consumed by optimizer components.
- *
- * @param <T> underlying Java type wrapped by the {@link StatisticValue}
- */
-@DeveloperApi
-public interface StatisticEntry<T> {
+  annotationProcessor(libs.lombok)
+  compileOnly(libs.lombok)
+  testAnnotationProcessor(libs.lombok)
+  testCompileOnly(libs.lombok)
 
-  /**
-   * Stable metric key used for lookup and reporting.
-   *
-   * @return non-null metric name
-   */
-  String name();
+  testImplementation(libs.junit.jupiter.api)
+  testRuntimeOnly(libs.junit.jupiter.engine)
+}
 
-  /**
-   * Typed statistic value wrapper.
-   *
-   * @return statistic value container
-   */
-  StatisticValue<T> value();
+tasks.test {
+  useJUnitPlatform()
 }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
rename to 
maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsRepository.java
 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsRepository.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsRepository.java
rename to 
maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsRepository.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsStorageException.java
 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsStorageException.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsStorageException.java
rename to 
maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsStorageException.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/DataSourceJdbcConnectionProvider.java
 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/DataSourceJdbcConnectionProvider.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/DataSourceJdbcConnectionProvider.java
rename to 
maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/DataSourceJdbcConnectionProvider.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/GenericJdbcMetricsRepository.java
 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/GenericJdbcMetricsRepository.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/GenericJdbcMetricsRepository.java
rename to 
maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/GenericJdbcMetricsRepository.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/H2MetricsDialect.java
 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/H2MetricsDialect.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/H2MetricsDialect.java
rename to 
maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/H2MetricsDialect.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcConnectionConfig.java
 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcConnectionConfig.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcConnectionConfig.java
rename to 
maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcConnectionConfig.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsDialect.java
 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsDialect.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsDialect.java
rename to 
maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsDialect.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java
 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java
similarity index 96%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java
rename to 
maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java
index 406d9b4372..e138e18fd7 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java
+++ 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java
@@ -41,8 +41,8 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.config.ConfigConstants;
 import org.apache.gravitino.maintenance.optimizer.api.common.DataScope;
 import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
-import 
org.apache.gravitino.maintenance.optimizer.common.util.StatisticValueUtils;
-import 
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import 
org.apache.gravitino.maintenance.optimizer.common.util.PartitionPathSerdeUtils;
+import 
org.apache.gravitino.maintenance.optimizer.common.util.StatisticValueSerdeUtils;
 import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.MetricsRepository;
 import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.MetricsStorageException;
 import org.apache.gravitino.utils.jdbc.JdbcSqlScriptUtils;
@@ -247,13 +247,13 @@ public abstract class JdbcMetricsRepository implements 
MetricsRepository {
                 || metricPoint.scope() == DataScope.Type.PARTITION,
             "Unsupported scope %s for table/partition metrics",
             metricPoint.scope());
-        String serializedMetricValue = 
StatisticValueUtils.toString(metricPoint.value());
+        String serializedMetricValue = 
StatisticValueSerdeUtils.toString(metricPoint.value());
         validateWriteArguments(metricPoint, serializedMetricValue);
 
         String normalizedIdentifier = 
normalizeIdentifier(metricPoint.identifier());
         String normalizedMetricName = 
normalizeMetricName(metricPoint.metricName());
         String normalizedPartition =
-            
metricPoint.partitionPath().map(PartitionUtils::encodePartitionPath).orElse(null);
+            
metricPoint.partitionPath().map(PartitionPathSerdeUtils::encode).orElse(null);
         tableInsertStmt.setString(1, normalizedIdentifier);
         tableInsertStmt.setString(2, normalizedMetricName);
         tableInsertStmt.setString(3, 
normalizePartition(normalizedPartition).orElse(null));
@@ -295,7 +295,7 @@ public abstract class JdbcMetricsRepository implements 
MetricsRepository {
             metricPoint.scope() == DataScope.Type.JOB,
             "Unsupported scope %s for job metrics",
             metricPoint.scope());
-        String serializedMetricValue = 
StatisticValueUtils.toString(metricPoint.value());
+        String serializedMetricValue = 
StatisticValueSerdeUtils.toString(metricPoint.value());
         validateWriteArguments(metricPoint, serializedMetricValue);
 
         String normalizedIdentifier = 
normalizeIdentifier(metricPoint.identifier());
@@ -439,8 +439,7 @@ public abstract class JdbcMetricsRepository implements 
MetricsRepository {
         MAX_METRIC_VALUE_LENGTH,
         serializedMetricValue.length());
     if (metricPoint.partitionPath().isPresent()) {
-      String encodedPartition =
-          
PartitionUtils.encodePartitionPath(metricPoint.partitionPath().get());
+      String encodedPartition = 
PartitionPathSerdeUtils.encode(metricPoint.partitionPath().get());
       Preconditions.checkArgument(
           StringUtils.isNotBlank(encodedPartition), "partition must not be 
blank");
     }
@@ -466,7 +465,7 @@ public abstract class JdbcMetricsRepository implements 
MetricsRepository {
               MetricPoint.forTable(
                   nameIdentifier,
                   rs.getString("metric_name"),
-                  StatisticValueUtils.fromString(rs.getString("metric_value")),
+                  
StatisticValueSerdeUtils.fromString(rs.getString("metric_value")),
                   rs.getLong("metric_ts")));
         }
       }
@@ -487,7 +486,7 @@ public abstract class JdbcMetricsRepository implements 
MetricsRepository {
     Preconditions.checkArgument(
         scope.partition().isPresent(), "partition scope must contain partition 
path");
     NameIdentifier nameIdentifier = scope.identifier();
-    String partition = 
PartitionUtils.encodePartitionPath(scope.partition().get());
+    String partition = PartitionPathSerdeUtils.encode(scope.partition().get());
 
     List<MetricPoint> result = new ArrayList<>();
     String sql =
@@ -509,7 +508,7 @@ public abstract class JdbcMetricsRepository implements 
MetricsRepository {
                   nameIdentifier,
                   scope.partition().get(),
                   rs.getString("metric_name"),
-                  StatisticValueUtils.fromString(rs.getString("metric_value")),
+                  
StatisticValueSerdeUtils.fromString(rs.getString("metric_value")),
                   rs.getLong("metric_ts")));
         }
       }
@@ -547,7 +546,7 @@ public abstract class JdbcMetricsRepository implements 
MetricsRepository {
               MetricPoint.forJob(
                   nameIdentifier,
                   rs.getString("metric_name"),
-                  StatisticValueUtils.fromString(rs.getString("metric_value")),
+                  
StatisticValueSerdeUtils.fromString(rs.getString("metric_value")),
                   rs.getLong("metric_ts")));
         }
       }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/MySQLMetricsDialect.java
 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/MySQLMetricsDialect.java
similarity index 100%
copy from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/MySQLMetricsDialect.java
copy to 
maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/MySQLMetricsDialect.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/PostgreSQLMetricsDialect.java
 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/PostgreSQLMetricsDialect.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/PostgreSQLMetricsDialect.java
rename to 
maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/PostgreSQLMetricsDialect.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
similarity index 97%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
rename to 
maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
index 3b6f4e45c0..a3635cf2c7 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
+++ 
b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
@@ -33,7 +33,7 @@ import 
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater;
 import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
 import 
org.apache.gravitino.maintenance.optimizer.common.util.GravitinoClientUtils;
 import org.apache.gravitino.maintenance.optimizer.common.util.IdentifierUtils;
-import 
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import 
org.apache.gravitino.maintenance.optimizer.common.util.PartitionPathSerdeUtils;
 import org.apache.gravitino.stats.PartitionStatisticsUpdate;
 import org.apache.gravitino.stats.StatisticValue;
 
@@ -104,7 +104,7 @@ public class GravitinoStatisticsUpdater implements 
StatisticsUpdater {
               return new PartitionStatisticsUpdate() {
                 @Override
                 public String partitionName() {
-                  return PartitionUtils.encodePartitionPath(entry.getKey());
+                  return PartitionPathSerdeUtils.encode(entry.getKey());
                 }
 
                 @Override
diff --git 
a/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
 
b/maintenance/gravitino-updaters/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
similarity index 55%
copy from 
maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
copy to 
maintenance/gravitino-updaters/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
index c059ab3483..ca56d73c1e 100644
--- 
a/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
+++ 
b/maintenance/gravitino-updaters/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
@@ -16,14 +16,5 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategyProvider
-org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider
-org.apache.gravitino.maintenance.optimizer.recommender.table.GravitinoTableMetadataProvider
-org.apache.gravitino.maintenance.optimizer.recommender.job.GravitinoJobSubmitter
-org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter
 
org.apache.gravitino.maintenance.optimizer.updater.statistics.GravitinoStatisticsUpdater
 
org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater
-org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider
-org.apache.gravitino.maintenance.optimizer.monitor.job.dummy.DummyTableJobRelationProvider
-org.apache.gravitino.maintenance.optimizer.monitor.job.local.LocalTableJobRelationProvider
-org.apache.gravitino.maintenance.optimizer.monitor.callback.ConsoleMonitorCallback
diff --git a/maintenance/jobs/build.gradle.kts 
b/maintenance/jobs/build.gradle.kts
index 1cec173984..e88f3ebbef 100644
--- a/maintenance/jobs/build.gradle.kts
+++ b/maintenance/jobs/build.gradle.kts
@@ -25,6 +25,11 @@ plugins {
   alias(libs.plugins.shadow)
 }
 
+java {
+  sourceCompatibility = JavaVersion.VERSION_17
+  targetCompatibility = JavaVersion.VERSION_17
+}
+
 repositories {
   mavenCentral()
 }
@@ -36,6 +41,8 @@ val sparkMajorVersion = "3.5"
 
 dependencies {
   implementation(project(":api"))
+  implementation(project(":maintenance:optimizer-api"))
+  implementation(project(":maintenance:gravitino-updaters"))
 
   compileOnly(libs.slf4j.api)
   compileOnly(libs.jackson.databind)
@@ -51,6 +58,8 @@ dependencies {
   }
 
   testImplementation(project(":api"))
+  testImplementation(project(":common"))
+  testImplementation(project(":clients:client-java"))
   testImplementation(libs.bundles.log4j)
   testImplementation(libs.hadoop3.common) {
     exclude("org.slf4j")
diff --git 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java
 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java
index 8620c2330c..1e93145446 100644
--- 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java
+++ 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java
@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
 import org.apache.gravitino.job.JobTemplate;
 import org.apache.gravitino.job.JobTemplateProvider;
 import 
org.apache.gravitino.maintenance.jobs.iceberg.IcebergRewriteDataFilesJob;
+import org.apache.gravitino.maintenance.jobs.iceberg.IcebergUpdateStatsJob;
 import org.apache.gravitino.maintenance.jobs.spark.SparkPiJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +42,8 @@ public class BuiltInJobTemplateProvider implements 
JobTemplateProvider {
       Pattern.compile(JobTemplateProvider.VERSION_VALUE_PATTERN);
 
   private static final List<BuiltInJob> BUILT_IN_JOBS =
-      ImmutableList.of(new SparkPiJob(), new IcebergRewriteDataFilesJob());
+      ImmutableList.of(
+          new SparkPiJob(), new IcebergRewriteDataFilesJob(), new 
IcebergUpdateStatsJob());
 
   @Override
   public List<? extends JobTemplate> jobTemplates() {
diff --git 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java
 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java
new file mode 100644
index 0000000000..e2689825c3
--- /dev/null
+++ 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.maintenance.jobs.BuiltInJob;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.api.updater.MetricsUpdater;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import org.apache.gravitino.maintenance.optimizer.common.util.ProviderUtils;
+import org.apache.gravitino.stats.StatisticValues;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Built-in job for computing Iceberg table file statistics and persisting 
them to Gravitino. */
+public class IcebergUpdateStatsJob implements BuiltInJob {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(IcebergUpdateStatsJob.class);
+
+  private static final String NAME =
+      JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-update-stats";
+  private static final String VERSION = "v1";
+  private static final String DEFAULT_STATISTICS_UPDATER = 
"gravitino-statistics-updater";
+  private static final String DEFAULT_METRICS_UPDATER = 
"gravitino-metrics-updater";
+  private static final long DEFAULT_TARGET_FILE_SIZE_BYTES = 100_000L;
+  private static final String CUSTOM_STAT_PREFIX = "custom-";
+
+  @Override
+  public SparkJobTemplate jobTemplate() {
+    return SparkJobTemplate.builder()
+        .withName(NAME)
+        .withComment(
+            "Built-in Iceberg update stats job template for computing datafile 
MSE and file metrics")
+        .withExecutable(resolveExecutable(IcebergUpdateStatsJob.class))
+        .withClassName(IcebergUpdateStatsJob.class.getName())
+        .withArguments(buildArguments())
+        .withConfigs(buildSparkConfigs())
+        .withCustomFields(
+            Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, 
VERSION))
+        .build();
+  }
+
+  /** Main entry point. */
+  public static void main(String[] args) {
+    Map<String, String> argMap = parseArguments(args);
+    String catalogName = argMap.get("catalog");
+    String tableIdentifier = argMap.get("table");
+    String gravitinoUri = argMap.get("gravitino-uri");
+    String metalake = argMap.get("metalake");
+
+    if (catalogName == null
+        || tableIdentifier == null
+        || gravitinoUri == null
+        || metalake == null) {
+      System.err.println(
+          "Error: --catalog, --table, --gravitino-uri and --metalake are 
required arguments");
+      printUsage();
+      System.exit(1);
+    }
+
+    String updaterName =
+        argMap.getOrDefault("statistics-updater", 
DEFAULT_STATISTICS_UPDATER).trim();
+    boolean enableMetrics = parseEnableMetrics(argMap.get("enable-metrics"));
+    String metricsUpdaterName =
+        argMap.getOrDefault("metrics-updater", DEFAULT_METRICS_UPDATER).trim();
+    long targetFileSizeBytes = 
parseTargetFileSize(argMap.get("target-file-size-bytes"));
+    String sparkConfJson = argMap.get("spark-conf");
+
+    SparkSession.Builder sparkBuilder =
+        SparkSession.builder().appName("Gravitino Built-in Iceberg Update 
Stats");
+
+    if (sparkConfJson != null && !sparkConfJson.isEmpty()) {
+      Map<String, String> customConfigs = 
parseCustomSparkConfigs(sparkConfJson);
+      for (Map.Entry<String, String> entry : customConfigs.entrySet()) {
+        sparkBuilder.config(entry.getKey(), entry.getValue());
+      }
+    }
+
+    SparkSession spark = sparkBuilder.getOrCreate();
+    StatisticsUpdater statisticsUpdater = null;
+    MetricsUpdater metricsUpdater = null;
+    try {
+      statisticsUpdater = createStatisticsUpdater(updaterName, gravitinoUri, 
metalake);
+      if (enableMetrics) {
+        metricsUpdater = createMetricsUpdater(metricsUpdaterName, 
gravitinoUri, metalake);
+      }
+      updateStatistics(
+          spark,
+          statisticsUpdater,
+          metricsUpdater,
+          catalogName,
+          tableIdentifier,
+          targetFileSizeBytes);
+    } catch (Exception e) {
+      LOG.error("Failed to update Iceberg statistics", e);
+      System.exit(1);
+    } finally {
+      if (statisticsUpdater != null) {
+        try {
+          statisticsUpdater.close();
+        } catch (Exception e) {
+          LOG.warn("Failed to close statistics updater", e);
+        }
+      }
+      if (metricsUpdater != null) {
+        try {
+          metricsUpdater.close();
+        } catch (Exception e) {
+          LOG.warn("Failed to close metrics updater", e);
+        }
+      }
+      spark.stop();
+    }
+  }
+
+  static void updateStatistics(
+      SparkSession spark,
+      StatisticsUpdater statisticsUpdater,
+      String catalogName,
+      String tableIdentifier,
+      long targetFileSizeBytes) {
+    updateStatistics(
+        spark, statisticsUpdater, null, catalogName, tableIdentifier, 
targetFileSizeBytes);
+  }
+
+  static void updateStatistics(
+      SparkSession spark,
+      StatisticsUpdater statisticsUpdater,
+      MetricsUpdater metricsUpdater,
+      String catalogName,
+      String tableIdentifier,
+      long targetFileSizeBytes) {
+    NameIdentifier gravitinoTableIdentifier =
+        toGravitinoTableIdentifier(catalogName, tableIdentifier);
+    long metricTimestamp = System.currentTimeMillis() / 1000L;
+    boolean partitioned = isPartitionedTable(spark, catalogName, 
tableIdentifier);
+    if (partitioned) {
+      String sql = buildPartitionStatsSql(catalogName, tableIdentifier, 
targetFileSizeBytes);
+      Row[] rows = (Row[]) spark.sql(sql).collect();
+      Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics = new 
LinkedHashMap<>();
+      List<MetricPoint> tableAndPartitionMetrics = new ArrayList<>();
+      for (Row row : rows) {
+        PartitionPath partitionPath = toPartitionPath(row.getAs("partition"));
+        List<StatisticEntry<?>> statistics = toStatistics(row);
+        partitionStatistics.put(partitionPath, statistics);
+        if (metricsUpdater != null) {
+          tableAndPartitionMetrics.addAll(
+              toPartitionMetricPoints(
+                  gravitinoTableIdentifier, partitionPath, statistics, 
metricTimestamp));
+        }
+      }
+      statisticsUpdater.updatePartitionStatistics(gravitinoTableIdentifier, 
partitionStatistics);
+      if (metricsUpdater != null && !tableAndPartitionMetrics.isEmpty()) {
+        
metricsUpdater.updateTableAndPartitionMetrics(tableAndPartitionMetrics);
+      }
+      LOG.info(
+          "Updated partition statistics for {} partitions on {}",
+          partitionStatistics.size(),
+          gravitinoTableIdentifier);
+    } else {
+      String sql = buildTableStatsSql(catalogName, tableIdentifier, 
targetFileSizeBytes);
+      Row[] rows = (Row[]) spark.sql(sql).collect();
+      List<StatisticEntry<?>> tableStatistics =
+          rows.length == 0 ? List.of() : toStatistics(rows[0]);
+      statisticsUpdater.updateTableStatistics(gravitinoTableIdentifier, 
tableStatistics);
+      if (metricsUpdater != null && !tableStatistics.isEmpty()) {
+        metricsUpdater.updateTableAndPartitionMetrics(
+            toTableMetricPoints(gravitinoTableIdentifier, tableStatistics, 
metricTimestamp));
+      }
+      LOG.info(
+          "Updated table statistics with {} metrics on {}",
+          tableStatistics.size(),
+          gravitinoTableIdentifier);
+    }
+  }
+
+  static String buildTableStatsSql(
+      String catalogName, String tableIdentifier, long targetFileSizeBytes) {
+    String filesTable = buildFilesTableIdentifier(catalogName, 
tableIdentifier);
+    return "SELECT "
+        + "COUNT(*) AS file_count, "
+        + "SUM(CASE WHEN content = 0 THEN 1 ELSE 0 END) AS data_files, "
+        + "SUM(CASE WHEN content = 1 THEN 1 ELSE 0 END) AS 
position_delete_files, "
+        + "SUM(CASE WHEN content = 2 THEN 1 ELSE 0 END) AS 
equality_delete_files, "
+        + "SUM(CASE WHEN file_size_in_bytes < "
+        + targetFileSizeBytes
+        + " THEN 1 ELSE 0 END) AS small_files, "
+        + "AVG(POWER("
+        + targetFileSizeBytes
+        + " - LEAST("
+        + targetFileSizeBytes
+        + ", file_size_in_bytes), 2)) AS datafile_mse, "
+        + "AVG(file_size_in_bytes) AS avg_size, "
+        + "SUM(file_size_in_bytes) AS total_size "
+        + "FROM "
+        + filesTable;
+  }
+
+  static String buildPartitionStatsSql(
+      String catalogName, String tableIdentifier, long targetFileSizeBytes) {
+    String filesTable = buildFilesTableIdentifier(catalogName, 
tableIdentifier);
+    return "SELECT "
+        + "partition, "
+        + "COUNT(*) AS file_count, "
+        + "SUM(CASE WHEN content = 0 THEN 1 ELSE 0 END) AS data_files, "
+        + "SUM(CASE WHEN content = 1 THEN 1 ELSE 0 END) AS 
position_delete_files, "
+        + "SUM(CASE WHEN content = 2 THEN 1 ELSE 0 END) AS 
equality_delete_files, "
+        + "SUM(CASE WHEN file_size_in_bytes < "
+        + targetFileSizeBytes
+        + " THEN 1 ELSE 0 END) AS small_files, "
+        + "AVG(POWER("
+        + targetFileSizeBytes
+        + " - LEAST("
+        + targetFileSizeBytes
+        + ", file_size_in_bytes), 2)) AS datafile_mse, "
+        + "AVG(file_size_in_bytes) AS avg_size, "
+        + "SUM(file_size_in_bytes) AS total_size "
+        + "FROM "
+        + filesTable
+        + " GROUP BY partition";
+  }
+
+  static boolean isPartitionedTable(
+      SparkSession spark, String catalogName, String tableIdentifier) {
+    StructType filesSchema =
+        spark.table(buildFilesTableIdentifier(catalogName, 
tableIdentifier)).schema();
+    if (!Arrays.asList(filesSchema.fieldNames()).contains("partition")) {
+      return false;
+    }
+    StructField partitionField = filesSchema.apply("partition");
+    if (!(partitionField.dataType() instanceof StructType)) {
+      return false;
+    }
+    return ((StructType) partitionField.dataType()).fields().length > 0;
+  }
+
+  static List<StatisticEntry<?>> toStatistics(Row row) {
+    List<StatisticEntry<?>> statistics = new ArrayList<>();
+    statistics.add(
+        new StatisticEntryImpl<>(
+            CUSTOM_STAT_PREFIX + "file_count",
+            StatisticValues.longValue(toLongValue(row, "file_count"))));
+    statistics.add(
+        new StatisticEntryImpl<>(
+            CUSTOM_STAT_PREFIX + "data_files",
+            StatisticValues.longValue(toLongValue(row, "data_files"))));
+    statistics.add(
+        new StatisticEntryImpl<>(
+            CUSTOM_STAT_PREFIX + "position_delete_files",
+            StatisticValues.longValue(toLongValue(row, 
"position_delete_files"))));
+    statistics.add(
+        new StatisticEntryImpl<>(
+            CUSTOM_STAT_PREFIX + "equality_delete_files",
+            StatisticValues.longValue(toLongValue(row, 
"equality_delete_files"))));
+    statistics.add(
+        new StatisticEntryImpl<>(
+            CUSTOM_STAT_PREFIX + "small_files",
+            StatisticValues.longValue(toLongValue(row, "small_files"))));
+    statistics.add(
+        new StatisticEntryImpl<>(
+            CUSTOM_STAT_PREFIX + "datafile_mse",
+            StatisticValues.doubleValue(toDoubleValue(row, "datafile_mse"))));
+    statistics.add(
+        new StatisticEntryImpl<>(
+            CUSTOM_STAT_PREFIX + "avg_size",
+            StatisticValues.doubleValue(toDoubleValue(row, "avg_size"))));
+    statistics.add(
+        new StatisticEntryImpl<>(
+            CUSTOM_STAT_PREFIX + "total_size",
+            StatisticValues.longValue(toLongValue(row, "total_size"))));
+    return statistics;
+  }
+
+  static PartitionPath toPartitionPath(Row partitionRow) {
+    StructType partitionSchema = partitionRow.schema();
+    List<PartitionEntry> entries = new 
ArrayList<>(partitionSchema.fields().length);
+    for (int i = 0; i < partitionSchema.fields().length; i++) {
+      String name = partitionSchema.fields()[i].name();
+      Object value = partitionRow.get(i);
+      entries.add(new PartitionEntryImpl(name, String.valueOf(value)));
+    }
+    return PartitionPath.of(entries);
+  }
+
+  static Map<String, String> parseArguments(String[] args) {
+    Map<String, String> argMap = new HashMap<>();
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].startsWith("--")) {
+        String key = args[i].substring(2);
+        if (i + 1 < args.length && !args[i + 1].startsWith("--")) {
+          String value = args[i + 1];
+          if (value != null && !value.trim().isEmpty()) {
+            argMap.put(key, value);
+          }
+          i++;
+        }
+      }
+    }
+    return argMap;
+  }
+
+  static Map<String, String> parseCustomSparkConfigs(String sparkConfJson) {
+    if (sparkConfJson == null || sparkConfJson.isEmpty()) {
+      return new HashMap<>();
+    }
+    try {
+      ObjectMapper mapper = new ObjectMapper();
+      Map<String, Object> parsedMap =
+          mapper.readValue(sparkConfJson, new TypeReference<Map<String, 
Object>>() {});
+      Map<String, String> configs = new HashMap<>();
+      for (Map.Entry<String, Object> entry : parsedMap.entrySet()) {
+        configs.put(entry.getKey(), entry.getValue() == null ? "" : 
entry.getValue().toString());
+      }
+      return configs;
+    } catch (Exception e) {
+      throw new IllegalArgumentException(
+          "Failed to parse Spark configurations JSON: "
+              + sparkConfJson
+              + ". Error: "
+              + e.getMessage(),
+          e);
+    }
+  }
+
+  static long parseTargetFileSize(String value) {
+    if (value == null || value.trim().isEmpty()) {
+      return DEFAULT_TARGET_FILE_SIZE_BYTES;
+    }
+    try {
+      long parsed = Long.parseLong(value.trim());
+      if (parsed <= 0) {
+        throw new IllegalArgumentException("target-file-size-bytes must be > 
0");
+      }
+      return parsed;
+    } catch (NumberFormatException e) {
+      throw new IllegalArgumentException("Invalid target-file-size-bytes: " + 
value, e);
+    }
+  }
+
+  static boolean parseEnableMetrics(String value) {
+    if (value == null || value.trim().isEmpty()) {
+      return false;
+    }
+    if ("true".equalsIgnoreCase(value.trim())) {
+      return true;
+    }
+    if ("false".equalsIgnoreCase(value.trim())) {
+      return false;
+    }
+    throw new IllegalArgumentException("Invalid enable-metrics value: " + 
value);
+  }
+
+  private static StatisticsUpdater createStatisticsUpdater(
+      String updaterName, String gravitinoUri, String metalake) {
+    StatisticsUpdater statisticsUpdater =
+        ProviderUtils.createStatisticsUpdaterInstance(updaterName);
+    Map<String, String> conf = new HashMap<>();
+    conf.put(OptimizerConfig.GRAVITINO_URI, gravitinoUri);
+    conf.put(OptimizerConfig.GRAVITINO_METALAKE, metalake);
+    statisticsUpdater.initialize(new OptimizerEnv(new OptimizerConfig(conf)));
+    return statisticsUpdater;
+  }
+
+  private static MetricsUpdater createMetricsUpdater(
+      String updaterName, String gravitinoUri, String metalake) {
+    MetricsUpdater metricsUpdater = 
ProviderUtils.createMetricsUpdaterInstance(updaterName);
+    Map<String, String> conf = new HashMap<>();
+    conf.put(OptimizerConfig.GRAVITINO_URI, gravitinoUri);
+    conf.put(OptimizerConfig.GRAVITINO_METALAKE, metalake);
+    metricsUpdater.initialize(new OptimizerEnv(new OptimizerConfig(conf)));
+    return metricsUpdater;
+  }
+
+  private static List<MetricPoint> toTableMetricPoints(
+      NameIdentifier tableIdentifier,
+      List<StatisticEntry<?>> tableStatistics,
+      long metricTimestamp) {
+    List<MetricPoint> requests = new ArrayList<>(tableStatistics.size());
+    for (StatisticEntry<?> statistic : tableStatistics) {
+      requests.add(
+          MetricPoint.forTable(
+              tableIdentifier, statistic.name(), statistic.value(), 
metricTimestamp));
+    }
+    return requests;
+  }
+
+  private static List<MetricPoint> toPartitionMetricPoints(
+      NameIdentifier tableIdentifier,
+      PartitionPath partitionPath,
+      List<StatisticEntry<?>> statistics,
+      long metricTimestamp) {
+    List<MetricPoint> requests = new ArrayList<>(statistics.size());
+    for (StatisticEntry<?> statistic : statistics) {
+      requests.add(
+          MetricPoint.forPartition(
+              tableIdentifier,
+              partitionPath,
+              statistic.name(),
+              statistic.value(),
+              metricTimestamp));
+    }
+    return requests;
+  }
+
+  private static NameIdentifier toGravitinoTableIdentifier(
+      String catalogName, String tableIdentifier) {
+    String[] levels = tableIdentifier.split("\\.");
+    if (levels.length != 2) {
+      throw new IllegalArgumentException(
+          "--table must use schema.table format, but got: " + tableIdentifier);
+    }
+    return NameIdentifier.of(catalogName, levels[0], levels[1]);
+  }
+
+  private static String buildFilesTableIdentifier(String catalogName, String 
tableIdentifier) {
+    String[] levels = tableIdentifier.split("\\.");
+    if (levels.length != 2) {
+      throw new IllegalArgumentException(
+          "--table must use schema.table format, but got: " + tableIdentifier);
+    }
+    return escapeSqlIdentifier(catalogName)
+        + "."
+        + escapeSqlIdentifier(levels[0])
+        + "."
+        + escapeSqlIdentifier(levels[1])
+        + ".files";
+  }
+
+  private static String escapeSqlIdentifier(String identifier) {
+    return identifier.replace("`", "``");
+  }
+
+  private static long toLongValue(Row row, String fieldName) {
+    Number number = row.getAs(fieldName);
+    return number == null ? 0L : number.longValue();
+  }
+
+  private static double toDoubleValue(Row row, String fieldName) {
+    Number number = row.getAs(fieldName);
+    return number == null ? 0D : number.doubleValue();
+  }
+
+  private static List<String> buildArguments() {
+    return Arrays.asList(
+        "--catalog",
+        "{{catalog_name}}",
+        "--table",
+        "{{table_identifier}}",
+        "--gravitino-uri",
+        "{{gravitino_uri}}",
+        "--metalake",
+        "{{metalake}}",
+        "--target-file-size-bytes",
+        "{{target_file_size_bytes}}",
+        "--statistics-updater",
+        "{{statistics_updater}}",
+        "--enable-metrics",
+        "{{enable_metrics}}",
+        "--metrics-updater",
+        "{{metrics_updater}}",
+        "--spark-conf",
+        "{{spark_conf}}");
+  }
+
+  private static Map<String, String> buildSparkConfigs() {
+    Map<String, String> configs = new HashMap<>();
+    configs.put("spark.master", "{{spark_master}}");
+    configs.put("spark.executor.instances", "{{spark_executor_instances}}");
+    configs.put("spark.executor.cores", "{{spark_executor_cores}}");
+    configs.put("spark.executor.memory", "{{spark_executor_memory}}");
+    configs.put("spark.driver.memory", "{{spark_driver_memory}}");
+    configs.put("spark.sql.catalog.{{catalog_name}}", 
"org.apache.iceberg.spark.SparkCatalog");
+    configs.put("spark.sql.catalog.{{catalog_name}}.type", "{{catalog_type}}");
+    configs.put("spark.sql.catalog.{{catalog_name}}.uri", "{{catalog_uri}}");
+    configs.put("spark.sql.catalog.{{catalog_name}}.warehouse", 
"{{warehouse_location}}");
+    configs.put(
+        "spark.sql.extensions",
+        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");
+    return Collections.unmodifiableMap(configs);
+  }
+
+  private static void printUsage() {
+    System.err.println(
+        "Usage: IcebergUpdateStatsJob [OPTIONS]\n"
+            + "\n"
+            + "Required Options:\n"
+            + "  --catalog <name>                   Iceberg catalog name 
registered in Spark\n"
+            + "  --table <identifier>               Table name in schema.table 
format\n"
+            + "  --gravitino-uri <uri>              Gravitino server URI\n"
+            + "  --metalake <metalake_name>         Gravitino metalake name\n"
+            + "\n"
+            + "Optional Options:\n"
+            + "  --target-file-size-bytes <bytes>   Small-file threshold and 
MSE target\n"
+            + "                                     Default: 100000\n"
+            + "  --statistics-updater <name>        StatisticsUpdater provider 
name\n"
+            + "                                     Default: 
gravitino-statistics-updater\n"
+            + "  --enable-metrics <true|false>     Whether to persist metrics 
via MetricsUpdater\n"
+            + "                                     Default: false\n"
+            + "  --metrics-updater <name>           MetricsUpdater provider 
name\n"
+            + "                                     Default: 
gravitino-metrics-updater\n"
+            + "  --spark-conf <json>                JSON map of custom Spark 
configs\n"
+            + "                                     Example: 
'{\"spark.sql.shuffle.partitions\":\"200\"}'");
+  }
+}
diff --git 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java
 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java
new file mode 100644
index 0000000000..b035ed4b74
--- /dev/null
+++ 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.junit.jupiter.api.Test;
+
+public class TestIcebergUpdateStatsJob {
+
+  @Test
+  public void testJobTemplateHasCorrectNameAndVersion() {
+    IcebergUpdateStatsJob job = new IcebergUpdateStatsJob();
+    SparkJobTemplate template = job.jobTemplate();
+
+    assertNotNull(template);
+    assertEquals("builtin-iceberg-update-stats", template.name());
+    
assertTrue(template.name().matches(JobTemplateProvider.BUILTIN_NAME_PATTERN));
+    assertEquals("v1", 
template.customFields().get(JobTemplateProvider.PROPERTY_VERSION_KEY));
+  }
+
+  @Test
+  public void testJobTemplateArguments() {
+    IcebergUpdateStatsJob job = new IcebergUpdateStatsJob();
+    SparkJobTemplate template = job.jobTemplate();
+
+    assertNotNull(template.arguments());
+    assertEquals(18, template.arguments().size());
+    assertTrue(template.arguments().contains("--catalog"));
+    assertTrue(template.arguments().contains("{{catalog_name}}"));
+    assertTrue(template.arguments().contains("--table"));
+    assertTrue(template.arguments().contains("{{table_identifier}}"));
+    assertTrue(template.arguments().contains("--gravitino-uri"));
+    assertTrue(template.arguments().contains("{{gravitino_uri}}"));
+    assertTrue(template.arguments().contains("--metalake"));
+    assertTrue(template.arguments().contains("{{metalake}}"));
+    assertTrue(template.arguments().contains("--target-file-size-bytes"));
+    assertTrue(template.arguments().contains("{{target_file_size_bytes}}"));
+    assertTrue(template.arguments().contains("--statistics-updater"));
+    assertTrue(template.arguments().contains("{{statistics_updater}}"));
+    assertTrue(template.arguments().contains("--enable-metrics"));
+    assertTrue(template.arguments().contains("{{enable_metrics}}"));
+    assertTrue(template.arguments().contains("--metrics-updater"));
+    assertTrue(template.arguments().contains("{{metrics_updater}}"));
+  }
+
+  @Test
+  public void testParseArguments() {
+    String[] args = {
+      "--catalog", "cat",
+      "--table", "db.tbl",
+      "--gravitino-uri", "http://localhost:8090";,
+      "--metalake", "ml",
+      "--target-file-size-bytes", "2048",
+      "--enable-metrics", "true",
+      "--metrics-updater", "gravitino-metrics-updater"
+    };
+
+    Map<String, String> parsed = IcebergUpdateStatsJob.parseArguments(args);
+    assertEquals("cat", parsed.get("catalog"));
+    assertEquals("db.tbl", parsed.get("table"));
+    assertEquals("http://localhost:8090";, parsed.get("gravitino-uri"));
+    assertEquals("ml", parsed.get("metalake"));
+    assertEquals("2048", parsed.get("target-file-size-bytes"));
+    assertEquals("true", parsed.get("enable-metrics"));
+    assertEquals("gravitino-metrics-updater", parsed.get("metrics-updater"));
+  }
+
+  @Test
+  public void testBuildStatsSql() {
+    String tableSql = IcebergUpdateStatsJob.buildTableStatsSql("cat", 
"db.tbl", 100000L);
+    String partitionSql = IcebergUpdateStatsJob.buildPartitionStatsSql("cat", 
"db.tbl", 100000L);
+
+    assertTrue(tableSql.contains("FROM cat.db.tbl.files"));
+    assertFalse(tableSql.contains("GROUP BY partition"));
+    assertTrue(tableSql.contains("AS datafile_mse"));
+    assertTrue(partitionSql.contains("FROM cat.db.tbl.files"));
+    assertTrue(partitionSql.contains("GROUP BY partition"));
+    assertTrue(partitionSql.startsWith("SELECT partition"));
+  }
+
+  @Test
+  public void testParseTargetFileSize() {
+    assertEquals(100000L, IcebergUpdateStatsJob.parseTargetFileSize(null));
+    assertEquals(100000L, IcebergUpdateStatsJob.parseTargetFileSize(""));
+    assertEquals(2048L, IcebergUpdateStatsJob.parseTargetFileSize("2048"));
+    assertThrows(
+        IllegalArgumentException.class, () -> 
IcebergUpdateStatsJob.parseTargetFileSize("-1"));
+    assertThrows(
+        IllegalArgumentException.class, () -> 
IcebergUpdateStatsJob.parseTargetFileSize("abc"));
+  }
+
+  @Test
+  public void testParseEnableMetrics() {
+    assertFalse(IcebergUpdateStatsJob.parseEnableMetrics(null));
+    assertFalse(IcebergUpdateStatsJob.parseEnableMetrics(""));
+    assertFalse(IcebergUpdateStatsJob.parseEnableMetrics("false"));
+    assertTrue(IcebergUpdateStatsJob.parseEnableMetrics("true"));
+    assertTrue(IcebergUpdateStatsJob.parseEnableMetrics("TRUE"));
+    assertThrows(
+        IllegalArgumentException.class, () -> 
IcebergUpdateStatsJob.parseEnableMetrics("yes"));
+  }
+}
diff --git 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
new file mode 100644
index 0000000000..8f51256ee0
--- /dev/null
+++ 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.DataScope;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.api.updater.MetricsUpdater;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+/** Integration tests for IcebergUpdateStatsJob with a real Spark+Iceberg 
runtime. */
+public class TestIcebergUpdateStatsJobWithSpark {
+
+  @TempDir static File tempDir;
+
+  private static SparkSession spark;
+  private static String catalogName;
+
+  @BeforeAll
+  public static void setUp() {
+    String warehousePath = new File(tempDir, "warehouse").getAbsolutePath();
+    catalogName = "test_catalog";
+
+    spark =
+        SparkSession.builder()
+            .appName("TestIcebergUpdateStatsJob")
+            .master("local[2]")
+            .config(
+                "spark.sql.extensions",
+                
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+            .config("spark.sql.catalog." + catalogName, 
"org.apache.iceberg.spark.SparkCatalog")
+            .config("spark.sql.catalog." + catalogName + ".type", "hadoop")
+            .config("spark.sql.catalog." + catalogName + ".warehouse", 
warehousePath)
+            .getOrCreate();
+
+    spark.sql("CREATE NAMESPACE IF NOT EXISTS " + catalogName + ".db");
+    spark.sql(
+        "CREATE TABLE IF NOT EXISTS "
+            + catalogName
+            + ".db.non_partitioned (id INT, name STRING) USING iceberg");
+    spark.sql(
+        "INSERT INTO " + catalogName + ".db.non_partitioned VALUES (1, 'A'), 
(2, 'B'), (3, 'C')");
+
+    spark.sql(
+        "CREATE TABLE IF NOT EXISTS "
+            + catalogName
+            + ".db.partitioned (id INT, ds STRING) USING iceberg PARTITIONED 
BY (ds)");
+    spark.sql(
+        "INSERT INTO "
+            + catalogName
+            + ".db.partitioned VALUES "
+            + "(1, '2026-01-01'), (2, '2026-01-01'), (3, '2026-01-02')");
+
+    spark.sql(
+        "CREATE TABLE IF NOT EXISTS "
+            + catalogName
+            + ".db.multi_partitioned (id INT, event_ts TIMESTAMP, region 
STRING) "
+            + "USING iceberg PARTITIONED BY (days(event_ts), region)");
+    spark.sql(
+        "INSERT INTO "
+            + catalogName
+            + ".db.multi_partitioned VALUES "
+            + "(1, TIMESTAMP '2026-01-01 09:00:00', 'ap-south'), "
+            + "(2, TIMESTAMP '2026-01-01 11:00:00', 'us-east'), "
+            + "(3, TIMESTAMP '2026-01-02 08:30:00', 'ap-south'), "
+            + "(4, TIMESTAMP '2026-01-02 12:45:00', 'us-east')");
+  }
+
+  @AfterAll
+  public static void tearDown() {
+    if (spark != null) {
+      spark.sql("DROP TABLE IF EXISTS " + catalogName + ".db.non_partitioned");
+      spark.sql("DROP TABLE IF EXISTS " + catalogName + ".db.partitioned");
+      spark.sql("DROP TABLE IF EXISTS " + catalogName + 
".db.multi_partitioned");
+      spark.sql("DROP NAMESPACE IF EXISTS " + catalogName + ".db");
+      spark.stop();
+    }
+  }
+
+  @Test
+  public void testUpdateNonPartitionedTableStatistics() {
+    RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater();
+
+    IcebergUpdateStatsJob.updateStatistics(
+        spark, updater, catalogName, "db.non_partitioned", 100_000L);
+
+    assertEquals(NameIdentifier.of(catalogName, "db", "non_partitioned"), 
updater.tableIdentifier);
+    assertNotNull(updater.tableStatistics);
+    assertEquals(8, updater.tableStatistics.size());
+    assertTrue(updater.partitionStatistics.isEmpty());
+
+    Map<String, Object> stats =
+        updater.tableStatistics.stream()
+            .collect(Collectors.toMap(StatisticEntry::name, stat -> 
stat.value().value()));
+    assertTrue((Long) stats.get("custom-file_count") > 0L);
+    assertTrue((Double) stats.get("custom-datafile_mse") >= 0D);
+    assertTrue((Long) stats.get("custom-total_size") > 0L);
+  }
+
+  @Test
+  public void testUpdatePartitionedTableStatistics() {
+    RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater();
+
+    IcebergUpdateStatsJob.updateStatistics(spark, updater, catalogName, 
"db.partitioned", 100_000L);
+
+    assertEquals(NameIdentifier.of(catalogName, "db", "partitioned"), 
updater.tableIdentifier);
+    assertTrue(updater.tableStatistics.isEmpty());
+    assertFalse(updater.partitionStatistics.isEmpty());
+    assertEquals(2, updater.partitionStatistics.size());
+
+    updater.partitionStatistics.forEach(
+        (partitionPath, statistics) -> {
+          assertEquals(1, partitionPath.entries().size());
+          assertEquals("ds", partitionPath.entries().get(0).partitionName());
+          Map<String, Object> statMap =
+              statistics.stream()
+                  .collect(Collectors.toMap(StatisticEntry::name, stat -> 
stat.value().value()));
+          assertTrue(statMap.containsKey("custom-datafile_mse"));
+          assertTrue((Long) statMap.get("custom-file_count") > 0L);
+        });
+  }
+
+  @Test
+  public void testUpdatePartitionedTableStatisticsWithMetrics() {
+    RecordingStatisticsUpdater statisticsUpdater = new 
RecordingStatisticsUpdater();
+    RecordingMetricsUpdater metricsUpdater = new RecordingMetricsUpdater();
+
+    IcebergUpdateStatsJob.updateStatistics(
+        spark, statisticsUpdater, metricsUpdater, catalogName, 
"db.partitioned", 100_000L);
+
+    assertEquals(
+        NameIdentifier.of(catalogName, "db", "partitioned"), 
statisticsUpdater.tableIdentifier);
+    assertFalse(statisticsUpdater.partitionStatistics.isEmpty());
+    assertEquals(2, statisticsUpdater.partitionStatistics.size());
+
+    assertEquals(16, metricsUpdater.tableMetrics.size());
+    assertTrue(
+        metricsUpdater.tableMetrics.stream()
+            .allMatch(metric -> metric.scope() == DataScope.Type.PARTITION));
+    assertTrue(
+        metricsUpdater.tableMetrics.stream()
+            .allMatch(metric -> metric.partitionPath().isPresent()));
+    assertTrue(
+        metricsUpdater.tableMetrics.stream()
+            .allMatch(metric -> metric.value() != null && 
metric.value().value() != null));
+    assertTrue(metricsUpdater.jobMetrics.isEmpty());
+  }
+
+  @Test
+  public void testUpdateMultiLevelPartitionedTableStatistics() {
+    RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater();
+
+    IcebergUpdateStatsJob.updateStatistics(
+        spark, updater, catalogName, "db.multi_partitioned", 100_000L);
+
+    assertEquals(
+        NameIdentifier.of(catalogName, "db", "multi_partitioned"), 
updater.tableIdentifier);
+    assertTrue(updater.tableStatistics.isEmpty());
+    assertFalse(updater.partitionStatistics.isEmpty());
+    assertEquals(4, updater.partitionStatistics.size());
+
+    Set<String> parsedPartitions =
+        updater.partitionStatistics.keySet().stream()
+            .map(
+                partitionPath ->
+                    partitionPath.entries().get(0).partitionName()
+                        + "="
+                        + partitionPath.entries().get(0).partitionValue()
+                        + ","
+                        + partitionPath.entries().get(1).partitionName()
+                        + "="
+                        + partitionPath.entries().get(1).partitionValue())
+            .collect(Collectors.toSet());
+
+    assertEquals(
+        Set.of(
+            "event_ts_day=2026-01-01,region=ap-south",
+            "event_ts_day=2026-01-01,region=us-east",
+            "event_ts_day=2026-01-02,region=ap-south",
+            "event_ts_day=2026-01-02,region=us-east"),
+        parsedPartitions);
+  }
+
+  private static final class RecordingStatisticsUpdater implements 
StatisticsUpdater {
+    private NameIdentifier tableIdentifier;
+    private List<StatisticEntry<?>> tableStatistics = List.of();
+    private Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics = 
Map.of();
+
+    @Override
+    public String name() {
+      return "recording-updater";
+    }
+
+    @Override
+    public void initialize(OptimizerEnv optimizerEnv) {}
+
+    @Override
+    public void updateTableStatistics(
+        NameIdentifier tableIdentifier, List<StatisticEntry<?>> 
tableStatistics) {
+      this.tableIdentifier = tableIdentifier;
+      this.tableStatistics = tableStatistics;
+    }
+
+    @Override
+    public void updatePartitionStatistics(
+        NameIdentifier tableIdentifier,
+        Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics) {
+      this.tableIdentifier = tableIdentifier;
+      this.partitionStatistics = partitionStatistics;
+    }
+
+    @Override
+    public void close() {}
+  }
+
+  private static final class RecordingMetricsUpdater implements MetricsUpdater 
{
+    private List<MetricPoint> tableMetrics = List.of();
+    private List<MetricPoint> jobMetrics = List.of();
+
+    @Override
+    public String name() {
+      return "recording-metrics-updater";
+    }
+
+    @Override
+    public void initialize(OptimizerEnv optimizerEnv) {}
+
+    @Override
+    public void updateTableAndPartitionMetrics(List<MetricPoint> metrics) {
+      this.tableMetrics = metrics;
+    }
+
+    @Override
+    public void updateJobMetrics(List<MetricPoint> metrics) {
+      this.jobMetrics = metrics;
+    }
+
+    @Override
+    public void close() {}
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/MySQLMetricsDialect.java
 b/maintenance/optimizer-api/build.gradle.kts
similarity index 57%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/MySQLMetricsDialect.java
rename to maintenance/optimizer-api/build.gradle.kts
index 7bdb502bc3..436fa18ab3 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/MySQLMetricsDialect.java
+++ b/maintenance/optimizer-api/build.gradle.kts
@@ -16,14 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+description = "Gravitino Optimizer API"
 
-package 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc;
+plugins {
+  `maven-publish`
+  id("java")
+  id("idea")
+}
+
+dependencies {
+  implementation(project(":api"))
+  implementation(project(":common"))
+  implementation(project(":clients:client-java"))
+  implementation(libs.guava)
+  implementation(libs.commons.lang3)
+  implementation(libs.jackson.databind)
 
-/** MySQL dialect marker for JDBC metrics storage. */
-public class MySQLMetricsDialect implements JdbcMetricsDialect {
+  annotationProcessor(libs.lombok)
+  compileOnly(libs.lombok)
+  testAnnotationProcessor(libs.lombok)
+  testCompileOnly(libs.lombok)
+
+  testImplementation(libs.junit.jupiter.api)
+  testRuntimeOnly(libs.junit.jupiter.engine)
+}
 
-  @Override
-  public String name() {
-    return "mysql";
-  }
+tasks.test {
+  useJUnitPlatform()
 }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/DataScope.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/DataScope.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/DataScope.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/DataScope.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricPoint.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricPoint.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricPoint.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricPoint.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricSample.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricSample.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricSample.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricSample.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionEntry.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionEntry.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionEntry.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionEntry.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionStrategy.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionStrategy.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionStrategy.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionStrategy.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Provider.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Provider.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Provider.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Provider.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/StatisticEntry.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/StatisticEntry.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/StatisticEntry.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/StatisticEntry.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsProvider.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsProvider.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsProvider.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsProvider.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MonitorCallback.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MonitorCallback.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MonitorCallback.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MonitorCallback.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/TableJobRelationProvider.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/TableJobRelationProvider.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/TableJobRelationProvider.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/TableJobRelationProvider.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobSubmitter.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobSubmitter.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobSubmitter.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobSubmitter.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StatisticsProvider.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StatisticsProvider.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StatisticsProvider.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StatisticsProvider.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyProvider.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyProvider.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyProvider.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyProvider.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/SupportTableStatistics.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/SupportTableStatistics.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/SupportTableStatistics.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/SupportTableStatistics.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/TableMetadataProvider.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/TableMetadataProvider.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/TableMetadataProvider.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/TableMetadataProvider.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/MetricsUpdater.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/MetricsUpdater.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/MetricsUpdater.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/MetricsUpdater.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsCalculator.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsCalculator.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsCalculator.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsCalculator.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsUpdater.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsUpdater.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsUpdater.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsUpdater.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobMetrics.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobMetrics.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobMetrics.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobMetrics.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobStatistics.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobStatistics.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobStatistics.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobStatistics.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableMetrics.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableMetrics.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableMetrics.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableMetrics.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableStatistics.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableStatistics.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableStatistics.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableStatistics.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobMetrics.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobMetrics.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobMetrics.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobMetrics.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobStatistics.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobStatistics.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobStatistics.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobStatistics.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableMetrics.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableMetrics.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableMetrics.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableMetrics.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableStatistics.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableStatistics.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableStatistics.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableStatistics.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerEnv.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerEnv.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerEnv.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerEnv.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionEntryImpl.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionEntryImpl.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionEntryImpl.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionEntryImpl.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/StatisticEntryImpl.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/StatisticEntryImpl.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/StatisticEntryImpl.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/StatisticEntryImpl.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
similarity index 84%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
index 5d64378ff2..f915f2cb07 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
+++ 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
@@ -26,15 +26,6 @@ import org.apache.gravitino.Config;
 import org.apache.gravitino.config.ConfigBuilder;
 import org.apache.gravitino.config.ConfigConstants;
 import org.apache.gravitino.config.ConfigEntry;
-import 
org.apache.gravitino.maintenance.optimizer.monitor.evaluator.GravitinoMetricsEvaluator;
-import 
org.apache.gravitino.maintenance.optimizer.monitor.job.dummy.DummyTableJobRelationProvider;
-import 
org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider;
-import 
org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter;
-import 
org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider;
-import 
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategyProvider;
-import 
org.apache.gravitino.maintenance.optimizer.recommender.table.GravitinoTableMetadataProvider;
-import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater;
-import 
org.apache.gravitino.maintenance.optimizer.updater.statistics.GravitinoStatisticsUpdater;
 
 /**
  * Central configuration holder for the optimizer runtime. Keys are grouped 
under the {@code
@@ -67,64 +58,74 @@ public class OptimizerConfig extends Config {
       MONITOR_PREFIX + "tableJobRelationProvider";
   private static final String METRICS_EVALUATOR = MONITOR_PREFIX + 
"metricsEvaluator";
   private static final String MONITOR_CALLBACKS = MONITOR_PREFIX + "callbacks";
+  private static final String DEFAULT_STATISTICS_PROVIDER = 
"gravitino-statistics-provider";
+  private static final String DEFAULT_STRATEGY_PROVIDER = 
"gravitino-strategy-provider";
+  private static final String DEFAULT_TABLE_META_PROVIDER = 
"gravitino-table-metadata-provider";
+  private static final String DEFAULT_JOB_SUBMITTER = "noop-job-submitter";
+  private static final String DEFAULT_STATISTICS_UPDATER = 
"gravitino-statistics-updater";
+  private static final String DEFAULT_METRICS_UPDATER = 
"gravitino-metrics-updater";
+  private static final String DEFAULT_METRICS_PROVIDER = 
"gravitino-metrics-provider";
+  private static final String DEFAULT_TABLE_JOB_RELATION_PROVIDER =
+      "dummy-table-job-relation-provider";
+  private static final String DEFAULT_METRICS_EVALUATOR = 
"gravitino-metrics-evaluator";
 
   public static final ConfigEntry<String> STATISTICS_PROVIDER_CONFIG =
       new ConfigBuilder(STATISTICS_PROVIDER)
           .doc(
               "Statistics provider implementation name (matches 
Provider.name()) discoverable via "
                   + "ServiceLoader. Example: '"
-                  + GravitinoStatisticsProvider.NAME
+                  + DEFAULT_STATISTICS_PROVIDER
                   + "'.")
           .version(ConfigConstants.VERSION_1_2_0)
           .stringConf()
-          .createWithDefault(GravitinoStatisticsProvider.NAME);
+          .createWithDefault(DEFAULT_STATISTICS_PROVIDER);
 
   public static final ConfigEntry<String> STRATEGY_PROVIDER_CONFIG =
       new ConfigBuilder(STRATEGY_PROVIDER)
           .doc(
               "Strategy provider implementation name (matches Provider.name()) 
discoverable via "
                   + "ServiceLoader. Example: '"
-                  + GravitinoStrategyProvider.NAME
+                  + DEFAULT_STRATEGY_PROVIDER
                   + "'.")
           .version(ConfigConstants.VERSION_1_2_0)
           .stringConf()
-          .createWithDefault(GravitinoStrategyProvider.NAME);
+          .createWithDefault(DEFAULT_STRATEGY_PROVIDER);
 
   public static final ConfigEntry<String> TABLE_META_PROVIDER_CONFIG =
       new ConfigBuilder(TABLE_META_PROVIDER)
           .doc(
               "Table metadata provider implementation name (matches 
Provider.name()) discoverable "
                   + "via ServiceLoader. Example: '"
-                  + GravitinoTableMetadataProvider.NAME
+                  + DEFAULT_TABLE_META_PROVIDER
                   + "'.")
           .version(ConfigConstants.VERSION_1_2_0)
           .stringConf()
-          .createWithDefault(GravitinoTableMetadataProvider.NAME);
+          .createWithDefault(DEFAULT_TABLE_META_PROVIDER);
 
   public static final ConfigEntry<String> JOB_SUBMITTER_CONFIG =
       new ConfigBuilder(JOB_SUBMITTER)
           .doc(
               "Job submitter implementation name (matches Provider.name()) 
discoverable via "
                   + "ServiceLoader. Example: '"
-                  + NoopJobSubmitter.NAME
+                  + DEFAULT_JOB_SUBMITTER
                   + "'.")
           .version(ConfigConstants.VERSION_1_2_0)
           .stringConf()
-          .createWithDefault(NoopJobSubmitter.NAME);
+          .createWithDefault(DEFAULT_JOB_SUBMITTER);
 
   public static final ConfigEntry<String> STATISTICS_UPDATER_CONFIG =
       new ConfigBuilder(STATISTICS_UPDATER)
           .doc("The statistics updater implementation name (matches 
Provider.name()).")
           .version(ConfigConstants.VERSION_1_2_0)
           .stringConf()
-          .createWithDefault(GravitinoStatisticsUpdater.NAME);
+          .createWithDefault(DEFAULT_STATISTICS_UPDATER);
 
   public static final ConfigEntry<String> METRICS_UPDATER_CONFIG =
       new ConfigBuilder(METRICS_UPDATER)
           .doc("The metrics updater implementation name (matches 
Provider.name()).")
           .version(ConfigConstants.VERSION_1_2_0)
           .stringConf()
-          .createWithDefault(GravitinoMetricsUpdater.NAME);
+          .createWithDefault(DEFAULT_METRICS_UPDATER);
 
   public static final ConfigEntry<String> METRICS_PROVIDER_CONFIG =
       new ConfigBuilder(METRICS_PROVIDER)
@@ -133,7 +134,7 @@ public class OptimizerConfig extends Config {
                   + "discoverable via ServiceLoader. Example: 
'metrics-provider'.")
           .version(ConfigConstants.VERSION_1_2_0)
           .stringConf()
-          .createWithDefault(GravitinoMetricsProvider.NAME);
+          .createWithDefault(DEFAULT_METRICS_PROVIDER);
 
   public static final ConfigEntry<String> TABLE_JOB_RELATION_PROVIDER_CONFIG =
       new ConfigBuilder(TABLE_JOB_RELATION_PROVIDER)
@@ -142,7 +143,7 @@ public class OptimizerConfig extends Config {
                   + "discoverable via ServiceLoader. Example: 
'table-job-relation-provider'.")
           .version(ConfigConstants.VERSION_1_2_0)
           .stringConf()
-          .createWithDefault(DummyTableJobRelationProvider.NAME);
+          .createWithDefault(DEFAULT_TABLE_JOB_RELATION_PROVIDER);
 
   public static final ConfigEntry<String> METRICS_EVALUATOR_CONFIG =
       new ConfigBuilder(METRICS_EVALUATOR)
@@ -152,7 +153,7 @@ public class OptimizerConfig extends Config {
                   + "'metrics-evaluator'.")
           .version(ConfigConstants.VERSION_1_2_0)
           .stringConf()
-          .createWithDefault(GravitinoMetricsEvaluator.NAME);
+          .createWithDefault(DEFAULT_METRICS_EVALUATOR);
 
   public static final ConfigEntry<List<String>> MONITOR_CALLBACKS_CONFIG =
       new ConfigBuilder(MONITOR_CALLBACKS)
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/GravitinoClientUtils.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/GravitinoClientUtils.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/GravitinoClientUtils.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/GravitinoClientUtils.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/PartitionPathSerdeUtils.java
similarity index 64%
copy from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java
copy to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/PartitionPathSerdeUtils.java
index 392e7b1f5a..31d9f1c1ca 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java
+++ 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/PartitionPathSerdeUtils.java
@@ -17,9 +17,9 @@
  * under the License.
  */
 
-package org.apache.gravitino.maintenance.optimizer.recommender.util;
+package org.apache.gravitino.maintenance.optimizer.common.util;
 
-import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -31,23 +31,12 @@ import 
org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
 import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
 import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
 
-/** Helpers for converting between Gravitino partition names and {@link 
PartitionPath}. */
-public class PartitionUtils {
-  private static final TypeReference<List<Map<String, String>>> 
PARTITION_PATH_TYPE =
-      new TypeReference<List<Map<String, String>>>() {};
+/** Shared serde helpers for converting between partition path and JSON 
string. */
+public final class PartitionPathSerdeUtils {
+  private PartitionPathSerdeUtils() {}
 
-  private PartitionUtils() {}
-
-  /**
-   * Encodes a {@link PartitionPath} into a JSON string.
-   *
-   * <p>For example, a path with entries {@code p1=v1, p2=v2} is encoded as 
{@code [{"p1":"v1"},
-   * {"p2":"v2"}]}.
-   *
-   * @param partitionPath partition path
-   * @return encoded JSON string
-   */
-  public static String encodePartitionPath(PartitionPath partitionPath) {
+  /** Encodes partition path into JSON string, such as 
[{"p1":"v1"},{"p2":"v2"}]. */
+  public static String encode(PartitionPath partitionPath) {
     Preconditions.checkArgument(partitionPath != null, "partitionPath must not 
be null");
     List<PartitionEntry> entries = partitionPath.entries();
     Preconditions.checkArgument(entries != null && !entries.isEmpty(), 
"partitionPath is empty");
@@ -70,32 +59,28 @@ public class PartitionUtils {
     }
   }
 
-  /**
-   * Decodes a JSON-encoded partition path into a {@link PartitionPath}.
-   *
-   * <p>Example format: {@code [{"p1":"v1"},{"p2":"v2"}]}.
-   *
-   * @param encodedPartitionPath JSON string representing the partition path
-   * @return parsed partition path
-   */
-  public static PartitionPath decodePartitionPath(String encodedPartitionPath) 
{
+  /** Decodes partition path from JSON string, such as 
[{"p1":"v1"},{"p2":"v2"}]. */
+  public static PartitionPath decode(String encodedPartitionPath) {
     Preconditions.checkArgument(
         StringUtils.isNotBlank(encodedPartitionPath), "encodedPartitionPath 
must not be blank");
-    List<Map<String, String>> decoded;
+    JsonNode decoded;
     try {
-      decoded = JsonUtils.objectMapper().readValue(encodedPartitionPath, 
PARTITION_PATH_TYPE);
+      decoded = JsonUtils.objectMapper().readTree(encodedPartitionPath);
     } catch (Exception e) {
       throw new IllegalArgumentException("Failed to decode partition path", e);
     }
-    Preconditions.checkArgument(decoded != null && !decoded.isEmpty(), 
"partitionPath is empty");
+    Preconditions.checkArgument(
+        decoded != null && decoded.isArray() && !decoded.isEmpty(), 
"partitionPath is empty");
 
     List<PartitionEntry> entries = new ArrayList<>(decoded.size());
-    for (Map<String, String> item : decoded) {
+    for (JsonNode item : decoded) {
       Preconditions.checkArgument(
-          item != null && item.size() == 1, "partition entry must contain one 
key/value pair");
-      Map.Entry<String, String> kv = item.entrySet().iterator().next();
+          item != null && item.isObject() && item.size() == 1,
+          "partition entry must contain one key/value pair");
+      Map.Entry<String, JsonNode> kv = item.fields().next();
       String name = kv.getKey();
-      String value = kv.getValue();
+      JsonNode valueNode = kv.getValue();
+      String value = valueNode == null || valueNode.isNull() ? null : 
valueNode.asText();
       Preconditions.checkArgument(StringUtils.isNotBlank(name), "partitionName 
cannot be blank");
       Preconditions.checkArgument(StringUtils.isNotBlank(value), 
"partitionValue cannot be blank");
       entries.add(new PartitionEntryImpl(name, value));
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
similarity index 100%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
rename to 
maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
diff --git 
a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/StatisticValueSerdeUtils.java
 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/StatisticValueSerdeUtils.java
new file mode 100644
index 0000000000..3ce0cdda7c
--- /dev/null
+++ 
b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/StatisticValueSerdeUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.stats.StatisticValue;
+
+/** Shared serde helpers for {@link StatisticValue}. */
+public final class StatisticValueSerdeUtils {
+
+  private StatisticValueSerdeUtils() {}
+
+  public static String toString(StatisticValue<?> value) {
+    Preconditions.checkArgument(value != null, "StatisticValue cannot be 
null");
+    try {
+      return JsonUtils.anyFieldMapper().writeValueAsString(value);
+    } catch (JsonProcessingException e) {
+      throw new IllegalArgumentException("Failed to serialize StatisticValue: 
" + value, e);
+    }
+  }
+
+  public static StatisticValue<?> fromString(String valueStr) {
+    Preconditions.checkArgument(valueStr != null, "StatisticValue string 
cannot be null");
+    try {
+      return JsonUtils.anyFieldMapper().readValue(valueStr, 
StatisticValue.class);
+    } catch (JsonProcessingException e) {
+      throw new IllegalArgumentException(
+          "Failed to deserialize StatisticValue from: " + valueStr, e);
+    }
+  }
+}
diff --git 
a/maintenance/optimizer-api/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestPartitionPathSerdeUtils.java
 
b/maintenance/optimizer-api/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestPartitionPathSerdeUtils.java
new file mode 100644
index 0000000000..da7749b8c8
--- /dev/null
+++ 
b/maintenance/optimizer-api/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestPartitionPathSerdeUtils.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.List;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.junit.jupiter.api.Test;
+
+public class TestPartitionPathSerdeUtils {
+
+  @Test
+  public void testEncodeAndDecodePartitionPath() {
+    PartitionPath path =
+        PartitionPath.of(
+            List.of(
+                new PartitionEntryImpl("dt", "2026-01-01"), new 
PartitionEntryImpl("hh", "08")));
+
+    String encoded = PartitionPathSerdeUtils.encode(path);
+    PartitionPath decoded = PartitionPathSerdeUtils.decode(encoded);
+
+    assertEquals(path, decoded);
+  }
+
+  @Test
+  public void testDecodeInvalidPath() {
+    assertThrows(IllegalArgumentException.class, () -> 
PartitionPathSerdeUtils.decode(""));
+    assertThrows(
+        IllegalArgumentException.class, () -> 
PartitionPathSerdeUtils.decode("{\"dt\":\"2026\"}"));
+  }
+}
diff --git a/maintenance/optimizer/build.gradle.kts 
b/maintenance/optimizer/build.gradle.kts
index df1c4473cd..87478a2a71 100644
--- a/maintenance/optimizer/build.gradle.kts
+++ b/maintenance/optimizer/build.gradle.kts
@@ -32,6 +32,8 @@ val icebergVersion: String = 
libs.versions.iceberg4connector.get()
 
 dependencies {
   implementation(project(":api"))
+  implementation(project(":maintenance:optimizer-api"))
+  implementation(project(":maintenance:gravitino-updaters"))
   implementation(project(":catalogs:catalog-common"))
   implementation(project(":clients:client-java"))
   implementation(project(":core")) {
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java
index 392e7b1f5a..39d5cce1d8 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java
@@ -19,23 +19,11 @@
 
 package org.apache.gravitino.maintenance.optimizer.recommender.util;
 
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.gravitino.json.JsonUtils;
-import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
 import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
-import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import 
org.apache.gravitino.maintenance.optimizer.common.util.PartitionPathSerdeUtils;
 
 /** Helpers for converting between Gravitino partition names and {@link 
PartitionPath}. */
 public class PartitionUtils {
-  private static final TypeReference<List<Map<String, String>>> 
PARTITION_PATH_TYPE =
-      new TypeReference<List<Map<String, String>>>() {};
-
   private PartitionUtils() {}
 
   /**
@@ -48,26 +36,7 @@ public class PartitionUtils {
    * @return encoded JSON string
    */
   public static String encodePartitionPath(PartitionPath partitionPath) {
-    Preconditions.checkArgument(partitionPath != null, "partitionPath must not 
be null");
-    List<PartitionEntry> entries = partitionPath.entries();
-    Preconditions.checkArgument(entries != null && !entries.isEmpty(), 
"partitionPath is empty");
-
-    List<Map<String, String>> encoded = new ArrayList<>(entries.size());
-    for (PartitionEntry entry : entries) {
-      String name = entry.partitionName();
-      String value = entry.partitionValue();
-      Preconditions.checkArgument(StringUtils.isNotBlank(name), "partitionName 
cannot be blank");
-      Preconditions.checkArgument(StringUtils.isNotBlank(value), 
"partitionValue cannot be blank");
-      Map<String, String> item = new LinkedHashMap<>(1);
-      item.put(name, value);
-      encoded.add(item);
-    }
-
-    try {
-      return JsonUtils.objectMapper().writeValueAsString(encoded);
-    } catch (Exception e) {
-      throw new IllegalArgumentException("Failed to encode partition path", e);
-    }
+    return PartitionPathSerdeUtils.encode(partitionPath);
   }
 
   /**
@@ -79,28 +48,6 @@ public class PartitionUtils {
    * @return parsed partition path
    */
   public static PartitionPath decodePartitionPath(String encodedPartitionPath) 
{
-    Preconditions.checkArgument(
-        StringUtils.isNotBlank(encodedPartitionPath), "encodedPartitionPath 
must not be blank");
-    List<Map<String, String>> decoded;
-    try {
-      decoded = JsonUtils.objectMapper().readValue(encodedPartitionPath, 
PARTITION_PATH_TYPE);
-    } catch (Exception e) {
-      throw new IllegalArgumentException("Failed to decode partition path", e);
-    }
-    Preconditions.checkArgument(decoded != null && !decoded.isEmpty(), 
"partitionPath is empty");
-
-    List<PartitionEntry> entries = new ArrayList<>(decoded.size());
-    for (Map<String, String> item : decoded) {
-      Preconditions.checkArgument(
-          item != null && item.size() == 1, "partition entry must contain one 
key/value pair");
-      Map.Entry<String, String> kv = item.entrySet().iterator().next();
-      String name = kv.getKey();
-      String value = kv.getValue();
-      Preconditions.checkArgument(StringUtils.isNotBlank(name), "partitionName 
cannot be blank");
-      Preconditions.checkArgument(StringUtils.isNotBlank(value), 
"partitionValue cannot be blank");
-      entries.add(new PartitionEntryImpl(name, value));
-    }
-
-    return PartitionPath.of(entries);
+    return PartitionPathSerdeUtils.decode(encodedPartitionPath);
   }
 }
diff --git 
a/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
 
b/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
index c059ab3483..bd0237a754 100644
--- 
a/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
+++ 
b/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
@@ -21,8 +21,6 @@ 
org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStati
 
org.apache.gravitino.maintenance.optimizer.recommender.table.GravitinoTableMetadataProvider
 
org.apache.gravitino.maintenance.optimizer.recommender.job.GravitinoJobSubmitter
 org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter
-org.apache.gravitino.maintenance.optimizer.updater.statistics.GravitinoStatisticsUpdater
-org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater
 
org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider
 
org.apache.gravitino.maintenance.optimizer.monitor.job.dummy.DummyTableJobRelationProvider
 
org.apache.gravitino.maintenance.optimizer.monitor.job.local.LocalTableJobRelationProvider
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
index 83062a773b..bc6285c41b 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoClient;
 import org.apache.gravitino.client.GravitinoMetalake;
 import org.apache.gravitino.dto.rel.ColumnDTO;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
@@ -37,9 +38,11 @@ import 
org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
 import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
 import 
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionJobContext;
 import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.expressions.transforms.Transforms;
 import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.stats.Statistic;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.awaitility.Awaitility;
@@ -55,6 +58,7 @@ public class TestBuiltinIcebergRewriteDataFiles {
   private static final String METALAKE_NAME = "test";
   private static final String ICEBERG_REST_URI = 
"http://localhost:9001/iceberg";;
   private static final String JOB_TEMPLATE_NAME = 
"builtin-iceberg-rewrite-data-files";
+  private static final String UPDATE_STATS_JOB_TEMPLATE_NAME = 
"builtin-iceberg-update-stats";
   private static final String SPARK_CATALOG_NAME = "rest_catalog";
   private static final String WAREHOUSE_LOCATION = "";
 
@@ -210,6 +214,38 @@ public class TestBuiltinIcebergRewriteDataFiles {
         });
   }
 
+  @Test
+  void testSubmitBuiltinIcebergUpdateStatsJobAndPersistStatistics() throws 
Exception {
+    String tableName = "update_stats_table";
+    String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName;
+    runWithSparkAndMetalake(
+        (spark, metalake) -> {
+          createTableAndInsertData(spark, fullTableName);
+
+          Map<String, String> jobConf = buildUpdateStatsJobConfig(tableName);
+          submitJob(metalake, UPDATE_STATS_JOB_TEMPLATE_NAME, jobConf);
+
+          try (GravitinoClient client =
+              
GravitinoClient.builder(SERVER_URI).withMetalake(METALAKE_NAME).build()) {
+            Table table =
+                client
+                    .loadCatalog(SPARK_CATALOG_NAME)
+                    .asTableCatalog()
+                    .loadTable(NameIdentifier.of("db", tableName));
+            List<Statistic> stats = 
table.supportsStatistics().listStatistics();
+            Assertions.assertFalse(stats.isEmpty(), "Expected table statistics 
to be updated");
+
+            Map<String, Statistic> statsMap = new HashMap<>();
+            for (Statistic stat : stats) {
+              statsMap.put(stat.name(), stat);
+            }
+            Assertions.assertTrue(statsMap.containsKey("custom-file_count"));
+            Assertions.assertTrue(statsMap.containsKey("custom-datafile_mse"));
+            Assertions.assertTrue(statsMap.containsKey("custom-total_size"));
+          }
+        });
+  }
+
   private static GravitinoMetalake loadOrCreateMetalake(
       GravitinoAdminClient client, String metalakeName) {
     try {
@@ -235,7 +271,12 @@ public class TestBuiltinIcebergRewriteDataFiles {
   }
 
   private static void submitCompactionJob(GravitinoMetalake metalake, 
Map<String, String> jobConf) {
-    JobHandle jobHandle = metalake.runJob(JOB_TEMPLATE_NAME, jobConf);
+    submitJob(metalake, JOB_TEMPLATE_NAME, jobConf);
+  }
+
+  private static void submitJob(
+      GravitinoMetalake metalake, String jobTemplateName, Map<String, String> 
jobConf) {
+    JobHandle jobHandle = metalake.runJob(jobTemplateName, jobConf);
     System.out.println("Submitted job id: " + jobHandle.jobId());
     Assertions.assertTrue(StringUtils.isNotBlank(jobHandle.jobId()), "Job id 
should not be blank");
 
@@ -265,6 +306,17 @@ public class TestBuiltinIcebergRewriteDataFiles {
     return jobConf;
   }
 
+  private static Map<String, String> buildUpdateStatsJobConfig(String 
tableName) {
+    Map<String, String> jobConf = new HashMap<>();
+    jobConf.put("table_identifier", "db." + tableName);
+    jobConf.put("gravitino_uri", SERVER_URI);
+    jobConf.put("metalake", METALAKE_NAME);
+    jobConf.put("target_file_size_bytes", "100000");
+    jobConf.put("statistics_updater", "gravitino-statistics-updater");
+    jobConf.putAll(createOptimizerConfig().jobSubmitterConfigs());
+    return jobConf;
+  }
+
   private static Map<String, String> buildCompactionJobConfig(
       OptimizerConfig optimizerConfig,
       String tableName,
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java
new file mode 100644
index 0000000000..003bb16a0f
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.recommender.job;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.stats.Statistic;
+import org.apache.spark.sql.SparkSession;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+
+/** Environment IT for triggering built-in Iceberg update stats job through 
the client API. */
+@EnabledIfEnvironmentVariable(named = "GRAVITINO_ENV_IT", matches = "true")
+public class TestBuiltinIcebergUpdateStatsJob {
+
+  private static final String SERVER_URI = "http://localhost:8090";;
+  private static final String ICEBERG_REST_URI = 
"http://localhost:9001/iceberg";;
+  private static final String JOB_TEMPLATE_NAME = 
"builtin-iceberg-update-stats";
+  private static final String SPARK_CATALOG_NAME = "rest_catalog";
+  private static final String WAREHOUSE_LOCATION = "";
+  private static final String METALAKE_NAME = "test";
+
+  @Test
+  void testRunBuiltinUpdateStatsJobAndPersistStatistics() throws Exception {
+    String tableName = "update_stats_table_" + 
UUID.randomUUID().toString().replace("-", "");
+    String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName;
+
+    SparkSession spark = createSparkSession();
+    try (GravitinoAdminClient adminClient = 
GravitinoAdminClient.builder(SERVER_URI).build()) {
+      GravitinoMetalake metalake = loadOrCreateMetalake(adminClient, 
METALAKE_NAME);
+      recreateRestCatalog(metalake);
+      createTableAndInsertData(spark, fullTableName);
+      submitJob(metalake, buildUpdateStatsJobConfig(tableName));
+
+      try (GravitinoClient client =
+          
GravitinoClient.builder(SERVER_URI).withMetalake(METALAKE_NAME).build()) {
+        Table table =
+            client
+                .loadCatalog(SPARK_CATALOG_NAME)
+                .asTableCatalog()
+                .loadTable(NameIdentifier.of("db", tableName));
+        List<Statistic> statistics = 
table.supportsStatistics().listStatistics();
+        Map<String, Statistic> statisticMap =
+            statistics.stream().collect(Collectors.toMap(Statistic::name, s -> 
s));
+        Assertions.assertTrue(statisticMap.containsKey("custom-file_count"));
+        Assertions.assertTrue(statisticMap.containsKey("custom-datafile_mse"));
+        Assertions.assertTrue(statisticMap.containsKey("custom-total_size"));
+      }
+    } finally {
+      spark.stop();
+    }
+  }
+
+  private static SparkSession createSparkSession() {
+    return SparkSession.builder()
+        .master("local[2]")
+        .appName("builtin-iceberg-update-stats-it")
+        .config(
+            "spark.sql.extensions",
+            
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+        .config("spark.sql.catalog." + SPARK_CATALOG_NAME, 
"org.apache.iceberg.spark.SparkCatalog")
+        .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".type", "rest")
+        .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".cache-enabled", 
"false")
+        .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".uri", 
ICEBERG_REST_URI)
+        .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".warehouse", 
WAREHOUSE_LOCATION)
+        .getOrCreate();
+  }
+
+  private static void createTableAndInsertData(SparkSession spark, String 
fullTableName) {
+    spark.sql("CREATE NAMESPACE IF NOT EXISTS " + SPARK_CATALOG_NAME + ".db");
+    spark.sql("DROP TABLE IF EXISTS " + fullTableName);
+    spark.sql("CREATE TABLE " + fullTableName + " (id INT, data STRING) USING 
iceberg");
+    spark.sql(
+        "ALTER TABLE "
+            + fullTableName
+            + " SET TBLPROPERTIES ('write.target-file-size-bytes'='1024000')");
+    for (int i = 0; i < 10; i++) {
+      spark.sql("INSERT INTO " + fullTableName + " VALUES (" + i + ", 'value_" 
+ i + "')");
+    }
+  }
+
+  private static Map<String, String> buildUpdateStatsJobConfig(String 
tableName) {
+    Map<String, String> jobConf = new HashMap<>();
+    jobConf.put("table_identifier", "db." + tableName);
+    jobConf.put("gravitino_uri", SERVER_URI);
+    jobConf.put("metalake", METALAKE_NAME);
+    jobConf.put("target_file_size_bytes", "100000");
+    jobConf.put("statistics_updater", "gravitino-statistics-updater");
+    jobConf.put("catalog_name", SPARK_CATALOG_NAME);
+    jobConf.put("catalog_type", "rest");
+    jobConf.put("catalog_uri", ICEBERG_REST_URI);
+    jobConf.put("warehouse_location", WAREHOUSE_LOCATION);
+    jobConf.put("spark_master", "local[2]");
+    jobConf.put("spark_executor_instances", "1");
+    jobConf.put("spark_executor_cores", "1");
+    jobConf.put("spark_executor_memory", "1g");
+    jobConf.put("spark_driver_memory", "1g");
+    jobConf.put("spark_conf", "{\"spark.hadoop.fs.defaultFS\":\"file:///\"}");
+    return jobConf;
+  }
+
+  private static void submitJob(GravitinoMetalake metalake, Map<String, 
String> jobConf) {
+    JobHandle jobHandle = metalake.runJob(JOB_TEMPLATE_NAME, jobConf);
+    Assertions.assertTrue(StringUtils.isNotBlank(jobHandle.jobId()), "Job id 
should not be blank");
+
+    Awaitility.await()
+        .atMost(Duration.ofMinutes(5))
+        .pollInterval(Duration.ofSeconds(2))
+        .until(
+            () -> {
+              JobHandle.Status status = 
metalake.getJob(jobHandle.jobId()).jobStatus();
+              return status == JobHandle.Status.SUCCEEDED
+                  || status == JobHandle.Status.FAILED
+                  || status == JobHandle.Status.CANCELLED;
+            });
+
+    JobHandle.Status finalStatus = 
metalake.getJob(jobHandle.jobId()).jobStatus();
+    Assertions.assertEquals(JobHandle.Status.SUCCEEDED, finalStatus, "Job 
should succeed");
+  }
+
+  private static GravitinoMetalake loadOrCreateMetalake(
+      GravitinoAdminClient client, String metalakeName) {
+    try {
+      return client.loadMetalake(metalakeName);
+    } catch (NoSuchMetalakeException ignored) {
+      return client.createMetalake(metalakeName, "IT metalake", Map.of());
+    }
+  }
+
+  private static void recreateRestCatalog(GravitinoMetalake metalake) {
+    try {
+      metalake.dropCatalog(SPARK_CATALOG_NAME, true);
+    } catch (Exception ignored) {
+      // Ignore when the catalog does not exist, or when force-drop is not 
needed.
+    }
+
+    Map<String, String> properties = new HashMap<>();
+    properties.put("catalog-backend", "REST");
+    properties.put("uri", ICEBERG_REST_URI);
+
+    metalake.createCatalog(
+        SPARK_CATALOG_NAME,
+        Catalog.Type.RELATIONAL,
+        "lakehouse-iceberg",
+        "IT Iceberg REST catalog",
+        properties);
+  }
+}
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 1529343c61..35f24ed2a7 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -103,4 +103,9 @@ include(":bundles:azure", ":bundles:azure-bundle", 
":bundles:iceberg-azure-bundl
 include(":catalogs:hadoop-common")
 include(":lineage")
 include(":mcp-server")
-include(":maintenance:optimizer", ":maintenance:jobs")
+include(
+  ":maintenance:optimizer-api",
+  ":maintenance:gravitino-updaters",
+  ":maintenance:optimizer",
+  ":maintenance:jobs"
+)

Reply via email to