This is an automated email from the ASF dual-hosted git repository.
diqiu50 pushed a commit to branch branch-1.2
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-1.2 by this push:
new cdfd40e0c0 [Cherry-pick to branch-1.2] [#10193] test(optimizer): add
optimizer integration tests (#10191) (#10236)
cdfd40e0c0 is described below
commit cdfd40e0c0f4a2064259dfc28656cde50513ed96
Author: Qi Yu <[email protected]>
AuthorDate: Thu Mar 5 16:36:11 2026 +0800
[Cherry-pick to branch-1.2] [#10193] test(optimizer): add optimizer
integration tests (#10191) (#10236)
**Cherry-pick Information:**
- Original commit: 004d51452c11c4a9f6caad180a8c7c81af4ef7ba
- Target branch: `branch-1.2`
- Status: ✅ Clean cherry-pick (no conflicts)
Co-authored-by: FANNG <[email protected]>
---
maintenance/optimizer/build.gradle.kts | 2 +
.../updater/metrics/GravitinoMetricsUpdater.java | 11 +
.../statistics/GravitinoStatisticsUpdater.java | 6 +
.../test/AbstractGravitinoOptimizerEnvIT.java | 195 ++++++++++++++
.../test/DummyJobMetricsCalculator.java | 49 ++++
.../test/DummyTableStatisticsComputer.java | 82 ++++++
.../integration/test/GravitinoMetricsIT.java | 180 +++++++++++++
.../integration/test/GravitinoStatisticsIT.java | 132 ++++++++++
.../integration/test/GravitinoStrategyIT.java | 88 +++++++
.../integration/test/GravitinoTableMetaIT.java | 68 +++++
.../optimizer/integration/test/MonitorIT.java | 264 +++++++++++++++++++
.../optimizer/integration/test/RecommenderIT.java | 281 +++++++++++++++++++++
.../test/RecordingJobSubmitterForIT.java | 79 ++++++
.../optimizer/integration/test/UpdaterIT.java | 155 ++++++++++++
.../metrics/TestGravitinoMetricsUpdater.java | 29 +--
.../statistics/TestGravitinoStatisticsUpdater.java | 14 +-
...itino.maintenance.optimizer.api.common.Provider | 1 +
...ance.optimizer.api.updater.StatisticsCalculator | 2 +
18 files changed, 1605 insertions(+), 33 deletions(-)
diff --git a/maintenance/optimizer/build.gradle.kts
b/maintenance/optimizer/build.gradle.kts
index b0b9f865d9..df1c4473cd 100644
--- a/maintenance/optimizer/build.gradle.kts
+++ b/maintenance/optimizer/build.gradle.kts
@@ -88,6 +88,8 @@ dependencies {
testImplementation(libs.testcontainers.junit.jupiter)
testImplementation(libs.testcontainers.mysql)
testImplementation(libs.testcontainers.postgresql)
+ testImplementation(project(":integration-test-common", "testArtifacts"))
+ testImplementation(project(":server"))
testRuntimeOnly(libs.mysql.driver)
testRuntimeOnly(libs.postgresql.driver)
testAnnotationProcessor(libs.lombok)
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
index a6bd5874c2..7834efa6e2 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.maintenance.optimizer.updater.metrics;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.List;
import org.apache.gravitino.maintenance.optimizer.api.common.DataScope;
@@ -68,6 +69,16 @@ public class GravitinoMetricsUpdater implements
MetricsUpdater {
}
}
+ @VisibleForTesting
+ void setMetricsRepositoryForTest(MetricsRepository metricsRepository) {
+ this.metricsStorage = metricsRepository;
+ }
+
+ @VisibleForTesting
+ MetricsRepository metricsRepositoryForTest() {
+ return metricsStorage;
+ }
+
private void ensureInitialized() {
Preconditions.checkState(
metricsStorage != null,
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
index 63fa0e65fa..3b6f4e45c0 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.maintenance.optimizer.updater.statistics;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.LinkedHashMap;
import java.util.List;
@@ -153,4 +154,9 @@ public class GravitinoStatisticsUpdater implements
StatisticsUpdater {
gravitinoClient.close();
}
}
+
+ @VisibleForTesting
+ void setGravitinoClientForTest(GravitinoClient gravitinoClient) {
+ this.gravitinoClient = gravitinoClient;
+ }
}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/AbstractGravitinoOptimizerEnvIT.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/AbstractGravitinoOptimizerEnvIT.java
new file mode 100644
index 0000000000..0cb70f0094
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/AbstractGravitinoOptimizerEnvIT.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategy;
+import
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+import org.apache.gravitino.policy.PolicyContent;
+import org.apache.gravitino.policy.PolicyContents;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.BeforeAll;
+
+// Set up the Gravitino server, metalake, Iceberg catalogs
+public abstract class AbstractGravitinoOptimizerEnvIT extends BaseIT {
+
+ protected static final String METALAKE_NAME = "test_metalake";
+ protected static final String GRAVITINO_CATALOG_NAME = "iceberg";
+ protected static final String TEST_SCHEMA = "test_schema";
+
+ protected Catalog catalogClient;
+ protected GravitinoMetalake metalakeClient;
+ protected OptimizerEnv optimizerEnv;
+ protected String icebergWarehouseLocation;
+
+ @BeforeAll
+ @Override
+ public void startIntegrationTest() throws Exception {
+ super.startIntegrationTest();
+ this.icebergWarehouseLocation = createWarehouseLocation();
+ initMetalakeAndCatalog();
+ this.optimizerEnv = initOptimizerEnv();
+ }
+
+ private String createWarehouseLocation() throws IOException {
+ Path path = Files.createTempDirectory("gravitino-optimizer-warehouse-");
+ path.toFile().deleteOnExit();
+ return path.toUri().toString();
+ }
+
+ protected void createTable(String tableName) {
+ catalogClient
+ .asTableCatalog()
+ .createTable(
+ NameIdentifier.of(TEST_SCHEMA, tableName),
+ new Column[] {Column.of("col_1", Types.IntegerType.get())},
+ "comment",
+ ImmutableMap.of());
+ }
+
+ protected NameIdentifier getTableIdentifier(String tableName) {
+ return NameIdentifier.of(GRAVITINO_CATALOG_NAME, TEST_SCHEMA, tableName);
+ }
+
+ protected void createPartitionTable(String tableName) {
+ catalogClient
+ .asTableCatalog()
+ .createTable(
+ NameIdentifier.of(TEST_SCHEMA, tableName),
+ new Column[] {
+ Column.of("col1", Types.IntegerType.get(), "col1"),
+ Column.of("col2", Types.IntegerType.get(), "col2"),
+ Column.of("col3", Types.IntegerType.get(), "col3")
+ },
+ "comment",
+ ImmutableMap.of(),
+ new Transform[] {
+ Transforms.identity("col1"), Transforms.bucket(8, new String[]
{"col2"})
+ });
+ }
+
+ protected void createPolicy(String policyName, Map<String, Object> rules,
String policyType) {
+ PolicyContent content =
+ PolicyContents.custom(
+ rules,
+ ImmutableSet.of(MetadataObject.Type.TABLE),
+ Map.of(
+ GravitinoStrategy.STRATEGY_TYPE_KEY,
+ policyType,
+ GravitinoStrategy.JOB_TEMPLATE_NAME_KEY,
+ "template-name"));
+ metalakeClient.createPolicy(policyName, "custom", "comment", true,
content);
+ }
+
+ protected void associatePoliciesToTable(String policyName, String tableName)
{
+ Table table =
+
catalogClient.asTableCatalog().loadTable(NameIdentifier.of(TEST_SCHEMA,
tableName));
+ table.supportsPolicies().associatePolicies(new String[] {policyName}, new
String[] {});
+ }
+
+ protected void associatePoliciesToSchema(String policyName, String
schemaName) {
+ Schema schema = catalogClient.asSchemas().loadSchema(schemaName);
+ schema.supportsPolicies().associatePolicies(new String[] {policyName}, new
String[] {});
+ }
+
+ protected Map<String, String> getSpecifyConfigs() {
+ return Map.of();
+ }
+
+ protected OptimizerEnv initOptimizerEnv() {
+ Map<String, String> configs = new HashMap<>();
+ configs.putAll(getGravitinoConfigs());
+ configs.putAll(getJdbcMetricsConfigs());
+ configs.putAll(getSpecifyConfigs());
+ return new OptimizerEnv(new OptimizerConfig(configs));
+ }
+
+ private Map<String, String> getJdbcMetricsConfigs() {
+ String jdbcUrl =
+ String.format(
+
"jdbc:h2:mem:gravitino-optimizer-it-%d;DB_CLOSE_DELAY=-1;MODE=MYSQL",
+ System.nanoTime());
+
+ return Map.of(
+ OptimizerConfig.OPTIMIZER_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_URL,
+ jdbcUrl,
+ OptimizerConfig.OPTIMIZER_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_DRIVER,
+ "org.h2.Driver");
+ }
+
+ private Map<String, String> getGravitinoConfigs() {
+ int gravitinoPort = getGravitinoServerPort();
+ String uri = String.format("http://127.0.0.1:%d", gravitinoPort);
+ return ImmutableMap.of(
+ OptimizerConfig.GRAVITINO_URI,
+ uri,
+ OptimizerConfig.GRAVITINO_METALAKE,
+ METALAKE_NAME,
+ OptimizerConfig.GRAVITINO_DEFAULT_CATALOG,
+ GRAVITINO_CATALOG_NAME);
+ }
+
+ private void initMetalakeAndCatalog() {
+ this.metalakeClient = client.createMetalake(METALAKE_NAME, "", new
HashMap<>());
+ this.catalogClient = createGravitinoIcebergCatalog();
+
+ if (!catalogClient.asSchemas().schemaExists(TEST_SCHEMA)) {
+ catalogClient.asSchemas().createSchema(TEST_SCHEMA, "comment",
ImmutableMap.of());
+ }
+ }
+
+ private Catalog createGravitinoIcebergCatalog() {
+ return metalakeClient.createCatalog(
+ GRAVITINO_CATALOG_NAME,
+ Catalog.Type.RELATIONAL,
+ "lakehouse-iceberg",
+ "comment",
+ ImmutableMap.of(
+ IcebergConstants.URI,
+ "memory://gravitino-optimizer",
+ IcebergConstants.CATALOG_BACKEND,
+ "memory",
+ IcebergConstants.WAREHOUSE,
+ icebergWarehouseLocation));
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/DummyJobMetricsCalculator.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/DummyJobMetricsCalculator.java
new file mode 100644
index 0000000000..289a1600ed
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/DummyJobMetricsCalculator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateJobMetrics;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.stats.StatisticValues;
+
+public class DummyJobMetricsCalculator implements SupportsCalculateJobMetrics {
+
+ public static final String DUMMY_JOB_METRICS = "dummy-job-metrics";
+ public static final String JOB_STAT_NAME = "dummy-job-stat-name";
+
+ @Override
+ public String name() {
+ return DUMMY_JOB_METRICS;
+ }
+
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {}
+
+ @Override
+ public List<MetricPoint> calculateJobMetrics(NameIdentifier jobIdentifier) {
+ long timestampSeconds = System.currentTimeMillis() / 1000;
+ return List.of(
+ MetricPoint.forJob(
+ jobIdentifier, JOB_STAT_NAME, StatisticValues.longValue(1L),
timestampSeconds));
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/DummyTableStatisticsComputer.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/DummyTableStatisticsComputer.java
new file mode 100644
index 0000000000..02383659aa
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/DummyTableStatisticsComputer.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import
org.apache.gravitino.maintenance.optimizer.api.common.TableAndPartitionStatistics;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateTableMetrics;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.SupportsCalculateTableStatistics;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import org.apache.gravitino.stats.StatisticValues;
+
+public class DummyTableStatisticsComputer
+ implements SupportsCalculateTableStatistics, SupportsCalculateTableMetrics
{
+
+ public static final String DUMMY_TABLE_STAT = "dummy-table-stat";
+ public static final String TABLE_STAT_NAME = "custom-dummy-table-stat-name";
+
+ @Override
+ public String name() {
+ return DUMMY_TABLE_STAT;
+ }
+
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {}
+
+ @Override
+ public TableAndPartitionStatistics calculateTableStatistics(NameIdentifier
tableIdentifier) {
+ List<StatisticEntry<?>> tableStatistics =
+ List.of(new StatisticEntryImpl<>(TABLE_STAT_NAME,
StatisticValues.longValue(1L)));
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics =
+ Map.of(
+ PartitionPath.of(getPartitionEntries()),
+ List.of(new StatisticEntryImpl<>(TABLE_STAT_NAME,
StatisticValues.longValue(2L))));
+ return new TableAndPartitionStatistics(tableStatistics,
partitionStatistics);
+ }
+
+ @Override
+ public List<MetricPoint> calculateTableMetrics(NameIdentifier
tableIdentifier) {
+ long timestampSeconds = System.currentTimeMillis() / 1000;
+ PartitionPath partitionPath = PartitionPath.of(getPartitionEntries());
+ return List.of(
+ MetricPoint.forTable(
+ tableIdentifier, TABLE_STAT_NAME, StatisticValues.longValue(1L),
timestampSeconds),
+ MetricPoint.forPartition(
+ tableIdentifier,
+ partitionPath,
+ TABLE_STAT_NAME,
+ StatisticValues.longValue(2L),
+ timestampSeconds));
+ }
+
+ @VisibleForTesting
+ public static List<PartitionEntry> getPartitionEntries() {
+ return List.of(new PartitionEntryImpl("p1", "v1"));
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoMetricsIT.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoMetricsIT.java
new file mode 100644
index 0000000000..4d10adff3e
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoMetricsIT.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import
org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider;
+import
org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater;
+import
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class GravitinoMetricsIT {
+
+ private GravitinoMetricsUpdater updater;
+ private GravitinoMetricsProvider provider;
+
+ @BeforeAll
+ public void setUp() {
+ OptimizerEnv optimizerEnv = createOptimizerEnv();
+ updater = new GravitinoMetricsUpdater();
+ updater.initialize(optimizerEnv);
+ provider = new GravitinoMetricsProvider();
+ provider.initialize(optimizerEnv);
+ }
+
+ @AfterAll
+ public void tearDown() throws Exception {
+ if (updater != null) {
+ updater.close();
+ }
+ if (provider != null) {
+ provider.close();
+ }
+ }
+
+ @Test
+ void testTableMetrics() {
+ NameIdentifier tableIdentifier = NameIdentifier.of("catalog", "schema",
"table");
+
+ PartitionPath partition1 =
+ PartitionPath.of(
+ Arrays.asList(new PartitionEntryImpl("p1", "v1"), new
PartitionEntryImpl("p2", "v2")));
+ PartitionPath partition2 =
+ PartitionPath.of(
+ Arrays.asList(new PartitionEntryImpl("p1", "v11"), new
PartitionEntryImpl("p2", "v2")));
+
+ updater.updateTableAndPartitionMetrics(
+ Arrays.asList(
+ MetricPoint.forTable(tableIdentifier, "a",
StatisticValues.longValue(10), 1000),
+ MetricPoint.forTable(tableIdentifier, "b",
StatisticValues.longValue(1000), 1000),
+ MetricPoint.forPartition(
+ tableIdentifier, partition1, "b",
StatisticValues.longValue(1003), 1000),
+ MetricPoint.forPartition(
+ tableIdentifier, partition2, "b",
StatisticValues.longValue(1004), 1000),
+ MetricPoint.forTable(tableIdentifier, "a",
StatisticValues.longValue(100L), 1001)));
+
+ List<MetricPoint> tableMetrics = provider.tableMetrics(tableIdentifier,
1000, 1002);
+ Map<String, List<MetricPoint>> tableMetricsByName =
metricsByName(tableMetrics);
+
+ Assertions.assertEquals(2, tableMetricsByName.size());
+
+ List<MetricPoint> aMetrics = sortByTimestamp(tableMetricsByName.get("a"));
+ Assertions.assertEquals(2, aMetrics.size());
+ Assertions.assertEquals(10L, ((Number)
aMetrics.get(0).value().value()).longValue());
+ Assertions.assertEquals(1000L, aMetrics.get(0).timestampSeconds());
+ Assertions.assertEquals(100L, ((Number)
aMetrics.get(1).value().value()).longValue());
+ Assertions.assertEquals(1001L, aMetrics.get(1).timestampSeconds());
+
+ List<MetricPoint> bMetrics = tableMetricsByName.get("b");
+ Assertions.assertEquals(1, bMetrics.size());
+ Assertions.assertEquals(1000L, ((Number)
bMetrics.get(0).value().value()).longValue());
+ Assertions.assertEquals(1000L, bMetrics.get(0).timestampSeconds());
+
+ List<MetricPoint> partitionMetrics1 =
+ provider.partitionMetrics(tableIdentifier, partition1, 1000, 1002);
+ Assertions.assertEquals(1, partitionMetrics1.size());
+ MetricPoint metric1 = partitionMetrics1.get(0);
+ Assertions.assertEquals("b", metric1.metricName());
+ Assertions.assertEquals(
+ partition1,
metric1.partitionPath().orElseThrow(IllegalStateException::new));
+ Assertions.assertEquals(1003L, ((Number)
metric1.value().value()).longValue());
+ Assertions.assertEquals(1000L, metric1.timestampSeconds());
+
+ List<MetricPoint> partitionMetrics2 =
+ provider.partitionMetrics(tableIdentifier, partition2, 1000, 1002);
+ Assertions.assertEquals(1, partitionMetrics2.size());
+ MetricPoint metric2 = partitionMetrics2.get(0);
+ Assertions.assertEquals("b", metric2.metricName());
+ Assertions.assertEquals(
+ partition2,
metric2.partitionPath().orElseThrow(IllegalStateException::new));
+ Assertions.assertEquals(1004L, ((Number)
metric2.value().value()).longValue());
+ Assertions.assertEquals(1000L, metric2.timestampSeconds());
+ }
+
+ @Test
+ void testJobMetrics() {
+ NameIdentifier jobIdentifier = NameIdentifier.of("job1");
+
+ updater.updateJobMetrics(
+ Arrays.asList(
+ MetricPoint.forJob(jobIdentifier, "x",
StatisticValues.longValue(20), 2000),
+ MetricPoint.forJob(jobIdentifier, "y",
StatisticValues.longValue(2000), 2000),
+ MetricPoint.forJob(jobIdentifier, "x",
StatisticValues.longValue(200L), 2001)));
+
+ List<MetricPoint> jobMetrics = provider.jobMetrics(jobIdentifier, 2000,
2002);
+ Map<String, List<MetricPoint>> jobMetricsByName =
metricsByName(jobMetrics);
+
+ Assertions.assertEquals(2, jobMetricsByName.size());
+
+ List<MetricPoint> xMetrics = sortByTimestamp(jobMetricsByName.get("x"));
+ Assertions.assertEquals(2, xMetrics.size());
+ Assertions.assertEquals(20L, ((Number)
xMetrics.get(0).value().value()).longValue());
+ Assertions.assertEquals(2000L, xMetrics.get(0).timestampSeconds());
+ Assertions.assertEquals(200L, ((Number)
xMetrics.get(1).value().value()).longValue());
+ Assertions.assertEquals(2001L, xMetrics.get(1).timestampSeconds());
+
+ List<MetricPoint> yMetrics = jobMetricsByName.get("y");
+ Assertions.assertEquals(1, yMetrics.size());
+ Assertions.assertEquals(2000L, ((Number)
yMetrics.get(0).value().value()).longValue());
+ Assertions.assertEquals(2000L, yMetrics.get(0).timestampSeconds());
+ }
+
+ private static Map<String, List<MetricPoint>>
metricsByName(List<MetricPoint> points) {
+ return
points.stream().collect(Collectors.groupingBy(MetricPoint::metricName));
+ }
+
+ private static List<MetricPoint> sortByTimestamp(List<MetricPoint> points) {
+ return
points.stream().sorted(Comparator.comparingLong(MetricPoint::timestampSeconds)).toList();
+ }
+
+ private OptimizerEnv createOptimizerEnv() {
+ String jdbcUrl =
+ String.format(
+
"jdbc:h2:mem:gravitino-metrics-it-%d;DB_CLOSE_DELAY=-1;MODE=MYSQL",
System.nanoTime());
+
+ Map<String, String> config =
+ Map.of(
+ OptimizerConfig.OPTIMIZER_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_URL,
+ jdbcUrl,
+ OptimizerConfig.OPTIMIZER_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_DRIVER,
+ "org.h2.Driver");
+ return new OptimizerEnv(new OptimizerConfig(config));
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoStatisticsIT.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoStatisticsIT.java
new file mode 100644
index 0000000000..2ff4f8a5f0
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoStatisticsIT.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import
org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider;
+import
org.apache.gravitino.maintenance.optimizer.updater.statistics.GravitinoStatisticsUpdater;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class GravitinoStatisticsIT extends AbstractGravitinoOptimizerEnvIT {
+
+ private static final String TEST_TABLE = "test_stats_table";
+ private static final String TEST_PARTITION_TABLE =
"test_stats_partition_table";
+ private static final String STATISTICS_PREFIX = "custom-";
+ private static final String DATAFILE_SIZE_MSE = STATISTICS_PREFIX +
"datafile_size_mse";
+
+ private GravitinoStatisticsUpdater statisticsUpdater;
+ private GravitinoStatisticsProvider statisticsProvider;
+
+ @BeforeAll
+ void init() {
+ this.statisticsUpdater = new GravitinoStatisticsUpdater();
+ statisticsUpdater.initialize(optimizerEnv);
+ this.statisticsProvider = new GravitinoStatisticsProvider();
+ statisticsProvider.initialize(optimizerEnv);
+ createTable(TEST_TABLE);
+ createPartitionTable(TEST_PARTITION_TABLE);
+ }
+
+ @AfterAll
+ void closeResources() throws Exception {
+ if (statisticsProvider != null) {
+ statisticsProvider.close();
+ }
+ if (statisticsUpdater != null) {
+ statisticsUpdater.close();
+ }
+ }
+
+ @Test
+ void testTableStatisticsUpdaterAndProvider() {
+ statisticsUpdater.updateTableStatistics(
+ getTableIdentifier(TEST_TABLE),
+ Arrays.asList(
+ new StatisticEntryImpl<>(
+ STATISTICS_PREFIX + "row_count",
StatisticValues.longValue(1000)),
+ new StatisticEntryImpl<>(
+ STATISTICS_PREFIX + "size_in_bytes",
StatisticValues.longValue(1000000)),
+ new StatisticEntryImpl<>(DATAFILE_SIZE_MSE,
StatisticValues.doubleValue(10000.1))));
+
+ List<StatisticEntry<?>> stats =
+ statisticsProvider.tableStatistics(getTableIdentifier(TEST_TABLE));
+ Assertions.assertEquals(3, stats.size());
+ stats.forEach(
+ stat -> {
+ if (stat.name().equals(STATISTICS_PREFIX + "row_count")) {
+ Assertions.assertEquals(1000L, ((Number)
stat.value().value()).longValue());
+ } else if (stat.name().equals(STATISTICS_PREFIX + "size_in_bytes")) {
+ Assertions.assertEquals(1000000L, ((Number)
stat.value().value()).longValue());
+ } else if (stat.name().equals(DATAFILE_SIZE_MSE)) {
+ Assertions.assertEquals(10000.1, ((Number)
stat.value().value()).doubleValue());
+ } else {
+ Assertions.fail("Unexpected statistic name: " + stat.name());
+ }
+ });
+ }
+
+ @Test
+ void testTablePartitionStatisticsUpdaterAndProvider() {
+ statisticsUpdater.updateTableStatistics(
+ getTableIdentifier(TEST_PARTITION_TABLE),
+ Arrays.asList(
+ new StatisticEntryImpl<>(
+ STATISTICS_PREFIX + "size_in_bytes",
StatisticValues.longValue(1000000))));
+ statisticsUpdater.updatePartitionStatistics(
+ getTableIdentifier(TEST_PARTITION_TABLE),
+ Map.of(
+ PartitionPath.of(
+ Arrays.asList(
+ new PartitionEntryImpl("col1", "1"), new
PartitionEntryImpl("col2", "2"))),
+ List.of(
+ new StatisticEntryImpl<>(
+ STATISTICS_PREFIX + "partition_row_count",
StatisticValues.longValue(500)),
+ new StatisticEntryImpl<>(
+ STATISTICS_PREFIX + "partition_size_in_bytes",
+ StatisticValues.longValue(500000)))));
+
+ Map<PartitionPath, List<StatisticEntry<?>>> stats =
+
statisticsProvider.partitionStatistics(getTableIdentifier(TEST_PARTITION_TABLE));
+ Assertions.assertEquals(1, stats.size());
+ List<StatisticEntry<?>> partitionStats =
+
stats.values().stream().findFirst().orElseThrow(IllegalStateException::new);
+ Assertions.assertEquals(2, partitionStats.size());
+ partitionStats.forEach(
+ stat -> {
+ if (stat.name().equals(STATISTICS_PREFIX + "partition_row_count")) {
+ Assertions.assertEquals(500L, ((Number)
stat.value().value()).longValue());
+ } else if (stat.name().equals(STATISTICS_PREFIX +
"partition_size_in_bytes")) {
+ Assertions.assertEquals(500000L, ((Number)
stat.value().value()).longValue());
+ } else {
+ Assertions.fail("Unexpected statistic name: " + stat.name());
+ }
+ });
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoStrategyIT.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoStrategyIT.java
new file mode 100644
index 0000000000..fb0382a818
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoStrategyIT.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.List;
+import org.apache.gravitino.maintenance.optimizer.api.common.Strategy;
+import
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategyProvider;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class GravitinoStrategyIT extends AbstractGravitinoOptimizerEnvIT {
+
+ private GravitinoStrategyProvider strategyProvider;
+
+ @BeforeAll
+ void init() {
+ this.strategyProvider = new GravitinoStrategyProvider();
+ strategyProvider.initialize(optimizerEnv);
+ }
+
+ @AfterAll
+ void closeResources() throws Exception {
+ if (strategyProvider != null) {
+ strategyProvider.close();
+ }
+ }
+
+ @Test
+ void testGravitinoStrategyProviderGetStrategy() {
+ createPolicy("test_policy", ImmutableMap.of("rule1", "value1"), "test");
+
+ Strategy strategy = strategyProvider.strategy("test_policy");
+ Assertions.assertNotNull(strategy);
+ Assertions.assertEquals("test", strategy.strategyType());
+ Assertions.assertEquals("template-name", strategy.jobTemplateName());
+ Assertions.assertEquals(ImmutableMap.of("rule1", "value1"),
strategy.rules());
+ }
+
+ @Test
+ void testGravitinoStrategyProviderGetTableStrategies() {
+ String tableName = "test_get_table_policy";
+ createTable(tableName);
+ createPolicy("policy1", ImmutableMap.of("rule1", "value1"), "test");
+ createPolicy("policy2", ImmutableMap.of("rule2", "value2"), "test");
+ associatePoliciesToTable("policy1", tableName);
+ associatePoliciesToTable("policy2", tableName);
+
+ List<Strategy> strategies =
strategyProvider.strategies(getTableIdentifier(tableName));
+ Assertions.assertNotNull(strategies);
+ Assertions.assertEquals(2, strategies.size());
+
+ strategies.stream()
+ .forEach(
+ strategy -> {
+ if (strategy.name().equals("policy1")) {
+ Assertions.assertEquals("test", strategy.strategyType());
+ Assertions.assertEquals("template-name",
strategy.jobTemplateName());
+ Assertions.assertEquals(ImmutableMap.of("rule1", "value1"),
strategy.rules());
+ } else if (strategy.name().equals("policy2")) {
+ Assertions.assertEquals("test", strategy.strategyType());
+ Assertions.assertEquals("template-name",
strategy.jobTemplateName());
+ Assertions.assertEquals(ImmutableMap.of("rule2", "value2"),
strategy.rules());
+ } else {
+ Assertions.fail("Unexpected strategy name: " +
strategy.name());
+ }
+ });
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoTableMetaIT.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoTableMetaIT.java
new file mode 100644
index 0000000000..2d0af79d91
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/GravitinoTableMetaIT.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import
org.apache.gravitino.maintenance.optimizer.recommender.table.GravitinoTableMetadataProvider;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class GravitinoTableMetaIT extends AbstractGravitinoOptimizerEnvIT {
+ private GravitinoTableMetadataProvider tableMetadataProvider;
+
+ @BeforeAll
+ void init() {
+ this.tableMetadataProvider = new GravitinoTableMetadataProvider();
+ tableMetadataProvider.initialize(optimizerEnv);
+ }
+
+ @AfterAll
+ void closeResources() throws Exception {
+ if (tableMetadataProvider != null) {
+ tableMetadataProvider.close();
+ }
+ }
+
+ @Test
+ void testTableMetadataProviderTableAndPartitioningMetadata() {
+ String tableName = "test_table_metadata";
+ createPartitionTable(tableName);
+ Table table =
tableMetadataProvider.tableMetadata(getTableIdentifier(tableName));
+ Assertions.assertNotNull(table);
+
+ // check table name, column ,partition information
+ Assertions.assertEquals(tableName, table.name());
+ Assertions.assertEquals(3, table.columns().length);
+ Column[] columns = table.columns();
+ Assertions.assertEquals("col1", columns[0].name());
+ Assertions.assertEquals("col2", columns[1].name());
+ Assertions.assertEquals("col3", columns[2].name());
+
+ Assertions.assertEquals(2, table.partitioning().length);
+ Transform[] partitioning = table.partitioning();
+ Assertions.assertEquals(Transforms.NAME_OF_IDENTITY,
partitioning[0].name());
+ Assertions.assertEquals(Transforms.NAME_OF_BUCKET, partitioning[1].name());
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/MonitorIT.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/MonitorIT.java
new file mode 100644
index 0000000000..426da0ec89
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/MonitorIT.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.DataScope;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.EvaluationResult;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import org.apache.gravitino.maintenance.optimizer.monitor.Monitor;
+import
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest;
+import
org.apache.gravitino.maintenance.optimizer.monitor.evaluator.MetricsEvaluatorForTest;
+import
org.apache.gravitino.maintenance.optimizer.monitor.job.TableJobRelationProviderForTest;
+import
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import
org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater;
+import
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class MonitorIT {
+
+ private GravitinoMetricsUpdater updater;
+ private Monitor monitor;
+
+ @BeforeAll
+ public void setUp() {
+ OptimizerEnv env = createOptimizerEnv();
+ this.updater = new GravitinoMetricsUpdater();
+ updater.initialize(env);
+ this.monitor = new Monitor(env);
+ MonitorCallbackForTest.reset();
+ }
+
+ @AfterAll
+ public void closeResources() throws Exception {
+ if (monitor != null) {
+ monitor.close();
+ }
+ if (updater != null) {
+ updater.close();
+ }
+ }
+
+ @Test
+ void testTableMetrics() {
+ long actionTime = 10;
+ long rangeSeconds = 2;
+ NameIdentifier tableIdentifier = NameIdentifier.of("db", "table");
+ NameIdentifier job1 = TableJobRelationProviderForTest.JOB1;
+ NameIdentifier job2 = TableJobRelationProviderForTest.JOB2;
+
+ updater.updateTableAndPartitionMetrics(
+ Arrays.asList(
+ MetricPoint.forTable(tableIdentifier, "storage",
StatisticValues.longValue(10), 8),
+ MetricPoint.forTable(tableIdentifier, "s3_cost",
StatisticValues.longValue(1000), 9),
+ MetricPoint.forTable(tableIdentifier, "s3_cost",
StatisticValues.longValue(1003), 10),
+ MetricPoint.forTable(tableIdentifier, "storage",
StatisticValues.longValue(100L), 11)));
+
+ List<MetricPoint> jobMetrics =
+ Arrays.asList(
+ MetricPoint.forJob(job1, "job_runtime",
StatisticValues.longValue(8), 8),
+ MetricPoint.forJob(job1, "job_cost", StatisticValues.longValue(9),
9),
+ MetricPoint.forJob(job1, "job_cost",
StatisticValues.longValue(10), 10),
+ MetricPoint.forJob(job1, "job_cost",
StatisticValues.longValue(11), 100),
+ MetricPoint.forJob(job1, "job_runtime",
StatisticValues.longValue(12L), 11),
+ MetricPoint.forJob(job2, "job_runtime",
StatisticValues.longValue(8), 8),
+ MetricPoint.forJob(job2, "job_cost", StatisticValues.longValue(9),
9),
+ MetricPoint.forJob(job2, "job_cost",
StatisticValues.longValue(10), 10),
+ MetricPoint.forJob(job2, "job_cost",
StatisticValues.longValue(11), 100),
+ MetricPoint.forJob(job2, "job_runtime",
StatisticValues.longValue(12L), 11));
+ updater.updateJobMetrics(jobMetrics);
+
+ List<EvaluationResult> results =
+ monitor.evaluateMetrics(tableIdentifier, actionTime, rangeSeconds,
Optional.empty());
+ Assertions.assertEquals(3, results.size());
+
+ EvaluationResult tableResult = findByScope(results, DataScope.Type.TABLE,
tableIdentifier);
+
+ List<MetricSample> storageBefore =
sortedSamples(tableResult.beforeMetrics().get("storage"));
+ Assertions.assertEquals(1, storageBefore.size());
+ Assertions.assertEquals(8L, storageBefore.get(0).timestampSeconds());
+ Assertions.assertEquals(10L, ((Number)
storageBefore.get(0).value().value()).longValue());
+
+ List<MetricSample> s3Before =
sortedSamples(tableResult.beforeMetrics().get("s3_cost"));
+ Assertions.assertEquals(1, s3Before.size());
+ Assertions.assertEquals(9L, s3Before.get(0).timestampSeconds());
+ Assertions.assertEquals(1000L, ((Number)
s3Before.get(0).value().value()).longValue());
+
+ List<MetricSample> storageAfter =
sortedSamples(tableResult.afterMetrics().get("storage"));
+ Assertions.assertEquals(1, storageAfter.size());
+ Assertions.assertEquals(11L, storageAfter.get(0).timestampSeconds());
+ Assertions.assertEquals(100L, ((Number)
storageAfter.get(0).value().value()).longValue());
+
+ List<MetricSample> s3After =
sortedSamples(tableResult.afterMetrics().get("s3_cost"));
+ Assertions.assertEquals(1, s3After.size());
+ Assertions.assertEquals(10L, s3After.get(0).timestampSeconds());
+ Assertions.assertEquals(1003L, ((Number)
s3After.get(0).value().value()).longValue());
+
+ EvaluationResult jobResult1 = findByScope(results, DataScope.Type.JOB,
job1);
+ checkJobMetrics(jobResult1.beforeMetrics(), jobResult1.afterMetrics());
+
+ EvaluationResult jobResult2 = findByScope(results, DataScope.Type.JOB,
job2);
+ checkJobMetrics(jobResult2.beforeMetrics(), jobResult2.afterMetrics());
+ }
+
+ @Test
+ void testPartitionMetrics() {
+ long actionTime = 10;
+ long rangeSeconds = 2;
+ NameIdentifier tableIdentifier = NameIdentifier.of("db", "partitionTable");
+ PartitionPath partitionPath =
+
PartitionUtils.decodePartitionPath("[{\"country\":\"US\"},{\"region\":\"CA\"}]");
+
+ updater.updateTableAndPartitionMetrics(
+ Arrays.asList(
+ MetricPoint.forPartition(
+ tableIdentifier, partitionPath, "storage",
StatisticValues.longValue(10), 8),
+ MetricPoint.forPartition(
+ tableIdentifier, partitionPath, "storage",
StatisticValues.longValue(20), 11),
+ MetricPoint.forPartition(
+ tableIdentifier, partitionPath, "s3_cost",
StatisticValues.longValue(5), 10)));
+
+ List<EvaluationResult> results =
+ monitor.evaluateMetrics(
+ tableIdentifier, actionTime, rangeSeconds,
Optional.of(partitionPath));
+
+ EvaluationResult partitionResult =
+ results.stream()
+ .filter(result -> result.scope().type() ==
DataScope.Type.PARTITION)
+ .findFirst()
+ .orElseThrow(IllegalStateException::new);
+
+ Assertions.assertEquals(partitionPath,
partitionResult.scope().partition().orElseThrow());
+
+ List<MetricSample> storageBefore =
+ sortedSamples(partitionResult.beforeMetrics().get("storage"));
+ Assertions.assertEquals(1, storageBefore.size());
+ Assertions.assertEquals(8L, storageBefore.get(0).timestampSeconds());
+ Assertions.assertEquals(10L, ((Number)
storageBefore.get(0).value().value()).longValue());
+
+ List<MetricSample> storageAfter =
sortedSamples(partitionResult.afterMetrics().get("storage"));
+ Assertions.assertEquals(1, storageAfter.size());
+ Assertions.assertEquals(11L, storageAfter.get(0).timestampSeconds());
+ Assertions.assertEquals(20L, ((Number)
storageAfter.get(0).value().value()).longValue());
+
+ List<MetricSample> s3After =
sortedSamples(partitionResult.afterMetrics().get("s3_cost"));
+ Assertions.assertEquals(1, s3After.size());
+ Assertions.assertEquals(10L, s3After.get(0).timestampSeconds());
+ Assertions.assertEquals(5L, ((Number)
s3After.get(0).value().value()).longValue());
+ }
+
+ @Test
+ void testMonitorCallbacks() {
+ MonitorCallbackForTest.reset();
+ NameIdentifier tableIdentifier = NameIdentifier.of("db", "table");
+
+ monitor.evaluateMetrics(tableIdentifier, 10, 1, Optional.empty());
+
+ Assertions.assertEquals(3, MonitorCallbackForTest.INVOCATIONS.get());
+ Assertions.assertTrue(
+ MonitorCallbackForTest.RESULTS.stream()
+ .anyMatch(result -> result.scope().type() ==
DataScope.Type.TABLE));
+ }
+
+ private void checkJobMetrics(
+ Map<String, List<MetricSample>> jobBeforeMetrics,
+ Map<String, List<MetricSample>> jobAfterMetrics) {
+ List<MetricSample> runtimeBefore =
sortedSamples(jobBeforeMetrics.get("job_runtime"));
+ Assertions.assertEquals(1, runtimeBefore.size());
+ Assertions.assertEquals(8L, runtimeBefore.get(0).timestampSeconds());
+ Assertions.assertEquals(8L, ((Number)
runtimeBefore.get(0).value().value()).longValue());
+
+ List<MetricSample> costBefore =
sortedSamples(jobBeforeMetrics.get("job_cost"));
+ Assertions.assertEquals(1, costBefore.size());
+ Assertions.assertEquals(9L, costBefore.get(0).timestampSeconds());
+ Assertions.assertEquals(9L, ((Number)
costBefore.get(0).value().value()).longValue());
+
+ List<MetricSample> runtimeAfter =
sortedSamples(jobAfterMetrics.get("job_runtime"));
+ Assertions.assertEquals(1, runtimeAfter.size());
+ Assertions.assertEquals(11L, runtimeAfter.get(0).timestampSeconds());
+ Assertions.assertEquals(12L, ((Number)
runtimeAfter.get(0).value().value()).longValue());
+
+ List<MetricSample> costAfter =
sortedSamples(jobAfterMetrics.get("job_cost"));
+ Assertions.assertEquals(1, costAfter.size());
+ Assertions.assertEquals(10L, costAfter.get(0).timestampSeconds());
+ Assertions.assertEquals(10L, ((Number)
costAfter.get(0).value().value()).longValue());
+ }
+
+ private static EvaluationResult findByScope(
+ List<EvaluationResult> results, DataScope.Type type, NameIdentifier
identifier) {
+ return results.stream()
+ .filter(result -> result.scope().type() == type)
+ .filter(result -> result.scope().identifier().equals(identifier))
+ .findFirst()
+ .orElseThrow(IllegalStateException::new);
+ }
+
+ private static List<MetricSample> sortedSamples(List<MetricSample> samples) {
+ if (samples == null) {
+ return List.of();
+ }
+ return samples.stream()
+ .sorted(Comparator.comparingLong(MetricSample::timestampSeconds))
+ .toList();
+ }
+
+ private OptimizerEnv createOptimizerEnv() {
+ String jdbcUrl =
+ String.format(
+
"jdbc:h2:mem:gravitino-monitor-it-%d;DB_CLOSE_DELAY=-1;MODE=MYSQL",
System.nanoTime());
+
+ Map<String, String> configs =
+ ImmutableMap.<String, String>builder()
+ .put(OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(),
MetricsEvaluatorForTest.NAME)
+ .put(
+ OptimizerConfig.TABLE_JOB_RELATION_PROVIDER_CONFIG.getKey(),
+ TableJobRelationProviderForTest.NAME)
+ .put(OptimizerConfig.MONITOR_CALLBACKS_CONFIG.getKey(),
MonitorCallbackForTest.NAME)
+ .put(
+ OptimizerConfig.OPTIMIZER_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_URL,
+ jdbcUrl)
+ .put(
+ OptimizerConfig.OPTIMIZER_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_DRIVER,
+ "org.h2.Driver")
+ .build();
+
+ return new OptimizerEnv(new OptimizerConfig(configs));
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/RecommenderIT.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/RecommenderIT.java
new file mode 100644
index 0000000000..a896eb2295
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/RecommenderIT.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import org.apache.gravitino.maintenance.optimizer.recommender.Recommender;
+import
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionJobContext;
+import
org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionStrategyHandler;
+import
org.apache.gravitino.maintenance.optimizer.recommender.util.StrategyUtils;
+import
org.apache.gravitino.maintenance.optimizer.updater.statistics.GravitinoStatisticsUpdater;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/*
+ * 1. update table stats
+ * 2. add policy
+ * 3. run recommender to get optimize result
+ */
+public class RecommenderIT extends AbstractGravitinoOptimizerEnvIT {
+
+ private static final String STATISTICS_PREFIX = "custom-";
+ private static final String DATAFILE_MSE = STATISTICS_PREFIX +
"datafile_size_mse";
+ private static final String DELETE_FILE_NUM = STATISTICS_PREFIX +
"position_delete_file_number";
+ private static final String SESSION_ID = "recommender-it-" +
UUID.randomUUID();
+
+ private StatisticsUpdater statisticsUpdater;
+
+ @Override
+ protected Map<String, String> getSpecifyConfigs() {
+ return Map.of(
+ OptimizerConfig.OPTIMIZER_PREFIX
+ + "strategyHandler."
+ + CompactionStrategyHandler.NAME
+ + ".className",
+ CompactionStrategyHandler.class.getName(),
+ OptimizerConfig.JOB_SUBMITTER_CONFIG.getKey(),
+ RecordingJobSubmitterForIT.NAME,
+ RecordingJobSubmitterForIT.SESSION_ID_KEY,
+ SESSION_ID);
+ }
+
+ @BeforeAll
+ void init() {
+ this.statisticsUpdater = new GravitinoStatisticsUpdater();
+ statisticsUpdater.initialize(optimizerEnv);
+ }
+
+ @AfterAll
+ void closeResources() throws Exception {
+ if (statisticsUpdater != null) {
+ statisticsUpdater.close();
+ }
+ }
+
+ @Test
+ void testRecommendNonPartitionTable() throws Exception {
+ String policyForDelete = "policyForDelete";
+ String policyForSmallFile = "policyForSmallFile";
+
+ String tableWithSmallFile = "tableWithSmallFile";
+ String tableWithDeleteFile = "tableWithDeleteFile";
+ String tableWithoutCompaction = "tableWithoutCompaction";
+
+ createTable(tableWithSmallFile);
+ createTable(tableWithDeleteFile);
+ createTable(tableWithoutCompaction);
+
+ createPolicy(
+ policyForSmallFile,
+ ImmutableMap.of(
+ "min_datafile_mse",
+ 1000,
+ StrategyUtils.TRIGGER_EXPR,
+ DATAFILE_MSE + " > min_datafile_mse || " + DELETE_FILE_NUM + " >
0",
+ StrategyUtils.SCORE_EXPR,
+ DATAFILE_MSE + " + " + DELETE_FILE_NUM + " * 10"),
+ CompactionStrategyHandler.NAME);
+
+ createPolicy(
+ policyForDelete,
+ ImmutableMap.of(
+ "min_datafile_mse",
+ 1000,
+ StrategyUtils.TRIGGER_EXPR,
+ DATAFILE_MSE + " > min_datafile_mse || " + DELETE_FILE_NUM + " >
1",
+ StrategyUtils.SCORE_EXPR,
+ DATAFILE_MSE + "/100 + " + DELETE_FILE_NUM + " * 100"),
+ CompactionStrategyHandler.NAME);
+
+ associatePoliciesToSchema(policyForSmallFile, TEST_SCHEMA);
+ associatePoliciesToSchema(policyForDelete, TEST_SCHEMA);
+
+ statisticsUpdater.updateTableStatistics(
+ getTableIdentifier(tableWithSmallFile),
+ Arrays.asList(
+ new StatisticEntryImpl<>(DELETE_FILE_NUM,
StatisticValues.longValue(2)),
+ new StatisticEntryImpl<>(DATAFILE_MSE,
StatisticValues.doubleValue(10000.1))));
+
+ statisticsUpdater.updateTableStatistics(
+ getTableIdentifier(tableWithDeleteFile),
+ Arrays.asList(
+ new StatisticEntryImpl<>(DELETE_FILE_NUM,
StatisticValues.longValue(100)),
+ new StatisticEntryImpl<>(DATAFILE_MSE,
StatisticValues.doubleValue(100.1))));
+
+ statisticsUpdater.updateTableStatistics(
+ getTableIdentifier(tableWithoutCompaction),
+ Arrays.asList(
+ new StatisticEntryImpl<>(DELETE_FILE_NUM,
StatisticValues.longValue(0)),
+ new StatisticEntryImpl<>(DATAFILE_MSE,
StatisticValues.doubleValue(0))));
+
+ try (Recommender recommender = new Recommender(optimizerEnv)) {
+ List<JobExecutionContext> jobs =
+ recommendForOneStrategy(
+ recommender,
+ Arrays.asList(
+ getTableIdentifier(tableWithSmallFile),
+ getTableIdentifier(tableWithDeleteFile),
+ getTableIdentifier(tableWithoutCompaction)),
+ policyForSmallFile);
+ Assertions.assertEquals(2, jobs.size());
+ Assertions.assertEquals(tableWithSmallFile,
jobs.get(0).nameIdentifier().name());
+ Assertions.assertEquals(tableWithDeleteFile,
jobs.get(1).nameIdentifier().name());
+
+ jobs =
+ recommendForOneStrategy(
+ recommender,
+ Arrays.asList(
+ getTableIdentifier(tableWithSmallFile),
+ getTableIdentifier(tableWithDeleteFile),
+ getTableIdentifier(tableWithoutCompaction)),
+ policyForDelete);
+ Assertions.assertEquals(2, jobs.size());
+ Assertions.assertEquals(tableWithDeleteFile,
jobs.get(0).nameIdentifier().name());
+ Assertions.assertEquals(tableWithSmallFile,
jobs.get(1).nameIdentifier().name());
+ }
+ }
+
+ @Test
+ void testCompactionPartitionTable() throws Exception {
+ String tableName = "partitionTable";
+ String tableName2 = "partitionTable2";
+ String policyName = "policyForPartition";
+
+ createPartitionTable(tableName);
+ createPartitionTable(tableName2);
+
+ createPolicy(
+ policyName,
+ ImmutableMap.of(
+ StrategyUtils.TRIGGER_EXPR,
+ "true",
+ StrategyUtils.SCORE_EXPR,
+ DELETE_FILE_NUM + " * 100 + " + DATAFILE_MSE),
+ CompactionStrategyHandler.NAME);
+ associatePoliciesToTable(policyName, tableName);
+ associatePoliciesToTable(policyName, tableName2);
+
+ List<PartitionEntry> partition1 =
+ Arrays.asList(new PartitionEntryImpl("col1", "1"), new
PartitionEntryImpl("col2", "3"));
+ List<PartitionEntry> partition2 =
+ Arrays.asList(new PartitionEntryImpl("col1", "2"), new
PartitionEntryImpl("col2", "4"));
+ List<PartitionEntry> partition3 =
+ Arrays.asList(new PartitionEntryImpl("col1", "10"), new
PartitionEntryImpl("col2", "5"));
+
+ statisticsUpdater.updateTableStatistics(
+ getTableIdentifier(tableName),
+ Arrays.asList(
+ new StatisticEntryImpl<>(DELETE_FILE_NUM,
StatisticValues.longValue(1)),
+ new StatisticEntryImpl<>(DATAFILE_MSE,
StatisticValues.doubleValue(10.0))));
+ statisticsUpdater.updatePartitionStatistics(
+ getTableIdentifier(tableName),
+ Map.of(
+ PartitionPath.of(partition1),
+ List.of(
+ new StatisticEntryImpl<>(DELETE_FILE_NUM,
StatisticValues.longValue(5)),
+ new StatisticEntryImpl<>(DATAFILE_MSE,
StatisticValues.doubleValue(1000.0))),
+ PartitionPath.of(partition2),
+ List.of(
+ new StatisticEntryImpl<>(DELETE_FILE_NUM,
StatisticValues.longValue(1)),
+ new StatisticEntryImpl<>(DATAFILE_MSE,
StatisticValues.doubleValue(100.0)))));
+
+ statisticsUpdater.updateTableStatistics(
+ getTableIdentifier(tableName2),
+ Arrays.asList(
+ new StatisticEntryImpl<>(DELETE_FILE_NUM,
StatisticValues.longValue(1)),
+ new StatisticEntryImpl<>(DATAFILE_MSE,
StatisticValues.doubleValue(10.0))));
+ statisticsUpdater.updatePartitionStatistics(
+ getTableIdentifier(tableName2),
+ Map.of(
+ PartitionPath.of(partition3),
+ List.of(
+ new StatisticEntryImpl<>(DELETE_FILE_NUM,
StatisticValues.longValue(2)),
+ new StatisticEntryImpl<>(DATAFILE_MSE,
StatisticValues.doubleValue(500.0)))));
+
+ try (Recommender recommender = new Recommender(optimizerEnv)) {
+ List<JobExecutionContext> jobs =
+ recommendForOneStrategy(
+ recommender,
+ Arrays.asList(getTableIdentifier(tableName),
getTableIdentifier(tableName2)),
+ policyName);
+
+ Assertions.assertEquals(2, jobs.size());
+ jobs.forEach(job -> Assertions.assertTrue(job instanceof
CompactionJobContext));
+
+ Map<String, CompactionJobContext> jobByTable =
+ jobs.stream()
+ .map(job -> (CompactionJobContext) job)
+ .collect(Collectors.toMap(ctx -> ctx.nameIdentifier().name(),
Function.identity()));
+
+ CompactionJobContext jobContext1 = jobByTable.get(tableName);
+ Assertions.assertNotNull(jobContext1);
+ Assertions.assertFalse(jobContext1.getPartitions().isEmpty());
+ Assertions.assertEquals(2, jobContext1.getPartitions().size());
+ Assertions.assertEquals(
+ Arrays.asList("col1=1", "col2=3"),
+ jobContext1.getPartitions().get(0).entries().stream()
+ .map(p -> p.partitionName() + "=" + p.partitionValue())
+ .toList());
+ Assertions.assertEquals(
+ Arrays.asList("col1=2", "col2=4"),
+ jobContext1.getPartitions().get(1).entries().stream()
+ .map(p -> p.partitionName() + "=" + p.partitionValue())
+ .toList());
+
+ CompactionJobContext jobContext2 = jobByTable.get(tableName2);
+ Assertions.assertNotNull(jobContext2);
+ Assertions.assertFalse(jobContext2.getPartitions().isEmpty());
+ Assertions.assertEquals(1, jobContext2.getPartitions().size());
+ Assertions.assertEquals(
+ Arrays.asList("col1=10", "col2=5"),
+ jobContext2.getPartitions().get(0).entries().stream()
+ .map(p -> p.partitionName() + "=" + p.partitionValue())
+ .toList());
+ }
+ }
+
+ private List<JobExecutionContext> recommendForOneStrategy(
+ Recommender recommender, List<NameIdentifier> identifiers, String
strategyName) {
+ RecordingJobSubmitterForIT.reset(SESSION_ID);
+ try {
+ recommender.submitForStrategyName(identifiers, strategyName,
Integer.MAX_VALUE);
+ return RecordingJobSubmitterForIT.submittedContexts(SESSION_ID);
+ } finally {
+ RecordingJobSubmitterForIT.clear(SESSION_ID);
+ }
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/RecordingJobSubmitterForIT.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/RecordingJobSubmitterForIT.java
new file mode 100644
index 0000000000..06eb3c02e1
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/RecordingJobSubmitterForIT.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import
org.apache.gravitino.maintenance.optimizer.api.recommender.JobExecutionContext;
+import org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+
+public class RecordingJobSubmitterForIT implements JobSubmitter {
+ public static final String NAME = "recording-job-submitter-it";
+ public static final String SESSION_ID_KEY =
+ OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + "recording-session-id";
+
+ private static final Map<String, List<JobExecutionContext>>
SUBMITTED_CONTEXTS_BY_SESSION =
+ new ConcurrentHashMap<>();
+
+ private String sessionId;
+
+ public static void reset(String sessionId) {
+ SUBMITTED_CONTEXTS_BY_SESSION.put(sessionId, new CopyOnWriteArrayList<>());
+ }
+
+ public static List<JobExecutionContext> submittedContexts(String sessionId) {
+ return List.copyOf(
+ SUBMITTED_CONTEXTS_BY_SESSION.getOrDefault(sessionId, new
CopyOnWriteArrayList<>()));
+ }
+
+ public static void clear(String sessionId) {
+ SUBMITTED_CONTEXTS_BY_SESSION.remove(sessionId);
+ }
+
+ @Override
+ public String submitJob(String jobTemplateName, JobExecutionContext
jobExecutionContext) {
+ List<JobExecutionContext> submittedContexts =
+ SUBMITTED_CONTEXTS_BY_SESSION.computeIfAbsent(
+ sessionId, key -> new CopyOnWriteArrayList<>());
+ submittedContexts.add(jobExecutionContext);
+ return "it-job-" + submittedContexts.size();
+ }
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {
+ this.sessionId = optimizerEnv.config().getAllConfig().get(SESSION_ID_KEY);
+ if (sessionId == null || sessionId.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Missing test session id config for RecordingJobSubmitterForIT: " +
SESSION_ID_KEY);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {}
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/UpdaterIT.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/UpdaterIT.java
new file mode 100644
index 0000000000..e84937189a
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/integration/test/UpdaterIT.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.integration.test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import
org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider;
+import
org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider;
+import
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import org.apache.gravitino.maintenance.optimizer.updater.UpdateType;
+import org.apache.gravitino.maintenance.optimizer.updater.Updater;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+public class UpdaterIT extends AbstractGravitinoOptimizerEnvIT {
+
+ private Updater updater;
+ private GravitinoStatisticsProvider statisticsProvider;
+ private GravitinoMetricsProvider metricsProvider;
+
+ @Override
+ protected Map<String, String> getSpecifyConfigs() {
+ return Map.of();
+ }
+
+ @BeforeAll
+ void init() {
+ this.updater = new Updater(optimizerEnv);
+ this.statisticsProvider = new GravitinoStatisticsProvider();
+ statisticsProvider.initialize(optimizerEnv);
+ this.metricsProvider = new GravitinoMetricsProvider();
+ metricsProvider.initialize(optimizerEnv);
+ }
+
+ @AfterAll
+ void closeResources() throws Exception {
+ if (metricsProvider != null) {
+ metricsProvider.close();
+ }
+ if (statisticsProvider != null) {
+ statisticsProvider.close();
+ }
+ if (updater != null) {
+ updater.close();
+ }
+ }
+
+ @Test
+ void testUpdateTableStatistics() {
+ String tableName = "update-stats";
+ createTable(tableName);
+ NameIdentifier tableIdentifier = getTableIdentifier(tableName);
+ updater.update(
+ DummyTableStatisticsComputer.DUMMY_TABLE_STAT,
+ Arrays.asList(tableIdentifier),
+ UpdateType.STATISTICS);
+
+ List<StatisticEntry<?>> tableStats =
statisticsProvider.tableStatistics(tableIdentifier);
+ Assertions.assertEquals(1, tableStats.size());
+ Assertions.assertEquals(DummyTableStatisticsComputer.TABLE_STAT_NAME,
tableStats.get(0).name());
+ Assertions.assertEquals(1L, ((Number)
tableStats.get(0).value().value()).longValue());
+
+ Map<PartitionPath, List<StatisticEntry<?>>> partitionStats =
+ statisticsProvider.partitionStatistics(tableIdentifier);
+ Assertions.assertEquals(1, partitionStats.size());
+ List<StatisticEntry<?>> partitionEntries =
+
partitionStats.values().stream().findFirst().orElseThrow(IllegalStateException::new);
+ Assertions.assertEquals(1, partitionEntries.size());
+ Assertions.assertEquals(
+ DummyTableStatisticsComputer.TABLE_STAT_NAME,
partitionEntries.get(0).name());
+ Assertions.assertEquals(2L, ((Number)
partitionEntries.get(0).value().value()).longValue());
+ Assertions.assertEquals(
+ PartitionUtils.encodePartitionPath(
+
PartitionPath.of(DummyTableStatisticsComputer.getPartitionEntries())),
+ PartitionUtils.encodePartitionPath(
+
partitionStats.keySet().stream().findFirst().orElseThrow(IllegalStateException::new)));
+ }
+
+ @Test
+ void testUpdateTableMetrics() {
+ String tableName = "update-metrics";
+ createTable(tableName);
+ NameIdentifier tableIdentifier = getTableIdentifier(tableName);
+ updater.update(
+ DummyTableStatisticsComputer.DUMMY_TABLE_STAT,
+ Arrays.asList(tableIdentifier),
+ UpdateType.METRICS);
+
+ List<MetricPoint> tableMetrics =
+ metricsProvider.tableMetrics(tableIdentifier, 0, Long.MAX_VALUE);
+ Assertions.assertEquals(1, tableMetrics.size());
+ MetricPoint tableMetric = tableMetrics.get(0);
+ Assertions.assertEquals(DummyTableStatisticsComputer.TABLE_STAT_NAME,
tableMetric.metricName());
+ long tableDiff = System.currentTimeMillis() / 1000 -
tableMetric.timestampSeconds();
+ Assertions.assertTrue(tableDiff >= 0 && tableDiff <= 10000);
+ Assertions.assertEquals(1L, ((Number)
tableMetric.value().value()).longValue());
+
+ PartitionPath expectedPartition =
+ PartitionPath.of(DummyTableStatisticsComputer.getPartitionEntries());
+ List<MetricPoint> partitionMetrics =
+ metricsProvider.partitionMetrics(tableIdentifier, expectedPartition,
0, Long.MAX_VALUE);
+ Assertions.assertEquals(1, partitionMetrics.size());
+ MetricPoint partitionMetric = partitionMetrics.get(0);
+ Assertions.assertEquals(
+ DummyTableStatisticsComputer.TABLE_STAT_NAME,
partitionMetric.metricName());
+ long partitionDiff = System.currentTimeMillis() / 1000 -
partitionMetric.timestampSeconds();
+ Assertions.assertTrue(partitionDiff >= 0 && partitionDiff <= 10000);
+ Assertions.assertEquals(2L, ((Number)
partitionMetric.value().value()).longValue());
+ Assertions.assertEquals(
+ PartitionUtils.encodePartitionPath(expectedPartition),
+
PartitionUtils.encodePartitionPath(partitionMetric.partitionPath().orElseThrow()));
+ }
+
+ @Test
+ void testUpdateJobMetrics() {
+ String jobName = "update-job-metrics";
+ NameIdentifier jobIdentifier = NameIdentifier.of(jobName);
+ updater.update(
+ DummyJobMetricsCalculator.DUMMY_JOB_METRICS,
+ Arrays.asList(jobIdentifier),
+ UpdateType.METRICS);
+
+ List<MetricPoint> jobMetrics = metricsProvider.jobMetrics(jobIdentifier,
0, Long.MAX_VALUE);
+ Assertions.assertEquals(1, jobMetrics.size());
+ MetricPoint jobMetric = jobMetrics.get(0);
+ Assertions.assertEquals(DummyJobMetricsCalculator.JOB_STAT_NAME,
jobMetric.metricName());
+ long diff = System.currentTimeMillis() / 1000 -
jobMetric.timestampSeconds();
+ Assertions.assertTrue(diff >= 0 && diff <= 10000);
+ Assertions.assertEquals(1L, ((Number)
jobMetric.value().value()).longValue());
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/TestGravitinoMetricsUpdater.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/TestGravitinoMetricsUpdater.java
index e68bf72573..673892ecd5 100644
---
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/TestGravitinoMetricsUpdater.java
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/TestGravitinoMetricsUpdater.java
@@ -19,7 +19,6 @@
package org.apache.gravitino.maintenance.optimizer.updater.metrics;
-import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import org.apache.gravitino.NameIdentifier;
@@ -62,7 +61,7 @@ class TestGravitinoMetricsUpdater {
void testUpdateTableAndPartitionMetricsPassThroughRequests() throws
Exception {
GravitinoMetricsUpdater updater = new GravitinoMetricsUpdater();
MetricsRepository repository = Mockito.mock(MetricsRepository.class);
- setMetricsRepository(updater, repository);
+ updater.setMetricsRepositoryForTest(repository);
NameIdentifier tableId = NameIdentifier.of("catalog", "db", "table");
List<MetricPoint> inputMetrics =
List.of(
@@ -97,7 +96,7 @@ class TestGravitinoMetricsUpdater {
void testUpdateJobMetricsPassThroughRequests() throws Exception {
GravitinoMetricsUpdater updater = new GravitinoMetricsUpdater();
MetricsRepository repository = Mockito.mock(MetricsRepository.class);
- setMetricsRepository(updater, repository);
+ updater.setMetricsRepositoryForTest(repository);
NameIdentifier jobId = NameIdentifier.of("catalog", "db", "job");
List<MetricPoint> inputMetrics =
List.of(MetricPoint.forJob(jobId, "duration",
StatisticValues.longValue(20L), 200L));
@@ -120,7 +119,7 @@ class TestGravitinoMetricsUpdater {
void testUpdateTableAndPartitionMetricsRejectsJobScope() throws Exception {
GravitinoMetricsUpdater updater = new GravitinoMetricsUpdater();
MetricsRepository repository = Mockito.mock(MetricsRepository.class);
- setMetricsRepository(updater, repository);
+ updater.setMetricsRepositoryForTest(repository);
NameIdentifier jobId = NameIdentifier.of("catalog", "db", "job");
Assertions.assertThrows(
@@ -135,7 +134,7 @@ class TestGravitinoMetricsUpdater {
void testUpdateJobMetricsRejectsTableScope() throws Exception {
GravitinoMetricsUpdater updater = new GravitinoMetricsUpdater();
MetricsRepository repository = Mockito.mock(MetricsRepository.class);
- setMetricsRepository(updater, repository);
+ updater.setMetricsRepositoryForTest(repository);
NameIdentifier tableId = NameIdentifier.of("catalog", "db", "table");
Assertions.assertThrows(
@@ -151,7 +150,7 @@ class TestGravitinoMetricsUpdater {
void testCloseDelegatesToRepository() throws Exception {
GravitinoMetricsUpdater updater = new GravitinoMetricsUpdater();
MetricsRepository repository = Mockito.mock(MetricsRepository.class);
- setMetricsRepository(updater, repository);
+ updater.setMetricsRepositoryForTest(repository);
updater.close();
@@ -177,7 +176,7 @@ class TestGravitinoMetricsUpdater {
"org.h2.Driver",
OptimizerConfig.OPTIMIZER_PREFIX + "jdbcMetrics." +
"testOnBorrow",
"false"))));
- MetricsRepository repository = getMetricsRepository(updater);
+ MetricsRepository repository = updater.metricsRepositoryForTest();
Assertions.assertInstanceOf(GenericJdbcMetricsRepository.class,
repository);
updater.close();
}
@@ -196,25 +195,11 @@ class TestGravitinoMetricsUpdater {
"gravitino.optimizer.jdbcMetrics.jdbcPassword",
""));
updater.initialize(new OptimizerEnv(config));
- MetricsRepository repository = getMetricsRepository(updater);
+ MetricsRepository repository = updater.metricsRepositoryForTest();
Assertions.assertInstanceOf(GenericJdbcMetricsRepository.class,
repository);
updater.close();
}
- private void setMetricsRepository(GravitinoMetricsUpdater updater,
MetricsRepository repository)
- throws ReflectiveOperationException {
- Field field =
GravitinoMetricsUpdater.class.getDeclaredField("metricsStorage");
- field.setAccessible(true);
- field.set(updater, repository);
- }
-
- private MetricsRepository getMetricsRepository(GravitinoMetricsUpdater
updater)
- throws ReflectiveOperationException {
- Field field =
GravitinoMetricsUpdater.class.getDeclaredField("metricsStorage");
- field.setAccessible(true);
- return (MetricsRepository) field.get(updater);
- }
-
private PartitionPath parsePartitionPath(String partition) {
String[] entries = partition.split("/");
List<PartitionEntry> partitionEntries = new
java.util.ArrayList<>(entries.length);
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/TestGravitinoStatisticsUpdater.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/TestGravitinoStatisticsUpdater.java
index 65e5ea4b6c..f856cc7c03 100644
---
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/TestGravitinoStatisticsUpdater.java
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/TestGravitinoStatisticsUpdater.java
@@ -19,7 +19,6 @@
package org.apache.gravitino.maintenance.optimizer.updater.statistics;
-import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -57,7 +56,7 @@ class TestGravitinoStatisticsUpdater {
void testUpdateTableStatisticsDuplicateNameLastWins() throws Exception {
GravitinoStatisticsUpdater updater = new GravitinoStatisticsUpdater();
GravitinoClient client = Mockito.mock(GravitinoClient.class,
Mockito.RETURNS_DEEP_STUBS);
- setClient(updater, client);
+ updater.setGravitinoClientForTest(client);
NameIdentifier tableIdentifier = NameIdentifier.of("catalog", "db",
"table");
updater.updateTableStatistics(
@@ -82,7 +81,7 @@ class TestGravitinoStatisticsUpdater {
void testUpdatePartitionStatisticsNullPartitionPathFails() throws Exception {
GravitinoStatisticsUpdater updater = new GravitinoStatisticsUpdater();
GravitinoClient client = Mockito.mock(GravitinoClient.class,
Mockito.RETURNS_DEEP_STUBS);
- setClient(updater, client);
+ updater.setGravitinoClientForTest(client);
Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics = new
HashMap<>();
partitionStatistics.put(null, List.of(stat("s1", 1L)));
@@ -101,7 +100,7 @@ class TestGravitinoStatisticsUpdater {
void testUpdatePartitionStatisticsDuplicateNameLastWins() throws Exception {
GravitinoStatisticsUpdater updater = new GravitinoStatisticsUpdater();
GravitinoClient client = Mockito.mock(GravitinoClient.class,
Mockito.RETURNS_DEEP_STUBS);
- setClient(updater, client);
+ updater.setGravitinoClientForTest(client);
NameIdentifier tableIdentifier = NameIdentifier.of("catalog", "db",
"table");
PartitionPath partitionPath = PartitionPath.of(List.of(new
PartitionEntryImpl("p", "1")));
@@ -128,11 +127,4 @@ class TestGravitinoStatisticsUpdater {
private StatisticEntry<?> stat(String name, long value) {
return new StatisticEntryImpl<>(name, StatisticValues.longValue(value));
}
-
- private void setClient(GravitinoStatisticsUpdater updater, GravitinoClient
client)
- throws ReflectiveOperationException {
- Field field =
GravitinoStatisticsUpdater.class.getDeclaredField("gravitinoClient");
- field.setAccessible(true);
- field.set(updater, client);
- }
}
diff --git
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
index 3084fcc280..0b1189aba0 100644
---
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
+++
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
@@ -25,3 +25,4 @@
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTe
org.apache.gravitino.maintenance.optimizer.recommender.StrategyProviderForCmdTest
org.apache.gravitino.maintenance.optimizer.recommender.StatisticsProviderForCmdTest
org.apache.gravitino.maintenance.optimizer.recommender.TableMetadataProviderForCmdTest
+org.apache.gravitino.maintenance.optimizer.integration.test.RecordingJobSubmitterForIT
diff --git
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator
index 2e0ab7cd28..2e3a914a1c 100644
---
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator
+++
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsCalculator
@@ -16,3 +16,5 @@
# under the License.
org.apache.gravitino.maintenance.optimizer.updater.StatisticsCalculatorForTest
+org.apache.gravitino.maintenance.optimizer.integration.test.DummyTableStatisticsComputer
+org.apache.gravitino.maintenance.optimizer.integration.test.DummyJobMetricsCalculator