This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 004d51452c [#10193] test(optimizer): add optimizer integration tests 
(#10191)
004d51452c is described below

commit 004d51452c11c4a9f6caad180a8c7c81af4ef7ba
Author: FANNG <[email protected]>
AuthorDate: Thu Mar 5 16:28:37 2026 +0900

    [#10193] test(optimizer): add optimizer integration tests (#10191)
    
    ### What changes were proposed in this pull request?
    
    - Ported optimizer integration tests from recommender to add-it and
    adapted
        them to the current APIs/configs.
    - Updated IT environment setup to create tables via lakehouse-iceberg
    catalog
    (memory backend) instead of relying on legacy reflective/internal paths.
    - Reworked RecommenderIT to use public recommender APIs and a test
    recording
        JobSubmitter (session-isolated) instead of reflection.
    - Removed remaining reflection-based field injection in updater unit
    tests by
        adding package-private test hooks in:
          - GravitinoMetricsUpdater
          - GravitinoStatisticsUpdater
    - Added/updated related test service registration and assertions
    accordingly.
    
      ### Why are the changes needed?
    
    - add-it branch has API/config differences from recommender; direct copy
    of
        ITs is not compatible.
    - Reflection-based tests are brittle and tightly coupled to internals,
    causing
        maintenance and stability issues.
    - Session-isolated recording in IT avoids shared static-state
    interference and
        makes tests safer for parallel runs.
    
      Fix: #10193
    
      ### Does this PR introduce any user-facing change?
    
      - No user-facing API changes.
      - No new user-facing property keys for production behavior.
      - Changes are limited to test code/test wiring and testability hooks.
    
      ### How was this patch tested?
    
      - Added/updated integration and unit tests to cover migrated paths and
        reflection removal.
      - Ran formatting:
          - ./gradlew :maintenance:optimizer:spotlessApply
      - Ran targeted tests (with required env vars):
          - :maintenance:optimizer:test --tests
    
    'org.apache.gravitino.maintenance.optimizer.integration.test.Recommender
            IT' -PskipIT=false -PskipDockerTests=false
          - :maintenance:optimizer:test --tests
    'org.apache.gravitino.maintenance.optimizer.integration.test.*' (targete
            d migrated IT classes)
          - :maintenance:optimizer:test --tests
    
    'org.apache.gravitino.maintenance.optimizer.updater.statistics.TestGravi
            tinoStatisticsUpdater' --tests
    
    'org.apache.gravitino.maintenance.optimizer.updater.metrics.TestGravitin
            oMetricsUpdater' -PskipIT=true -PskipDockerTests=false
      - Result: targeted IT/UT passed.
---
 maintenance/optimizer/build.gradle.kts             |   2 +
 .../updater/metrics/GravitinoMetricsUpdater.java   |  11 +
 .../statistics/GravitinoStatisticsUpdater.java     |   6 +
 .../test/AbstractGravitinoOptimizerEnvIT.java      | 195 ++++++++++++++
 .../test/DummyJobMetricsCalculator.java            |  49 ++++
 .../test/DummyTableStatisticsComputer.java         |  82 ++++++
 .../integration/test/GravitinoMetricsIT.java       | 180 +++++++++++++
 .../integration/test/GravitinoStatisticsIT.java    | 132 ++++++++++
 .../integration/test/GravitinoStrategyIT.java      |  88 +++++++
 .../integration/test/GravitinoTableMetaIT.java     |  68 +++++
 .../optimizer/integration/test/MonitorIT.java      | 264 +++++++++++++++++++
 .../optimizer/integration/test/RecommenderIT.java  | 281 +++++++++++++++++++++
 .../test/RecordingJobSubmitterForIT.java           |  79 ++++++
 .../optimizer/integration/test/UpdaterIT.java      | 155 ++++++++++++
 .../metrics/TestGravitinoMetricsUpdater.java       |  29 +--
 .../statistics/TestGravitinoStatisticsUpdater.java |  14 +-
 ...itino.maintenance.optimizer.api.common.Provider |   1 +
 ...ance.optimizer.api.updater.StatisticsCalculator |   2 +
 18 files changed, 1605 insertions(+), 33 deletions(-)

diff --git a/maintenance/optimizer/build.gradle.kts 
b/maintenance/optimizer/build.gradle.kts
index b0b9f865d9..df1c4473cd 100644
--- a/maintenance/optimizer/build.gradle.kts
+++ b/maintenance/optimizer/build.gradle.kts
@@ -88,6 +88,8 @@ dependencies {
   testImplementation(libs.testcontainers.junit.jupiter)
   testImplementation(libs.testcontainers.mysql)
   testImplementation(libs.testcontainers.postgresql)
+  testImplementation(project(":integration-test-common", "testArtifacts"))
+  testImplementation(project(":server"))
   testRuntimeOnly(libs.mysql.driver)
   testRuntimeOnly(libs.postgresql.driver)
   testAnnotationProcessor(libs.lombok)
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
index a6bd5874c2..7834efa6e2 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
@@ -19,6 +19,7 @@
 
 package org.apache.gravitino.maintenance.optimizer.updater.metrics;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.List;
 import org.apache.gravitino.maintenance.optimizer.api.common.DataScope;
@@ -68,6 +69,16 @@ public class GravitinoMetricsUpdater implements 
MetricsUpdater {
     }
   }
 
+  @VisibleForTesting
+  void setMetricsRepositoryForTest(MetricsRepository metricsRepository) {
+    this.metricsStorage = metricsRepository;
+  }
+
+  @VisibleForTesting
+  MetricsRepository metricsRepositoryForTest() {
+    return metricsStorage;
+  }
+
   private void ensureInitialized() {
     Preconditions.checkState(
         metricsStorage != null,
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
index 63fa0e65fa..3b6f4e45c0 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
@@ -19,6 +19,7 @@
 
 package org.apache.gravitino.maintenance.optimizer.updater.statistics;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -153,4 +154,9 @@ public class GravitinoStatisticsUpdater implements 
StatisticsUpdater {
       gravitinoClient.close();
     }
   }
+
+  @VisibleForTesting
+  void setGravitinoClientForTest(GravitinoClient gravitinoClient) {
+    this.gravitinoClient = gravitinoClient;
+  }
 }
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/AbstractGravitinoOptimizerEnvIT.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/AbstractGravitinoOptimizerEnvIT.java
new file mode 100644
index 0000000000..0cb70f0094
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/AbstractGravitinoOptimizerEnvIT.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategy;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+import org.apache.gravitino.policy.PolicyContent;
+import org.apache.gravitino.policy.PolicyContents;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.BeforeAll;
+
+// Set up the Gravitino server, metalake, Iceberg catalogs
+public abstract class AbstractGravitinoOptimizerEnvIT extends BaseIT {
+
+  protected static final String METALAKE_NAME = "test_metalake";
+  protected static final String GRAVITINO_CATALOG_NAME = "iceberg";
+  protected static final String TEST_SCHEMA = "test_schema";
+
+  protected Catalog catalogClient;
+  protected GravitinoMetalake metalakeClient;
+  protected OptimizerEnv optimizerEnv;
+  protected String icebergWarehouseLocation;
+
+  @BeforeAll
+  @Override
+  public void startIntegrationTest() throws Exception {
+    super.startIntegrationTest();
+    this.icebergWarehouseLocation = createWarehouseLocation();
+    initMetalakeAndCatalog();
+    this.optimizerEnv = initOptimizerEnv();
+  }
+
+  private String createWarehouseLocation() throws IOException {
+    Path path = Files.createTempDirectory("gravitino-optimizer-warehouse-");
+    path.toFile().deleteOnExit();
+    return path.toUri().toString();
+  }
+
+  protected void createTable(String tableName) {
+    catalogClient
+        .asTableCatalog()
+        .createTable(
+            NameIdentifier.of(TEST_SCHEMA, tableName),
+            new Column[] {Column.of("col_1", Types.IntegerType.get())},
+            "comment",
+            ImmutableMap.of());
+  }
+
+  protected NameIdentifier getTableIdentifier(String tableName) {
+    return NameIdentifier.of(GRAVITINO_CATALOG_NAME, TEST_SCHEMA, tableName);
+  }
+
+  protected void createPartitionTable(String tableName) {
+    catalogClient
+        .asTableCatalog()
+        .createTable(
+            NameIdentifier.of(TEST_SCHEMA, tableName),
+            new Column[] {
+              Column.of("col1", Types.IntegerType.get(), "col1"),
+              Column.of("col2", Types.IntegerType.get(), "col2"),
+              Column.of("col3", Types.IntegerType.get(), "col3")
+            },
+            "comment",
+            ImmutableMap.of(),
+            new Transform[] {
+              Transforms.identity("col1"), Transforms.bucket(8, new String[] 
{"col2"})
+            });
+  }
+
+  protected void createPolicy(String policyName, Map<String, Object> rules, 
String policyType) {
+    PolicyContent content =
+        PolicyContents.custom(
+            rules,
+            ImmutableSet.of(MetadataObject.Type.TABLE),
+            Map.of(
+                GravitinoStrategy.STRATEGY_TYPE_KEY,
+                policyType,
+                GravitinoStrategy.JOB_TEMPLATE_NAME_KEY,
+                "template-name"));
+    metalakeClient.createPolicy(policyName, "custom", "comment", true, 
content);
+  }
+
+  protected void associatePoliciesToTable(String policyName, String tableName) 
{
+    Table table =
+        
catalogClient.asTableCatalog().loadTable(NameIdentifier.of(TEST_SCHEMA, 
tableName));
+    table.supportsPolicies().associatePolicies(new String[] {policyName}, new 
String[] {});
+  }
+
+  protected void associatePoliciesToSchema(String policyName, String 
schemaName) {
+    Schema schema = catalogClient.asSchemas().loadSchema(schemaName);
+    schema.supportsPolicies().associatePolicies(new String[] {policyName}, new 
String[] {});
+  }
+
+  protected Map<String, String> getSpecifyConfigs() {
+    return Map.of();
+  }
+
+  protected OptimizerEnv initOptimizerEnv() {
+    Map<String, String> configs = new HashMap<>();
+    configs.putAll(getGravitinoConfigs());
+    configs.putAll(getJdbcMetricsConfigs());
+    configs.putAll(getSpecifyConfigs());
+    return new OptimizerEnv(new OptimizerConfig(configs));
+  }
+
+  private Map<String, String> getJdbcMetricsConfigs() {
+    String jdbcUrl =
+        String.format(
+            
"jdbc:h2:mem:gravitino-optimizer-it-%d;DB_CLOSE_DELAY=-1;MODE=MYSQL",
+            System.nanoTime());
+
+    return Map.of(
+        OptimizerConfig.OPTIMIZER_PREFIX
+            + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+            + GenericJdbcMetricsRepository.JDBC_URL,
+        jdbcUrl,
+        OptimizerConfig.OPTIMIZER_PREFIX
+            + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+            + GenericJdbcMetricsRepository.JDBC_DRIVER,
+        "org.h2.Driver");
+  }
+
+  private Map<String, String> getGravitinoConfigs() {
+    int gravitinoPort = getGravitinoServerPort();
+    String uri = String.format("http://127.0.0.1:%d";, gravitinoPort);
+    return ImmutableMap.of(
+        OptimizerConfig.GRAVITINO_URI,
+        uri,
+        OptimizerConfig.GRAVITINO_METALAKE,
+        METALAKE_NAME,
+        OptimizerConfig.GRAVITINO_DEFAULT_CATALOG,
+        GRAVITINO_CATALOG_NAME);
+  }
+
+  private void initMetalakeAndCatalog() {
+    this.metalakeClient = client.createMetalake(METALAKE_NAME, "", new 
HashMap<>());
+    this.catalogClient = createGravitinoIcebergCatalog();
+
+    if (!catalogClient.asSchemas().schemaExists(TEST_SCHEMA)) {
+      catalogClient.asSchemas().createSchema(TEST_SCHEMA, "comment", 
ImmutableMap.of());
+    }
+  }
+
+  private Catalog createGravitinoIcebergCatalog() {
+    return metalakeClient.createCatalog(
+        GRAVITINO_CATALOG_NAME,
+        Catalog.Type.RELATIONAL,
+        "lakehouse-iceberg",
+        "comment",
+        ImmutableMap.of(
+            IcebergConstants.URI,
+            "memory://gravitino-optimizer",
+            IcebergConstants.CATALOG_BACKEND,
+            "memory",
+            IcebergConstants.WAREHOUSE,
+            icebergWarehouseLocation));
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/DummyJobMetricsCalculator.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/DummyJobMetricsCalculator.java
new file mode 100644
index 0000000000..289a1600ed
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/DummyJobMetricsCalculator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateJobMetrics;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.stats.StatisticValues;
+
+public class DummyJobMetricsCalculator implements SupportsCalculateJobMetrics {
+
+  public static final String DUMMY_JOB_METRICS = "dummy-job-metrics";
+  public static final String JOB_STAT_NAME = "dummy-job-stat-name";
+
+  @Override
+  public String name() {
+    return DUMMY_JOB_METRICS;
+  }
+
+  @Override
+  public void initialize(OptimizerEnv optimizerEnv) {}
+
+  @Override
+  public List<MetricPoint> calculateJobMetrics(NameIdentifier jobIdentifier) {
+    long timestampSeconds = System.currentTimeMillis() / 1000;
+    return List.of(
+        MetricPoint.forJob(
+            jobIdentifier, JOB_STAT_NAME, StatisticValues.longValue(1L), 
timestampSeconds));
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/DummyTableStatisticsComputer.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/DummyTableStatisticsComputer.java
new file mode 100644
index 0000000000..02383659aa
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/DummyTableStatisticsComputer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import 
org.apache.gravitino.maintenance.optimizer.api.common.TableAndPartitionStatistics;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateTableMetrics;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateTableStatistics;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import org.apache.gravitino.stats.StatisticValues;
+
+public class DummyTableStatisticsComputer
+    implements SupportsCalculateTableStatistics, SupportsCalculateTableMetrics 
{
+
+  public static final String DUMMY_TABLE_STAT = "dummy-table-stat";
+  public static final String TABLE_STAT_NAME = "custom-dummy-table-stat-name";
+
+  @Override
+  public String name() {
+    return DUMMY_TABLE_STAT;
+  }
+
+  @Override
+  public void initialize(OptimizerEnv optimizerEnv) {}
+
+  @Override
+  public TableAndPartitionStatistics calculateTableStatistics(NameIdentifier 
tableIdentifier) {
+    List<StatisticEntry<?>> tableStatistics =
+        List.of(new StatisticEntryImpl<>(TABLE_STAT_NAME, 
StatisticValues.longValue(1L)));
+    Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics =
+        Map.of(
+            PartitionPath.of(getPartitionEntries()),
+            List.of(new StatisticEntryImpl<>(TABLE_STAT_NAME, 
StatisticValues.longValue(2L))));
+    return new TableAndPartitionStatistics(tableStatistics, 
partitionStatistics);
+  }
+
+  @Override
+  public List<MetricPoint> calculateTableMetrics(NameIdentifier 
tableIdentifier) {
+    long timestampSeconds = System.currentTimeMillis() / 1000;
+    PartitionPath partitionPath = PartitionPath.of(getPartitionEntries());
+    return List.of(
+        MetricPoint.forTable(
+            tableIdentifier, TABLE_STAT_NAME, StatisticValues.longValue(1L), 
timestampSeconds),
+        MetricPoint.forPartition(
+            tableIdentifier,
+            partitionPath,
+            TABLE_STAT_NAME,
+            StatisticValues.longValue(2L),
+            timestampSeconds));
+  }
+
+  @VisibleForTesting
+  public static List<PartitionEntry> getPartitionEntries() {
+    return List.of(new PartitionEntryImpl("p1", "v1"));
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoMetricsIT.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoMetricsIT.java
new file mode 100644
index 0000000000..4d10adff3e
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoMetricsIT.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class GravitinoMetricsIT {
+
+  private GravitinoMetricsUpdater updater;
+  private GravitinoMetricsProvider provider;
+
+  @BeforeAll
+  public void setUp() {
+    OptimizerEnv optimizerEnv = createOptimizerEnv();
+    updater = new GravitinoMetricsUpdater();
+    updater.initialize(optimizerEnv);
+    provider = new GravitinoMetricsProvider();
+    provider.initialize(optimizerEnv);
+  }
+
+  @AfterAll
+  public void tearDown() throws Exception {
+    if (updater != null) {
+      updater.close();
+    }
+    if (provider != null) {
+      provider.close();
+    }
+  }
+
+  @Test
+  void testTableMetrics() {
+    NameIdentifier tableIdentifier = NameIdentifier.of("catalog", "schema", 
"table");
+
+    PartitionPath partition1 =
+        PartitionPath.of(
+            Arrays.asList(new PartitionEntryImpl("p1", "v1"), new 
PartitionEntryImpl("p2", "v2")));
+    PartitionPath partition2 =
+        PartitionPath.of(
+            Arrays.asList(new PartitionEntryImpl("p1", "v11"), new 
PartitionEntryImpl("p2", "v2")));
+
+    updater.updateTableAndPartitionMetrics(
+        Arrays.asList(
+            MetricPoint.forTable(tableIdentifier, "a", 
StatisticValues.longValue(10), 1000),
+            MetricPoint.forTable(tableIdentifier, "b", 
StatisticValues.longValue(1000), 1000),
+            MetricPoint.forPartition(
+                tableIdentifier, partition1, "b", 
StatisticValues.longValue(1003), 1000),
+            MetricPoint.forPartition(
+                tableIdentifier, partition2, "b", 
StatisticValues.longValue(1004), 1000),
+            MetricPoint.forTable(tableIdentifier, "a", 
StatisticValues.longValue(100L), 1001)));
+
+    List<MetricPoint> tableMetrics = provider.tableMetrics(tableIdentifier, 
1000, 1002);
+    Map<String, List<MetricPoint>> tableMetricsByName = 
metricsByName(tableMetrics);
+
+    Assertions.assertEquals(2, tableMetricsByName.size());
+
+    List<MetricPoint> aMetrics = sortByTimestamp(tableMetricsByName.get("a"));
+    Assertions.assertEquals(2, aMetrics.size());
+    Assertions.assertEquals(10L, ((Number) 
aMetrics.get(0).value().value()).longValue());
+    Assertions.assertEquals(1000L, aMetrics.get(0).timestampSeconds());
+    Assertions.assertEquals(100L, ((Number) 
aMetrics.get(1).value().value()).longValue());
+    Assertions.assertEquals(1001L, aMetrics.get(1).timestampSeconds());
+
+    List<MetricPoint> bMetrics = tableMetricsByName.get("b");
+    Assertions.assertEquals(1, bMetrics.size());
+    Assertions.assertEquals(1000L, ((Number) 
bMetrics.get(0).value().value()).longValue());
+    Assertions.assertEquals(1000L, bMetrics.get(0).timestampSeconds());
+
+    List<MetricPoint> partitionMetrics1 =
+        provider.partitionMetrics(tableIdentifier, partition1, 1000, 1002);
+    Assertions.assertEquals(1, partitionMetrics1.size());
+    MetricPoint metric1 = partitionMetrics1.get(0);
+    Assertions.assertEquals("b", metric1.metricName());
+    Assertions.assertEquals(
+        partition1, 
metric1.partitionPath().orElseThrow(IllegalStateException::new));
+    Assertions.assertEquals(1003L, ((Number) 
metric1.value().value()).longValue());
+    Assertions.assertEquals(1000L, metric1.timestampSeconds());
+
+    List<MetricPoint> partitionMetrics2 =
+        provider.partitionMetrics(tableIdentifier, partition2, 1000, 1002);
+    Assertions.assertEquals(1, partitionMetrics2.size());
+    MetricPoint metric2 = partitionMetrics2.get(0);
+    Assertions.assertEquals("b", metric2.metricName());
+    Assertions.assertEquals(
+        partition2, 
metric2.partitionPath().orElseThrow(IllegalStateException::new));
+    Assertions.assertEquals(1004L, ((Number) 
metric2.value().value()).longValue());
+    Assertions.assertEquals(1000L, metric2.timestampSeconds());
+  }
+
+  @Test
+  void testJobMetrics() {
+    NameIdentifier jobIdentifier = NameIdentifier.of("job1");
+
+    updater.updateJobMetrics(
+        Arrays.asList(
+            MetricPoint.forJob(jobIdentifier, "x", 
StatisticValues.longValue(20), 2000),
+            MetricPoint.forJob(jobIdentifier, "y", 
StatisticValues.longValue(2000), 2000),
+            MetricPoint.forJob(jobIdentifier, "x", 
StatisticValues.longValue(200L), 2001)));
+
+    List<MetricPoint> jobMetrics = provider.jobMetrics(jobIdentifier, 2000, 
2002);
+    Map<String, List<MetricPoint>> jobMetricsByName = 
metricsByName(jobMetrics);
+
+    Assertions.assertEquals(2, jobMetricsByName.size());
+
+    List<MetricPoint> xMetrics = sortByTimestamp(jobMetricsByName.get("x"));
+    Assertions.assertEquals(2, xMetrics.size());
+    Assertions.assertEquals(20L, ((Number) 
xMetrics.get(0).value().value()).longValue());
+    Assertions.assertEquals(2000L, xMetrics.get(0).timestampSeconds());
+    Assertions.assertEquals(200L, ((Number) 
xMetrics.get(1).value().value()).longValue());
+    Assertions.assertEquals(2001L, xMetrics.get(1).timestampSeconds());
+
+    List<MetricPoint> yMetrics = jobMetricsByName.get("y");
+    Assertions.assertEquals(1, yMetrics.size());
+    Assertions.assertEquals(2000L, ((Number) 
yMetrics.get(0).value().value()).longValue());
+    Assertions.assertEquals(2000L, yMetrics.get(0).timestampSeconds());
+  }
+
+  private static Map<String, List<MetricPoint>> 
metricsByName(List<MetricPoint> points) {
+    return 
points.stream().collect(Collectors.groupingBy(MetricPoint::metricName));
+  }
+
+  private static List<MetricPoint> sortByTimestamp(List<MetricPoint> points) {
+    return 
points.stream().sorted(Comparator.comparingLong(MetricPoint::timestampSeconds)).toList();
+  }
+
+  private OptimizerEnv createOptimizerEnv() {
+    String jdbcUrl =
+        String.format(
+            
"jdbc:h2:mem:gravitino-metrics-it-%d;DB_CLOSE_DELAY=-1;MODE=MYSQL", 
System.nanoTime());
+
+    Map<String, String> config =
+        Map.of(
+            OptimizerConfig.OPTIMIZER_PREFIX
+                + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+                + GenericJdbcMetricsRepository.JDBC_URL,
+            jdbcUrl,
+            OptimizerConfig.OPTIMIZER_PREFIX
+                + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+                + GenericJdbcMetricsRepository.JDBC_DRIVER,
+            "org.h2.Driver");
+    return new OptimizerEnv(new OptimizerConfig(config));
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoStatisticsIT.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoStatisticsIT.java
new file mode 100644
index 0000000000..2ff4f8a5f0
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoStatisticsIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider;
+import 
org.apache.gravitino.maintenance.optimizer.updater.statistics.GravitinoStatisticsUpdater;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class GravitinoStatisticsIT extends AbstractGravitinoOptimizerEnvIT {
+
+  private static final String TEST_TABLE = "test_stats_table";
+  private static final String TEST_PARTITION_TABLE = 
"test_stats_partition_table";
+  private static final String STATISTICS_PREFIX = "custom-";
+  private static final String DATAFILE_SIZE_MSE = STATISTICS_PREFIX + 
"datafile_size_mse";
+
+  private GravitinoStatisticsUpdater statisticsUpdater;
+  private GravitinoStatisticsProvider statisticsProvider;
+
+  @BeforeAll
+  void init() {
+    this.statisticsUpdater = new GravitinoStatisticsUpdater();
+    statisticsUpdater.initialize(optimizerEnv);
+    this.statisticsProvider = new GravitinoStatisticsProvider();
+    statisticsProvider.initialize(optimizerEnv);
+    createTable(TEST_TABLE);
+    createPartitionTable(TEST_PARTITION_TABLE);
+  }
+
+  @AfterAll
+  void closeResources() throws Exception {
+    if (statisticsProvider != null) {
+      statisticsProvider.close();
+    }
+    if (statisticsUpdater != null) {
+      statisticsUpdater.close();
+    }
+  }
+
+  @Test
+  void testTableStatisticsUpdaterAndProvider() {
+    statisticsUpdater.updateTableStatistics(
+        getTableIdentifier(TEST_TABLE),
+        Arrays.asList(
+            new StatisticEntryImpl<>(
+                STATISTICS_PREFIX + "row_count", 
StatisticValues.longValue(1000)),
+            new StatisticEntryImpl<>(
+                STATISTICS_PREFIX + "size_in_bytes", 
StatisticValues.longValue(1000000)),
+            new StatisticEntryImpl<>(DATAFILE_SIZE_MSE, 
StatisticValues.doubleValue(10000.1))));
+
+    List<StatisticEntry<?>> stats =
+        statisticsProvider.tableStatistics(getTableIdentifier(TEST_TABLE));
+    Assertions.assertEquals(3, stats.size());
+    stats.forEach(
+        stat -> {
+          if (stat.name().equals(STATISTICS_PREFIX + "row_count")) {
+            Assertions.assertEquals(1000L, ((Number) 
stat.value().value()).longValue());
+          } else if (stat.name().equals(STATISTICS_PREFIX + "size_in_bytes")) {
+            Assertions.assertEquals(1000000L, ((Number) 
stat.value().value()).longValue());
+          } else if (stat.name().equals(DATAFILE_SIZE_MSE)) {
+            Assertions.assertEquals(10000.1, ((Number) 
stat.value().value()).doubleValue());
+          } else {
+            Assertions.fail("Unexpected statistic name: " + stat.name());
+          }
+        });
+  }
+
+  @Test
+  void testTablePartitionStatisticsUpdaterAndProvider() {
+    statisticsUpdater.updateTableStatistics(
+        getTableIdentifier(TEST_PARTITION_TABLE),
+        Arrays.asList(
+            new StatisticEntryImpl<>(
+                STATISTICS_PREFIX + "size_in_bytes", 
StatisticValues.longValue(1000000))));
+    statisticsUpdater.updatePartitionStatistics(
+        getTableIdentifier(TEST_PARTITION_TABLE),
+        Map.of(
+            PartitionPath.of(
+                Arrays.asList(
+                    new PartitionEntryImpl("col1", "1"), new 
PartitionEntryImpl("col2", "2"))),
+            List.of(
+                new StatisticEntryImpl<>(
+                    STATISTICS_PREFIX + "partition_row_count", 
StatisticValues.longValue(500)),
+                new StatisticEntryImpl<>(
+                    STATISTICS_PREFIX + "partition_size_in_bytes",
+                    StatisticValues.longValue(500000)))));
+
+    Map<PartitionPath, List<StatisticEntry<?>>> stats =
+        
statisticsProvider.partitionStatistics(getTableIdentifier(TEST_PARTITION_TABLE));
+    Assertions.assertEquals(1, stats.size());
+    List<StatisticEntry<?>> partitionStats =
+        
stats.values().stream().findFirst().orElseThrow(IllegalStateException::new);
+    Assertions.assertEquals(2, partitionStats.size());
+    partitionStats.forEach(
+        stat -> {
+          if (stat.name().equals(STATISTICS_PREFIX + "partition_row_count")) {
+            Assertions.assertEquals(500L, ((Number) 
stat.value().value()).longValue());
+          } else if (stat.name().equals(STATISTICS_PREFIX + 
"partition_size_in_bytes")) {
+            Assertions.assertEquals(500000L, ((Number) 
stat.value().value()).longValue());
+          } else {
+            Assertions.fail("Unexpected statistic name: " + stat.name());
+          }
+        });
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoStrategyIT.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoStrategyIT.java
new file mode 100644
index 0000000000..fb0382a818
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoStrategyIT.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategyProvider;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class GravitinoStrategyIT extends AbstractGravitinoOptimizerEnvIT {
+
+  private GravitinoStrategyProvider strategyProvider;
+
+  @BeforeAll
+  void init() {
+    this.strategyProvider = new GravitinoStrategyProvider();
+    strategyProvider.initialize(optimizerEnv);
+  }
+
+  @AfterAll
+  void closeResources() throws Exception {
+    if (strategyProvider != null) {
+      strategyProvider.close();
+    }
+  }
+
+  @Test
+  void testGravitinoStrategyProviderGetStrategy() {
+    createPolicy("test_policy", ImmutableMap.of("rule1", "value1"), "test");
+
+    Strategy strategy = strategyProvider.strategy("test_policy");
+    Assertions.assertNotNull(strategy);
+    Assertions.assertEquals("test", strategy.strategyType());
+    Assertions.assertEquals("template-name", strategy.jobTemplateName());
+    Assertions.assertEquals(ImmutableMap.of("rule1", "value1"), 
strategy.rules());
+  }
+
+  @Test
+  void testGravitinoStrategyProviderGetTableStrategies() {
+    String tableName = "test_get_table_policy";
+    createTable(tableName);
+    createPolicy("policy1", ImmutableMap.of("rule1", "value1"), "test");
+    createPolicy("policy2", ImmutableMap.of("rule2", "value2"), "test");
+    associatePoliciesToTable("policy1", tableName);
+    associatePoliciesToTable("policy2", tableName);
+
+    List<Strategy> strategies = 
strategyProvider.strategies(getTableIdentifier(tableName));
+    Assertions.assertNotNull(strategies);
+    Assertions.assertEquals(2, strategies.size());
+
+    strategies.stream()
+        .forEach(
+            strategy -> {
+              if (strategy.name().equals("policy1")) {
+                Assertions.assertEquals("test", strategy.strategyType());
+                Assertions.assertEquals("template-name", 
strategy.jobTemplateName());
+                Assertions.assertEquals(ImmutableMap.of("rule1", "value1"), 
strategy.rules());
+              } else if (strategy.name().equals("policy2")) {
+                Assertions.assertEquals("test", strategy.strategyType());
+                Assertions.assertEquals("template-name", 
strategy.jobTemplateName());
+                Assertions.assertEquals(ImmutableMap.of("rule2", "value2"), 
strategy.rules());
+              } else {
+                Assertions.fail("Unexpected strategy name: " + 
strategy.name());
+              }
+            });
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoTableMetaIT.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoTableMetaIT.java
new file mode 100644
index 0000000000..2d0af79d91
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoTableMetaIT.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import 
org.apache.gravitino.maintenance.optimizer.recommender.table.GravitinoTableMetadataProvider;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class GravitinoTableMetaIT extends AbstractGravitinoOptimizerEnvIT {
+  private GravitinoTableMetadataProvider tableMetadataProvider;
+
+  @BeforeAll
+  void init() {
+    this.tableMetadataProvider = new GravitinoTableMetadataProvider();
+    tableMetadataProvider.initialize(optimizerEnv);
+  }
+
+  @AfterAll
+  void closeResources() throws Exception {
+    if (tableMetadataProvider != null) {
+      tableMetadataProvider.close();
+    }
+  }
+
+  @Test
+  void testTableMetadataProviderTableAndPartitioningMetadata() {
+    String tableName = "test_table_metadata";
+    createPartitionTable(tableName);
+    Table table = 
tableMetadataProvider.tableMetadata(getTableIdentifier(tableName));
+    Assertions.assertNotNull(table);
+
+    // check table name, column ,partition information
+    Assertions.assertEquals(tableName, table.name());
+    Assertions.assertEquals(3, table.columns().length);
+    Column[] columns = table.columns();
+    Assertions.assertEquals("col1", columns[0].name());
+    Assertions.assertEquals("col2", columns[1].name());
+    Assertions.assertEquals("col3", columns[2].name());
+
+    Assertions.assertEquals(2, table.partitioning().length);
+    Transform[] partitioning = table.partitioning();
+    Assertions.assertEquals(Transforms.NAME_OF_IDENTITY, 
partitioning[0].name());
+    Assertions.assertEquals(Transforms.NAME_OF_BUCKET, partitioning[1].name());
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/MonitorIT.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/MonitorIT.java
new file mode 100644
index 0000000000..426da0ec89
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/MonitorIT.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.DataScope;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.EvaluationResult;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import org.apache.gravitino.maintenance.optimizer.monitor.Monitor;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.evaluator.MetricsEvaluatorForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.job.TableJobRelationProviderForTest;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class MonitorIT {
+
+  private GravitinoMetricsUpdater updater;
+  private Monitor monitor;
+
+  @BeforeAll
+  public void setUp() {
+    OptimizerEnv env = createOptimizerEnv();
+    this.updater = new GravitinoMetricsUpdater();
+    updater.initialize(env);
+    this.monitor = new Monitor(env);
+    MonitorCallbackForTest.reset();
+  }
+
+  @AfterAll
+  public void closeResources() throws Exception {
+    if (monitor != null) {
+      monitor.close();
+    }
+    if (updater != null) {
+      updater.close();
+    }
+  }
+
+  @Test
+  void testTableMetrics() {
+    long actionTime = 10;
+    long rangeSeconds = 2;
+    NameIdentifier tableIdentifier = NameIdentifier.of("db", "table");
+    NameIdentifier job1 = TableJobRelationProviderForTest.JOB1;
+    NameIdentifier job2 = TableJobRelationProviderForTest.JOB2;
+
+    updater.updateTableAndPartitionMetrics(
+        Arrays.asList(
+            MetricPoint.forTable(tableIdentifier, "storage", 
StatisticValues.longValue(10), 8),
+            MetricPoint.forTable(tableIdentifier, "s3_cost", 
StatisticValues.longValue(1000), 9),
+            MetricPoint.forTable(tableIdentifier, "s3_cost", 
StatisticValues.longValue(1003), 10),
+            MetricPoint.forTable(tableIdentifier, "storage", 
StatisticValues.longValue(100L), 11)));
+
+    List<MetricPoint> jobMetrics =
+        Arrays.asList(
+            MetricPoint.forJob(job1, "job_runtime", 
StatisticValues.longValue(8), 8),
+            MetricPoint.forJob(job1, "job_cost", StatisticValues.longValue(9), 
9),
+            MetricPoint.forJob(job1, "job_cost", 
StatisticValues.longValue(10), 10),
+            MetricPoint.forJob(job1, "job_cost", 
StatisticValues.longValue(11), 100),
+            MetricPoint.forJob(job1, "job_runtime", 
StatisticValues.longValue(12L), 11),
+            MetricPoint.forJob(job2, "job_runtime", 
StatisticValues.longValue(8), 8),
+            MetricPoint.forJob(job2, "job_cost", StatisticValues.longValue(9), 
9),
+            MetricPoint.forJob(job2, "job_cost", 
StatisticValues.longValue(10), 10),
+            MetricPoint.forJob(job2, "job_cost", 
StatisticValues.longValue(11), 100),
+            MetricPoint.forJob(job2, "job_runtime", 
StatisticValues.longValue(12L), 11));
+    updater.updateJobMetrics(jobMetrics);
+
+    List<EvaluationResult> results =
+        monitor.evaluateMetrics(tableIdentifier, actionTime, rangeSeconds, 
Optional.empty());
+    Assertions.assertEquals(3, results.size());
+
+    EvaluationResult tableResult = findByScope(results, DataScope.Type.TABLE, 
tableIdentifier);
+
+    List<MetricSample> storageBefore = 
sortedSamples(tableResult.beforeMetrics().get("storage"));
+    Assertions.assertEquals(1, storageBefore.size());
+    Assertions.assertEquals(8L, storageBefore.get(0).timestampSeconds());
+    Assertions.assertEquals(10L, ((Number) 
storageBefore.get(0).value().value()).longValue());
+
+    List<MetricSample> s3Before = 
sortedSamples(tableResult.beforeMetrics().get("s3_cost"));
+    Assertions.assertEquals(1, s3Before.size());
+    Assertions.assertEquals(9L, s3Before.get(0).timestampSeconds());
+    Assertions.assertEquals(1000L, ((Number) 
s3Before.get(0).value().value()).longValue());
+
+    List<MetricSample> storageAfter = 
sortedSamples(tableResult.afterMetrics().get("storage"));
+    Assertions.assertEquals(1, storageAfter.size());
+    Assertions.assertEquals(11L, storageAfter.get(0).timestampSeconds());
+    Assertions.assertEquals(100L, ((Number) 
storageAfter.get(0).value().value()).longValue());
+
+    List<MetricSample> s3After = 
sortedSamples(tableResult.afterMetrics().get("s3_cost"));
+    Assertions.assertEquals(1, s3After.size());
+    Assertions.assertEquals(10L, s3After.get(0).timestampSeconds());
+    Assertions.assertEquals(1003L, ((Number) 
s3After.get(0).value().value()).longValue());
+
+    EvaluationResult jobResult1 = findByScope(results, DataScope.Type.JOB, 
job1);
+    checkJobMetrics(jobResult1.beforeMetrics(), jobResult1.afterMetrics());
+
+    EvaluationResult jobResult2 = findByScope(results, DataScope.Type.JOB, 
job2);
+    checkJobMetrics(jobResult2.beforeMetrics(), jobResult2.afterMetrics());
+  }
+
+  @Test
+  void testPartitionMetrics() {
+    long actionTime = 10;
+    long rangeSeconds = 2;
+    NameIdentifier tableIdentifier = NameIdentifier.of("db", "partitionTable");
+    PartitionPath partitionPath =
+        
PartitionUtils.decodePartitionPath("[{\"country\":\"US\"},{\"region\":\"CA\"}]");
+
+    updater.updateTableAndPartitionMetrics(
+        Arrays.asList(
+            MetricPoint.forPartition(
+                tableIdentifier, partitionPath, "storage", 
StatisticValues.longValue(10), 8),
+            MetricPoint.forPartition(
+                tableIdentifier, partitionPath, "storage", 
StatisticValues.longValue(20), 11),
+            MetricPoint.forPartition(
+                tableIdentifier, partitionPath, "s3_cost", 
StatisticValues.longValue(5), 10)));
+
+    List<EvaluationResult> results =
+        monitor.evaluateMetrics(
+            tableIdentifier, actionTime, rangeSeconds, 
Optional.of(partitionPath));
+
+    EvaluationResult partitionResult =
+        results.stream()
+            .filter(result -> result.scope().type() == 
DataScope.Type.PARTITION)
+            .findFirst()
+            .orElseThrow(IllegalStateException::new);
+
+    Assertions.assertEquals(partitionPath, 
partitionResult.scope().partition().orElseThrow());
+
+    List<MetricSample> storageBefore =
+        sortedSamples(partitionResult.beforeMetrics().get("storage"));
+    Assertions.assertEquals(1, storageBefore.size());
+    Assertions.assertEquals(8L, storageBefore.get(0).timestampSeconds());
+    Assertions.assertEquals(10L, ((Number) 
storageBefore.get(0).value().value()).longValue());
+
+    List<MetricSample> storageAfter = 
sortedSamples(partitionResult.afterMetrics().get("storage"));
+    Assertions.assertEquals(1, storageAfter.size());
+    Assertions.assertEquals(11L, storageAfter.get(0).timestampSeconds());
+    Assertions.assertEquals(20L, ((Number) 
storageAfter.get(0).value().value()).longValue());
+
+    List<MetricSample> s3After = 
sortedSamples(partitionResult.afterMetrics().get("s3_cost"));
+    Assertions.assertEquals(1, s3After.size());
+    Assertions.assertEquals(10L, s3After.get(0).timestampSeconds());
+    Assertions.assertEquals(5L, ((Number) 
s3After.get(0).value().value()).longValue());
+  }
+
+  @Test
+  void testMonitorCallbacks() {
+    MonitorCallbackForTest.reset();
+    NameIdentifier tableIdentifier = NameIdentifier.of("db", "table");
+
+    monitor.evaluateMetrics(tableIdentifier, 10, 1, Optional.empty());
+
+    Assertions.assertEquals(3, MonitorCallbackForTest.INVOCATIONS.get());
+    Assertions.assertTrue(
+        MonitorCallbackForTest.RESULTS.stream()
+            .anyMatch(result -> result.scope().type() == 
DataScope.Type.TABLE));
+  }
+
+  private void checkJobMetrics(
+      Map<String, List<MetricSample>> jobBeforeMetrics,
+      Map<String, List<MetricSample>> jobAfterMetrics) {
+    List<MetricSample> runtimeBefore = 
sortedSamples(jobBeforeMetrics.get("job_runtime"));
+    Assertions.assertEquals(1, runtimeBefore.size());
+    Assertions.assertEquals(8L, runtimeBefore.get(0).timestampSeconds());
+    Assertions.assertEquals(8L, ((Number) 
runtimeBefore.get(0).value().value()).longValue());
+
+    List<MetricSample> costBefore = 
sortedSamples(jobBeforeMetrics.get("job_cost"));
+    Assertions.assertEquals(1, costBefore.size());
+    Assertions.assertEquals(9L, costBefore.get(0).timestampSeconds());
+    Assertions.assertEquals(9L, ((Number) 
costBefore.get(0).value().value()).longValue());
+
+    List<MetricSample> runtimeAfter = 
sortedSamples(jobAfterMetrics.get("job_runtime"));
+    Assertions.assertEquals(1, runtimeAfter.size());
+    Assertions.assertEquals(11L, runtimeAfter.get(0).timestampSeconds());
+    Assertions.assertEquals(12L, ((Number) 
runtimeAfter.get(0).value().value()).longValue());
+
+    List<MetricSample> costAfter = 
sortedSamples(jobAfterMetrics.get("job_cost"));
+    Assertions.assertEquals(1, costAfter.size());
+    Assertions.assertEquals(10L, costAfter.get(0).timestampSeconds());
+    Assertions.assertEquals(10L, ((Number) 
costAfter.get(0).value().value()).longValue());
+  }
+
+  private static EvaluationResult findByScope(
+      List<EvaluationResult> results, DataScope.Type type, NameIdentifier 
identifier) {
+    return results.stream()
+        .filter(result -> result.scope().type() == type)
+        .filter(result -> result.scope().identifier().equals(identifier))
+        .findFirst()
+        .orElseThrow(IllegalStateException::new);
+  }
+
+  private static List<MetricSample> sortedSamples(List<MetricSample> samples) {
+    if (samples == null) {
+      return List.of();
+    }
+    return samples.stream()
+        .sorted(Comparator.comparingLong(MetricSample::timestampSeconds))
+        .toList();
+  }
+
+  private OptimizerEnv createOptimizerEnv() {
+    String jdbcUrl =
+        String.format(
+            
"jdbc:h2:mem:gravitino-monitor-it-%d;DB_CLOSE_DELAY=-1;MODE=MYSQL", 
System.nanoTime());
+
+    Map<String, String> configs =
+        ImmutableMap.<String, String>builder()
+            .put(OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(), 
MetricsEvaluatorForTest.NAME)
+            .put(
+                OptimizerConfig.TABLE_JOB_RELATION_PROVIDER_CONFIG.getKey(),
+                TableJobRelationProviderForTest.NAME)
+            .put(OptimizerConfig.MONITOR_CALLBACKS_CONFIG.getKey(), 
MonitorCallbackForTest.NAME)
+            .put(
+                OptimizerConfig.OPTIMIZER_PREFIX
+                    + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+                    + GenericJdbcMetricsRepository.JDBC_URL,
+                jdbcUrl)
+            .put(
+                OptimizerConfig.OPTIMIZER_PREFIX
+                    + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+                    + GenericJdbcMetricsRepository.JDBC_DRIVER,
+                "org.h2.Driver")
+            .build();
+
+    return new OptimizerEnv(new OptimizerConfig(configs));
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/RecommenderIT.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/RecommenderIT.java
new file mode 100644
index 0000000000..a896eb2295
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/RecommenderIT.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import org.apache.gravitino.maintenance.optimizer.recommender.Recommender;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionJobContext;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionStrategyHandler;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.StrategyUtils;
+import 
org.apache.gravitino.maintenance.optimizer.updater.statistics.GravitinoStatisticsUpdater;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/*
+ * 1. update table stats
+ * 2. add policy
+ * 3. run recommender to get optimize result
+ */
+public class RecommenderIT extends AbstractGravitinoOptimizerEnvIT {
+
+  private static final String STATISTICS_PREFIX = "custom-";
+  private static final String DATAFILE_MSE = STATISTICS_PREFIX + 
"datafile_size_mse";
+  private static final String DELETE_FILE_NUM = STATISTICS_PREFIX + 
"position_delete_file_number";
+  private static final String SESSION_ID = "recommender-it-" + 
UUID.randomUUID();
+
+  private StatisticsUpdater statisticsUpdater;
+
+  @Override
+  protected Map<String, String> getSpecifyConfigs() {
+    return Map.of(
+        OptimizerConfig.OPTIMIZER_PREFIX
+            + "strategyHandler."
+            + CompactionStrategyHandler.NAME
+            + ".className",
+        CompactionStrategyHandler.class.getName(),
+        OptimizerConfig.JOB_SUBMITTER_CONFIG.getKey(),
+        RecordingJobSubmitterForIT.NAME,
+        RecordingJobSubmitterForIT.SESSION_ID_KEY,
+        SESSION_ID);
+  }
+
+  @BeforeAll
+  void init() {
+    this.statisticsUpdater = new GravitinoStatisticsUpdater();
+    statisticsUpdater.initialize(optimizerEnv);
+  }
+
+  @AfterAll
+  void closeResources() throws Exception {
+    if (statisticsUpdater != null) {
+      statisticsUpdater.close();
+    }
+  }
+
+  @Test
+  void testRecommendNonPartitionTable() throws Exception {
+    String policyForDelete = "policyForDelete";
+    String policyForSmallFile = "policyForSmallFile";
+
+    String tableWithSmallFile = "tableWithSmallFile";
+    String tableWithDeleteFile = "tableWithDeleteFile";
+    String tableWithoutCompaction = "tableWithoutCompaction";
+
+    createTable(tableWithSmallFile);
+    createTable(tableWithDeleteFile);
+    createTable(tableWithoutCompaction);
+
+    createPolicy(
+        policyForSmallFile,
+        ImmutableMap.of(
+            "min_datafile_mse",
+            1000,
+            StrategyUtils.TRIGGER_EXPR,
+            DATAFILE_MSE + " > min_datafile_mse || " + DELETE_FILE_NUM + " > 
0",
+            StrategyUtils.SCORE_EXPR,
+            DATAFILE_MSE + " + " + DELETE_FILE_NUM + " * 10"),
+        CompactionStrategyHandler.NAME);
+
+    createPolicy(
+        policyForDelete,
+        ImmutableMap.of(
+            "min_datafile_mse",
+            1000,
+            StrategyUtils.TRIGGER_EXPR,
+            DATAFILE_MSE + " > min_datafile_mse || " + DELETE_FILE_NUM + " > 
1",
+            StrategyUtils.SCORE_EXPR,
+            DATAFILE_MSE + "/100 + " + DELETE_FILE_NUM + " * 100"),
+        CompactionStrategyHandler.NAME);
+
+    associatePoliciesToSchema(policyForSmallFile, TEST_SCHEMA);
+    associatePoliciesToSchema(policyForDelete, TEST_SCHEMA);
+
+    statisticsUpdater.updateTableStatistics(
+        getTableIdentifier(tableWithSmallFile),
+        Arrays.asList(
+            new StatisticEntryImpl<>(DELETE_FILE_NUM, 
StatisticValues.longValue(2)),
+            new StatisticEntryImpl<>(DATAFILE_MSE, 
StatisticValues.doubleValue(10000.1))));
+
+    statisticsUpdater.updateTableStatistics(
+        getTableIdentifier(tableWithDeleteFile),
+        Arrays.asList(
+            new StatisticEntryImpl<>(DELETE_FILE_NUM, 
StatisticValues.longValue(100)),
+            new StatisticEntryImpl<>(DATAFILE_MSE, 
StatisticValues.doubleValue(100.1))));
+
+    statisticsUpdater.updateTableStatistics(
+        getTableIdentifier(tableWithoutCompaction),
+        Arrays.asList(
+            new StatisticEntryImpl<>(DELETE_FILE_NUM, 
StatisticValues.longValue(0)),
+            new StatisticEntryImpl<>(DATAFILE_MSE, 
StatisticValues.doubleValue(0))));
+
+    try (Recommender recommender = new Recommender(optimizerEnv)) {
+      List<JobExecutionContext> jobs =
+          recommendForOneStrategy(
+              recommender,
+              Arrays.asList(
+                  getTableIdentifier(tableWithSmallFile),
+                  getTableIdentifier(tableWithDeleteFile),
+                  getTableIdentifier(tableWithoutCompaction)),
+              policyForSmallFile);
+      Assertions.assertEquals(2, jobs.size());
+      Assertions.assertEquals(tableWithSmallFile, 
jobs.get(0).nameIdentifier().name());
+      Assertions.assertEquals(tableWithDeleteFile, 
jobs.get(1).nameIdentifier().name());
+
+      jobs =
+          recommendForOneStrategy(
+              recommender,
+              Arrays.asList(
+                  getTableIdentifier(tableWithSmallFile),
+                  getTableIdentifier(tableWithDeleteFile),
+                  getTableIdentifier(tableWithoutCompaction)),
+              policyForDelete);
+      Assertions.assertEquals(2, jobs.size());
+      Assertions.assertEquals(tableWithDeleteFile, 
jobs.get(0).nameIdentifier().name());
+      Assertions.assertEquals(tableWithSmallFile, 
jobs.get(1).nameIdentifier().name());
+    }
+  }
+
+  @Test
+  void testCompactionPartitionTable() throws Exception {
+    String tableName = "partitionTable";
+    String tableName2 = "partitionTable2";
+    String policyName = "policyForPartition";
+
+    createPartitionTable(tableName);
+    createPartitionTable(tableName2);
+
+    createPolicy(
+        policyName,
+        ImmutableMap.of(
+            StrategyUtils.TRIGGER_EXPR,
+            "true",
+            StrategyUtils.SCORE_EXPR,
+            DELETE_FILE_NUM + " * 100 + " + DATAFILE_MSE),
+        CompactionStrategyHandler.NAME);
+    associatePoliciesToTable(policyName, tableName);
+    associatePoliciesToTable(policyName, tableName2);
+
+    List<PartitionEntry> partition1 =
+        Arrays.asList(new PartitionEntryImpl("col1", "1"), new 
PartitionEntryImpl("col2", "3"));
+    List<PartitionEntry> partition2 =
+        Arrays.asList(new PartitionEntryImpl("col1", "2"), new 
PartitionEntryImpl("col2", "4"));
+    List<PartitionEntry> partition3 =
+        Arrays.asList(new PartitionEntryImpl("col1", "10"), new 
PartitionEntryImpl("col2", "5"));
+
+    statisticsUpdater.updateTableStatistics(
+        getTableIdentifier(tableName),
+        Arrays.asList(
+            new StatisticEntryImpl<>(DELETE_FILE_NUM, 
StatisticValues.longValue(1)),
+            new StatisticEntryImpl<>(DATAFILE_MSE, 
StatisticValues.doubleValue(10.0))));
+    statisticsUpdater.updatePartitionStatistics(
+        getTableIdentifier(tableName),
+        Map.of(
+            PartitionPath.of(partition1),
+            List.of(
+                new StatisticEntryImpl<>(DELETE_FILE_NUM, 
StatisticValues.longValue(5)),
+                new StatisticEntryImpl<>(DATAFILE_MSE, 
StatisticValues.doubleValue(1000.0))),
+            PartitionPath.of(partition2),
+            List.of(
+                new StatisticEntryImpl<>(DELETE_FILE_NUM, 
StatisticValues.longValue(1)),
+                new StatisticEntryImpl<>(DATAFILE_MSE, 
StatisticValues.doubleValue(100.0)))));
+
+    statisticsUpdater.updateTableStatistics(
+        getTableIdentifier(tableName2),
+        Arrays.asList(
+            new StatisticEntryImpl<>(DELETE_FILE_NUM, 
StatisticValues.longValue(1)),
+            new StatisticEntryImpl<>(DATAFILE_MSE, 
StatisticValues.doubleValue(10.0))));
+    statisticsUpdater.updatePartitionStatistics(
+        getTableIdentifier(tableName2),
+        Map.of(
+            PartitionPath.of(partition3),
+            List.of(
+                new StatisticEntryImpl<>(DELETE_FILE_NUM, 
StatisticValues.longValue(2)),
+                new StatisticEntryImpl<>(DATAFILE_MSE, 
StatisticValues.doubleValue(500.0)))));
+
+    try (Recommender recommender = new Recommender(optimizerEnv)) {
+      List<JobExecutionContext> jobs =
+          recommendForOneStrategy(
+              recommender,
+              Arrays.asList(getTableIdentifier(tableName), 
getTableIdentifier(tableName2)),
+              policyName);
+
+      Assertions.assertEquals(2, jobs.size());
+      jobs.forEach(job -> Assertions.assertTrue(job instanceof 
CompactionJobContext));
+
+      Map<String, CompactionJobContext> jobByTable =
+          jobs.stream()
+              .map(job -> (CompactionJobContext) job)
+              .collect(Collectors.toMap(ctx -> ctx.nameIdentifier().name(), 
Function.identity()));
+
+      CompactionJobContext jobContext1 = jobByTable.get(tableName);
+      Assertions.assertNotNull(jobContext1);
+      Assertions.assertFalse(jobContext1.getPartitions().isEmpty());
+      Assertions.assertEquals(2, jobContext1.getPartitions().size());
+      Assertions.assertEquals(
+          Arrays.asList("col1=1", "col2=3"),
+          jobContext1.getPartitions().get(0).entries().stream()
+              .map(p -> p.partitionName() + "=" + p.partitionValue())
+              .toList());
+      Assertions.assertEquals(
+          Arrays.asList("col1=2", "col2=4"),
+          jobContext1.getPartitions().get(1).entries().stream()
+              .map(p -> p.partitionName() + "=" + p.partitionValue())
+              .toList());
+
+      CompactionJobContext jobContext2 = jobByTable.get(tableName2);
+      Assertions.assertNotNull(jobContext2);
+      Assertions.assertFalse(jobContext2.getPartitions().isEmpty());
+      Assertions.assertEquals(1, jobContext2.getPartitions().size());
+      Assertions.assertEquals(
+          Arrays.asList("col1=10", "col2=5"),
+          jobContext2.getPartitions().get(0).entries().stream()
+              .map(p -> p.partitionName() + "=" + p.partitionValue())
+              .toList());
+    }
+  }
+
+  private List<JobExecutionContext> recommendForOneStrategy(
+      Recommender recommender, List<NameIdentifier> identifiers, String 
strategyName) {
+    RecordingJobSubmitterForIT.reset(SESSION_ID);
+    try {
+      recommender.submitForStrategyName(identifiers, strategyName, 
Integer.MAX_VALUE);
+      return RecordingJobSubmitterForIT.submittedContexts(SESSION_ID);
+    } finally {
+      RecordingJobSubmitterForIT.clear(SESSION_ID);
+    }
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/RecordingJobSubmitterForIT.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/RecordingJobSubmitterForIT.java
new file mode 100644
index 0000000000..06eb3c02e1
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/RecordingJobSubmitterForIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+
+public class RecordingJobSubmitterForIT implements JobSubmitter {
+  public static final String NAME = "recording-job-submitter-it";
+  public static final String SESSION_ID_KEY =
+      OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "recording-session-id";
+
+  private static final Map<String, List<JobExecutionContext>> 
SUBMITTED_CONTEXTS_BY_SESSION =
+      new ConcurrentHashMap<>();
+
+  private String sessionId;
+
+  public static void reset(String sessionId) {
+    SUBMITTED_CONTEXTS_BY_SESSION.put(sessionId, new CopyOnWriteArrayList<>());
+  }
+
+  public static List<JobExecutionContext> submittedContexts(String sessionId) {
+    return List.copyOf(
+        SUBMITTED_CONTEXTS_BY_SESSION.getOrDefault(sessionId, new 
CopyOnWriteArrayList<>()));
+  }
+
+  public static void clear(String sessionId) {
+    SUBMITTED_CONTEXTS_BY_SESSION.remove(sessionId);
+  }
+
+  @Override
+  public String submitJob(String jobTemplateName, JobExecutionContext 
jobExecutionContext) {
+    List<JobExecutionContext> submittedContexts =
+        SUBMITTED_CONTEXTS_BY_SESSION.computeIfAbsent(
+            sessionId, key -> new CopyOnWriteArrayList<>());
+    submittedContexts.add(jobExecutionContext);
+    return "it-job-" + submittedContexts.size();
+  }
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public void initialize(OptimizerEnv optimizerEnv) {
+    this.sessionId = optimizerEnv.config().getAllConfig().get(SESSION_ID_KEY);
+    if (sessionId == null || sessionId.isEmpty()) {
+      throw new IllegalArgumentException(
+          "Missing test session id config for RecordingJobSubmitterForIT: " + 
SESSION_ID_KEY);
+    }
+  }
+
+  @Override
+  public void close() throws Exception {}
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/UpdaterIT.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/UpdaterIT.java
new file mode 100644
index 0000000000..e84937189a
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/UpdaterIT.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import org.apache.gravitino.maintenance.optimizer.updater.UpdateType;
+import org.apache.gravitino.maintenance.optimizer.updater.Updater;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class UpdaterIT extends AbstractGravitinoOptimizerEnvIT {
+
+  private Updater updater;
+  private GravitinoStatisticsProvider statisticsProvider;
+  private GravitinoMetricsProvider metricsProvider;
+
+  @Override
+  protected Map<String, String> getSpecifyConfigs() {
+    return Map.of();
+  }
+
+  @BeforeAll
+  void init() {
+    this.updater = new Updater(optimizerEnv);
+    this.statisticsProvider = new GravitinoStatisticsProvider();
+    statisticsProvider.initialize(optimizerEnv);
+    this.metricsProvider = new GravitinoMetricsProvider();
+    metricsProvider.initialize(optimizerEnv);
+  }
+
+  @AfterAll
+  void closeResources() throws Exception {
+    if (metricsProvider != null) {
+      metricsProvider.close();
+    }
+    if (statisticsProvider != null) {
+      statisticsProvider.close();
+    }
+    if (updater != null) {
+      updater.close();
+    }
+  }
+
+  @Test
+  void testUpdateTableStatistics() {
+    String tableName = "update-stats";
+    createTable(tableName);
+    NameIdentifier tableIdentifier = getTableIdentifier(tableName);
+    updater.update(
+        DummyTableStatisticsComputer.DUMMY_TABLE_STAT,
+        Arrays.asList(tableIdentifier),
+        UpdateType.STATISTICS);
+
+    List<StatisticEntry<?>> tableStats = 
statisticsProvider.tableStatistics(tableIdentifier);
+    Assertions.assertEquals(1, tableStats.size());
+    Assertions.assertEquals(DummyTableStatisticsComputer.TABLE_STAT_NAME, 
tableStats.get(0).name());
+    Assertions.assertEquals(1L, ((Number) 
tableStats.get(0).value().value()).longValue());
+
+    Map<PartitionPath, List<StatisticEntry<?>>> partitionStats =
+        statisticsProvider.partitionStatistics(tableIdentifier);
+    Assertions.assertEquals(1, partitionStats.size());
+    List<StatisticEntry<?>> partitionEntries =
+        
partitionStats.values().stream().findFirst().orElseThrow(IllegalStateException::new);
+    Assertions.assertEquals(1, partitionEntries.size());
+    Assertions.assertEquals(
+        DummyTableStatisticsComputer.TABLE_STAT_NAME, 
partitionEntries.get(0).name());
+    Assertions.assertEquals(2L, ((Number) 
partitionEntries.get(0).value().value()).longValue());
+    Assertions.assertEquals(
+        PartitionUtils.encodePartitionPath(
+            
PartitionPath.of(DummyTableStatisticsComputer.getPartitionEntries())),
+        PartitionUtils.encodePartitionPath(
+            
partitionStats.keySet().stream().findFirst().orElseThrow(IllegalStateException::new)));
+  }
+
+  @Test
+  void testUpdateTableMetrics() {
+    String tableName = "update-metrics";
+    createTable(tableName);
+    NameIdentifier tableIdentifier = getTableIdentifier(tableName);
+    updater.update(
+        DummyTableStatisticsComputer.DUMMY_TABLE_STAT,
+        Arrays.asList(tableIdentifier),
+        UpdateType.METRICS);
+
+    List<MetricPoint> tableMetrics =
+        metricsProvider.tableMetrics(tableIdentifier, 0, Long.MAX_VALUE);
+    Assertions.assertEquals(1, tableMetrics.size());
+    MetricPoint tableMetric = tableMetrics.get(0);
+    Assertions.assertEquals(DummyTableStatisticsComputer.TABLE_STAT_NAME, 
tableMetric.metricName());
+    long tableDiff = System.currentTimeMillis() / 1000 - 
tableMetric.timestampSeconds();
+    Assertions.assertTrue(tableDiff >= 0 && tableDiff <= 10000);
+    Assertions.assertEquals(1L, ((Number) 
tableMetric.value().value()).longValue());
+
+    PartitionPath expectedPartition =
+        PartitionPath.of(DummyTableStatisticsComputer.getPartitionEntries());
+    List<MetricPoint> partitionMetrics =
+        metricsProvider.partitionMetrics(tableIdentifier, expectedPartition, 
0, Long.MAX_VALUE);
+    Assertions.assertEquals(1, partitionMetrics.size());
+    MetricPoint partitionMetric = partitionMetrics.get(0);
+    Assertions.assertEquals(
+        DummyTableStatisticsComputer.TABLE_STAT_NAME, 
partitionMetric.metricName());
+    long partitionDiff = System.currentTimeMillis() / 1000 - 
partitionMetric.timestampSeconds();
+    Assertions.assertTrue(partitionDiff >= 0 && partitionDiff <= 10000);
+    Assertions.assertEquals(2L, ((Number) 
partitionMetric.value().value()).longValue());
+    Assertions.assertEquals(
+        PartitionUtils.encodePartitionPath(expectedPartition),
+        
PartitionUtils.encodePartitionPath(partitionMetric.partitionPath().orElseThrow()));
+  }
+
+  @Test
+  void testUpdateJobMetrics() {
+    String jobName = "update-job-metrics";
+    NameIdentifier jobIdentifier = NameIdentifier.of(jobName);
+    updater.update(
+        DummyJobMetricsCalculator.DUMMY_JOB_METRICS,
+        Arrays.asList(jobIdentifier),
+        UpdateType.METRICS);
+
+    List<MetricPoint> jobMetrics = metricsProvider.jobMetrics(jobIdentifier, 
0, Long.MAX_VALUE);
+    Assertions.assertEquals(1, jobMetrics.size());
+    MetricPoint jobMetric = jobMetrics.get(0);
+    Assertions.assertEquals(DummyJobMetricsCalculator.JOB_STAT_NAME, 
jobMetric.metricName());
+    long diff = System.currentTimeMillis() / 1000 - 
jobMetric.timestampSeconds();
+    Assertions.assertTrue(diff >= 0 && diff <= 10000);
+    Assertions.assertEquals(1L, ((Number) 
jobMetric.value().value()).longValue());
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/TestGravitinoMetricsUpdater.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/TestGravitinoMetricsUpdater.java
index e68bf72573..673892ecd5 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/TestGravitinoMetricsUpdater.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/TestGravitinoMetricsUpdater.java
@@ -19,7 +19,6 @@
 
 package org.apache.gravitino.maintenance.optimizer.updater.metrics;
 
-import java.lang.reflect.Field;
 import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.NameIdentifier;
@@ -62,7 +61,7 @@ class TestGravitinoMetricsUpdater {
   void testUpdateTableAndPartitionMetricsPassThroughRequests() throws 
Exception {
     GravitinoMetricsUpdater updater = new GravitinoMetricsUpdater();
     MetricsRepository repository = Mockito.mock(MetricsRepository.class);
-    setMetricsRepository(updater, repository);
+    updater.setMetricsRepositoryForTest(repository);
     NameIdentifier tableId = NameIdentifier.of("catalog", "db", "table");
     List<MetricPoint> inputMetrics =
         List.of(
@@ -97,7 +96,7 @@ class TestGravitinoMetricsUpdater {
   void testUpdateJobMetricsPassThroughRequests() throws Exception {
     GravitinoMetricsUpdater updater = new GravitinoMetricsUpdater();
     MetricsRepository repository = Mockito.mock(MetricsRepository.class);
-    setMetricsRepository(updater, repository);
+    updater.setMetricsRepositoryForTest(repository);
     NameIdentifier jobId = NameIdentifier.of("catalog", "db", "job");
     List<MetricPoint> inputMetrics =
         List.of(MetricPoint.forJob(jobId, "duration", 
StatisticValues.longValue(20L), 200L));
@@ -120,7 +119,7 @@ class TestGravitinoMetricsUpdater {
   void testUpdateTableAndPartitionMetricsRejectsJobScope() throws Exception {
     GravitinoMetricsUpdater updater = new GravitinoMetricsUpdater();
     MetricsRepository repository = Mockito.mock(MetricsRepository.class);
-    setMetricsRepository(updater, repository);
+    updater.setMetricsRepositoryForTest(repository);
     NameIdentifier jobId = NameIdentifier.of("catalog", "db", "job");
 
     Assertions.assertThrows(
@@ -135,7 +134,7 @@ class TestGravitinoMetricsUpdater {
   void testUpdateJobMetricsRejectsTableScope() throws Exception {
     GravitinoMetricsUpdater updater = new GravitinoMetricsUpdater();
     MetricsRepository repository = Mockito.mock(MetricsRepository.class);
-    setMetricsRepository(updater, repository);
+    updater.setMetricsRepositoryForTest(repository);
     NameIdentifier tableId = NameIdentifier.of("catalog", "db", "table");
 
     Assertions.assertThrows(
@@ -151,7 +150,7 @@ class TestGravitinoMetricsUpdater {
   void testCloseDelegatesToRepository() throws Exception {
     GravitinoMetricsUpdater updater = new GravitinoMetricsUpdater();
     MetricsRepository repository = Mockito.mock(MetricsRepository.class);
-    setMetricsRepository(updater, repository);
+    updater.setMetricsRepositoryForTest(repository);
 
     updater.close();
 
@@ -177,7 +176,7 @@ class TestGravitinoMetricsUpdater {
                     "org.h2.Driver",
                     OptimizerConfig.OPTIMIZER_PREFIX + "jdbcMetrics." + 
"testOnBorrow",
                     "false"))));
-    MetricsRepository repository = getMetricsRepository(updater);
+    MetricsRepository repository = updater.metricsRepositoryForTest();
     Assertions.assertInstanceOf(GenericJdbcMetricsRepository.class, 
repository);
     updater.close();
   }
@@ -196,25 +195,11 @@ class TestGravitinoMetricsUpdater {
                 "gravitino.optimizer.jdbcMetrics.jdbcPassword",
                 ""));
     updater.initialize(new OptimizerEnv(config));
-    MetricsRepository repository = getMetricsRepository(updater);
+    MetricsRepository repository = updater.metricsRepositoryForTest();
     Assertions.assertInstanceOf(GenericJdbcMetricsRepository.class, 
repository);
     updater.close();
   }
 
-  private void setMetricsRepository(GravitinoMetricsUpdater updater, 
MetricsRepository repository)
-      throws ReflectiveOperationException {
-    Field field = 
GravitinoMetricsUpdater.class.getDeclaredField("metricsStorage");
-    field.setAccessible(true);
-    field.set(updater, repository);
-  }
-
-  private MetricsRepository getMetricsRepository(GravitinoMetricsUpdater 
updater)
-      throws ReflectiveOperationException {
-    Field field = 
GravitinoMetricsUpdater.class.getDeclaredField("metricsStorage");
-    field.setAccessible(true);
-    return (MetricsRepository) field.get(updater);
-  }
-
   private PartitionPath parsePartitionPath(String partition) {
     String[] entries = partition.split("/");
     List<PartitionEntry> partitionEntries = new 
java.util.ArrayList<>(entries.length);
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/TestGravitinoStatisticsUpdater.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/TestGravitinoStatisticsUpdater.java
index 65e5ea4b6c..f856cc7c03 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/TestGravitinoStatisticsUpdater.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/TestGravitinoStatisticsUpdater.java
@@ -19,7 +19,6 @@
 
 package org.apache.gravitino.maintenance.optimizer.updater.statistics;
 
-import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -57,7 +56,7 @@ class TestGravitinoStatisticsUpdater {
   void testUpdateTableStatisticsDuplicateNameLastWins() throws Exception {
     GravitinoStatisticsUpdater updater = new GravitinoStatisticsUpdater();
     GravitinoClient client = Mockito.mock(GravitinoClient.class, 
Mockito.RETURNS_DEEP_STUBS);
-    setClient(updater, client);
+    updater.setGravitinoClientForTest(client);
     NameIdentifier tableIdentifier = NameIdentifier.of("catalog", "db", 
"table");
 
     updater.updateTableStatistics(
@@ -82,7 +81,7 @@ class TestGravitinoStatisticsUpdater {
   void testUpdatePartitionStatisticsNullPartitionPathFails() throws Exception {
     GravitinoStatisticsUpdater updater = new GravitinoStatisticsUpdater();
     GravitinoClient client = Mockito.mock(GravitinoClient.class, 
Mockito.RETURNS_DEEP_STUBS);
-    setClient(updater, client);
+    updater.setGravitinoClientForTest(client);
 
     Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics = new 
HashMap<>();
     partitionStatistics.put(null, List.of(stat("s1", 1L)));
@@ -101,7 +100,7 @@ class TestGravitinoStatisticsUpdater {
   void testUpdatePartitionStatisticsDuplicateNameLastWins() throws Exception {
     GravitinoStatisticsUpdater updater = new GravitinoStatisticsUpdater();
     GravitinoClient client = Mockito.mock(GravitinoClient.class, 
Mockito.RETURNS_DEEP_STUBS);
-    setClient(updater, client);
+    updater.setGravitinoClientForTest(client);
     NameIdentifier tableIdentifier = NameIdentifier.of("catalog", "db", 
"table");
     PartitionPath partitionPath = PartitionPath.of(List.of(new 
PartitionEntryImpl("p", "1")));
 
@@ -128,11 +127,4 @@ class TestGravitinoStatisticsUpdater {
   private StatisticEntry<?> stat(String name, long value) {
     return new StatisticEntryImpl<>(name, StatisticValues.longValue(value));
   }
-
-  private void setClient(GravitinoStatisticsUpdater updater, GravitinoClient 
client)
-      throws ReflectiveOperationException {
-    Field field = 
GravitinoStatisticsUpdater.class.getDeclaredField("gravitinoClient");
-    field.setAccessible(true);
-    field.set(updater, client);
-  }
 }
diff --git 
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
index 3084fcc280..0b1189aba0 100644
--- 
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
+++ 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
@@ -25,3 +25,4 @@ 
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTe
 
org.apache.gravitino.maintenance.optimizer.recommender.StrategyProviderForCmdTest
 
org.apache.gravitino.maintenance.optimizer.recommender.StatisticsProviderForCmdTest
 
org.apache.gravitino.maintenance.optimizer.recommender.TableMetadataProviderForCmdTest
+org.apache.gravitino.maintenance.optimizer.integration.test.RecordingJobSubmitterForIT
diff --git 
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator
 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator
index 2e0ab7cd28..2e3a914a1c 100644
--- 
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator
+++ 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator
@@ -16,3 +16,5 @@
 # under the License.
 
 org.apache.gravitino.maintenance.optimizer.updater.StatisticsCalculatorForTest
+org.apache.gravitino.maintenance.optimizer.integration.test.DummyTableStatisticsComputer
+org.apache.gravitino.maintenance.optimizer.integration.test.DummyJobMetricsCalculator


Reply via email to