Copilot commented on code in PR #10191:
URL: https://github.com/apache/gravitino/pull/10191#discussion_r2882909711


##########
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/UpdaterIT.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import org.apache.gravitino.maintenance.optimizer.updater.UpdateType;
+import org.apache.gravitino.maintenance.optimizer.updater.Updater;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class UpdaterIT extends GravitinoOptimizerEnvIT {
+
+  private Updater updater;
+  private GravitinoStatisticsProvider statisticsProvider;
+  private GravitinoMetricsProvider metricsProvider;
+
+  @Override
+  protected Map<String, String> getSpecifyConfigs() {
+    return Map.of();
+  }
+
+  @BeforeAll
+  void init() {
+    this.updater = new Updater(optimizerEnv);
+    this.statisticsProvider = new GravitinoStatisticsProvider();
+    statisticsProvider.initialize(optimizerEnv);
+    this.metricsProvider = new GravitinoMetricsProvider();
+    metricsProvider.initialize(optimizerEnv);
+  }

Review Comment:
   The test initializes several AutoCloseable components in `@BeforeAll` 
(Updater, GravitinoStatisticsProvider, GravitinoMetricsProvider) but never 
closes them. This can leak threads/connections and keep the H2 metrics DB file 
locked; add an `@AfterAll` to close the updater and both providers (and 
consider closing in reverse init order).



##########
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoTableMetaIT.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import 
org.apache.gravitino.maintenance.optimizer.recommender.table.GravitinoTableMetadataProvider;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class GravitinoTableMetaIT extends GravitinoOptimizerEnvIT {
+  private GravitinoTableMetadataProvider tableMetadataProvider;
+
+  @BeforeAll
+  void init() {
+    this.tableMetadataProvider = new GravitinoTableMetadataProvider();
+    tableMetadataProvider.initialize(optimizerEnv);
+  }

Review Comment:
   GravitinoTableMetadataProvider is initialized in `@BeforeAll` but not 
closed. Since it is a Provider/AutoCloseable (and likely holds a 
GravitinoClient), add an `@AfterAll` to close tableMetadataProvider to avoid 
resource leaks.



##########
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoStrategyIT.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategyProvider;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class GravitinoStrategyIT extends GravitinoOptimizerEnvIT {
+
+  private GravitinoStrategyProvider strategyProvider;
+
+  @BeforeAll
+  void init() {
+    this.strategyProvider = new GravitinoStrategyProvider();
+    strategyProvider.initialize(optimizerEnv);
+  }

Review Comment:
   GravitinoStrategyProvider is a Provider (AutoCloseable) and is initialized 
in `@BeforeAll` but never closed. Add an `@AfterAll` to close strategyProvider 
to avoid leaking the underlying GravitinoClient.



##########
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoOptimizerEnvIT.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategy;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+import org.apache.gravitino.policy.PolicyContent;
+import org.apache.gravitino.policy.PolicyContents;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.BeforeAll;
+
+// Set up the Gravitino server, metalake, Iceberg catalogs
+public class GravitinoOptimizerEnvIT extends BaseIT {
+
+  protected static final String METALAKE_NAME = "test_metalake";
+  protected static final String GRAVITINO_CATALOG_NAME = "iceberg";
+  protected static final String TEST_SCHEMA = "test_schema";
+
+  protected Catalog catalogClient;
+  protected GravitinoMetalake metalakeClient;
+  protected OptimizerEnv optimizerEnv;
+
+  @BeforeAll
+  @Override
+  public void startIntegrationTest() throws Exception {
+    super.startIntegrationTest();
+    initMetalakeAndCatalog();
+    this.optimizerEnv = initOptimizerEnv();
+  }
+
+  protected void createTable(String tableName) {
+    catalogClient
+        .asTableCatalog()
+        .createTable(
+            NameIdentifier.of(TEST_SCHEMA, tableName),
+            new Column[] {Column.of("col_1", Types.IntegerType.get())},
+            "comment",
+            ImmutableMap.of());
+  }
+
+  protected NameIdentifier getTableIdentifier(String tableName) {
+    return NameIdentifier.of(GRAVITINO_CATALOG_NAME, TEST_SCHEMA, tableName);
+  }
+
+  protected void createPartitionTable(String tableName) {
+    catalogClient
+        .asTableCatalog()
+        .createTable(
+            NameIdentifier.of(TEST_SCHEMA, tableName),
+            new Column[] {
+              Column.of("col1", Types.IntegerType.get(), "col1"),
+              Column.of("col2", Types.IntegerType.get(), "col2"),
+              Column.of("col3", Types.IntegerType.get(), "col3")
+            },
+            "comment",
+            ImmutableMap.of(),
+            new Transform[] {
+              Transforms.identity("col1"), Transforms.bucket(8, new String[] 
{"col2"})
+            });
+  }
+
+  protected void createPolicy(String policyName, Map<String, Object> rules, 
String policyType) {
+    PolicyContent content =
+        PolicyContents.custom(
+            rules,
+            ImmutableSet.of(MetadataObject.Type.TABLE),
+            Map.of(
+                GravitinoStrategy.STRATEGY_TYPE_KEY,
+                policyType,
+                GravitinoStrategy.JOB_TEMPLATE_NAME_KEY,
+                "template-name"));
+    metalakeClient.createPolicy(policyName, "custom", "comment", true, 
content);
+  }
+
+  protected void associatePoliciesToTable(String policyName, String tableName) 
{
+    Table table =
+        
catalogClient.asTableCatalog().loadTable(NameIdentifier.of(TEST_SCHEMA, 
tableName));
+    table.supportsPolicies().associatePolicies(new String[] {policyName}, new 
String[] {});
+  }
+
+  protected void associatePoliciesToSchema(String policyName, String 
schemaName) {
+    Schema schema = catalogClient.asSchemas().loadSchema(schemaName);
+    schema.supportsPolicies().associatePolicies(new String[] {policyName}, new 
String[] {});
+  }
+
+  protected Map<String, String> getSpecifyConfigs() {
+    return Map.of();
+  }
+
+  protected OptimizerEnv initOptimizerEnv() {
+    Map<String, String> configs = new HashMap<>();
+    configs.putAll(getGravitinoConfigs());
+    configs.putAll(getJdbcMetricsConfigs());
+    configs.putAll(getSpecifyConfigs());
+    return new OptimizerEnv(new OptimizerConfig(configs));
+  }
+
+  private Map<String, String> getJdbcMetricsConfigs() {
+    String jdbcUrl =
+        String.format(
+            
"jdbc:h2:file:/tmp/gravitino-optimizer-it-%d;DB_CLOSE_DELAY=-1;MODE=MYSQL;AUTO_SERVER=TRUE",
+            System.nanoTime());
+
+    return Map.of(
+        OptimizerConfig.OPTIMIZER_PREFIX
+            + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+            + GenericJdbcMetricsRepository.JDBC_URL,
+        jdbcUrl,
+        OptimizerConfig.OPTIMIZER_PREFIX
+            + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+            + GenericJdbcMetricsRepository.JDBC_DRIVER,
+        "org.h2.Driver");
+  }
+
+  private Map<String, String> getGravitinoConfigs() {
+    int gravitinoPort = getGravitinoServerPort();
+    String uri = String.format("http://127.0.0.1:%d";, gravitinoPort);
+    return ImmutableMap.of(
+        OptimizerConfig.GRAVITINO_URI,
+        uri,
+        OptimizerConfig.GRAVITINO_METALAKE,
+        METALAKE_NAME,
+        OptimizerConfig.GRAVITINO_DEFAULT_CATALOG,
+        GRAVITINO_CATALOG_NAME);
+  }
+
+  private void initMetalakeAndCatalog() {
+    this.metalakeClient = client.createMetalake(METALAKE_NAME, "", new 
HashMap<>());
+    this.catalogClient = createGravitinoIcebergCatalog();
+
+    if (!catalogClient.asSchemas().schemaExists(TEST_SCHEMA)) {
+      catalogClient.asSchemas().createSchema(TEST_SCHEMA, "comment", 
ImmutableMap.of());
+    }
+  }
+
+  private Catalog createGravitinoIcebergCatalog() {
+    return metalakeClient.createCatalog(
+        GRAVITINO_CATALOG_NAME,
+        Catalog.Type.RELATIONAL,
+        "lakehouse-iceberg",
+        "comment",
+        ImmutableMap.of(
+            IcebergConstants.URI,
+            "memory://gravitino-optimizer",
+            IcebergConstants.CATALOG_BACKEND,
+            "memory",
+            IcebergConstants.WAREHOUSE,
+            "file:///tmp/gravitino-optimizer/"));

Review Comment:
   The Iceberg warehouse is hard-coded to file:///tmp/gravitino-optimizer/. 
This can cause cross-test interference and leaves data on disk; consider using 
a per-test temp directory (e.g., Files.createTempDirectory) and pointing 
IcebergConstants.WAREHOUSE to that location.



##########
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoOptimizerEnvIT.java:
##########
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategy;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+import org.apache.gravitino.policy.PolicyContent;
+import org.apache.gravitino.policy.PolicyContents;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.BeforeAll;
+
+// Set up the Gravitino server, metalake, Iceberg catalogs
+public class GravitinoOptimizerEnvIT extends BaseIT {
+
+  protected static final String METALAKE_NAME = "test_metalake";
+  protected static final String GRAVITINO_CATALOG_NAME = "iceberg";
+  protected static final String TEST_SCHEMA = "test_schema";
+
+  protected Catalog catalogClient;
+  protected GravitinoMetalake metalakeClient;
+  protected OptimizerEnv optimizerEnv;
+
+  @BeforeAll
+  @Override
+  public void startIntegrationTest() throws Exception {
+    super.startIntegrationTest();
+    initMetalakeAndCatalog();
+    this.optimizerEnv = initOptimizerEnv();
+  }
+
+  protected void createTable(String tableName) {
+    catalogClient
+        .asTableCatalog()
+        .createTable(
+            NameIdentifier.of(TEST_SCHEMA, tableName),
+            new Column[] {Column.of("col_1", Types.IntegerType.get())},
+            "comment",
+            ImmutableMap.of());
+  }
+
+  protected NameIdentifier getTableIdentifier(String tableName) {
+    return NameIdentifier.of(GRAVITINO_CATALOG_NAME, TEST_SCHEMA, tableName);
+  }
+
+  protected void createPartitionTable(String tableName) {
+    catalogClient
+        .asTableCatalog()
+        .createTable(
+            NameIdentifier.of(TEST_SCHEMA, tableName),
+            new Column[] {
+              Column.of("col1", Types.IntegerType.get(), "col1"),
+              Column.of("col2", Types.IntegerType.get(), "col2"),
+              Column.of("col3", Types.IntegerType.get(), "col3")
+            },
+            "comment",
+            ImmutableMap.of(),
+            new Transform[] {
+              Transforms.identity("col1"), Transforms.bucket(8, new String[] 
{"col2"})
+            });
+  }
+
+  protected void createPolicy(String policyName, Map<String, Object> rules, 
String policyType) {
+    PolicyContent content =
+        PolicyContents.custom(
+            rules,
+            ImmutableSet.of(MetadataObject.Type.TABLE),
+            Map.of(
+                GravitinoStrategy.STRATEGY_TYPE_KEY,
+                policyType,
+                GravitinoStrategy.JOB_TEMPLATE_NAME_KEY,
+                "template-name"));
+    metalakeClient.createPolicy(policyName, "custom", "comment", true, 
content);
+  }
+
+  protected void associatePoliciesToTable(String policyName, String tableName) 
{
+    Table table =
+        
catalogClient.asTableCatalog().loadTable(NameIdentifier.of(TEST_SCHEMA, 
tableName));
+    table.supportsPolicies().associatePolicies(new String[] {policyName}, new 
String[] {});
+  }
+
+  protected void associatePoliciesToSchema(String policyName, String 
schemaName) {
+    Schema schema = catalogClient.asSchemas().loadSchema(schemaName);
+    schema.supportsPolicies().associatePolicies(new String[] {policyName}, new 
String[] {});
+  }
+
+  protected Map<String, String> getSpecifyConfigs() {
+    return Map.of();
+  }
+
+  protected OptimizerEnv initOptimizerEnv() {
+    Map<String, String> configs = new HashMap<>();
+    configs.putAll(getGravitinoConfigs());
+    configs.putAll(getJdbcMetricsConfigs());
+    configs.putAll(getSpecifyConfigs());
+    return new OptimizerEnv(new OptimizerConfig(configs));
+  }
+
+  private Map<String, String> getJdbcMetricsConfigs() {
+    String jdbcUrl =
+        String.format(
+            
"jdbc:h2:file:/tmp/gravitino-optimizer-it-%d;DB_CLOSE_DELAY=-1;MODE=MYSQL;AUTO_SERVER=TRUE",
+            System.nanoTime());
+

Review Comment:
   The test uses a hard-coded /tmp H2 file path for metrics storage. This is 
less portable (non-Unix environments/permissions) and leaves files behind; 
consider creating a temp directory via java.io.tmpdir / 
Files.createTempDirectory (as done in TestH2GenericJdbcMetricsRepositoryIT) and 
building the JDBC URL from that path.



##########
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/MonitorIT.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.DataScope;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.EvaluationResult;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import org.apache.gravitino.maintenance.optimizer.monitor.Monitor;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.evaluator.MetricsEvaluatorForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.job.TableJobRelationProviderForTest;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class MonitorIT {
+
+  private GravitinoMetricsUpdater updater;
+  private Monitor monitor;
+
+  @BeforeAll
+  public void setUp() {
+    OptimizerEnv env = createOptimizerEnv();
+    this.updater = new GravitinoMetricsUpdater();
+    updater.initialize(env);
+    this.monitor = new Monitor(env);
+    MonitorCallbackForTest.reset();
+  }

Review Comment:
   This test class initializes GravitinoMetricsUpdater and Monitor in 
`@BeforeAll` but never closes them. Please add an `@AfterAll` to call 
updater.close() (and close any other AutoCloseable resources used by Monitor if 
applicable) to avoid leaking the underlying JDBC repository and file handles.



##########
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/MonitorIT.java:
##########
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.DataScope;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.EvaluationResult;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import org.apache.gravitino.maintenance.optimizer.monitor.Monitor;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.evaluator.MetricsEvaluatorForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.job.TableJobRelationProviderForTest;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class MonitorIT {
+
+  private GravitinoMetricsUpdater updater;
+  private Monitor monitor;
+
+  @BeforeAll
+  public void setUp() {
+    OptimizerEnv env = createOptimizerEnv();
+    this.updater = new GravitinoMetricsUpdater();
+    updater.initialize(env);
+    this.monitor = new Monitor(env);
+    MonitorCallbackForTest.reset();
+  }
+
+  @Test
+  void testTableMetrics() {
+    long actionTime = 10;
+    long rangeSeconds = 2;
+    NameIdentifier tableIdentifier = NameIdentifier.of("db", "table");
+    NameIdentifier job1 = TableJobRelationProviderForTest.JOB1;
+    NameIdentifier job2 = TableJobRelationProviderForTest.JOB2;
+
+    updater.updateTableAndPartitionMetrics(
+        Arrays.asList(
+            MetricPoint.forTable(tableIdentifier, "storage", 
StatisticValues.longValue(10), 8),
+            MetricPoint.forTable(tableIdentifier, "s3_cost", 
StatisticValues.longValue(1000), 9),
+            MetricPoint.forTable(tableIdentifier, "s3_cost", 
StatisticValues.longValue(1003), 10),
+            MetricPoint.forTable(tableIdentifier, "storage", 
StatisticValues.longValue(100L), 11)));
+
+    List<MetricPoint> jobMetrics =
+        Arrays.asList(
+            MetricPoint.forJob(job1, "job_runtime", 
StatisticValues.longValue(8), 8),
+            MetricPoint.forJob(job1, "job_cost", StatisticValues.longValue(9), 
9),
+            MetricPoint.forJob(job1, "job_cost", 
StatisticValues.longValue(10), 10),
+            MetricPoint.forJob(job1, "job_cost", 
StatisticValues.longValue(11), 100),
+            MetricPoint.forJob(job1, "job_runtime", 
StatisticValues.longValue(12L), 11),
+            MetricPoint.forJob(job2, "job_runtime", 
StatisticValues.longValue(8), 8),
+            MetricPoint.forJob(job2, "job_cost", StatisticValues.longValue(9), 
9),
+            MetricPoint.forJob(job2, "job_cost", 
StatisticValues.longValue(10), 10),
+            MetricPoint.forJob(job2, "job_cost", 
StatisticValues.longValue(11), 100),
+            MetricPoint.forJob(job2, "job_runtime", 
StatisticValues.longValue(12L), 11));
+    updater.updateJobMetrics(jobMetrics);
+
+    List<EvaluationResult> results =
+        monitor.evaluateMetrics(tableIdentifier, actionTime, rangeSeconds, 
Optional.empty());
+    Assertions.assertEquals(3, results.size());
+
+    EvaluationResult tableResult = findByScope(results, DataScope.Type.TABLE, 
tableIdentifier);
+
+    List<MetricSample> storageBefore = 
sortedSamples(tableResult.beforeMetrics().get("storage"));
+    Assertions.assertEquals(1, storageBefore.size());
+    Assertions.assertEquals(8L, storageBefore.get(0).timestampSeconds());
+    Assertions.assertEquals(10L, ((Number) 
storageBefore.get(0).value().value()).longValue());
+
+    List<MetricSample> s3Before = 
sortedSamples(tableResult.beforeMetrics().get("s3_cost"));
+    Assertions.assertEquals(1, s3Before.size());
+    Assertions.assertEquals(9L, s3Before.get(0).timestampSeconds());
+    Assertions.assertEquals(1000L, ((Number) 
s3Before.get(0).value().value()).longValue());
+
+    List<MetricSample> storageAfter = 
sortedSamples(tableResult.afterMetrics().get("storage"));
+    Assertions.assertEquals(1, storageAfter.size());
+    Assertions.assertEquals(11L, storageAfter.get(0).timestampSeconds());
+    Assertions.assertEquals(100L, ((Number) 
storageAfter.get(0).value().value()).longValue());
+
+    List<MetricSample> s3After = 
sortedSamples(tableResult.afterMetrics().get("s3_cost"));
+    Assertions.assertEquals(1, s3After.size());
+    Assertions.assertEquals(10L, s3After.get(0).timestampSeconds());
+    Assertions.assertEquals(1003L, ((Number) 
s3After.get(0).value().value()).longValue());
+
+    EvaluationResult jobResult1 = findByScope(results, DataScope.Type.JOB, 
job1);
+    checkJobMetrics(jobResult1.beforeMetrics(), jobResult1.afterMetrics());
+
+    EvaluationResult jobResult2 = findByScope(results, DataScope.Type.JOB, 
job2);
+    checkJobMetrics(jobResult2.beforeMetrics(), jobResult2.afterMetrics());
+  }
+
+  @Test
+  void testPartitionMetrics() {
+    long actionTime = 10;
+    long rangeSeconds = 2;
+    NameIdentifier tableIdentifier = NameIdentifier.of("db", "partitionTable");
+    PartitionPath partitionPath =
+        
PartitionUtils.decodePartitionPath("[{\"country\":\"US\"},{\"region\":\"CA\"}]");
+
+    updater.updateTableAndPartitionMetrics(
+        Arrays.asList(
+            MetricPoint.forPartition(
+                tableIdentifier, partitionPath, "storage", 
StatisticValues.longValue(10), 8),
+            MetricPoint.forPartition(
+                tableIdentifier, partitionPath, "storage", 
StatisticValues.longValue(20), 11),
+            MetricPoint.forPartition(
+                tableIdentifier, partitionPath, "s3_cost", 
StatisticValues.longValue(5), 10)));
+
+    List<EvaluationResult> results =
+        monitor.evaluateMetrics(
+            tableIdentifier, actionTime, rangeSeconds, 
Optional.of(partitionPath));
+
+    EvaluationResult partitionResult =
+        results.stream()
+            .filter(result -> result.scope().type() == 
DataScope.Type.PARTITION)
+            .findFirst()
+            .orElseThrow(IllegalStateException::new);
+
+    Assertions.assertEquals(partitionPath, 
partitionResult.scope().partition().orElseThrow());
+
+    List<MetricSample> storageBefore =
+        sortedSamples(partitionResult.beforeMetrics().get("storage"));
+    Assertions.assertEquals(1, storageBefore.size());
+    Assertions.assertEquals(8L, storageBefore.get(0).timestampSeconds());
+    Assertions.assertEquals(10L, ((Number) 
storageBefore.get(0).value().value()).longValue());
+
+    List<MetricSample> storageAfter = 
sortedSamples(partitionResult.afterMetrics().get("storage"));
+    Assertions.assertEquals(1, storageAfter.size());
+    Assertions.assertEquals(11L, storageAfter.get(0).timestampSeconds());
+    Assertions.assertEquals(20L, ((Number) 
storageAfter.get(0).value().value()).longValue());
+
+    List<MetricSample> s3After = 
sortedSamples(partitionResult.afterMetrics().get("s3_cost"));
+    Assertions.assertEquals(1, s3After.size());
+    Assertions.assertEquals(10L, s3After.get(0).timestampSeconds());
+    Assertions.assertEquals(5L, ((Number) 
s3After.get(0).value().value()).longValue());
+  }
+
+  @Test
+  void testMonitorCallbacks() {
+    MonitorCallbackForTest.reset();
+    NameIdentifier tableIdentifier = NameIdentifier.of("db", "table");
+
+    monitor.evaluateMetrics(tableIdentifier, 10, 1, Optional.empty());
+
+    Assertions.assertEquals(3, MonitorCallbackForTest.INVOCATIONS.get());
+    Assertions.assertTrue(
+        MonitorCallbackForTest.RESULTS.stream()
+            .anyMatch(result -> result.scope().type() == 
DataScope.Type.TABLE));
+  }
+
+  private void checkJobMetrics(
+      Map<String, List<MetricSample>> jobBeforeMetrics,
+      Map<String, List<MetricSample>> jobAfterMetrics) {
+    List<MetricSample> runtimeBefore = 
sortedSamples(jobBeforeMetrics.get("job_runtime"));
+    Assertions.assertEquals(1, runtimeBefore.size());
+    Assertions.assertEquals(8L, runtimeBefore.get(0).timestampSeconds());
+    Assertions.assertEquals(8L, ((Number) 
runtimeBefore.get(0).value().value()).longValue());
+
+    List<MetricSample> costBefore = 
sortedSamples(jobBeforeMetrics.get("job_cost"));
+    Assertions.assertEquals(1, costBefore.size());
+    Assertions.assertEquals(9L, costBefore.get(0).timestampSeconds());
+    Assertions.assertEquals(9L, ((Number) 
costBefore.get(0).value().value()).longValue());
+
+    List<MetricSample> runtimeAfter = 
sortedSamples(jobAfterMetrics.get("job_runtime"));
+    Assertions.assertEquals(1, runtimeAfter.size());
+    Assertions.assertEquals(11L, runtimeAfter.get(0).timestampSeconds());
+    Assertions.assertEquals(12L, ((Number) 
runtimeAfter.get(0).value().value()).longValue());
+
+    List<MetricSample> costAfter = 
sortedSamples(jobAfterMetrics.get("job_cost"));
+    Assertions.assertEquals(1, costAfter.size());
+    Assertions.assertEquals(10L, costAfter.get(0).timestampSeconds());
+    Assertions.assertEquals(10L, ((Number) 
costAfter.get(0).value().value()).longValue());
+  }
+
+  private static EvaluationResult findByScope(
+      List<EvaluationResult> results, DataScope.Type type, NameIdentifier 
identifier) {
+    return results.stream()
+        .filter(result -> result.scope().type() == type)
+        .filter(result -> result.scope().identifier().equals(identifier))
+        .findFirst()
+        .orElseThrow(IllegalStateException::new);
+  }
+
+  private static List<MetricSample> sortedSamples(List<MetricSample> samples) {
+    if (samples == null) {
+      return List.of();
+    }
+    return samples.stream()
+        .sorted(Comparator.comparingLong(MetricSample::timestampSeconds))
+        .toList();
+  }
+
+  private OptimizerEnv createOptimizerEnv() {
+    String jdbcUrl =
+        String.format(
+            
"jdbc:h2:file:/tmp/gravitino-monitor-it-%d;DB_CLOSE_DELAY=-1;MODE=MYSQL;AUTO_SERVER=TRUE",

Review Comment:
   This test uses a hard-coded /tmp H2 file path for the metrics repository. 
For portability and cleanup, prefer a per-test temp directory 
(Files.createTempDirectory) or an in-memory H2 URL, similar to optimizer 
storage ITs.
   ```suggestion
     @Test
     public void testCreateOptimizerEnvUsesInMemoryH2() {
       OptimizerEnv env = createOptimizerEnv();
       String key =
           OptimizerConfig.OPTIMIZER_PREFIX
               + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
               + GenericJdbcMetricsRepository.JDBC_URL;
       String jdbcUrl = env.config().get(key);
       Assertions.assertNotNull(jdbcUrl);
       Assertions.assertTrue(
           jdbcUrl.startsWith("jdbc:h2:mem:gravitino-monitor-it-"),
           "Expected in-memory H2 URL, but was: " + jdbcUrl);
     }
   
     private OptimizerEnv createOptimizerEnv() {
       String jdbcUrl =
           String.format(
               
"jdbc:h2:mem:gravitino-monitor-it-%d;DB_CLOSE_DELAY=-1;MODE=MYSQL",
   ```



##########
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoMetricsIT.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class GravitinoMetricsIT {
+
+  private GravitinoMetricsUpdater updater;
+  private GravitinoMetricsProvider provider;
+
+  @BeforeAll
+  public void setUp() {
+    OptimizerEnv optimizerEnv = createOptimizerEnv();
+    updater = new GravitinoMetricsUpdater();
+    updater.initialize(optimizerEnv);
+    provider = new GravitinoMetricsProvider();
+    provider.initialize(optimizerEnv);
+  }
+
+  @AfterAll
+  public void tearDown() throws Exception {
+    if (updater != null) {
+      updater.close();
+    }
+    if (provider != null) {
+      provider.close();
+    }
+  }
+
+  @Test
+  void testTableMetrics() {
+    NameIdentifier tableIdentifier = NameIdentifier.of("catalog", "schema", 
"table");
+
+    PartitionPath partition1 =
+        PartitionPath.of(
+            Arrays.asList(new PartitionEntryImpl("p1", "v1"), new 
PartitionEntryImpl("p2", "v2")));
+    PartitionPath partition2 =
+        PartitionPath.of(
+            Arrays.asList(new PartitionEntryImpl("p1", "v11"), new 
PartitionEntryImpl("p2", "v2")));
+
+    updater.updateTableAndPartitionMetrics(
+        Arrays.asList(
+            MetricPoint.forTable(tableIdentifier, "a", 
StatisticValues.longValue(10), 1000),
+            MetricPoint.forTable(tableIdentifier, "b", 
StatisticValues.longValue(1000), 1000),
+            MetricPoint.forPartition(
+                tableIdentifier, partition1, "b", 
StatisticValues.longValue(1003), 1000),
+            MetricPoint.forPartition(
+                tableIdentifier, partition2, "b", 
StatisticValues.longValue(1004), 1000),
+            MetricPoint.forTable(tableIdentifier, "a", 
StatisticValues.longValue(100L), 1001)));
+
+    List<MetricPoint> tableMetrics = provider.tableMetrics(tableIdentifier, 
1000, 1002);
+    Map<String, List<MetricPoint>> tableMetricsByName = 
metricsByName(tableMetrics);
+
+    Assertions.assertEquals(2, tableMetricsByName.size());
+
+    List<MetricPoint> aMetrics = sortByTimestamp(tableMetricsByName.get("a"));
+    Assertions.assertEquals(2, aMetrics.size());
+    Assertions.assertEquals(10L, ((Number) 
aMetrics.get(0).value().value()).longValue());
+    Assertions.assertEquals(1000L, aMetrics.get(0).timestampSeconds());
+    Assertions.assertEquals(100L, ((Number) 
aMetrics.get(1).value().value()).longValue());
+    Assertions.assertEquals(1001L, aMetrics.get(1).timestampSeconds());
+
+    List<MetricPoint> bMetrics = tableMetricsByName.get("b");
+    Assertions.assertEquals(1, bMetrics.size());
+    Assertions.assertEquals(1000L, ((Number) 
bMetrics.get(0).value().value()).longValue());
+    Assertions.assertEquals(1000L, bMetrics.get(0).timestampSeconds());
+
+    List<MetricPoint> partitionMetrics1 =
+        provider.partitionMetrics(tableIdentifier, partition1, 1000, 1002);
+    Assertions.assertEquals(1, partitionMetrics1.size());
+    MetricPoint metric1 = partitionMetrics1.get(0);
+    Assertions.assertEquals("b", metric1.metricName());
+    Assertions.assertEquals(
+        partition1, 
metric1.partitionPath().orElseThrow(IllegalStateException::new));
+    Assertions.assertEquals(1003L, ((Number) 
metric1.value().value()).longValue());
+    Assertions.assertEquals(1000L, metric1.timestampSeconds());
+
+    List<MetricPoint> partitionMetrics2 =
+        provider.partitionMetrics(tableIdentifier, partition2, 1000, 1002);
+    Assertions.assertEquals(1, partitionMetrics2.size());
+    MetricPoint metric2 = partitionMetrics2.get(0);
+    Assertions.assertEquals("b", metric2.metricName());
+    Assertions.assertEquals(
+        partition2, 
metric2.partitionPath().orElseThrow(IllegalStateException::new));
+    Assertions.assertEquals(1004L, ((Number) 
metric2.value().value()).longValue());
+    Assertions.assertEquals(1000L, metric2.timestampSeconds());
+  }
+
+  @Test
+  void testJobMetrics() {
+    NameIdentifier jobIdentifier = NameIdentifier.of("job1");
+
+    updater.updateJobMetrics(
+        Arrays.asList(
+            MetricPoint.forJob(jobIdentifier, "x", 
StatisticValues.longValue(20), 2000),
+            MetricPoint.forJob(jobIdentifier, "y", 
StatisticValues.longValue(2000), 2000),
+            MetricPoint.forJob(jobIdentifier, "x", 
StatisticValues.longValue(200L), 2001)));
+
+    List<MetricPoint> jobMetrics = provider.jobMetrics(jobIdentifier, 2000, 
2002);
+    Map<String, List<MetricPoint>> jobMetricsByName = 
metricsByName(jobMetrics);
+
+    Assertions.assertEquals(2, jobMetricsByName.size());
+
+    List<MetricPoint> xMetrics = sortByTimestamp(jobMetricsByName.get("x"));
+    Assertions.assertEquals(2, xMetrics.size());
+    Assertions.assertEquals(20L, ((Number) 
xMetrics.get(0).value().value()).longValue());
+    Assertions.assertEquals(2000L, xMetrics.get(0).timestampSeconds());
+    Assertions.assertEquals(200L, ((Number) 
xMetrics.get(1).value().value()).longValue());
+    Assertions.assertEquals(2001L, xMetrics.get(1).timestampSeconds());
+
+    List<MetricPoint> yMetrics = jobMetricsByName.get("y");
+    Assertions.assertEquals(1, yMetrics.size());
+    Assertions.assertEquals(2000L, ((Number) 
yMetrics.get(0).value().value()).longValue());
+    Assertions.assertEquals(2000L, yMetrics.get(0).timestampSeconds());
+  }
+
+  private static Map<String, List<MetricPoint>> 
metricsByName(List<MetricPoint> points) {
+    return 
points.stream().collect(Collectors.groupingBy(MetricPoint::metricName));
+  }
+
+  private static List<MetricPoint> sortByTimestamp(List<MetricPoint> points) {
+    return 
points.stream().sorted(Comparator.comparingLong(MetricPoint::timestampSeconds)).toList();
+  }
+
+  private OptimizerEnv createOptimizerEnv() {
+    String jdbcUrl =
+        String.format(
+            
"jdbc:h2:file:/tmp/gravitino-metrics-it-%d;DB_CLOSE_DELAY=-1;MODE=MYSQL;AUTO_SERVER=TRUE",
+            System.nanoTime());
+

Review Comment:
   This test uses a hard-coded /tmp H2 file path, which can be problematic on 
non-Unix environments and leaves files behind. Consider creating a temp 
directory (Files.createTempDirectory) and building the jdbc:h2:file URL from 
that path, as other optimizer ITs do.



##########
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/RecommenderIT.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import 
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import org.apache.gravitino.maintenance.optimizer.recommender.Recommender;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionJobContext;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionStrategyHandler;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.StrategyUtils;
+import 
org.apache.gravitino.maintenance.optimizer.updater.statistics.GravitinoStatisticsUpdater;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/*
+ * 1. update table stats
+ * 2. add policy
+ * 3. run recommender to get optimize result
+ */
+public class RecommenderIT extends GravitinoOptimizerEnvIT {
+
+  private static final String STATISTICS_PREFIX = "custom-";
+  private static final String DATAFILE_MSE = STATISTICS_PREFIX + 
"datafile_size_mse";
+  private static final String DELETE_FILE_NUM = STATISTICS_PREFIX + 
"position_delete_file_number";
+  private static final String SESSION_ID = "recommender-it-" + 
UUID.randomUUID();
+
+  private StatisticsUpdater statisticsUpdater;
+
+  @Override
+  protected Map<String, String> getSpecifyConfigs() {
+    return Map.of(
+        OptimizerConfig.OPTIMIZER_PREFIX
+            + "strategyHandler."
+            + CompactionStrategyHandler.NAME
+            + ".className",
+        CompactionStrategyHandler.class.getName(),
+        OptimizerConfig.JOB_SUBMITTER_CONFIG.getKey(),
+        RecordingJobSubmitterForIT.NAME,
+        RecordingJobSubmitterForIT.SESSION_ID_KEY,
+        SESSION_ID);
+  }
+
+  @BeforeAll
+  void init() {
+    this.statisticsUpdater = new GravitinoStatisticsUpdater();
+    statisticsUpdater.initialize(optimizerEnv);
+  }

Review Comment:
   This IT creates a GravitinoStatisticsUpdater in `@BeforeAll` but never 
closes it. Since it holds a GravitinoClient, please add an `@AfterAll` to call 
statisticsUpdater.close() to avoid leaking client resources across tests.



##########
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoStatisticsIT.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider;
+import 
org.apache.gravitino.maintenance.optimizer.updater.statistics.GravitinoStatisticsUpdater;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class GravitinoStatisticsIT extends GravitinoOptimizerEnvIT {
+
+  private static final String TEST_TABLE = "test_stats_table";
+  private static final String TEST_PARTITION_TABLE = 
"test_stats_partition_table";
+  private static final String STATISTICS_PREFIX = "custom-";
+  private static final String DATAFILE_SIZE_MSE = STATISTICS_PREFIX + 
"datafile_size_mse";
+
+  private GravitinoStatisticsUpdater statisticsUpdater;
+  private GravitinoStatisticsProvider statisticsProvider;
+
+  @BeforeAll
+  void init() {
+    this.statisticsUpdater = new GravitinoStatisticsUpdater();
+    statisticsUpdater.initialize(optimizerEnv);
+    this.statisticsProvider = new GravitinoStatisticsProvider();
+    statisticsProvider.initialize(optimizerEnv);
+    createTable(TEST_TABLE);
+    createPartitionTable(TEST_PARTITION_TABLE);
+  }

Review Comment:
   The test initializes GravitinoStatisticsUpdater/GravitinoStatisticsProvider 
in `@BeforeAll` but does not close either. Please add an `@AfterAll` to close 
both to prevent leaked connections/resources (consistent with other optimizer 
ITs like TestH2GenericJdbcMetricsRepositoryIT).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to