This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 26c4271449 [#9654] feat(optimizer): add updater code skeleton (#9974)
26c4271449 is described below

commit 26c42714494f6ab8db16171e908ddadfb5fe51a8
Author: FANNG <[email protected]>
AuthorDate: Thu Feb 12 15:13:46 2026 +0900

    [#9654] feat(optimizer): add updater code skeleton (#9974)
    
    ## What changes were proposed in this pull request?
    
      ### User perspective
    
      - Configure the optimizer to point at your updater providers:
    - Set `OptimizerConfig.STATISTICS_UPDATER_CONFIG` to your
    `StatisticsUpdater` provider
      name.
    - Set `OptimizerConfig.METRICS_UPDATER_CONFIG` to your `MetricsUpdater`
    provider name.
    - Implement and register a `StatisticsCalculator` via:
    `META-INF/services/
    
    
    
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator`.
      - Construct an `Updater` with an `OptimizerEnv`.
      - Run one of the entry points:
    - `update(calculatorName, identifiers, UpdateType.STATISTICS)` to
    persist raw statistics
      for specific tables/jobs.
    - `update(calculatorName, identifiers, UpdateType.METRICS)` to persist
    derived metrics
      (and job metrics if the calculator supports them).
    - `updateAll(calculatorName, UpdateType.METRICS|STATISTICS)` for bulk
    refresh, using the
      calculator’s bulk methods.
    
      ### System perspective
    
    - Adds new updater-focused optimizer APIs and implementations for
    statistics/metrics flow:
    - New API types: `MetricSample`, `PartitionMetricSample`,
    `MetricsUpdater`,
    `StatisticsUpdater`, `StatisticsCalculator`,
    `SupportsCalculateTableStatistics`,
    `SupportsCalculateBulkTableStatistics`,
    `SupportsCalculateJobStatistics`, `SupportsCalculateBulkJobStatistics`,
    `UpdateType`, `ToStatistic`, and
      `TableStatisticsBundle`.
    - New impls: `MetricSampleImpl`, `PartitionMetricSampleImpl`,
    `StatisticEntryImpl`,
    `MetricRecordImpl`, `MetricsRepository`, plus `Updater` as the
    entrypoint to calculate and persist
      statistics/metrics.
      - Adds provider wiring:
    - `OptimizerConfig` now includes `STATISTICS_UPDATER_CONFIG` and
    `METRICS_UPDATER_CONFIG`.
    - `ProviderUtils` creates `StatisticsUpdater` and `MetricsUpdater`
    instances.
    - `InstanceLoaderUtils` loads `StatisticsCalculator` by provider name.
    - Removes unused/legacy updater/metrics components and SPI entries that
    are out of scope
      for the new updater flow.
    - Adds and updates unit tests for the updater/statistics logic and keeps
    existing
      recommender code unchanged.
    
      ## Why are the changes needed?
    
    Fixes: #9654
    
    - This change introduces a clear, provider-based updater API to
    calculate statistics and
    persist either raw statistics or derived metrics, with explicit
    entrypoints (`update`,
    `updateAll`) and consistent data models (`MetricSample`,
    `PartitionMetricSample`,
      `TableStatisticsBundle`).
    - It enables pluggable implementations for both statistics calculation
    and storage, which
    is required for supporting multiple sources/backends and batch refresh
    workflows.
    - It removes outdated or incomplete updater/metrics components so the
    new API surface is
      clean and consistent.
    
      ## Does this PR introduce any user-facing change?
      - No.
    
      ## How was this patch tested?
      - `./gradlew :maintenance:optimizer:test -PskipITs`
---
 .../optimizer/api/common/MetricSample.java         |  41 ++
 .../api/common/PartitionMetricSample.java          |  33 ++
 .../api/common/TableStatisticsBundle.java          |  47 +++
 .../optimizer/api/updater/MetricsUpdater.java      |  46 +++
 .../api/updater/StatisticsCalculator.java          |  33 ++
 .../optimizer/api/updater/StatisticsUpdater.java   |  51 +++
 .../SupportsCalculateBulkJobStatistics.java        |  37 ++
 .../SupportsCalculateBulkTableStatistics.java      |  37 ++
 .../updater/SupportsCalculateJobStatistics.java    |  37 ++
 .../updater/SupportsCalculateTableStatistics.java  |  36 ++
 .../optimizer/common/MetricSampleImpl.java         |  43 +++
 .../common/PartitionMetricSampleImpl.java          |  45 +++
 .../optimizer/common/conf/OptimizerConfig.java     |  18 +
 .../optimizer/common/util/InstanceLoaderUtils.java |  66 ++++
 .../optimizer/common/util/ProviderUtils.java       |  10 +
 .../optimizer/common/util/StatisticValueUtils.java | 119 ++++++
 .../optimizer/recommender/Recommender.java         |  23 +-
 .../optimizer/updater/StatisticEntryImpl.java      |  46 +++
 .../maintenance/optimizer/updater/UpdateType.java  |  44 +++
 .../maintenance/optimizer/updater/Updater.java     | 424 +++++++++++++++++++++
 .../updater/metrics/storage/MetricRecord.java      |  29 ++
 .../updater/metrics/storage/MetricRecordImpl.java  |  33 ++
 .../updater/metrics/storage/MetricsRepository.java |  60 +++
 .../optimizer/updater/util/ToStatistic.java        |  38 ++
 .../optimizer/common/util/TestProviderUtils.java   |   2 -
 .../common/util/TestStatisticValueUtils.java       | 172 +++++++++
 .../optimizer/updater/MetricsUpdaterForTest.java   |  86 +++++
 .../updater/StatisticsCalculatorForTest.java       | 101 +++++
 .../updater/StatisticsUpdaterForTest.java          |  91 +++++
 .../maintenance/optimizer/updater/TestUpdater.java | 160 ++++++++
 ...itino.maintenance.optimizer.api.common.Provider |  21 +
 ...ance.optimizer.api.updater.StatisticsCalculator |  18 +
 ...tenance.optimizer.api.updater.StatisticsUpdater |  18 +
 33 files changed, 2051 insertions(+), 14 deletions(-)

diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricSample.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricSample.java
new file mode 100644
index 0000000000..6370eef912
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricSample.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.common;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Represents a single metric sample with a timestamp and associated 
statistic. */
+@DeveloperApi
+public interface MetricSample {
+
+  /**
+   * Metric event time in epoch seconds.
+   *
+   * @return the metric timestamp in epoch seconds
+   */
+  long timestamp();
+
+  /**
+   * The statistic value sampled at this timestamp.
+   *
+   * @return the statistic entry for this sample
+   */
+  StatisticEntry<?> statistic();
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionMetricSample.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionMetricSample.java
new file mode 100644
index 0000000000..7731211c63
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionMetricSample.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.common;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Metric sample associated with a specific partition. */
+@DeveloperApi
+public interface PartitionMetricSample extends MetricSample {
+  /**
+   * Partition identifiers associated with this metric, ordered from outer to 
inner level.
+   *
+   * @return the partition path for this metric sample
+   */
+  PartitionPath partition();
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableStatisticsBundle.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableStatisticsBundle.java
new file mode 100644
index 0000000000..1c718dcf48
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableStatisticsBundle.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.common;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.annotation.DeveloperApi;
+
+/** Container for table-level statistics and partition-level statistics. */
+@DeveloperApi
+public class TableStatisticsBundle {
+  private final List<StatisticEntry<?>> tableStatistics;
+  private final Map<PartitionPath, List<StatisticEntry<?>>> 
partitionStatistics;
+
+  public TableStatisticsBundle(
+      List<StatisticEntry<?>> tableStatistics,
+      Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics) {
+    this.tableStatistics = tableStatistics != null ? 
List.copyOf(tableStatistics) : List.of();
+    this.partitionStatistics =
+        partitionStatistics != null ? Map.copyOf(partitionStatistics) : 
Map.of();
+  }
+
+  public List<StatisticEntry<?>> tableStatistics() {
+    return tableStatistics;
+  }
+
+  public Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics() {
+    return partitionStatistics;
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/MetricsUpdater.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/MetricsUpdater.java
new file mode 100644
index 0000000000..9cd0707e9f
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/MetricsUpdater.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.updater;
+
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.Provider;
+
+/** Represents an updater that can update metrics for a table or job. */
+@DeveloperApi
+public interface MetricsUpdater extends Provider {
+  /**
+   * Persist table metrics.
+   *
+   * @param nameIdentifier catalog/schema/table identifier
+   * @param metrics time-series samples to write
+   */
+  void updateTableMetrics(NameIdentifier nameIdentifier, List<MetricSample> 
metrics);
+
+  /**
+   * Persist job metrics.
+   *
+   * @param nameIdentifier job identifier
+   * @param metrics time-series samples to write
+   */
+  void updateJobMetrics(NameIdentifier nameIdentifier, List<MetricSample> 
metrics);
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsCalculator.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsCalculator.java
new file mode 100644
index 0000000000..e2c433f977
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsCalculator.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.updater;
+
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+
+/** Represents a calculator that can calculate statistics. */
+@DeveloperApi
+public interface StatisticsCalculator {
+  /** Name used for discovery via ServiceLoader. */
+  String name();
+
+  /** Prepare any external resources before first computation. */
+  void initialize(OptimizerEnv optimizerEnv);
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsUpdater.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsUpdater.java
new file mode 100644
index 0000000000..d3b0a59cf3
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsUpdater.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.updater;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.Provider;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+
+/** Represents an updater that can update statistics. */
+@DeveloperApi
+public interface StatisticsUpdater extends Provider {
+  /**
+   * Persist table statistics.
+   *
+   * @param tableIdentifier catalog/schema/table identifier
+   * @param tableStatistics list of table-level statistics
+   */
+  void updateTableStatistics(
+      NameIdentifier tableIdentifier, List<StatisticEntry<?>> tableStatistics);
+
+  /**
+   * Persist partition statistics.
+   *
+   * @param tableIdentifier catalog/schema/table identifier
+   * @param partitionStatistics map of partition identifier (outer to inner) 
to statistic entry
+   */
+  void updatePartitionStatistics(
+      NameIdentifier tableIdentifier,
+      Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics);
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobStatistics.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobStatistics.java
new file mode 100644
index 0000000000..d10aa2cecb
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobStatistics.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.updater;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+
+/** Represents a provider that supports bulk job statistics calculation. */
+@DeveloperApi
+public interface SupportsCalculateBulkJobStatistics extends 
SupportsCalculateJobStatistics {
+  /**
+   * Calculate job-level statistics for all identifiers discoverable by this 
calculator.
+   *
+   * @return map of job identifier to its statistics; empty when none are 
produced
+   */
+  Map<NameIdentifier, List<StatisticEntry<?>>> calculateAllJobStatistics();
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableStatistics.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableStatistics.java
new file mode 100644
index 0000000000..30072fc662
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableStatistics.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.updater;
+
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import 
org.apache.gravitino.maintenance.optimizer.api.common.TableStatisticsBundle;
+
+/** Represents a provider that supports bulk table statistics calculation. */
+@DeveloperApi
+public interface SupportsCalculateBulkTableStatistics extends 
SupportsCalculateTableStatistics {
+  /**
+   * Calculate table-level and partition-level statistics for all identifiers 
discoverable by this
+   * calculator.
+   *
+   * @return map of table identifier to its statistics bundle; empty when none 
are produced
+   */
+  Map<NameIdentifier, TableStatisticsBundle> calculateBulkTableStatistics();
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobStatistics.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobStatistics.java
new file mode 100644
index 0000000000..4a49591138
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobStatistics.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.updater;
+
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+
+/** Represents a provider that supports job statistics. */
+@DeveloperApi
+public interface SupportsCalculateJobStatistics extends StatisticsCalculator {
+  /**
+   * Calculate job-level statistics to be persisted.
+   *
+   * @param jobIdentifier job identifier
+   * @return list of statistics; empty when none are produced
+   */
+  List<StatisticEntry<?>> calculateJobStatistics(NameIdentifier jobIdentifier);
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableStatistics.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableStatistics.java
new file mode 100644
index 0000000000..586af4b5e1
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableStatistics.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.api.updater;
+
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.annotation.DeveloperApi;
+import 
org.apache.gravitino.maintenance.optimizer.api.common.TableStatisticsBundle;
+
+/** Represents a provider that supports table statistics. */
+@DeveloperApi
+public interface SupportsCalculateTableStatistics extends StatisticsCalculator 
{
+  /**
+   * Calculate table-level and partition-level statistics to be persisted.
+   *
+   * @param tableIdentifier catalog/schema/table identifier
+   * @return statistics bundle; contains table statistics and partition 
statistics
+   */
+  TableStatisticsBundle calculateTableStatistics(NameIdentifier 
tableIdentifier);
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/MetricSampleImpl.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/MetricSampleImpl.java
new file mode 100644
index 0000000000..858c1729d9
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/MetricSampleImpl.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.experimental.Accessors;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+
+/** Immutable {@link MetricSample} implementation that binds a statistic to an 
event timestamp. */
+@Accessors(fluent = true)
+@AllArgsConstructor
+@EqualsAndHashCode
+@Getter
+public final class MetricSampleImpl implements MetricSample {
+  private final long timestamp;
+  @NonNull private final StatisticEntry<?> statistic;
+
+  @Override
+  public String toString() {
+    return "{" + timestamp + ": " + statistic + " }";
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionMetricSampleImpl.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionMetricSampleImpl.java
new file mode 100644
index 0000000000..a2c8542e83
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionMetricSampleImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.experimental.Accessors;
+import 
org.apache.gravitino.maintenance.optimizer.api.common.PartitionMetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+
+/** Metric sample tied to a specific partition. */
+@Accessors(fluent = true)
+@AllArgsConstructor
+@EqualsAndHashCode
+@Getter
+public final class PartitionMetricSampleImpl implements PartitionMetricSample {
+  private final long timestamp;
+  @NonNull private final StatisticEntry<?> statistic;
+  @NonNull private final PartitionPath partition;
+
+  @Override
+  public String toString() {
+    return "{" + timestamp + ": " + statistic + ", partition=" + partition + " 
}";
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
index b0890957ac..8207b3716f 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
@@ -53,6 +53,10 @@ public class OptimizerConfig extends Config {
   private static final String TABLE_META_PROVIDER = RECOMMENDER_PREFIX + 
"tableMetaProvider";
   private static final String JOB_SUBMITTER = RECOMMENDER_PREFIX + 
"jobSubmitter";
 
+  private static final String UPDATER_PREFIX = OPTIMIZER_PREFIX + "updater.";
+  private static final String STATISTICS_UPDATER = UPDATER_PREFIX + 
"statisticsUpdater";
+  private static final String METRICS_UPDATER = UPDATER_PREFIX + 
"metricsUpdater";
+
   public static final ConfigEntry<String> STATISTICS_PROVIDER_CONFIG =
       new ConfigBuilder(STATISTICS_PROVIDER)
           .doc(
@@ -97,6 +101,20 @@ public class OptimizerConfig extends Config {
           .stringConf()
           .createWithDefault(NoopJobSubmitter.NAME);
 
+  public static final ConfigEntry<String> STATISTICS_UPDATER_CONFIG =
+      new ConfigBuilder(STATISTICS_UPDATER)
+          .doc("The statistics updater implementation name (matches 
Provider.name()).")
+          .version(ConfigConstants.VERSION_1_2_0)
+          .stringConf()
+          .create();
+
+  public static final ConfigEntry<String> METRICS_UPDATER_CONFIG =
+      new ConfigBuilder(METRICS_UPDATER)
+          .doc("The metrics updater implementation name (matches 
Provider.name()).")
+          .version(ConfigConstants.VERSION_1_2_0)
+          .stringConf()
+          .create();
+
   public static final ConfigEntry<String> GRAVITINO_URI_CONFIG =
       new ConfigBuilder(GRAVITINO_URI)
           .doc("The URI of the Gravitino server.")
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/InstanceLoaderUtils.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/InstanceLoaderUtils.java
new file mode 100644
index 0000000000..2ccdd294f2
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/InstanceLoaderUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Streams;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.stream.Collectors;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator;
+
+public class InstanceLoaderUtils {
+
+  public static <T extends StatisticsCalculator> T 
createStatisticsCalculatorInstance(
+      String calculatorName) {
+    ServiceLoader<StatisticsCalculator> loader = 
ServiceLoader.load(StatisticsCalculator.class);
+    List<Class<? extends T>> providers =
+        Streams.stream(loader.iterator())
+            .filter(p -> p.name().equalsIgnoreCase(calculatorName))
+            .map(p -> (Class<? extends T>) p.getClass())
+            .collect(Collectors.toList());
+
+    if (providers.isEmpty()) {
+      throw new IllegalArgumentException(
+          "No "
+              + StatisticsCalculator.class.getSimpleName()
+              + " class found for: "
+              + calculatorName);
+    } else if (providers.size() > 1) {
+      throw new IllegalArgumentException(
+          "Multiple "
+              + StatisticsCalculator.class.getSimpleName()
+              + " found for: "
+              + calculatorName);
+    } else {
+      Class<? extends T> providerClz = Iterables.getOnlyElement(providers);
+      try {
+        return providerClz.getDeclaredConstructor().newInstance();
+      } catch (Exception e) {
+        throw new RuntimeException(
+            "Failed to instantiate StatisticsCalculator: "
+                + calculatorName
+                + ", class: "
+                + providerClz.getName(),
+            e);
+      }
+    }
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
index 48b422c21e..a60b75257d 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
@@ -29,6 +29,8 @@ import 
org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
 import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StatisticsProvider;
 import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyProvider;
 import 
org.apache.gravitino.maintenance.optimizer.api.recommender.TableMetadataProvider;
+import org.apache.gravitino.maintenance.optimizer.api.updater.MetricsUpdater;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater;
 
 /** Helper for loading optimizer providers via {@link ServiceLoader}. */
 public class ProviderUtils {
@@ -89,4 +91,12 @@ public class ProviderUtils {
   public static JobSubmitter createJobSubmitterInstance(String provider) {
     return createProviderInstance(JobSubmitter.class, provider);
   }
+
+  public static StatisticsUpdater createStatisticsUpdaterInstance(String 
provider) {
+    return createProviderInstance(StatisticsUpdater.class, provider);
+  }
+
+  public static MetricsUpdater createMetricsUpdaterInstance(String provider) {
+    return createProviderInstance(MetricsUpdater.class, provider);
+  }
 }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/StatisticValueUtils.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/StatisticValueUtils.java
new file mode 100644
index 0000000000..970c526fdc
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/StatisticValueUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Optional;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Type.Name;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+
+public class StatisticValueUtils {
+
+  public static Optional<StatisticValue<?>> avg(List<StatisticValue<?>> 
values) {
+    if (values.isEmpty()) {
+      return Optional.empty();
+    }
+    
Preconditions.checkArgument(values.stream().allMatch(StatisticValueUtils::isNumber));
+
+    Optional<StatisticValue<?>> sum = sum(values);
+    if (sum.isEmpty()) {
+      return Optional.empty();
+    }
+    return Optional.of(StatisticValueUtils.div(sum.get(), values.size()));
+  }
+
+  public static Optional<StatisticValue<?>> sum(List<StatisticValue<?>> 
values) {
+    if (values.isEmpty()) {
+      return Optional.empty();
+    }
+    Type type = getValueType(values);
+    Name longName = Types.LongType.get().name();
+    Name doubleName = Types.DoubleType.get().name();
+    if (type.name().equals(longName)) {
+      long longSum = 0L;
+      for (StatisticValue<?> value : values) {
+        longSum += ((Long) value.value()).longValue();
+      }
+      return Optional.of(StatisticValues.longValue(longSum));
+    } else if (type.name().equals(doubleName)) {
+      double doubleSum = 0.0;
+      for (StatisticValue<?> value : values) {
+        doubleSum += ((Number) value.value()).doubleValue();
+      }
+      return Optional.of(StatisticValues.doubleValue(doubleSum));
+    } else {
+      throw new IllegalArgumentException("Unsupported number type: " + 
type.name());
+    }
+  }
+
+  public static StatisticValue<?> div(StatisticValue<?> value, long divisor) {
+    Preconditions.checkArgument(isNumber(value), "Value must be a number");
+    Preconditions.checkArgument(divisor != 0, "Divisor cannot be zero");
+    Type type = value.dataType();
+    Name longName = Types.LongType.get().name();
+    Name doubleName = Types.DoubleType.get().name();
+    if (type.name().equals(longName)) {
+      long longValue = ((Long) value.value()).longValue();
+      return StatisticValues.doubleValue(((double) longValue) / divisor);
+    } else if (type.name().equals(doubleName)) {
+      double doubleValue = ((Number) value.value()).doubleValue();
+      return StatisticValues.doubleValue(doubleValue / divisor);
+    } else {
+      throw new IllegalArgumentException("Unsupported number type: " + 
type.name());
+    }
+  }
+
+  public static String toString(StatisticValue<?> value) {
+    Preconditions.checkArgument(value != null, "StatisticValue cannot be 
null");
+    try {
+      return JsonUtils.anyFieldMapper().writeValueAsString(value);
+    } catch (JsonProcessingException e) {
+      throw new IllegalArgumentException("Failed to serialize StatisticValue: 
" + value, e);
+    }
+  }
+
+  public static StatisticValue<?> fromString(String valueStr) {
+    Preconditions.checkArgument(valueStr != null, "StatisticValue string 
cannot be null");
+    try {
+      return JsonUtils.anyFieldMapper().readValue(valueStr, 
StatisticValue.class);
+    } catch (JsonProcessingException e) {
+      throw new IllegalArgumentException(
+          "Failed to deserialize StatisticValue from: " + valueStr, e);
+    }
+  }
+
+  private static Type getValueType(List<StatisticValue<?>> values) {
+    Type type = values.get(0).dataType();
+    Preconditions.checkArgument(
+        values.stream().allMatch(v -> 
v.dataType().name().equals(type.name())));
+    return type;
+  }
+
+  private static boolean isNumber(StatisticValue<?> value) {
+    Type type = value.dataType();
+    return type == Types.LongType.get() || type == Types.DoubleType.get();
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
index 0602e7f286..a41398463e 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/Recommender.java
@@ -88,23 +88,22 @@ public class Recommender implements AutoCloseable {
    */
   public Recommender(OptimizerEnv optimizerEnv) {
     OptimizerConfig config = optimizerEnv.config();
-    StrategyProvider strategyProvider = loadStrategyProvider(config);
-    StatisticsProvider statisticsProvider = loadStatisticsProvider(config);
-    TableMetadataProvider tableMetadataProvider = 
loadTableMetadataProvider(config);
-    JobSubmitter jobSubmitter = loadJobSubmitter(config);
-
     this.optimizerEnv = optimizerEnv;
-    this.strategyProvider = strategyProvider;
-    this.statisticsProvider = statisticsProvider;
-    this.tableMetadataProvider = tableMetadataProvider;
-    this.jobSubmitter = jobSubmitter;
-
+    this.strategyProvider = loadStrategyProvider(config);
     this.strategyProvider.initialize(optimizerEnv);
+    closeableGroup.register(strategyProvider, 
StrategyProvider.class.getSimpleName());
+
+    this.statisticsProvider = loadStatisticsProvider(config);
     this.statisticsProvider.initialize(optimizerEnv);
+    closeableGroup.register(statisticsProvider, 
StatisticsProvider.class.getSimpleName());
+
+    this.tableMetadataProvider = loadTableMetadataProvider(config);
     this.tableMetadataProvider.initialize(optimizerEnv);
-    this.jobSubmitter.initialize(optimizerEnv);
+    closeableGroup.register(tableMetadataProvider, 
TableMetadataProvider.class.getSimpleName());
 
-    addToCloseableGroup();
+    this.jobSubmitter = loadJobSubmitter(config);
+    this.jobSubmitter.initialize(optimizerEnv);
+    closeableGroup.register(jobSubmitter, JobSubmitter.class.getSimpleName());
   }
 
   @VisibleForTesting
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticEntryImpl.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticEntryImpl.java
new file mode 100644
index 0000000000..d042a94abf
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticEntryImpl.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.experimental.Accessors;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.stats.StatisticValue;
+
+/**
+ * Immutable {@link StatisticEntry} implementation used by the updater runtime.
+ *
+ * @param <T> underlying value type
+ */
+@Accessors(fluent = true)
+@AllArgsConstructor
+@EqualsAndHashCode
+@Getter
+public final class StatisticEntryImpl<T> implements StatisticEntry<T> {
+  private final String name;
+  private final StatisticValue<T> value;
+
+  @Override
+  public String toString() {
+    return "{ " + name + " : " + value.value() + '}';
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/UpdateType.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/UpdateType.java
new file mode 100644
index 0000000000..55572fa9a9
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/UpdateType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater;
+
+/** Supported updater actions. */
+public enum UpdateType {
+  /** Persist metrics derived from statistics. */
+  METRICS,
+  /** Persist statistics directly into the catalog. */
+  STATISTICS;
+
+  /**
+   * Parses a case-insensitive update type value.
+   *
+   * @param value string representation such as "statistics" or "metrics"
+   * @return matching {@link UpdateType}
+   */
+  public static UpdateType fromString(String value) {
+    for (UpdateType m : UpdateType.values()) {
+      if (m.name().equalsIgnoreCase(value)) {
+        return m;
+      }
+    }
+    throw new IllegalArgumentException(
+        "Invalid update type: " + value + ". Allowed values: statistics, 
metrics");
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/Updater.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/Updater.java
new file mode 100644
index 0000000000..3acbe2b188
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/Updater.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import 
org.apache.gravitino.maintenance.optimizer.api.common.TableStatisticsBundle;
+import org.apache.gravitino.maintenance.optimizer.api.updater.MetricsUpdater;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateBulkJobStatistics;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateBulkTableStatistics;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateJobStatistics;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateTableStatistics;
+import org.apache.gravitino.maintenance.optimizer.common.CloseableGroup;
+import org.apache.gravitino.maintenance.optimizer.common.MetricSampleImpl;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import 
org.apache.gravitino.maintenance.optimizer.common.PartitionMetricSampleImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.common.util.InstanceLoaderUtils;
+import org.apache.gravitino.maintenance.optimizer.common.util.ProviderUtils;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Entry point that wires together the statistics calculator and updater 
providers to persist
+ * optimizer statistics or metrics.
+ *
+ * <p>Usage:
+ *
+ * <ol>
+ *   <li>Configure {@link OptimizerConfig#STATISTICS_UPDATER_CONFIG} and {@link
+ *       OptimizerConfig#METRICS_UPDATER_CONFIG} with provider names.
+ *   <li>Instantiate {@link Updater} with an {@link OptimizerEnv} to 
initialize the providers.
+ *   <li>Call {@link #update(String, List, UpdateType)} for specific 
identifiers or {@link
+ *       #updateAll(String, UpdateType)} for bulk refresh.
+ *   <li>Call {@link #close()} to release provider resources.
+ * </ol>
+ */
+public class Updater implements AutoCloseable {
+  private static final Logger LOG = LoggerFactory.getLogger(Updater.class);
+
+  private StatisticsUpdater statisticsUpdater;
+  private MetricsUpdater metricsUpdater;
+  private OptimizerEnv optimizerEnv;
+  private final CloseableGroup closeableGroup = new CloseableGroup();
+
+  public Updater(OptimizerEnv optimizerEnv) {
+    this.optimizerEnv = optimizerEnv;
+    this.statisticsUpdater = loadStatisticsUpdater(optimizerEnv.config());
+    statisticsUpdater.initialize(optimizerEnv);
+    closeableGroup.register(statisticsUpdater, 
StatisticsUpdater.class.getSimpleName());
+
+    this.metricsUpdater = loadMetricsUpdater(optimizerEnv.config());
+    metricsUpdater.initialize(optimizerEnv);
+    closeableGroup.register(metricsUpdater, 
MetricsUpdater.class.getSimpleName());
+  }
+
+  /**
+   * Updates statistics or metrics for the provided identifiers.
+   *
+   * <p>This is the main entry point for updating a bounded set of targets. 
The updater resolves the
+   * {@link StatisticsCalculator} by name, calculates table and partition 
statistics, and persists
+   * either raw statistics or derived metrics based on {@code updateType}. If 
the calculator
+   * implements {@link SupportsCalculateJobStatistics} and {@code updateType} 
is {@link
+   * UpdateType#METRICS}, job metrics are also emitted.
+   *
+   * @param statisticsCalculatorName The provider name of the statistics 
calculator.
+   * @param nameIdentifiers The identifiers to update (table and/or job).
+   * @param updateType The target update type: statistics or metrics.
+   */
+  public void update(
+      String statisticsCalculatorName,
+      List<NameIdentifier> nameIdentifiers,
+      UpdateType updateType) {
+    StatisticsCalculator calculator = 
getStatisticsCalculator(statisticsCalculatorName);
+    long tableRecords = 0;
+    long partitionRecords = 0;
+    long jobRecords = 0;
+    for (NameIdentifier nameIdentifier : nameIdentifiers) {
+      if (calculator instanceof SupportsCalculateTableStatistics) {
+        SupportsCalculateTableStatistics supportTableStatistics =
+            ((SupportsCalculateTableStatistics) calculator);
+        TableStatisticsBundle bundle =
+            supportTableStatistics.calculateTableStatistics(nameIdentifier);
+        List<StatisticEntry<?>> statistics = bundle != null ? 
bundle.tableStatistics() : List.of();
+        Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics =
+            bundle != null ? bundle.partitionStatistics() : Map.of();
+        tableRecords += countStatistics(statistics);
+        partitionRecords += countPartitionStatistics(partitionStatistics);
+        LOG.info(
+            "Updating table statistics/metrics: calculator={}, updateType={}, 
identifier={}",
+            statisticsCalculatorName,
+            updateType,
+            nameIdentifier);
+        updateTable(statistics, nameIdentifier, updateType);
+        updatePartition(partitionStatistics, nameIdentifier, updateType);
+      }
+      if (calculator instanceof SupportsCalculateJobStatistics
+          && UpdateType.METRICS.equals(updateType)) {
+        SupportsCalculateJobStatistics supportJobStatistics =
+            ((SupportsCalculateJobStatistics) calculator);
+        List<StatisticEntry<?>> statistics =
+            supportJobStatistics.calculateJobStatistics(nameIdentifier);
+        jobRecords += countStatistics(statistics);
+        LOG.info(
+            "Updating job metrics: calculator={}, identifier={}",
+            statisticsCalculatorName,
+            nameIdentifier);
+        updateJob(statistics, nameIdentifier);
+      }
+    }
+    System.out.println(
+        String.format(
+            "SUMMARY: %s totalRecords=%d tableRecords=%d partitionRecords=%d 
jobRecords=%d",
+            updateType.name().toLowerCase(Locale.ROOT),
+            tableRecords + partitionRecords + jobRecords,
+            tableRecords,
+            partitionRecords,
+            jobRecords));
+  }
+
+  /**
+   * Updates statistics or metrics for all identifiers returned by the 
calculator.
+   *
+   * <p>This is the main entry point for batch refreshes. The updater asks the 
{@link
+   * StatisticsCalculator} for all table statistics (and optionally job 
statistics) and persists
+   * them according to {@code updateType}. If the calculator implements {@link
+   * SupportsCalculateBulkJobStatistics} and {@code updateType} is {@link 
UpdateType#METRICS}, job
+   * metrics are also emitted.
+   *
+   * @param statisticsCalculatorName The provider name of the statistics 
calculator.
+   * @param updateType The target update type: statistics or metrics.
+   */
+  public void updateAll(String statisticsCalculatorName, UpdateType 
updateType) {
+    StatisticsCalculator calculator = 
getStatisticsCalculator(statisticsCalculatorName);
+    long tableRecords = 0;
+    long partitionRecords = 0;
+    long jobRecords = 0;
+
+    if (calculator instanceof SupportsCalculateBulkTableStatistics 
supportBulkTableStatistics) {
+      Map<NameIdentifier, TableStatisticsBundle> allTableStatistics =
+          supportBulkTableStatistics.calculateBulkTableStatistics();
+      if (allTableStatistics == null) {
+        allTableStatistics = Map.of();
+      }
+
+      tableRecords += countAllTableStatistics(allTableStatistics);
+      partitionRecords += countAllPartitionStatistics(allTableStatistics);
+      allTableStatistics.forEach(
+          (identifier, bundle) -> {
+            List<StatisticEntry<?>> statistics =
+                bundle != null ? bundle.tableStatistics() : List.of();
+            Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics =
+                bundle != null ? bundle.partitionStatistics() : Map.of();
+            updateTable(statistics, identifier, updateType);
+            updatePartition(partitionStatistics, identifier, updateType);
+          });
+    }
+
+    if (calculator instanceof SupportsCalculateBulkJobStatistics 
supportJobStatistics
+        && UpdateType.METRICS.equals(updateType)) {
+      Map<NameIdentifier, List<StatisticEntry<?>>> allJobStatistics =
+          supportJobStatistics.calculateAllJobStatistics();
+      if (allJobStatistics == null) {
+        allJobStatistics = Map.of();
+      }
+      jobRecords += countAllStatistics(allJobStatistics);
+      allJobStatistics.forEach((identifier, statistics) -> 
updateJob(statistics, identifier));
+    }
+    System.out.println(
+        String.format(
+            "SUMMARY: %s totalRecords=%d tableRecords=%d partitionRecords=%d 
jobRecords=%d",
+            updateType.name().toLowerCase(Locale.ROOT),
+            tableRecords + partitionRecords + jobRecords,
+            tableRecords,
+            partitionRecords,
+            jobRecords));
+  }
+
+  @VisibleForTesting
+  public MetricsUpdater getMetricsUpdater() {
+    return metricsUpdater;
+  }
+
+  @Override
+  public void close() throws Exception {
+    closeableGroup.close();
+  }
+
+  private void updateTable(
+      List<StatisticEntry<?>> statistics, NameIdentifier tableIdentifier, 
UpdateType updateType) {
+    switch (updateType) {
+      case STATISTICS:
+        updateTableStatistics(statistics, tableIdentifier);
+        break;
+      case METRICS:
+        updateTableMetrics(statistics, tableIdentifier);
+        break;
+    }
+  }
+
+  private void updatePartition(
+      Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics,
+      NameIdentifier tableIdentifier,
+      UpdateType updateType) {
+    switch (updateType) {
+      case STATISTICS:
+        updatePartitionStatistics(partitionStatistics, tableIdentifier);
+        break;
+      case METRICS:
+        updatePartitionMetrics(partitionStatistics, tableIdentifier);
+        break;
+    }
+  }
+
+  private void updateTableStatistics(
+      List<StatisticEntry<?>> statistics, NameIdentifier tableIdentifier) {
+    LOG.info(
+        "Persisting table statistics: identifier={}, count={}, details={}",
+        tableIdentifier,
+        statistics != null ? statistics.size() : 0,
+        summarize(statistics));
+    statisticsUpdater.updateTableStatistics(tableIdentifier, statistics);
+  }
+
+  private void updatePartitionStatistics(
+      Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics,
+      NameIdentifier tableIdentifier) {
+    if (partitionStatistics == null || partitionStatistics.isEmpty()) {
+      LOG.info(
+          "Persist partition statistics skipped: identifier={}, reason=empty 
partitions",
+          tableIdentifier);
+      return;
+    }
+    LOG.info(
+        "Persisting partition statistics: identifier={}, partitions={}, 
names={}, sample={}",
+        tableIdentifier,
+        partitionStatistics.size(),
+        partitionNames(partitionStatistics),
+        
summarize(partitionStatistics.values().stream().flatMap(Collection::stream).toList()));
+    statisticsUpdater.updatePartitionStatistics(tableIdentifier, 
partitionStatistics);
+  }
+
+  private void updateTableMetrics(
+      List<StatisticEntry<?>> statistics, NameIdentifier tableIdentifier) {
+    long timestampSeconds = System.currentTimeMillis() / 1000;
+    LOG.info(
+        "Persisting table metrics: identifier={}, count={}, details={}",
+        tableIdentifier,
+        statistics != null ? statistics.size() : 0,
+        summarize(statistics));
+    metricsUpdater.updateTableMetrics(tableIdentifier, toMetrics(statistics, 
timestampSeconds));
+  }
+
+  private void updatePartitionMetrics(
+      Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics,
+      NameIdentifier tableIdentifier) {
+    if (partitionStatistics == null || partitionStatistics.isEmpty()) {
+      LOG.info(
+          "Persist partition metrics skipped: identifier={}, reason=empty 
partitions",
+          tableIdentifier);
+      return;
+    }
+    long timestampSeconds = System.currentTimeMillis() / 1000;
+    List<MetricSample> partitionMetrics = 
toPartitionMetrics(partitionStatistics, timestampSeconds);
+    LOG.info(
+        "Persisting partition metrics: identifier={}, partitions={}, names={}, 
details={}",
+        tableIdentifier,
+        partitionStatistics.size(),
+        partitionNames(partitionStatistics),
+        
summarize(partitionStatistics.values().stream().flatMap(Collection::stream).toList()));
+    metricsUpdater.updateTableMetrics(tableIdentifier, partitionMetrics);
+  }
+
+  private void updateJob(List<StatisticEntry<?>> statistics, NameIdentifier 
jobIdentifier) {
+    long timestampSeconds = System.currentTimeMillis() / 1000;
+
+    LOG.info(
+        "Persisting job metrics: identifier={}, count={}, details={}",
+        jobIdentifier,
+        statistics != null ? statistics.size() : 0,
+        summarize(statistics));
+    metricsUpdater.updateJobMetrics(jobIdentifier, toMetrics(statistics, 
timestampSeconds));
+  }
+
+  private String summarize(List<StatisticEntry<?>> statistics) {
+    if (statistics == null || statistics.isEmpty()) {
+      return "[]";
+    }
+    int limit = Math.min(statistics.size(), 20);
+    String summary =
+        statistics.stream()
+            .limit(limit)
+            .map(stat -> stat.name() + "=" + stat.value().value())
+            .collect(Collectors.joining(", ", "[", "]"));
+    if (statistics.size() > limit) {
+      summary = summary + " ... (" + statistics.size() + " total)";
+    }
+    return summary;
+  }
+
+  private List<MetricSample> toMetrics(List<StatisticEntry<?>> statistics, 
long timestamp) {
+    List<MetricSample> metrics = new ArrayList<>();
+    if (statistics != null) {
+      statistics.forEach(stat -> metrics.add(new MetricSampleImpl(timestamp, 
stat)));
+    }
+    return metrics;
+  }
+
+  private List<MetricSample> toPartitionMetrics(
+      Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics, long 
timestamp) {
+    List<MetricSample> metrics = new ArrayList<>();
+    if (partitionStatistics != null) {
+      partitionStatistics.forEach(
+          (partitionPath, statisticEntries) ->
+              statisticEntries.forEach(
+                  stat ->
+                      metrics.add(new PartitionMetricSampleImpl(timestamp, 
stat, partitionPath))));
+    }
+    return metrics;
+  }
+
+  private StatisticsCalculator getStatisticsCalculator(String 
statisticsCalculatorName) {
+    StatisticsCalculator calculator =
+        
InstanceLoaderUtils.createStatisticsCalculatorInstance(statisticsCalculatorName);
+    calculator.initialize(optimizerEnv);
+    return calculator;
+  }
+
+  private StatisticsUpdater loadStatisticsUpdater(OptimizerConfig config) {
+    String statisticsUpdaterName = 
config.get(OptimizerConfig.STATISTICS_UPDATER_CONFIG);
+    if (statisticsUpdaterName == null || statisticsUpdaterName.isBlank()) {
+      throw new IllegalArgumentException(
+          "Statistics updater provider name is required. Set "
+              + OptimizerConfig.STATISTICS_UPDATER_CONFIG.getKey()
+              + " to a valid provider name.");
+    }
+    return 
ProviderUtils.createStatisticsUpdaterInstance(statisticsUpdaterName);
+  }
+
+  private String partitionNames(Map<PartitionPath, List<StatisticEntry<?>>> 
partitionStatistics) {
+    return partitionStatistics.keySet().stream()
+        .map(PartitionUtils::encodePartitionPath)
+        .collect(Collectors.joining(", ", "[", "]"));
+  }
+
+  private MetricsUpdater loadMetricsUpdater(OptimizerConfig config) {
+    String metricsUpdaterName = 
config.get(OptimizerConfig.METRICS_UPDATER_CONFIG);
+    if (metricsUpdaterName == null || metricsUpdaterName.isBlank()) {
+      throw new IllegalArgumentException(
+          "Metrics updater provider name is required. Set "
+              + OptimizerConfig.METRICS_UPDATER_CONFIG.getKey()
+              + " to a valid provider name.");
+    }
+    return ProviderUtils.createMetricsUpdaterInstance(metricsUpdaterName);
+  }
+
+  private long countStatistics(List<StatisticEntry<?>> statistics) {
+    return statistics == null ? 0 : statistics.size();
+  }
+
+  private long countPartitionStatistics(
+      Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics) {
+    if (partitionStatistics == null) {
+      return 0;
+    }
+    return 
partitionStatistics.values().stream().mapToLong(this::countStatistics).sum();
+  }
+
+  private long countAllStatistics(Map<NameIdentifier, List<StatisticEntry<?>>> 
statisticsByTable) {
+    if (statisticsByTable == null) {
+      return 0;
+    }
+    return 
statisticsByTable.values().stream().mapToLong(this::countStatistics).sum();
+  }
+
+  private long countAllTableStatistics(
+      Map<NameIdentifier, TableStatisticsBundle> statisticsByTable) {
+    if (statisticsByTable == null) {
+      return 0;
+    }
+    return statisticsByTable.values().stream()
+        .mapToLong(bundle -> countStatistics(bundle.tableStatistics()))
+        .sum();
+  }
+
+  private long countAllPartitionStatistics(
+      Map<NameIdentifier, TableStatisticsBundle> partitionStatisticsByTable) {
+    if (partitionStatisticsByTable == null) {
+      return 0;
+    }
+    return partitionStatisticsByTable.values().stream()
+        .mapToLong(bundle -> 
countPartitionStatistics(bundle.partitionStatistics()))
+        .sum();
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricRecord.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricRecord.java
new file mode 100644
index 0000000000..84da12fdcb
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricRecord.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater.metrics.storage;
+
+/** Serializable metric record used by {@link MetricsRepository}. */
+public interface MetricRecord {
+  /** Timestamp in epoch seconds. */
+  long getTimestamp();
+
+  /** Encoded metric value. */
+  String getValue();
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricRecordImpl.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricRecordImpl.java
new file mode 100644
index 0000000000..4d5cda98a0
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricRecordImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater.metrics.storage;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+/** Immutable {@link MetricRecord} implementation backed by timestamp and 
encoded value. */
+@AllArgsConstructor
+@EqualsAndHashCode
+@Getter
+public final class MetricRecordImpl implements MetricRecord {
+  private final long timestamp;
+  private final String value;
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsRepository.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsRepository.java
new file mode 100644
index 0000000000..c921164bf4
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsRepository.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater.metrics.storage;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.NameIdentifier;
+
+/** SPI for persisting metrics produced by the optimizer updater. */
+public interface MetricsRepository extends AutoCloseable {
+
+  /** Initialize the storage backend with configuration properties. */
+  void initialize(Map<String, String> properties);
+
+  /** Persist a table metric (optionally scoped to a partition). */
+  void storeTableMetric(
+      NameIdentifier nameIdentifier,
+      String metricName,
+      Optional<String> partition,
+      MetricRecord metric);
+
+  /** Load table-level metrics within a time window [fromTimestamp, 
toTimestamp). */
+  Map<String, List<MetricRecord>> getTableMetrics(
+      NameIdentifier nameIdentifier, long fromTimestamp, long toTimestamp);
+
+  /** Load partition-level metrics within a time window [fromTimestamp, 
toTimestamp). */
+  Map<String, List<MetricRecord>> getPartitionMetrics(
+      NameIdentifier nameIdentifier, String partition, long fromTimestamp, 
long toTimestamp);
+
+  /** Delete table metrics older than the supplied timestamp (epoch seconds), 
exclusive. */
+  int cleanupTableMetricsBefore(long timestamp);
+
+  /** Persist a job metric. */
+  void storeJobMetric(NameIdentifier nameIdentifier, String metricName, 
MetricRecord metric);
+
+  /** Load job metrics within a time window [fromTimestamp, toTimestamp). */
+  Map<String, List<MetricRecord>> getJobMetrics(
+      NameIdentifier nameIdentifier, long fromTimestamp, long toTimestamp);
+
+  /** Delete job metrics older than the supplied timestamp (epoch seconds), 
exclusive. */
+  int cleanupJobMetricsBefore(long timestamp);
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/util/ToStatistic.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/util/ToStatistic.java
new file mode 100644
index 0000000000..4a13628a93
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/util/ToStatistic.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater.util;
+
+import java.util.List;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+
+/**
+ * Represents an object that can be converted into statistics entries.
+ *
+ * <p>This interface is intended for updater components that need to expose 
their statistics in a
+ * common format.
+ */
+public interface ToStatistic {
+  /**
+   * Converts this object into a list of statistics entries.
+   *
+   * @return list of statistics entries; empty when no statistics are produced
+   */
+  List<StatisticEntry<?>> toStatistic();
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
index 2855194509..bc90dd9e35 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
@@ -68,6 +68,4 @@ public class TestProviderUtils {
     Assertions.assertNotNull(tableMetadataProvider);
     Assertions.assertTrue(tableMetadataProvider instanceof 
GravitinoTableMetadataProvider);
   }
-
-  // Updater/monitor providers removed for recommender-only scope.
 }
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestStatisticValueUtils.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestStatisticValueUtils.java
new file mode 100644
index 0000000000..574cb8768d
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestStatisticValueUtils.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.common.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+class TestStatisticValueUtils {
+
+  @Test
+  void avgReturnsEmptyForEmptyList() {
+    Assertions.assertTrue(StatisticValueUtils.avg(List.of()).isEmpty());
+  }
+
+  @Test
+  void avgCalculatesCorrectAverageForLongValues() {
+    List<StatisticValue<?>> values =
+        List.of(
+            StatisticValues.longValue(10L),
+            StatisticValues.longValue(20L),
+            StatisticValues.longValue(30L));
+    StatisticValue<?> result = StatisticValueUtils.avg(values).orElseThrow();
+    Assertions.assertEquals(20.0, result.value());
+  }
+
+  @Test
+  void avgCalculatesCorrectAverageForDoubleValues() {
+    List<StatisticValue<?>> values =
+        List.of(
+            StatisticValues.doubleValue(10.5),
+            StatisticValues.doubleValue(20.5),
+            StatisticValues.doubleValue(30.5));
+    StatisticValue<?> result = StatisticValueUtils.avg(values).orElseThrow();
+    Assertions.assertEquals(20.5, result.value());
+  }
+
+  @Test
+  void sumReturnsEmptyForEmptyList() {
+    Assertions.assertTrue(StatisticValueUtils.sum(List.of()).isEmpty());
+  }
+
+  @Test
+  void sumCalculatesCorrectSumForLongValues() {
+    List<StatisticValue<?>> values =
+        List.of(
+            StatisticValues.longValue(10L),
+            StatisticValues.longValue(20L),
+            StatisticValues.longValue(30L));
+    StatisticValue<?> result = StatisticValueUtils.sum(values).orElseThrow();
+    Assertions.assertEquals(60L, result.value());
+  }
+
+  @Test
+  void sumCalculatesCorrectSumForDoubleValues() {
+    List<StatisticValue<?>> values =
+        List.of(
+            StatisticValues.doubleValue(10.5),
+            StatisticValues.doubleValue(20.5),
+            StatisticValues.doubleValue(30.5));
+    StatisticValue<?> result = StatisticValueUtils.sum(values).orElseThrow();
+    Assertions.assertEquals(61.5, result.value());
+  }
+
+  @Test
+  void divThrowsExceptionForZeroDivisor() {
+    StatisticValue<?> value = StatisticValues.longValue(10L);
+    IllegalArgumentException exception =
+        Assertions.assertThrowsExactly(
+            IllegalArgumentException.class, () -> 
StatisticValueUtils.div(value, 0));
+    Assertions.assertEquals("Divisor cannot be zero", exception.getMessage());
+  }
+
+  @Test
+  void divCalculatesCorrectDivisionForLongValues() {
+    StatisticValue<?> value = StatisticValues.longValue(10L);
+    StatisticValue<?> result = StatisticValueUtils.div(value, 2);
+    Assertions.assertEquals(5.0, result.value());
+  }
+
+  @Test
+  void divCalculatesCorrectDivisionForDoubleValues() {
+    StatisticValue<?> value = StatisticValues.doubleValue(10.5);
+    StatisticValue<?> result = StatisticValueUtils.div(value, 2);
+    Assertions.assertEquals(5.25, result.value());
+  }
+
+  @Test
+  void avgThrowsExceptionForNonNumberValues() {
+    List<StatisticValue<?>> values =
+        List.of(
+            StatisticValues.longValue(10L),
+            StatisticValues.doubleValue(20.5),
+            StatisticValues.stringValue("invalid"));
+    Assertions.assertThrowsExactly(
+        IllegalArgumentException.class, () -> StatisticValueUtils.avg(values));
+  }
+
+  @Test
+  void testSerDeserEquals() {
+    StatisticValue<?> value = StatisticValues.longValue(100L);
+    String result = StatisticValueUtils.toString(value);
+    StatisticValue<?> deserializeValue = 
StatisticValueUtils.fromString(result);
+    assertEquals(value, deserializeValue);
+
+    StatisticValue<?> doubleValue = StatisticValues.doubleValue(100.5);
+    result = StatisticValueUtils.toString(doubleValue);
+    deserializeValue = StatisticValueUtils.fromString(result);
+    assertEquals(doubleValue, deserializeValue);
+  }
+
+  @Test
+  void toStringSerializesStatisticValueCorrectly() {
+    StatisticValue<?> value = StatisticValues.longValue(100L);
+    String result = StatisticValueUtils.toString(value);
+    assertEquals("100", result);
+
+    value = StatisticValues.doubleValue(100.5);
+    result = StatisticValueUtils.toString(value);
+    assertEquals("100.5", result);
+  }
+
+  @Test
+  void fromStringDeserializesStatisticValueCorrectly() {
+    String valueStr = "100";
+    StatisticValue<?> result = StatisticValueUtils.fromString(valueStr);
+    assertTrue(result instanceof StatisticValues.LongValue);
+    assertEquals(100L, ((StatisticValues.LongValue) result).value());
+
+    String doubleValueStr = "100.5";
+    StatisticValue<?> doubleResult = 
StatisticValueUtils.fromString(doubleValueStr);
+    assertTrue(doubleResult instanceof StatisticValues.DoubleValue);
+    assertEquals(100.5, ((StatisticValues.DoubleValue) doubleResult).value());
+  }
+
+  @Test
+  void toStringThrowsExceptionForNullValue() {
+    IllegalArgumentException exception =
+        assertThrows(IllegalArgumentException.class, () -> 
StatisticValueUtils.toString(null));
+    assertEquals("StatisticValue cannot be null", exception.getMessage());
+  }
+
+  @Test
+  void fromStringThrowsExceptionForNullString() {
+    IllegalArgumentException exception =
+        assertThrows(IllegalArgumentException.class, () -> 
StatisticValueUtils.fromString(null));
+    assertEquals("StatisticValue string cannot be null", 
exception.getMessage());
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/MetricsUpdaterForTest.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/MetricsUpdaterForTest.java
new file mode 100644
index 0000000000..5f02504306
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/MetricsUpdaterForTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.updater.MetricsUpdater;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+
+public class MetricsUpdaterForTest implements MetricsUpdater {
+
+  public static final String NAME = "test-metrics-updater";
+  private static final List<MetricsUpdaterForTest> INSTANCES =
+      Collections.synchronizedList(new ArrayList<>());
+  private final AtomicInteger tableUpdates = new AtomicInteger();
+  private final AtomicInteger jobUpdates = new AtomicInteger();
+  private final AtomicInteger closeCalls = new AtomicInteger();
+
+  public MetricsUpdaterForTest() {
+    INSTANCES.add(this);
+  }
+
+  public static List<MetricsUpdaterForTest> instances() {
+    return new ArrayList<>(INSTANCES);
+  }
+
+  public static void reset() {
+    INSTANCES.clear();
+  }
+
+  public int tableUpdates() {
+    return tableUpdates.get();
+  }
+
+  public int jobUpdates() {
+    return jobUpdates.get();
+  }
+
+  public int closeCalls() {
+    return closeCalls.get();
+  }
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public void initialize(OptimizerEnv optimizerEnv) {}
+
+  @Override
+  public void updateTableMetrics(NameIdentifier nameIdentifier, 
List<MetricSample> metrics) {
+    tableUpdates.incrementAndGet();
+  }
+
+  @Override
+  public void updateJobMetrics(NameIdentifier nameIdentifier, 
List<MetricSample> metrics) {
+    jobUpdates.incrementAndGet();
+  }
+
+  @Override
+  public void close() {
+    closeCalls.incrementAndGet();
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticsCalculatorForTest.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticsCalculatorForTest.java
new file mode 100644
index 0000000000..3736deff07
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticsCalculatorForTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import 
org.apache.gravitino.maintenance.optimizer.api.common.TableStatisticsBundle;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateBulkJobStatistics;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateBulkTableStatistics;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.stats.StatisticValue;
+import org.apache.gravitino.stats.StatisticValues;
+
+public class StatisticsCalculatorForTest
+    implements SupportsCalculateBulkTableStatistics, 
SupportsCalculateBulkJobStatistics {
+
+  public static final String NAME = "test-statistics-calculator";
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public void initialize(OptimizerEnv optimizerEnv) {}
+
+  @Override
+  public TableStatisticsBundle calculateTableStatistics(NameIdentifier 
tableIdentifier) {
+    List<StatisticEntry<?>> tableStatistics = List.of(entry("row_count", 10L));
+    Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics =
+        Map.of(
+            PartitionPath.of(List.of(new TestPartitionEntry("p1", "v1"))),
+            List.of(entry("row_count", 3L)),
+            PartitionPath.of(List.of(new TestPartitionEntry("p2", "v2"))),
+            List.of(entry("row_count", 7L)));
+    return new TableStatisticsBundle(tableStatistics, partitionStatistics);
+  }
+
+  @Override
+  public List<StatisticEntry<?>> calculateJobStatistics(NameIdentifier 
jobIdentifier) {
+    return List.of(entry("output_rows", 5L));
+  }
+
+  @Override
+  public Map<NameIdentifier, TableStatisticsBundle> 
calculateBulkTableStatistics() {
+    NameIdentifier identifier = NameIdentifier.of("catalog", "schema", 
"table");
+    return Map.of(identifier, calculateTableStatistics(identifier));
+  }
+
+  @Override
+  public Map<NameIdentifier, List<StatisticEntry<?>>> 
calculateAllJobStatistics() {
+    NameIdentifier identifier = NameIdentifier.of("job", "sample");
+    return Map.of(identifier, calculateJobStatistics(identifier));
+  }
+
+  private static StatisticEntry<?> entry(String name, long value) {
+    StatisticValue statisticValue = StatisticValues.longValue(value);
+    return new StatisticEntryImpl(name, statisticValue);
+  }
+
+  private static final class TestPartitionEntry implements PartitionEntry {
+    private final String name;
+    private final String value;
+
+    private TestPartitionEntry(String name, String value) {
+      this.name = name;
+      this.value = value;
+    }
+
+    @Override
+    public String partitionName() {
+      return name;
+    }
+
+    @Override
+    public String partitionValue() {
+      return value;
+    }
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticsUpdaterForTest.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticsUpdaterForTest.java
new file mode 100644
index 0000000000..a2e19a3d15
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/StatisticsUpdaterForTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+
+public class StatisticsUpdaterForTest implements StatisticsUpdater {
+
+  public static final String NAME = "test-statistics-updater";
+  private static final List<StatisticsUpdaterForTest> INSTANCES =
+      Collections.synchronizedList(new ArrayList<>());
+  private final AtomicInteger tableUpdates = new AtomicInteger();
+  private final AtomicInteger partitionUpdates = new AtomicInteger();
+  private final AtomicInteger closeCalls = new AtomicInteger();
+
+  public StatisticsUpdaterForTest() {
+    INSTANCES.add(this);
+  }
+
+  public static List<StatisticsUpdaterForTest> instances() {
+    return new ArrayList<>(INSTANCES);
+  }
+
+  public static void reset() {
+    INSTANCES.clear();
+  }
+
+  public int tableUpdates() {
+    return tableUpdates.get();
+  }
+
+  public int partitionUpdates() {
+    return partitionUpdates.get();
+  }
+
+  public int closeCalls() {
+    return closeCalls.get();
+  }
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public void initialize(OptimizerEnv optimizerEnv) {}
+
+  @Override
+  public void updateTableStatistics(
+      NameIdentifier tableIdentifier, List<StatisticEntry<?>> tableStatistics) 
{
+    tableUpdates.incrementAndGet();
+  }
+
+  @Override
+  public void updatePartitionStatistics(
+      NameIdentifier tableIdentifier,
+      Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics) {
+    partitionUpdates.incrementAndGet();
+  }
+
+  @Override
+  public void close() {
+    closeCalls.incrementAndGet();
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/TestUpdater.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/TestUpdater.java
new file mode 100644
index 0000000000..f7eb63b8c8
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/TestUpdater.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.updater;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.Comparator;
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestUpdater {
+
+  @AfterEach
+  void tearDown() {
+    StatisticsUpdaterForTest.reset();
+    MetricsUpdaterForTest.reset();
+  }
+
+  @Test
+  void testUpdateStatistics() {
+    OptimizerEnv optimizerEnv =
+        mockOptimizerEnv(StatisticsUpdaterForTest.NAME, 
MetricsUpdaterForTest.NAME);
+    NameIdentifier identifier = NameIdentifier.of("catalog", "schema", 
"table");
+
+    createUpdater(optimizerEnv)
+        .update(StatisticsCalculatorForTest.NAME, List.of(identifier), 
UpdateType.STATISTICS);
+
+    StatisticsUpdaterForTest statisticsUpdater = selectStatisticsUpdater();
+    MetricsUpdaterForTest metricsUpdater = selectMetricsUpdater();
+    assertNotNull(statisticsUpdater);
+    assertNotNull(metricsUpdater);
+    assertEquals(1, statisticsUpdater.tableUpdates());
+    assertEquals(1, statisticsUpdater.partitionUpdates());
+    assertEquals(0, metricsUpdater.tableUpdates());
+    assertEquals(0, metricsUpdater.jobUpdates());
+  }
+
+  @Test
+  void testUpdateMetricsWithJobStatistics() {
+    OptimizerEnv optimizerEnv =
+        mockOptimizerEnv(StatisticsUpdaterForTest.NAME, 
MetricsUpdaterForTest.NAME);
+    NameIdentifier identifier = NameIdentifier.of("catalog", "schema", 
"table");
+
+    createUpdater(optimizerEnv)
+        .update(StatisticsCalculatorForTest.NAME, List.of(identifier), 
UpdateType.METRICS);
+
+    StatisticsUpdaterForTest statisticsUpdater = selectStatisticsUpdater();
+    MetricsUpdaterForTest metricsUpdater = selectMetricsUpdater();
+    assertNotNull(statisticsUpdater);
+    assertNotNull(metricsUpdater);
+    assertEquals(0, statisticsUpdater.tableUpdates());
+    assertEquals(0, statisticsUpdater.partitionUpdates());
+    assertEquals(2, metricsUpdater.tableUpdates());
+    assertEquals(1, metricsUpdater.jobUpdates());
+  }
+
+  @Test
+  void testUpdateAllStatistics() {
+    OptimizerEnv optimizerEnv =
+        mockOptimizerEnv(StatisticsUpdaterForTest.NAME, 
MetricsUpdaterForTest.NAME);
+
+    createUpdater(optimizerEnv).updateAll(StatisticsCalculatorForTest.NAME, 
UpdateType.STATISTICS);
+
+    StatisticsUpdaterForTest statisticsUpdater = selectStatisticsUpdater();
+    MetricsUpdaterForTest metricsUpdater = selectMetricsUpdater();
+    assertNotNull(statisticsUpdater);
+    assertNotNull(metricsUpdater);
+    assertEquals(1, statisticsUpdater.tableUpdates());
+    assertEquals(1, statisticsUpdater.partitionUpdates());
+    assertEquals(0, metricsUpdater.tableUpdates());
+    assertEquals(0, metricsUpdater.jobUpdates());
+  }
+
+  @Test
+  void testUpdateAllMetricsWithJobStatistics() {
+    OptimizerEnv optimizerEnv =
+        mockOptimizerEnv(StatisticsUpdaterForTest.NAME, 
MetricsUpdaterForTest.NAME);
+
+    createUpdater(optimizerEnv).updateAll(StatisticsCalculatorForTest.NAME, 
UpdateType.METRICS);
+
+    StatisticsUpdaterForTest statisticsUpdater = selectStatisticsUpdater();
+    MetricsUpdaterForTest metricsUpdater = selectMetricsUpdater();
+    assertNotNull(statisticsUpdater);
+    assertNotNull(metricsUpdater);
+    assertEquals(0, statisticsUpdater.tableUpdates());
+    assertEquals(0, statisticsUpdater.partitionUpdates());
+    assertEquals(2, metricsUpdater.tableUpdates());
+    assertEquals(1, metricsUpdater.jobUpdates());
+  }
+
+  @Test
+  void testCloseClosesProviders() throws Exception {
+    OptimizerEnv optimizerEnv =
+        mockOptimizerEnv(StatisticsUpdaterForTest.NAME, 
MetricsUpdaterForTest.NAME);
+
+    Updater updater = createUpdater(optimizerEnv);
+    updater.close();
+
+    StatisticsUpdaterForTest statisticsUpdater = selectStatisticsUpdater();
+    MetricsUpdaterForTest metricsUpdater = selectMetricsUpdater();
+    assertNotNull(statisticsUpdater);
+    assertNotNull(metricsUpdater);
+    assertEquals(1, statisticsUpdater.closeCalls());
+    assertEquals(1, metricsUpdater.closeCalls());
+  }
+
+  private Updater createUpdater(OptimizerEnv optimizerEnv) {
+    return new Updater(optimizerEnv);
+  }
+
+  private StatisticsUpdaterForTest selectStatisticsUpdater() {
+    return StatisticsUpdaterForTest.instances().stream()
+        .max(
+            Comparator.comparingInt(
+                updater ->
+                    updater.tableUpdates() + updater.partitionUpdates() + 
updater.closeCalls()))
+        .orElse(null);
+  }
+
+  private MetricsUpdaterForTest selectMetricsUpdater() {
+    return MetricsUpdaterForTest.instances().stream()
+        .max(
+            Comparator.comparingInt(
+                updater -> updater.tableUpdates() + updater.jobUpdates() + 
updater.closeCalls()))
+        .orElse(null);
+  }
+
+  private OptimizerEnv mockOptimizerEnv(String statisticsUpdater, String 
metricsUpdater) {
+    OptimizerConfig config = Mockito.mock(OptimizerConfig.class);
+    Mockito.when(config.get(OptimizerConfig.STATISTICS_UPDATER_CONFIG))
+        .thenReturn(statisticsUpdater);
+    
Mockito.when(config.get(OptimizerConfig.METRICS_UPDATER_CONFIG)).thenReturn(metricsUpdater);
+    OptimizerEnv optimizerEnv = Mockito.mock(OptimizerEnv.class);
+    Mockito.when(optimizerEnv.config()).thenReturn(config);
+    return optimizerEnv;
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
new file mode 100644
index 0000000000..1948a23487
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.gravitino.maintenance.optimizer.updater.StatisticsUpdaterForTest
+org.apache.gravitino.maintenance.optimizer.updater.MetricsUpdaterForTest
diff --git 
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator
 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator
new file mode 100644
index 0000000000..2e0ab7cd28
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+org.apache.gravitino.maintenance.optimizer.updater.StatisticsCalculatorForTest
diff --git 
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater
 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater
new file mode 100644
index 0000000000..ee1202f65e
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+org.apache.gravitino.maintenance.optimizer.updater.StatisticsUpdaterForTest

Reply via email to