This is an automated email from the ASF dual-hosted git repository. fanng pushed a commit to branch stats_job in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 3191bc438a46b8d08c9fbefc1960b96bd5ccd900 Author: fanng <[email protected]> AuthorDate: Sat Feb 28 20:28:55 2026 +0800 support stats_job --- .../build.gradle.kts} | 48 +- .../updater/metrics/GravitinoMetricsUpdater.java | 0 .../updater/metrics/storage/MetricsRepository.java | 0 .../metrics/storage/MetricsStorageException.java | 0 .../jdbc/DataSourceJdbcConnectionProvider.java | 0 .../storage/jdbc/GenericJdbcMetricsRepository.java | 0 .../metrics/storage/jdbc/H2MetricsDialect.java | 0 .../metrics/storage/jdbc/JdbcConnectionConfig.java | 0 .../metrics/storage/jdbc/JdbcMetricsDialect.java | 0 .../storage/jdbc/JdbcMetricsRepository.java | 21 +- .../metrics/storage/jdbc/MySQLMetricsDialect.java | 0 .../storage/jdbc/PostgreSQLMetricsDialect.java | 0 .../statistics/GravitinoStatisticsUpdater.java | 4 +- ...itino.maintenance.optimizer.api.common.Provider | 9 - maintenance/jobs/build.gradle.kts | 9 + .../jobs/BuiltInJobTemplateProvider.java | 4 +- .../jobs/iceberg/IcebergUpdateStatsJob.java | 542 +++++++++++++++++++++ .../jobs/iceberg/TestIcebergUpdateStatsJob.java | 126 +++++ .../TestIcebergUpdateStatsJobWithSpark.java | 275 +++++++++++ .../build.gradle.kts} | 31 +- .../optimizer/api/common/DataScope.java | 0 .../optimizer/api/common/MetricPoint.java | 0 .../optimizer/api/common/MetricSample.java | 0 .../optimizer/api/common/PartitionEntry.java | 0 .../optimizer/api/common/PartitionPath.java | 0 .../optimizer/api/common/PartitionStrategy.java | 0 .../maintenance/optimizer/api/common/Provider.java | 0 .../optimizer/api/common/StatisticEntry.java | 0 .../maintenance/optimizer/api/common/Strategy.java | 0 .../api/common/TableAndPartitionStatistics.java | 0 .../optimizer/api/monitor/EvaluationResult.java | 0 .../optimizer/api/monitor/MetricsEvaluator.java | 0 .../optimizer/api/monitor/MetricsProvider.java | 0 .../optimizer/api/monitor/MonitorCallback.java | 0 .../api/monitor/TableJobRelationProvider.java | 0 .../api/recommender/JobExecutionContext.java | 0 .../optimizer/api/recommender/JobSubmitter.java | 0 .../api/recommender/StatisticsProvider.java | 0 .../api/recommender/StrategyEvaluation.java | 0 .../optimizer/api/recommender/StrategyHandler.java | 0 .../api/recommender/StrategyHandlerContext.java | 0 .../api/recommender/StrategyProvider.java | 0 .../api/recommender/SupportTableStatistics.java | 0 .../api/recommender/TableMetadataProvider.java | 0 .../optimizer/api/updater/MetricsUpdater.java | 0 .../api/updater/StatisticsCalculator.java | 0 .../optimizer/api/updater/StatisticsUpdater.java | 0 .../updater/SupportsCalculateBulkJobMetrics.java | 0 .../SupportsCalculateBulkJobStatistics.java | 0 .../updater/SupportsCalculateBulkTableMetrics.java | 0 .../SupportsCalculateBulkTableStatistics.java | 0 .../api/updater/SupportsCalculateJobMetrics.java | 0 .../updater/SupportsCalculateJobStatistics.java | 0 .../api/updater/SupportsCalculateTableMetrics.java | 0 .../updater/SupportsCalculateTableStatistics.java | 0 .../maintenance/optimizer/common/OptimizerEnv.java | 0 .../optimizer/common/PartitionEntryImpl.java | 0 .../optimizer/common/StatisticEntryImpl.java | 0 .../optimizer/common/conf/OptimizerConfig.java | 45 +- .../common/util/GravitinoClientUtils.java | 0 .../optimizer/common/util/IdentifierUtils.java | 0 .../common/util/PartitionPathSerdeUtils.java} | 53 +- .../optimizer/common/util/ProviderUtils.java | 0 .../common/util/StatisticValueSerdeUtils.java | 50 ++ .../common/util/TestPartitionPathSerdeUtils.java | 51 ++ maintenance/optimizer/build.gradle.kts | 2 + .../optimizer/recommender/util/PartitionUtils.java | 59 +-- ...itino.maintenance.optimizer.api.common.Provider | 2 - .../job/TestBuiltinIcebergRewriteDataFiles.java | 54 +- .../job/TestBuiltinIcebergUpdateStatsJob.java | 180 +++++++ settings.gradle.kts | 7 +- 71 files changed, 1404 insertions(+), 168 deletions(-) diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/StatisticEntry.java b/maintenance/gravitino-updaters/build.gradle.kts similarity index 51% copy from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/StatisticEntry.java copy to maintenance/gravitino-updaters/build.gradle.kts index 9c2443db50..9da6abb540 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/StatisticEntry.java +++ b/maintenance/gravitino-updaters/build.gradle.kts @@ -16,31 +16,35 @@ * specific language governing permissions and limitations * under the License. */ +description = "Gravitino Built-in Updaters" -package org.apache.gravitino.maintenance.optimizer.api.common; +plugins { + `maven-publish` + id("java") + id("idea") +} -import org.apache.gravitino.annotation.DeveloperApi; -import org.apache.gravitino.stats.StatisticValue; +dependencies { + implementation(project(":api")) + implementation(project(":common")) + implementation(project(":clients:client-java")) + implementation(project(":maintenance:optimizer-api")) + implementation(libs.guava) + implementation(libs.commons.lang3) + implementation(libs.commons.dbcp2) + implementation(libs.h2db) + implementation(libs.slf4j.api) + implementation(libs.jackson.databind) -/** - * Named statistic value produced or consumed by optimizer components. - * - * @param <T> underlying Java type wrapped by the {@link StatisticValue} - */ -@DeveloperApi -public interface StatisticEntry<T> { + annotationProcessor(libs.lombok) + compileOnly(libs.lombok) + testAnnotationProcessor(libs.lombok) + testCompileOnly(libs.lombok) - /** - * Stable metric key used for lookup and reporting. - * - * @return non-null metric name - */ - String name(); + testImplementation(libs.junit.jupiter.api) + testRuntimeOnly(libs.junit.jupiter.engine) +} - /** - * Typed statistic value wrapper. - * - * @return statistic value container - */ - StatisticValue<T> value(); +tasks.test { + useJUnitPlatform() } diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java rename to maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsRepository.java b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsRepository.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsRepository.java rename to maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsRepository.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsStorageException.java b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsStorageException.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsStorageException.java rename to maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/MetricsStorageException.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/DataSourceJdbcConnectionProvider.java b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/DataSourceJdbcConnectionProvider.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/DataSourceJdbcConnectionProvider.java rename to maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/DataSourceJdbcConnectionProvider.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/GenericJdbcMetricsRepository.java b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/GenericJdbcMetricsRepository.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/GenericJdbcMetricsRepository.java rename to maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/GenericJdbcMetricsRepository.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/H2MetricsDialect.java b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/H2MetricsDialect.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/H2MetricsDialect.java rename to maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/H2MetricsDialect.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcConnectionConfig.java b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcConnectionConfig.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcConnectionConfig.java rename to maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcConnectionConfig.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsDialect.java b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsDialect.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsDialect.java rename to maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsDialect.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java similarity index 96% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java rename to maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java index 406d9b4372..e138e18fd7 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java +++ b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java @@ -41,8 +41,8 @@ import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.config.ConfigConstants; import org.apache.gravitino.maintenance.optimizer.api.common.DataScope; import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint; -import org.apache.gravitino.maintenance.optimizer.common.util.StatisticValueUtils; -import org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils; +import org.apache.gravitino.maintenance.optimizer.common.util.PartitionPathSerdeUtils; +import org.apache.gravitino.maintenance.optimizer.common.util.StatisticValueSerdeUtils; import org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.MetricsRepository; import org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.MetricsStorageException; import org.apache.gravitino.utils.jdbc.JdbcSqlScriptUtils; @@ -247,13 +247,13 @@ public abstract class JdbcMetricsRepository implements MetricsRepository { || metricPoint.scope() == DataScope.Type.PARTITION, "Unsupported scope %s for table/partition metrics", metricPoint.scope()); - String serializedMetricValue = StatisticValueUtils.toString(metricPoint.value()); + String serializedMetricValue = StatisticValueSerdeUtils.toString(metricPoint.value()); validateWriteArguments(metricPoint, serializedMetricValue); String normalizedIdentifier = normalizeIdentifier(metricPoint.identifier()); String normalizedMetricName = normalizeMetricName(metricPoint.metricName()); String normalizedPartition = - metricPoint.partitionPath().map(PartitionUtils::encodePartitionPath).orElse(null); + metricPoint.partitionPath().map(PartitionPathSerdeUtils::encode).orElse(null); tableInsertStmt.setString(1, normalizedIdentifier); tableInsertStmt.setString(2, normalizedMetricName); tableInsertStmt.setString(3, normalizePartition(normalizedPartition).orElse(null)); @@ -295,7 +295,7 @@ public abstract class JdbcMetricsRepository implements MetricsRepository { metricPoint.scope() == DataScope.Type.JOB, "Unsupported scope %s for job metrics", metricPoint.scope()); - String serializedMetricValue = StatisticValueUtils.toString(metricPoint.value()); + String serializedMetricValue = StatisticValueSerdeUtils.toString(metricPoint.value()); validateWriteArguments(metricPoint, serializedMetricValue); String normalizedIdentifier = normalizeIdentifier(metricPoint.identifier()); @@ -439,8 +439,7 @@ public abstract class JdbcMetricsRepository implements MetricsRepository { MAX_METRIC_VALUE_LENGTH, serializedMetricValue.length()); if (metricPoint.partitionPath().isPresent()) { - String encodedPartition = - PartitionUtils.encodePartitionPath(metricPoint.partitionPath().get()); + String encodedPartition = PartitionPathSerdeUtils.encode(metricPoint.partitionPath().get()); Preconditions.checkArgument( StringUtils.isNotBlank(encodedPartition), "partition must not be blank"); } @@ -466,7 +465,7 @@ public abstract class JdbcMetricsRepository implements MetricsRepository { MetricPoint.forTable( nameIdentifier, rs.getString("metric_name"), - StatisticValueUtils.fromString(rs.getString("metric_value")), + StatisticValueSerdeUtils.fromString(rs.getString("metric_value")), rs.getLong("metric_ts"))); } } @@ -487,7 +486,7 @@ public abstract class JdbcMetricsRepository implements MetricsRepository { Preconditions.checkArgument( scope.partition().isPresent(), "partition scope must contain partition path"); NameIdentifier nameIdentifier = scope.identifier(); - String partition = PartitionUtils.encodePartitionPath(scope.partition().get()); + String partition = PartitionPathSerdeUtils.encode(scope.partition().get()); List<MetricPoint> result = new ArrayList<>(); String sql = @@ -509,7 +508,7 @@ public abstract class JdbcMetricsRepository implements MetricsRepository { nameIdentifier, scope.partition().get(), rs.getString("metric_name"), - StatisticValueUtils.fromString(rs.getString("metric_value")), + StatisticValueSerdeUtils.fromString(rs.getString("metric_value")), rs.getLong("metric_ts"))); } } @@ -547,7 +546,7 @@ public abstract class JdbcMetricsRepository implements MetricsRepository { MetricPoint.forJob( nameIdentifier, rs.getString("metric_name"), - StatisticValueUtils.fromString(rs.getString("metric_value")), + StatisticValueSerdeUtils.fromString(rs.getString("metric_value")), rs.getLong("metric_ts"))); } } diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/MySQLMetricsDialect.java b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/MySQLMetricsDialect.java similarity index 100% copy from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/MySQLMetricsDialect.java copy to maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/MySQLMetricsDialect.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/PostgreSQLMetricsDialect.java b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/PostgreSQLMetricsDialect.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/PostgreSQLMetricsDialect.java rename to maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/PostgreSQLMetricsDialect.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java similarity index 97% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java rename to maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java index 3b6f4e45c0..a3635cf2c7 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java +++ b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java @@ -33,7 +33,7 @@ import org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater; import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv; import org.apache.gravitino.maintenance.optimizer.common.util.GravitinoClientUtils; import org.apache.gravitino.maintenance.optimizer.common.util.IdentifierUtils; -import org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils; +import org.apache.gravitino.maintenance.optimizer.common.util.PartitionPathSerdeUtils; import org.apache.gravitino.stats.PartitionStatisticsUpdate; import org.apache.gravitino.stats.StatisticValue; @@ -104,7 +104,7 @@ public class GravitinoStatisticsUpdater implements StatisticsUpdater { return new PartitionStatisticsUpdate() { @Override public String partitionName() { - return PartitionUtils.encodePartitionPath(entry.getKey()); + return PartitionPathSerdeUtils.encode(entry.getKey()); } @Override diff --git a/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider b/maintenance/gravitino-updaters/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider similarity index 55% copy from maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider copy to maintenance/gravitino-updaters/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider index c059ab3483..ca56d73c1e 100644 --- a/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider +++ b/maintenance/gravitino-updaters/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider @@ -16,14 +16,5 @@ # specific language governing permissions and limitations # under the License. # -org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategyProvider -org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider -org.apache.gravitino.maintenance.optimizer.recommender.table.GravitinoTableMetadataProvider -org.apache.gravitino.maintenance.optimizer.recommender.job.GravitinoJobSubmitter -org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter org.apache.gravitino.maintenance.optimizer.updater.statistics.GravitinoStatisticsUpdater org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater -org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider -org.apache.gravitino.maintenance.optimizer.monitor.job.dummy.DummyTableJobRelationProvider -org.apache.gravitino.maintenance.optimizer.monitor.job.local.LocalTableJobRelationProvider -org.apache.gravitino.maintenance.optimizer.monitor.callback.ConsoleMonitorCallback diff --git a/maintenance/jobs/build.gradle.kts b/maintenance/jobs/build.gradle.kts index 1cec173984..e88f3ebbef 100644 --- a/maintenance/jobs/build.gradle.kts +++ b/maintenance/jobs/build.gradle.kts @@ -25,6 +25,11 @@ plugins { alias(libs.plugins.shadow) } +java { + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} + repositories { mavenCentral() } @@ -36,6 +41,8 @@ val sparkMajorVersion = "3.5" dependencies { implementation(project(":api")) + implementation(project(":maintenance:optimizer-api")) + implementation(project(":maintenance:gravitino-updaters")) compileOnly(libs.slf4j.api) compileOnly(libs.jackson.databind) @@ -51,6 +58,8 @@ dependencies { } testImplementation(project(":api")) + testImplementation(project(":common")) + testImplementation(project(":clients:client-java")) testImplementation(libs.bundles.log4j) testImplementation(libs.hadoop3.common) { exclude("org.slf4j") diff --git a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java index 8620c2330c..1e93145446 100644 --- a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java +++ b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java @@ -26,6 +26,7 @@ import java.util.stream.Collectors; import org.apache.gravitino.job.JobTemplate; import org.apache.gravitino.job.JobTemplateProvider; import org.apache.gravitino.maintenance.jobs.iceberg.IcebergRewriteDataFilesJob; +import org.apache.gravitino.maintenance.jobs.iceberg.IcebergUpdateStatsJob; import org.apache.gravitino.maintenance.jobs.spark.SparkPiJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +42,8 @@ public class BuiltInJobTemplateProvider implements JobTemplateProvider { Pattern.compile(JobTemplateProvider.VERSION_VALUE_PATTERN); private static final List<BuiltInJob> BUILT_IN_JOBS = - ImmutableList.of(new SparkPiJob(), new IcebergRewriteDataFilesJob()); + ImmutableList.of( + new SparkPiJob(), new IcebergRewriteDataFilesJob(), new IcebergUpdateStatsJob()); @Override public List<? extends JobTemplate> jobTemplates() { diff --git a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java new file mode 100644 index 0000000000..e2689825c3 --- /dev/null +++ b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.maintenance.jobs.iceberg; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.job.JobTemplateProvider; +import org.apache.gravitino.job.SparkJobTemplate; +import org.apache.gravitino.maintenance.jobs.BuiltInJob; +import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint; +import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry; +import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath; +import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry; +import org.apache.gravitino.maintenance.optimizer.api.updater.MetricsUpdater; +import org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater; +import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv; +import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl; +import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl; +import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig; +import org.apache.gravitino.maintenance.optimizer.common.util.ProviderUtils; +import org.apache.gravitino.stats.StatisticValues; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Built-in job for computing Iceberg table file statistics and persisting them to Gravitino. */ +public class IcebergUpdateStatsJob implements BuiltInJob { + + private static final Logger LOG = LoggerFactory.getLogger(IcebergUpdateStatsJob.class); + + private static final String NAME = + JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-update-stats"; + private static final String VERSION = "v1"; + private static final String DEFAULT_STATISTICS_UPDATER = "gravitino-statistics-updater"; + private static final String DEFAULT_METRICS_UPDATER = "gravitino-metrics-updater"; + private static final long DEFAULT_TARGET_FILE_SIZE_BYTES = 100_000L; + private static final String CUSTOM_STAT_PREFIX = "custom-"; + + @Override + public SparkJobTemplate jobTemplate() { + return SparkJobTemplate.builder() + .withName(NAME) + .withComment( + "Built-in Iceberg update stats job template for computing datafile MSE and file metrics") + .withExecutable(resolveExecutable(IcebergUpdateStatsJob.class)) + .withClassName(IcebergUpdateStatsJob.class.getName()) + .withArguments(buildArguments()) + .withConfigs(buildSparkConfigs()) + .withCustomFields( + Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY, VERSION)) + .build(); + } + + /** Main entry point. */ + public static void main(String[] args) { + Map<String, String> argMap = parseArguments(args); + String catalogName = argMap.get("catalog"); + String tableIdentifier = argMap.get("table"); + String gravitinoUri = argMap.get("gravitino-uri"); + String metalake = argMap.get("metalake"); + + if (catalogName == null + || tableIdentifier == null + || gravitinoUri == null + || metalake == null) { + System.err.println( + "Error: --catalog, --table, --gravitino-uri and --metalake are required arguments"); + printUsage(); + System.exit(1); + } + + String updaterName = + argMap.getOrDefault("statistics-updater", DEFAULT_STATISTICS_UPDATER).trim(); + boolean enableMetrics = parseEnableMetrics(argMap.get("enable-metrics")); + String metricsUpdaterName = + argMap.getOrDefault("metrics-updater", DEFAULT_METRICS_UPDATER).trim(); + long targetFileSizeBytes = parseTargetFileSize(argMap.get("target-file-size-bytes")); + String sparkConfJson = argMap.get("spark-conf"); + + SparkSession.Builder sparkBuilder = + SparkSession.builder().appName("Gravitino Built-in Iceberg Update Stats"); + + if (sparkConfJson != null && !sparkConfJson.isEmpty()) { + Map<String, String> customConfigs = parseCustomSparkConfigs(sparkConfJson); + for (Map.Entry<String, String> entry : customConfigs.entrySet()) { + sparkBuilder.config(entry.getKey(), entry.getValue()); + } + } + + SparkSession spark = sparkBuilder.getOrCreate(); + StatisticsUpdater statisticsUpdater = null; + MetricsUpdater metricsUpdater = null; + try { + statisticsUpdater = createStatisticsUpdater(updaterName, gravitinoUri, metalake); + if (enableMetrics) { + metricsUpdater = createMetricsUpdater(metricsUpdaterName, gravitinoUri, metalake); + } + updateStatistics( + spark, + statisticsUpdater, + metricsUpdater, + catalogName, + tableIdentifier, + targetFileSizeBytes); + } catch (Exception e) { + LOG.error("Failed to update Iceberg statistics", e); + System.exit(1); + } finally { + if (statisticsUpdater != null) { + try { + statisticsUpdater.close(); + } catch (Exception e) { + LOG.warn("Failed to close statistics updater", e); + } + } + if (metricsUpdater != null) { + try { + metricsUpdater.close(); + } catch (Exception e) { + LOG.warn("Failed to close metrics updater", e); + } + } + spark.stop(); + } + } + + static void updateStatistics( + SparkSession spark, + StatisticsUpdater statisticsUpdater, + String catalogName, + String tableIdentifier, + long targetFileSizeBytes) { + updateStatistics( + spark, statisticsUpdater, null, catalogName, tableIdentifier, targetFileSizeBytes); + } + + static void updateStatistics( + SparkSession spark, + StatisticsUpdater statisticsUpdater, + MetricsUpdater metricsUpdater, + String catalogName, + String tableIdentifier, + long targetFileSizeBytes) { + NameIdentifier gravitinoTableIdentifier = + toGravitinoTableIdentifier(catalogName, tableIdentifier); + long metricTimestamp = System.currentTimeMillis() / 1000L; + boolean partitioned = isPartitionedTable(spark, catalogName, tableIdentifier); + if (partitioned) { + String sql = buildPartitionStatsSql(catalogName, tableIdentifier, targetFileSizeBytes); + Row[] rows = (Row[]) spark.sql(sql).collect(); + Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics = new LinkedHashMap<>(); + List<MetricPoint> tableAndPartitionMetrics = new ArrayList<>(); + for (Row row : rows) { + PartitionPath partitionPath = toPartitionPath(row.getAs("partition")); + List<StatisticEntry<?>> statistics = toStatistics(row); + partitionStatistics.put(partitionPath, statistics); + if (metricsUpdater != null) { + tableAndPartitionMetrics.addAll( + toPartitionMetricPoints( + gravitinoTableIdentifier, partitionPath, statistics, metricTimestamp)); + } + } + statisticsUpdater.updatePartitionStatistics(gravitinoTableIdentifier, partitionStatistics); + if (metricsUpdater != null && !tableAndPartitionMetrics.isEmpty()) { + metricsUpdater.updateTableAndPartitionMetrics(tableAndPartitionMetrics); + } + LOG.info( + "Updated partition statistics for {} partitions on {}", + partitionStatistics.size(), + gravitinoTableIdentifier); + } else { + String sql = buildTableStatsSql(catalogName, tableIdentifier, targetFileSizeBytes); + Row[] rows = (Row[]) spark.sql(sql).collect(); + List<StatisticEntry<?>> tableStatistics = + rows.length == 0 ? List.of() : toStatistics(rows[0]); + statisticsUpdater.updateTableStatistics(gravitinoTableIdentifier, tableStatistics); + if (metricsUpdater != null && !tableStatistics.isEmpty()) { + metricsUpdater.updateTableAndPartitionMetrics( + toTableMetricPoints(gravitinoTableIdentifier, tableStatistics, metricTimestamp)); + } + LOG.info( + "Updated table statistics with {} metrics on {}", + tableStatistics.size(), + gravitinoTableIdentifier); + } + } + + static String buildTableStatsSql( + String catalogName, String tableIdentifier, long targetFileSizeBytes) { + String filesTable = buildFilesTableIdentifier(catalogName, tableIdentifier); + return "SELECT " + + "COUNT(*) AS file_count, " + + "SUM(CASE WHEN content = 0 THEN 1 ELSE 0 END) AS data_files, " + + "SUM(CASE WHEN content = 1 THEN 1 ELSE 0 END) AS position_delete_files, " + + "SUM(CASE WHEN content = 2 THEN 1 ELSE 0 END) AS equality_delete_files, " + + "SUM(CASE WHEN file_size_in_bytes < " + + targetFileSizeBytes + + " THEN 1 ELSE 0 END) AS small_files, " + + "AVG(POWER(" + + targetFileSizeBytes + + " - LEAST(" + + targetFileSizeBytes + + ", file_size_in_bytes), 2)) AS datafile_mse, " + + "AVG(file_size_in_bytes) AS avg_size, " + + "SUM(file_size_in_bytes) AS total_size " + + "FROM " + + filesTable; + } + + static String buildPartitionStatsSql( + String catalogName, String tableIdentifier, long targetFileSizeBytes) { + String filesTable = buildFilesTableIdentifier(catalogName, tableIdentifier); + return "SELECT " + + "partition, " + + "COUNT(*) AS file_count, " + + "SUM(CASE WHEN content = 0 THEN 1 ELSE 0 END) AS data_files, " + + "SUM(CASE WHEN content = 1 THEN 1 ELSE 0 END) AS position_delete_files, " + + "SUM(CASE WHEN content = 2 THEN 1 ELSE 0 END) AS equality_delete_files, " + + "SUM(CASE WHEN file_size_in_bytes < " + + targetFileSizeBytes + + " THEN 1 ELSE 0 END) AS small_files, " + + "AVG(POWER(" + + targetFileSizeBytes + + " - LEAST(" + + targetFileSizeBytes + + ", file_size_in_bytes), 2)) AS datafile_mse, " + + "AVG(file_size_in_bytes) AS avg_size, " + + "SUM(file_size_in_bytes) AS total_size " + + "FROM " + + filesTable + + " GROUP BY partition"; + } + + static boolean isPartitionedTable( + SparkSession spark, String catalogName, String tableIdentifier) { + StructType filesSchema = + spark.table(buildFilesTableIdentifier(catalogName, tableIdentifier)).schema(); + if (!Arrays.asList(filesSchema.fieldNames()).contains("partition")) { + return false; + } + StructField partitionField = filesSchema.apply("partition"); + if (!(partitionField.dataType() instanceof StructType)) { + return false; + } + return ((StructType) partitionField.dataType()).fields().length > 0; + } + + static List<StatisticEntry<?>> toStatistics(Row row) { + List<StatisticEntry<?>> statistics = new ArrayList<>(); + statistics.add( + new StatisticEntryImpl<>( + CUSTOM_STAT_PREFIX + "file_count", + StatisticValues.longValue(toLongValue(row, "file_count")))); + statistics.add( + new StatisticEntryImpl<>( + CUSTOM_STAT_PREFIX + "data_files", + StatisticValues.longValue(toLongValue(row, "data_files")))); + statistics.add( + new StatisticEntryImpl<>( + CUSTOM_STAT_PREFIX + "position_delete_files", + StatisticValues.longValue(toLongValue(row, "position_delete_files")))); + statistics.add( + new StatisticEntryImpl<>( + CUSTOM_STAT_PREFIX + "equality_delete_files", + StatisticValues.longValue(toLongValue(row, "equality_delete_files")))); + statistics.add( + new StatisticEntryImpl<>( + CUSTOM_STAT_PREFIX + "small_files", + StatisticValues.longValue(toLongValue(row, "small_files")))); + statistics.add( + new StatisticEntryImpl<>( + CUSTOM_STAT_PREFIX + "datafile_mse", + StatisticValues.doubleValue(toDoubleValue(row, "datafile_mse")))); + statistics.add( + new StatisticEntryImpl<>( + CUSTOM_STAT_PREFIX + "avg_size", + StatisticValues.doubleValue(toDoubleValue(row, "avg_size")))); + statistics.add( + new StatisticEntryImpl<>( + CUSTOM_STAT_PREFIX + "total_size", + StatisticValues.longValue(toLongValue(row, "total_size")))); + return statistics; + } + + static PartitionPath toPartitionPath(Row partitionRow) { + StructType partitionSchema = partitionRow.schema(); + List<PartitionEntry> entries = new ArrayList<>(partitionSchema.fields().length); + for (int i = 0; i < partitionSchema.fields().length; i++) { + String name = partitionSchema.fields()[i].name(); + Object value = partitionRow.get(i); + entries.add(new PartitionEntryImpl(name, String.valueOf(value))); + } + return PartitionPath.of(entries); + } + + static Map<String, String> parseArguments(String[] args) { + Map<String, String> argMap = new HashMap<>(); + for (int i = 0; i < args.length; i++) { + if (args[i].startsWith("--")) { + String key = args[i].substring(2); + if (i + 1 < args.length && !args[i + 1].startsWith("--")) { + String value = args[i + 1]; + if (value != null && !value.trim().isEmpty()) { + argMap.put(key, value); + } + i++; + } + } + } + return argMap; + } + + static Map<String, String> parseCustomSparkConfigs(String sparkConfJson) { + if (sparkConfJson == null || sparkConfJson.isEmpty()) { + return new HashMap<>(); + } + try { + ObjectMapper mapper = new ObjectMapper(); + Map<String, Object> parsedMap = + mapper.readValue(sparkConfJson, new TypeReference<Map<String, Object>>() {}); + Map<String, String> configs = new HashMap<>(); + for (Map.Entry<String, Object> entry : parsedMap.entrySet()) { + configs.put(entry.getKey(), entry.getValue() == null ? "" : entry.getValue().toString()); + } + return configs; + } catch (Exception e) { + throw new IllegalArgumentException( + "Failed to parse Spark configurations JSON: " + + sparkConfJson + + ". Error: " + + e.getMessage(), + e); + } + } + + static long parseTargetFileSize(String value) { + if (value == null || value.trim().isEmpty()) { + return DEFAULT_TARGET_FILE_SIZE_BYTES; + } + try { + long parsed = Long.parseLong(value.trim()); + if (parsed <= 0) { + throw new IllegalArgumentException("target-file-size-bytes must be > 0"); + } + return parsed; + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Invalid target-file-size-bytes: " + value, e); + } + } + + static boolean parseEnableMetrics(String value) { + if (value == null || value.trim().isEmpty()) { + return false; + } + if ("true".equalsIgnoreCase(value.trim())) { + return true; + } + if ("false".equalsIgnoreCase(value.trim())) { + return false; + } + throw new IllegalArgumentException("Invalid enable-metrics value: " + value); + } + + private static StatisticsUpdater createStatisticsUpdater( + String updaterName, String gravitinoUri, String metalake) { + StatisticsUpdater statisticsUpdater = + ProviderUtils.createStatisticsUpdaterInstance(updaterName); + Map<String, String> conf = new HashMap<>(); + conf.put(OptimizerConfig.GRAVITINO_URI, gravitinoUri); + conf.put(OptimizerConfig.GRAVITINO_METALAKE, metalake); + statisticsUpdater.initialize(new OptimizerEnv(new OptimizerConfig(conf))); + return statisticsUpdater; + } + + private static MetricsUpdater createMetricsUpdater( + String updaterName, String gravitinoUri, String metalake) { + MetricsUpdater metricsUpdater = ProviderUtils.createMetricsUpdaterInstance(updaterName); + Map<String, String> conf = new HashMap<>(); + conf.put(OptimizerConfig.GRAVITINO_URI, gravitinoUri); + conf.put(OptimizerConfig.GRAVITINO_METALAKE, metalake); + metricsUpdater.initialize(new OptimizerEnv(new OptimizerConfig(conf))); + return metricsUpdater; + } + + private static List<MetricPoint> toTableMetricPoints( + NameIdentifier tableIdentifier, + List<StatisticEntry<?>> tableStatistics, + long metricTimestamp) { + List<MetricPoint> requests = new ArrayList<>(tableStatistics.size()); + for (StatisticEntry<?> statistic : tableStatistics) { + requests.add( + MetricPoint.forTable( + tableIdentifier, statistic.name(), statistic.value(), metricTimestamp)); + } + return requests; + } + + private static List<MetricPoint> toPartitionMetricPoints( + NameIdentifier tableIdentifier, + PartitionPath partitionPath, + List<StatisticEntry<?>> statistics, + long metricTimestamp) { + List<MetricPoint> requests = new ArrayList<>(statistics.size()); + for (StatisticEntry<?> statistic : statistics) { + requests.add( + MetricPoint.forPartition( + tableIdentifier, + partitionPath, + statistic.name(), + statistic.value(), + metricTimestamp)); + } + return requests; + } + + private static NameIdentifier toGravitinoTableIdentifier( + String catalogName, String tableIdentifier) { + String[] levels = tableIdentifier.split("\\."); + if (levels.length != 2) { + throw new IllegalArgumentException( + "--table must use schema.table format, but got: " + tableIdentifier); + } + return NameIdentifier.of(catalogName, levels[0], levels[1]); + } + + private static String buildFilesTableIdentifier(String catalogName, String tableIdentifier) { + String[] levels = tableIdentifier.split("\\."); + if (levels.length != 2) { + throw new IllegalArgumentException( + "--table must use schema.table format, but got: " + tableIdentifier); + } + return escapeSqlIdentifier(catalogName) + + "." + + escapeSqlIdentifier(levels[0]) + + "." + + escapeSqlIdentifier(levels[1]) + + ".files"; + } + + private static String escapeSqlIdentifier(String identifier) { + return identifier.replace("`", "``"); + } + + private static long toLongValue(Row row, String fieldName) { + Number number = row.getAs(fieldName); + return number == null ? 0L : number.longValue(); + } + + private static double toDoubleValue(Row row, String fieldName) { + Number number = row.getAs(fieldName); + return number == null ? 0D : number.doubleValue(); + } + + private static List<String> buildArguments() { + return Arrays.asList( + "--catalog", + "{{catalog_name}}", + "--table", + "{{table_identifier}}", + "--gravitino-uri", + "{{gravitino_uri}}", + "--metalake", + "{{metalake}}", + "--target-file-size-bytes", + "{{target_file_size_bytes}}", + "--statistics-updater", + "{{statistics_updater}}", + "--enable-metrics", + "{{enable_metrics}}", + "--metrics-updater", + "{{metrics_updater}}", + "--spark-conf", + "{{spark_conf}}"); + } + + private static Map<String, String> buildSparkConfigs() { + Map<String, String> configs = new HashMap<>(); + configs.put("spark.master", "{{spark_master}}"); + configs.put("spark.executor.instances", "{{spark_executor_instances}}"); + configs.put("spark.executor.cores", "{{spark_executor_cores}}"); + configs.put("spark.executor.memory", "{{spark_executor_memory}}"); + configs.put("spark.driver.memory", "{{spark_driver_memory}}"); + configs.put("spark.sql.catalog.{{catalog_name}}", "org.apache.iceberg.spark.SparkCatalog"); + configs.put("spark.sql.catalog.{{catalog_name}}.type", "{{catalog_type}}"); + configs.put("spark.sql.catalog.{{catalog_name}}.uri", "{{catalog_uri}}"); + configs.put("spark.sql.catalog.{{catalog_name}}.warehouse", "{{warehouse_location}}"); + configs.put( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"); + return Collections.unmodifiableMap(configs); + } + + private static void printUsage() { + System.err.println( + "Usage: IcebergUpdateStatsJob [OPTIONS]\n" + + "\n" + + "Required Options:\n" + + " --catalog <name> Iceberg catalog name registered in Spark\n" + + " --table <identifier> Table name in schema.table format\n" + + " --gravitino-uri <uri> Gravitino server URI\n" + + " --metalake <metalake_name> Gravitino metalake name\n" + + "\n" + + "Optional Options:\n" + + " --target-file-size-bytes <bytes> Small-file threshold and MSE target\n" + + " Default: 100000\n" + + " --statistics-updater <name> StatisticsUpdater provider name\n" + + " Default: gravitino-statistics-updater\n" + + " --enable-metrics <true|false> Whether to persist metrics via MetricsUpdater\n" + + " Default: false\n" + + " --metrics-updater <name> MetricsUpdater provider name\n" + + " Default: gravitino-metrics-updater\n" + + " --spark-conf <json> JSON map of custom Spark configs\n" + + " Example: '{\"spark.sql.shuffle.partitions\":\"200\"}'"); + } +} diff --git a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java new file mode 100644 index 0000000000..b035ed4b74 --- /dev/null +++ b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.maintenance.jobs.iceberg; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Map; +import org.apache.gravitino.job.JobTemplateProvider; +import org.apache.gravitino.job.SparkJobTemplate; +import org.junit.jupiter.api.Test; + +public class TestIcebergUpdateStatsJob { + + @Test + public void testJobTemplateHasCorrectNameAndVersion() { + IcebergUpdateStatsJob job = new IcebergUpdateStatsJob(); + SparkJobTemplate template = job.jobTemplate(); + + assertNotNull(template); + assertEquals("builtin-iceberg-update-stats", template.name()); + assertTrue(template.name().matches(JobTemplateProvider.BUILTIN_NAME_PATTERN)); + assertEquals("v1", template.customFields().get(JobTemplateProvider.PROPERTY_VERSION_KEY)); + } + + @Test + public void testJobTemplateArguments() { + IcebergUpdateStatsJob job = new IcebergUpdateStatsJob(); + SparkJobTemplate template = job.jobTemplate(); + + assertNotNull(template.arguments()); + assertEquals(18, template.arguments().size()); + assertTrue(template.arguments().contains("--catalog")); + assertTrue(template.arguments().contains("{{catalog_name}}")); + assertTrue(template.arguments().contains("--table")); + assertTrue(template.arguments().contains("{{table_identifier}}")); + assertTrue(template.arguments().contains("--gravitino-uri")); + assertTrue(template.arguments().contains("{{gravitino_uri}}")); + assertTrue(template.arguments().contains("--metalake")); + assertTrue(template.arguments().contains("{{metalake}}")); + assertTrue(template.arguments().contains("--target-file-size-bytes")); + assertTrue(template.arguments().contains("{{target_file_size_bytes}}")); + assertTrue(template.arguments().contains("--statistics-updater")); + assertTrue(template.arguments().contains("{{statistics_updater}}")); + assertTrue(template.arguments().contains("--enable-metrics")); + assertTrue(template.arguments().contains("{{enable_metrics}}")); + assertTrue(template.arguments().contains("--metrics-updater")); + assertTrue(template.arguments().contains("{{metrics_updater}}")); + } + + @Test + public void testParseArguments() { + String[] args = { + "--catalog", "cat", + "--table", "db.tbl", + "--gravitino-uri", "http://localhost:8090", + "--metalake", "ml", + "--target-file-size-bytes", "2048", + "--enable-metrics", "true", + "--metrics-updater", "gravitino-metrics-updater" + }; + + Map<String, String> parsed = IcebergUpdateStatsJob.parseArguments(args); + assertEquals("cat", parsed.get("catalog")); + assertEquals("db.tbl", parsed.get("table")); + assertEquals("http://localhost:8090", parsed.get("gravitino-uri")); + assertEquals("ml", parsed.get("metalake")); + assertEquals("2048", parsed.get("target-file-size-bytes")); + assertEquals("true", parsed.get("enable-metrics")); + assertEquals("gravitino-metrics-updater", parsed.get("metrics-updater")); + } + + @Test + public void testBuildStatsSql() { + String tableSql = IcebergUpdateStatsJob.buildTableStatsSql("cat", "db.tbl", 100000L); + String partitionSql = IcebergUpdateStatsJob.buildPartitionStatsSql("cat", "db.tbl", 100000L); + + assertTrue(tableSql.contains("FROM cat.db.tbl.files")); + assertFalse(tableSql.contains("GROUP BY partition")); + assertTrue(tableSql.contains("AS datafile_mse")); + assertTrue(partitionSql.contains("FROM cat.db.tbl.files")); + assertTrue(partitionSql.contains("GROUP BY partition")); + assertTrue(partitionSql.startsWith("SELECT partition")); + } + + @Test + public void testParseTargetFileSize() { + assertEquals(100000L, IcebergUpdateStatsJob.parseTargetFileSize(null)); + assertEquals(100000L, IcebergUpdateStatsJob.parseTargetFileSize("")); + assertEquals(2048L, IcebergUpdateStatsJob.parseTargetFileSize("2048")); + assertThrows( + IllegalArgumentException.class, () -> IcebergUpdateStatsJob.parseTargetFileSize("-1")); + assertThrows( + IllegalArgumentException.class, () -> IcebergUpdateStatsJob.parseTargetFileSize("abc")); + } + + @Test + public void testParseEnableMetrics() { + assertFalse(IcebergUpdateStatsJob.parseEnableMetrics(null)); + assertFalse(IcebergUpdateStatsJob.parseEnableMetrics("")); + assertFalse(IcebergUpdateStatsJob.parseEnableMetrics("false")); + assertTrue(IcebergUpdateStatsJob.parseEnableMetrics("true")); + assertTrue(IcebergUpdateStatsJob.parseEnableMetrics("TRUE")); + assertThrows( + IllegalArgumentException.class, () -> IcebergUpdateStatsJob.parseEnableMetrics("yes")); + } +} diff --git a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java new file mode 100644 index 0000000000..8f51256ee0 --- /dev/null +++ b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.maintenance.jobs.iceberg; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.maintenance.optimizer.api.common.DataScope; +import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint; +import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath; +import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry; +import org.apache.gravitino.maintenance.optimizer.api.updater.MetricsUpdater; +import org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater; +import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** Integration tests for IcebergUpdateStatsJob with a real Spark+Iceberg runtime. */ +public class TestIcebergUpdateStatsJobWithSpark { + + @TempDir static File tempDir; + + private static SparkSession spark; + private static String catalogName; + + @BeforeAll + public static void setUp() { + String warehousePath = new File(tempDir, "warehouse").getAbsolutePath(); + catalogName = "test_catalog"; + + spark = + SparkSession.builder() + .appName("TestIcebergUpdateStatsJob") + .master("local[2]") + .config( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.sql.catalog." + catalogName, "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog." + catalogName + ".type", "hadoop") + .config("spark.sql.catalog." + catalogName + ".warehouse", warehousePath) + .getOrCreate(); + + spark.sql("CREATE NAMESPACE IF NOT EXISTS " + catalogName + ".db"); + spark.sql( + "CREATE TABLE IF NOT EXISTS " + + catalogName + + ".db.non_partitioned (id INT, name STRING) USING iceberg"); + spark.sql( + "INSERT INTO " + catalogName + ".db.non_partitioned VALUES (1, 'A'), (2, 'B'), (3, 'C')"); + + spark.sql( + "CREATE TABLE IF NOT EXISTS " + + catalogName + + ".db.partitioned (id INT, ds STRING) USING iceberg PARTITIONED BY (ds)"); + spark.sql( + "INSERT INTO " + + catalogName + + ".db.partitioned VALUES " + + "(1, '2026-01-01'), (2, '2026-01-01'), (3, '2026-01-02')"); + + spark.sql( + "CREATE TABLE IF NOT EXISTS " + + catalogName + + ".db.multi_partitioned (id INT, event_ts TIMESTAMP, region STRING) " + + "USING iceberg PARTITIONED BY (days(event_ts), region)"); + spark.sql( + "INSERT INTO " + + catalogName + + ".db.multi_partitioned VALUES " + + "(1, TIMESTAMP '2026-01-01 09:00:00', 'ap-south'), " + + "(2, TIMESTAMP '2026-01-01 11:00:00', 'us-east'), " + + "(3, TIMESTAMP '2026-01-02 08:30:00', 'ap-south'), " + + "(4, TIMESTAMP '2026-01-02 12:45:00', 'us-east')"); + } + + @AfterAll + public static void tearDown() { + if (spark != null) { + spark.sql("DROP TABLE IF EXISTS " + catalogName + ".db.non_partitioned"); + spark.sql("DROP TABLE IF EXISTS " + catalogName + ".db.partitioned"); + spark.sql("DROP TABLE IF EXISTS " + catalogName + ".db.multi_partitioned"); + spark.sql("DROP NAMESPACE IF EXISTS " + catalogName + ".db"); + spark.stop(); + } + } + + @Test + public void testUpdateNonPartitionedTableStatistics() { + RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater(); + + IcebergUpdateStatsJob.updateStatistics( + spark, updater, catalogName, "db.non_partitioned", 100_000L); + + assertEquals(NameIdentifier.of(catalogName, "db", "non_partitioned"), updater.tableIdentifier); + assertNotNull(updater.tableStatistics); + assertEquals(8, updater.tableStatistics.size()); + assertTrue(updater.partitionStatistics.isEmpty()); + + Map<String, Object> stats = + updater.tableStatistics.stream() + .collect(Collectors.toMap(StatisticEntry::name, stat -> stat.value().value())); + assertTrue((Long) stats.get("custom-file_count") > 0L); + assertTrue((Double) stats.get("custom-datafile_mse") >= 0D); + assertTrue((Long) stats.get("custom-total_size") > 0L); + } + + @Test + public void testUpdatePartitionedTableStatistics() { + RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater(); + + IcebergUpdateStatsJob.updateStatistics(spark, updater, catalogName, "db.partitioned", 100_000L); + + assertEquals(NameIdentifier.of(catalogName, "db", "partitioned"), updater.tableIdentifier); + assertTrue(updater.tableStatistics.isEmpty()); + assertFalse(updater.partitionStatistics.isEmpty()); + assertEquals(2, updater.partitionStatistics.size()); + + updater.partitionStatistics.forEach( + (partitionPath, statistics) -> { + assertEquals(1, partitionPath.entries().size()); + assertEquals("ds", partitionPath.entries().get(0).partitionName()); + Map<String, Object> statMap = + statistics.stream() + .collect(Collectors.toMap(StatisticEntry::name, stat -> stat.value().value())); + assertTrue(statMap.containsKey("custom-datafile_mse")); + assertTrue((Long) statMap.get("custom-file_count") > 0L); + }); + } + + @Test + public void testUpdatePartitionedTableStatisticsWithMetrics() { + RecordingStatisticsUpdater statisticsUpdater = new RecordingStatisticsUpdater(); + RecordingMetricsUpdater metricsUpdater = new RecordingMetricsUpdater(); + + IcebergUpdateStatsJob.updateStatistics( + spark, statisticsUpdater, metricsUpdater, catalogName, "db.partitioned", 100_000L); + + assertEquals( + NameIdentifier.of(catalogName, "db", "partitioned"), statisticsUpdater.tableIdentifier); + assertFalse(statisticsUpdater.partitionStatistics.isEmpty()); + assertEquals(2, statisticsUpdater.partitionStatistics.size()); + + assertEquals(16, metricsUpdater.tableMetrics.size()); + assertTrue( + metricsUpdater.tableMetrics.stream() + .allMatch(metric -> metric.scope() == DataScope.Type.PARTITION)); + assertTrue( + metricsUpdater.tableMetrics.stream() + .allMatch(metric -> metric.partitionPath().isPresent())); + assertTrue( + metricsUpdater.tableMetrics.stream() + .allMatch(metric -> metric.value() != null && metric.value().value() != null)); + assertTrue(metricsUpdater.jobMetrics.isEmpty()); + } + + @Test + public void testUpdateMultiLevelPartitionedTableStatistics() { + RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater(); + + IcebergUpdateStatsJob.updateStatistics( + spark, updater, catalogName, "db.multi_partitioned", 100_000L); + + assertEquals( + NameIdentifier.of(catalogName, "db", "multi_partitioned"), updater.tableIdentifier); + assertTrue(updater.tableStatistics.isEmpty()); + assertFalse(updater.partitionStatistics.isEmpty()); + assertEquals(4, updater.partitionStatistics.size()); + + Set<String> parsedPartitions = + updater.partitionStatistics.keySet().stream() + .map( + partitionPath -> + partitionPath.entries().get(0).partitionName() + + "=" + + partitionPath.entries().get(0).partitionValue() + + "," + + partitionPath.entries().get(1).partitionName() + + "=" + + partitionPath.entries().get(1).partitionValue()) + .collect(Collectors.toSet()); + + assertEquals( + Set.of( + "event_ts_day=2026-01-01,region=ap-south", + "event_ts_day=2026-01-01,region=us-east", + "event_ts_day=2026-01-02,region=ap-south", + "event_ts_day=2026-01-02,region=us-east"), + parsedPartitions); + } + + private static final class RecordingStatisticsUpdater implements StatisticsUpdater { + private NameIdentifier tableIdentifier; + private List<StatisticEntry<?>> tableStatistics = List.of(); + private Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics = Map.of(); + + @Override + public String name() { + return "recording-updater"; + } + + @Override + public void initialize(OptimizerEnv optimizerEnv) {} + + @Override + public void updateTableStatistics( + NameIdentifier tableIdentifier, List<StatisticEntry<?>> tableStatistics) { + this.tableIdentifier = tableIdentifier; + this.tableStatistics = tableStatistics; + } + + @Override + public void updatePartitionStatistics( + NameIdentifier tableIdentifier, + Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics) { + this.tableIdentifier = tableIdentifier; + this.partitionStatistics = partitionStatistics; + } + + @Override + public void close() {} + } + + private static final class RecordingMetricsUpdater implements MetricsUpdater { + private List<MetricPoint> tableMetrics = List.of(); + private List<MetricPoint> jobMetrics = List.of(); + + @Override + public String name() { + return "recording-metrics-updater"; + } + + @Override + public void initialize(OptimizerEnv optimizerEnv) {} + + @Override + public void updateTableAndPartitionMetrics(List<MetricPoint> metrics) { + this.tableMetrics = metrics; + } + + @Override + public void updateJobMetrics(List<MetricPoint> metrics) { + this.jobMetrics = metrics; + } + + @Override + public void close() {} + } +} diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/MySQLMetricsDialect.java b/maintenance/optimizer-api/build.gradle.kts similarity index 57% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/MySQLMetricsDialect.java rename to maintenance/optimizer-api/build.gradle.kts index 7bdb502bc3..436fa18ab3 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/MySQLMetricsDialect.java +++ b/maintenance/optimizer-api/build.gradle.kts @@ -16,14 +16,31 @@ * specific language governing permissions and limitations * under the License. */ +description = "Gravitino Optimizer API" -package org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc; +plugins { + `maven-publish` + id("java") + id("idea") +} + +dependencies { + implementation(project(":api")) + implementation(project(":common")) + implementation(project(":clients:client-java")) + implementation(libs.guava) + implementation(libs.commons.lang3) + implementation(libs.jackson.databind) -/** MySQL dialect marker for JDBC metrics storage. */ -public class MySQLMetricsDialect implements JdbcMetricsDialect { + annotationProcessor(libs.lombok) + compileOnly(libs.lombok) + testAnnotationProcessor(libs.lombok) + testCompileOnly(libs.lombok) + + testImplementation(libs.junit.jupiter.api) + testRuntimeOnly(libs.junit.jupiter.engine) +} - @Override - public String name() { - return "mysql"; - } +tasks.test { + useJUnitPlatform() } diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/DataScope.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/DataScope.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/DataScope.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/DataScope.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricPoint.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricPoint.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricPoint.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricPoint.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricSample.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricSample.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricSample.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/MetricSample.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionEntry.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionEntry.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionEntry.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionEntry.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionStrategy.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionStrategy.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionStrategy.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionStrategy.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Provider.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Provider.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Provider.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Provider.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/StatisticEntry.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/StatisticEntry.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/StatisticEntry.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/StatisticEntry.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/Strategy.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsProvider.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsProvider.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsProvider.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsProvider.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MonitorCallback.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MonitorCallback.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MonitorCallback.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MonitorCallback.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/TableJobRelationProvider.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/TableJobRelationProvider.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/TableJobRelationProvider.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/TableJobRelationProvider.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobExecutionContext.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobSubmitter.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobSubmitter.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobSubmitter.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/JobSubmitter.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StatisticsProvider.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StatisticsProvider.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StatisticsProvider.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StatisticsProvider.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyEvaluation.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyProvider.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyProvider.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyProvider.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyProvider.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/SupportTableStatistics.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/SupportTableStatistics.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/SupportTableStatistics.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/SupportTableStatistics.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/TableMetadataProvider.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/TableMetadataProvider.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/TableMetadataProvider.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/TableMetadataProvider.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/MetricsUpdater.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/MetricsUpdater.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/MetricsUpdater.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/MetricsUpdater.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsCalculator.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsCalculator.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsCalculator.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsCalculator.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsUpdater.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsUpdater.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsUpdater.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/StatisticsUpdater.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobMetrics.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobMetrics.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobMetrics.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobMetrics.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobStatistics.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobStatistics.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobStatistics.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkJobStatistics.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableMetrics.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableMetrics.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableMetrics.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableMetrics.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableStatistics.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableStatistics.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableStatistics.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateBulkTableStatistics.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobMetrics.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobMetrics.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobMetrics.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobMetrics.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobStatistics.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobStatistics.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobStatistics.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateJobStatistics.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableMetrics.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableMetrics.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableMetrics.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableMetrics.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableStatistics.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableStatistics.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableStatistics.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/updater/SupportsCalculateTableStatistics.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerEnv.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerEnv.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerEnv.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerEnv.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionEntryImpl.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionEntryImpl.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionEntryImpl.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/PartitionEntryImpl.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/StatisticEntryImpl.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/StatisticEntryImpl.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/StatisticEntryImpl.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/StatisticEntryImpl.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java similarity index 84% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java index 5d64378ff2..f915f2cb07 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java +++ b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java @@ -26,15 +26,6 @@ import org.apache.gravitino.Config; import org.apache.gravitino.config.ConfigBuilder; import org.apache.gravitino.config.ConfigConstants; import org.apache.gravitino.config.ConfigEntry; -import org.apache.gravitino.maintenance.optimizer.monitor.evaluator.GravitinoMetricsEvaluator; -import org.apache.gravitino.maintenance.optimizer.monitor.job.dummy.DummyTableJobRelationProvider; -import org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider; -import org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter; -import org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider; -import org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategyProvider; -import org.apache.gravitino.maintenance.optimizer.recommender.table.GravitinoTableMetadataProvider; -import org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater; -import org.apache.gravitino.maintenance.optimizer.updater.statistics.GravitinoStatisticsUpdater; /** * Central configuration holder for the optimizer runtime. Keys are grouped under the {@code @@ -67,64 +58,74 @@ public class OptimizerConfig extends Config { MONITOR_PREFIX + "tableJobRelationProvider"; private static final String METRICS_EVALUATOR = MONITOR_PREFIX + "metricsEvaluator"; private static final String MONITOR_CALLBACKS = MONITOR_PREFIX + "callbacks"; + private static final String DEFAULT_STATISTICS_PROVIDER = "gravitino-statistics-provider"; + private static final String DEFAULT_STRATEGY_PROVIDER = "gravitino-strategy-provider"; + private static final String DEFAULT_TABLE_META_PROVIDER = "gravitino-table-metadata-provider"; + private static final String DEFAULT_JOB_SUBMITTER = "noop-job-submitter"; + private static final String DEFAULT_STATISTICS_UPDATER = "gravitino-statistics-updater"; + private static final String DEFAULT_METRICS_UPDATER = "gravitino-metrics-updater"; + private static final String DEFAULT_METRICS_PROVIDER = "gravitino-metrics-provider"; + private static final String DEFAULT_TABLE_JOB_RELATION_PROVIDER = + "dummy-table-job-relation-provider"; + private static final String DEFAULT_METRICS_EVALUATOR = "gravitino-metrics-evaluator"; public static final ConfigEntry<String> STATISTICS_PROVIDER_CONFIG = new ConfigBuilder(STATISTICS_PROVIDER) .doc( "Statistics provider implementation name (matches Provider.name()) discoverable via " + "ServiceLoader. Example: '" - + GravitinoStatisticsProvider.NAME + + DEFAULT_STATISTICS_PROVIDER + "'.") .version(ConfigConstants.VERSION_1_2_0) .stringConf() - .createWithDefault(GravitinoStatisticsProvider.NAME); + .createWithDefault(DEFAULT_STATISTICS_PROVIDER); public static final ConfigEntry<String> STRATEGY_PROVIDER_CONFIG = new ConfigBuilder(STRATEGY_PROVIDER) .doc( "Strategy provider implementation name (matches Provider.name()) discoverable via " + "ServiceLoader. Example: '" - + GravitinoStrategyProvider.NAME + + DEFAULT_STRATEGY_PROVIDER + "'.") .version(ConfigConstants.VERSION_1_2_0) .stringConf() - .createWithDefault(GravitinoStrategyProvider.NAME); + .createWithDefault(DEFAULT_STRATEGY_PROVIDER); public static final ConfigEntry<String> TABLE_META_PROVIDER_CONFIG = new ConfigBuilder(TABLE_META_PROVIDER) .doc( "Table metadata provider implementation name (matches Provider.name()) discoverable " + "via ServiceLoader. Example: '" - + GravitinoTableMetadataProvider.NAME + + DEFAULT_TABLE_META_PROVIDER + "'.") .version(ConfigConstants.VERSION_1_2_0) .stringConf() - .createWithDefault(GravitinoTableMetadataProvider.NAME); + .createWithDefault(DEFAULT_TABLE_META_PROVIDER); public static final ConfigEntry<String> JOB_SUBMITTER_CONFIG = new ConfigBuilder(JOB_SUBMITTER) .doc( "Job submitter implementation name (matches Provider.name()) discoverable via " + "ServiceLoader. Example: '" - + NoopJobSubmitter.NAME + + DEFAULT_JOB_SUBMITTER + "'.") .version(ConfigConstants.VERSION_1_2_0) .stringConf() - .createWithDefault(NoopJobSubmitter.NAME); + .createWithDefault(DEFAULT_JOB_SUBMITTER); public static final ConfigEntry<String> STATISTICS_UPDATER_CONFIG = new ConfigBuilder(STATISTICS_UPDATER) .doc("The statistics updater implementation name (matches Provider.name()).") .version(ConfigConstants.VERSION_1_2_0) .stringConf() - .createWithDefault(GravitinoStatisticsUpdater.NAME); + .createWithDefault(DEFAULT_STATISTICS_UPDATER); public static final ConfigEntry<String> METRICS_UPDATER_CONFIG = new ConfigBuilder(METRICS_UPDATER) .doc("The metrics updater implementation name (matches Provider.name()).") .version(ConfigConstants.VERSION_1_2_0) .stringConf() - .createWithDefault(GravitinoMetricsUpdater.NAME); + .createWithDefault(DEFAULT_METRICS_UPDATER); public static final ConfigEntry<String> METRICS_PROVIDER_CONFIG = new ConfigBuilder(METRICS_PROVIDER) @@ -133,7 +134,7 @@ public class OptimizerConfig extends Config { + "discoverable via ServiceLoader. Example: 'metrics-provider'.") .version(ConfigConstants.VERSION_1_2_0) .stringConf() - .createWithDefault(GravitinoMetricsProvider.NAME); + .createWithDefault(DEFAULT_METRICS_PROVIDER); public static final ConfigEntry<String> TABLE_JOB_RELATION_PROVIDER_CONFIG = new ConfigBuilder(TABLE_JOB_RELATION_PROVIDER) @@ -142,7 +143,7 @@ public class OptimizerConfig extends Config { + "discoverable via ServiceLoader. Example: 'table-job-relation-provider'.") .version(ConfigConstants.VERSION_1_2_0) .stringConf() - .createWithDefault(DummyTableJobRelationProvider.NAME); + .createWithDefault(DEFAULT_TABLE_JOB_RELATION_PROVIDER); public static final ConfigEntry<String> METRICS_EVALUATOR_CONFIG = new ConfigBuilder(METRICS_EVALUATOR) @@ -152,7 +153,7 @@ public class OptimizerConfig extends Config { + "'metrics-evaluator'.") .version(ConfigConstants.VERSION_1_2_0) .stringConf() - .createWithDefault(GravitinoMetricsEvaluator.NAME); + .createWithDefault(DEFAULT_METRICS_EVALUATOR); public static final ConfigEntry<List<String>> MONITOR_CALLBACKS_CONFIG = new ConfigBuilder(MONITOR_CALLBACKS) diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/GravitinoClientUtils.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/GravitinoClientUtils.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/GravitinoClientUtils.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/GravitinoClientUtils.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/PartitionPathSerdeUtils.java similarity index 64% copy from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java copy to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/PartitionPathSerdeUtils.java index 392e7b1f5a..31d9f1c1ca 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java +++ b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/PartitionPathSerdeUtils.java @@ -17,9 +17,9 @@ * under the License. */ -package org.apache.gravitino.maintenance.optimizer.recommender.util; +package org.apache.gravitino.maintenance.optimizer.common.util; -import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.LinkedHashMap; @@ -31,23 +31,12 @@ import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry; import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath; import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl; -/** Helpers for converting between Gravitino partition names and {@link PartitionPath}. */ -public class PartitionUtils { - private static final TypeReference<List<Map<String, String>>> PARTITION_PATH_TYPE = - new TypeReference<List<Map<String, String>>>() {}; +/** Shared serde helpers for converting between partition path and JSON string. */ +public final class PartitionPathSerdeUtils { + private PartitionPathSerdeUtils() {} - private PartitionUtils() {} - - /** - * Encodes a {@link PartitionPath} into a JSON string. - * - * <p>For example, a path with entries {@code p1=v1, p2=v2} is encoded as {@code [{"p1":"v1"}, - * {"p2":"v2"}]}. - * - * @param partitionPath partition path - * @return encoded JSON string - */ - public static String encodePartitionPath(PartitionPath partitionPath) { + /** Encodes partition path into JSON string, such as [{"p1":"v1"},{"p2":"v2"}]. */ + public static String encode(PartitionPath partitionPath) { Preconditions.checkArgument(partitionPath != null, "partitionPath must not be null"); List<PartitionEntry> entries = partitionPath.entries(); Preconditions.checkArgument(entries != null && !entries.isEmpty(), "partitionPath is empty"); @@ -70,32 +59,28 @@ public class PartitionUtils { } } - /** - * Decodes a JSON-encoded partition path into a {@link PartitionPath}. - * - * <p>Example format: {@code [{"p1":"v1"},{"p2":"v2"}]}. - * - * @param encodedPartitionPath JSON string representing the partition path - * @return parsed partition path - */ - public static PartitionPath decodePartitionPath(String encodedPartitionPath) { + /** Decodes partition path from JSON string, such as [{"p1":"v1"},{"p2":"v2"}]. */ + public static PartitionPath decode(String encodedPartitionPath) { Preconditions.checkArgument( StringUtils.isNotBlank(encodedPartitionPath), "encodedPartitionPath must not be blank"); - List<Map<String, String>> decoded; + JsonNode decoded; try { - decoded = JsonUtils.objectMapper().readValue(encodedPartitionPath, PARTITION_PATH_TYPE); + decoded = JsonUtils.objectMapper().readTree(encodedPartitionPath); } catch (Exception e) { throw new IllegalArgumentException("Failed to decode partition path", e); } - Preconditions.checkArgument(decoded != null && !decoded.isEmpty(), "partitionPath is empty"); + Preconditions.checkArgument( + decoded != null && decoded.isArray() && !decoded.isEmpty(), "partitionPath is empty"); List<PartitionEntry> entries = new ArrayList<>(decoded.size()); - for (Map<String, String> item : decoded) { + for (JsonNode item : decoded) { Preconditions.checkArgument( - item != null && item.size() == 1, "partition entry must contain one key/value pair"); - Map.Entry<String, String> kv = item.entrySet().iterator().next(); + item != null && item.isObject() && item.size() == 1, + "partition entry must contain one key/value pair"); + Map.Entry<String, JsonNode> kv = item.fields().next(); String name = kv.getKey(); - String value = kv.getValue(); + JsonNode valueNode = kv.getValue(); + String value = valueNode == null || valueNode.isNull() ? null : valueNode.asText(); Preconditions.checkArgument(StringUtils.isNotBlank(name), "partitionName cannot be blank"); Preconditions.checkArgument(StringUtils.isNotBlank(value), "partitionValue cannot be blank"); entries.add(new PartitionEntryImpl(name, value)); diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java diff --git a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/StatisticValueSerdeUtils.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/StatisticValueSerdeUtils.java new file mode 100644 index 0000000000..3ce0cdda7c --- /dev/null +++ b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/StatisticValueSerdeUtils.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.maintenance.optimizer.common.util; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Preconditions; +import org.apache.gravitino.json.JsonUtils; +import org.apache.gravitino.stats.StatisticValue; + +/** Shared serde helpers for {@link StatisticValue}. */ +public final class StatisticValueSerdeUtils { + + private StatisticValueSerdeUtils() {} + + public static String toString(StatisticValue<?> value) { + Preconditions.checkArgument(value != null, "StatisticValue cannot be null"); + try { + return JsonUtils.anyFieldMapper().writeValueAsString(value); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException("Failed to serialize StatisticValue: " + value, e); + } + } + + public static StatisticValue<?> fromString(String valueStr) { + Preconditions.checkArgument(valueStr != null, "StatisticValue string cannot be null"); + try { + return JsonUtils.anyFieldMapper().readValue(valueStr, StatisticValue.class); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException( + "Failed to deserialize StatisticValue from: " + valueStr, e); + } + } +} diff --git a/maintenance/optimizer-api/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestPartitionPathSerdeUtils.java b/maintenance/optimizer-api/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestPartitionPathSerdeUtils.java new file mode 100644 index 0000000000..da7749b8c8 --- /dev/null +++ b/maintenance/optimizer-api/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestPartitionPathSerdeUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.maintenance.optimizer.common.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.List; +import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath; +import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl; +import org.junit.jupiter.api.Test; + +public class TestPartitionPathSerdeUtils { + + @Test + public void testEncodeAndDecodePartitionPath() { + PartitionPath path = + PartitionPath.of( + List.of( + new PartitionEntryImpl("dt", "2026-01-01"), new PartitionEntryImpl("hh", "08"))); + + String encoded = PartitionPathSerdeUtils.encode(path); + PartitionPath decoded = PartitionPathSerdeUtils.decode(encoded); + + assertEquals(path, decoded); + } + + @Test + public void testDecodeInvalidPath() { + assertThrows(IllegalArgumentException.class, () -> PartitionPathSerdeUtils.decode("")); + assertThrows( + IllegalArgumentException.class, () -> PartitionPathSerdeUtils.decode("{\"dt\":\"2026\"}")); + } +} diff --git a/maintenance/optimizer/build.gradle.kts b/maintenance/optimizer/build.gradle.kts index df1c4473cd..87478a2a71 100644 --- a/maintenance/optimizer/build.gradle.kts +++ b/maintenance/optimizer/build.gradle.kts @@ -32,6 +32,8 @@ val icebergVersion: String = libs.versions.iceberg4connector.get() dependencies { implementation(project(":api")) + implementation(project(":maintenance:optimizer-api")) + implementation(project(":maintenance:gravitino-updaters")) implementation(project(":catalogs:catalog-common")) implementation(project(":clients:client-java")) implementation(project(":core")) { diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java index 392e7b1f5a..39d5cce1d8 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java +++ b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/recommender/util/PartitionUtils.java @@ -19,23 +19,11 @@ package org.apache.gravitino.maintenance.optimizer.recommender.util; -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.base.Preconditions; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import org.apache.commons.lang3.StringUtils; -import org.apache.gravitino.json.JsonUtils; -import org.apache.gravitino.maintenance.optimizer.api.common.PartitionEntry; import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath; -import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl; +import org.apache.gravitino.maintenance.optimizer.common.util.PartitionPathSerdeUtils; /** Helpers for converting between Gravitino partition names and {@link PartitionPath}. */ public class PartitionUtils { - private static final TypeReference<List<Map<String, String>>> PARTITION_PATH_TYPE = - new TypeReference<List<Map<String, String>>>() {}; - private PartitionUtils() {} /** @@ -48,26 +36,7 @@ public class PartitionUtils { * @return encoded JSON string */ public static String encodePartitionPath(PartitionPath partitionPath) { - Preconditions.checkArgument(partitionPath != null, "partitionPath must not be null"); - List<PartitionEntry> entries = partitionPath.entries(); - Preconditions.checkArgument(entries != null && !entries.isEmpty(), "partitionPath is empty"); - - List<Map<String, String>> encoded = new ArrayList<>(entries.size()); - for (PartitionEntry entry : entries) { - String name = entry.partitionName(); - String value = entry.partitionValue(); - Preconditions.checkArgument(StringUtils.isNotBlank(name), "partitionName cannot be blank"); - Preconditions.checkArgument(StringUtils.isNotBlank(value), "partitionValue cannot be blank"); - Map<String, String> item = new LinkedHashMap<>(1); - item.put(name, value); - encoded.add(item); - } - - try { - return JsonUtils.objectMapper().writeValueAsString(encoded); - } catch (Exception e) { - throw new IllegalArgumentException("Failed to encode partition path", e); - } + return PartitionPathSerdeUtils.encode(partitionPath); } /** @@ -79,28 +48,6 @@ public class PartitionUtils { * @return parsed partition path */ public static PartitionPath decodePartitionPath(String encodedPartitionPath) { - Preconditions.checkArgument( - StringUtils.isNotBlank(encodedPartitionPath), "encodedPartitionPath must not be blank"); - List<Map<String, String>> decoded; - try { - decoded = JsonUtils.objectMapper().readValue(encodedPartitionPath, PARTITION_PATH_TYPE); - } catch (Exception e) { - throw new IllegalArgumentException("Failed to decode partition path", e); - } - Preconditions.checkArgument(decoded != null && !decoded.isEmpty(), "partitionPath is empty"); - - List<PartitionEntry> entries = new ArrayList<>(decoded.size()); - for (Map<String, String> item : decoded) { - Preconditions.checkArgument( - item != null && item.size() == 1, "partition entry must contain one key/value pair"); - Map.Entry<String, String> kv = item.entrySet().iterator().next(); - String name = kv.getKey(); - String value = kv.getValue(); - Preconditions.checkArgument(StringUtils.isNotBlank(name), "partitionName cannot be blank"); - Preconditions.checkArgument(StringUtils.isNotBlank(value), "partitionValue cannot be blank"); - entries.add(new PartitionEntryImpl(name, value)); - } - - return PartitionPath.of(entries); + return PartitionPathSerdeUtils.decode(encodedPartitionPath); } } diff --git a/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider b/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider index c059ab3483..bd0237a754 100644 --- a/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider +++ b/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider @@ -21,8 +21,6 @@ org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStati org.apache.gravitino.maintenance.optimizer.recommender.table.GravitinoTableMetadataProvider org.apache.gravitino.maintenance.optimizer.recommender.job.GravitinoJobSubmitter org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter -org.apache.gravitino.maintenance.optimizer.updater.statistics.GravitinoStatisticsUpdater -org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider org.apache.gravitino.maintenance.optimizer.monitor.job.dummy.DummyTableJobRelationProvider org.apache.gravitino.maintenance.optimizer.monitor.job.local.LocalTableJobRelationProvider diff --git a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java index 83062a773b..bc6285c41b 100644 --- a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java +++ b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.client.GravitinoAdminClient; +import org.apache.gravitino.client.GravitinoClient; import org.apache.gravitino.client.GravitinoMetalake; import org.apache.gravitino.dto.rel.ColumnDTO; import org.apache.gravitino.exceptions.NoSuchMetalakeException; @@ -37,9 +38,11 @@ import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl; import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig; import org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionJobContext; import org.apache.gravitino.rel.Column; +import org.apache.gravitino.rel.Table; import org.apache.gravitino.rel.expressions.transforms.Transform; import org.apache.gravitino.rel.expressions.transforms.Transforms; import org.apache.gravitino.rel.types.Types; +import org.apache.gravitino.stats.Statistic; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.awaitility.Awaitility; @@ -55,6 +58,7 @@ public class TestBuiltinIcebergRewriteDataFiles { private static final String METALAKE_NAME = "test"; private static final String ICEBERG_REST_URI = "http://localhost:9001/iceberg"; private static final String JOB_TEMPLATE_NAME = "builtin-iceberg-rewrite-data-files"; + private static final String UPDATE_STATS_JOB_TEMPLATE_NAME = "builtin-iceberg-update-stats"; private static final String SPARK_CATALOG_NAME = "rest_catalog"; private static final String WAREHOUSE_LOCATION = ""; @@ -210,6 +214,38 @@ public class TestBuiltinIcebergRewriteDataFiles { }); } + @Test + void testSubmitBuiltinIcebergUpdateStatsJobAndPersistStatistics() throws Exception { + String tableName = "update_stats_table"; + String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName; + runWithSparkAndMetalake( + (spark, metalake) -> { + createTableAndInsertData(spark, fullTableName); + + Map<String, String> jobConf = buildUpdateStatsJobConfig(tableName); + submitJob(metalake, UPDATE_STATS_JOB_TEMPLATE_NAME, jobConf); + + try (GravitinoClient client = + GravitinoClient.builder(SERVER_URI).withMetalake(METALAKE_NAME).build()) { + Table table = + client + .loadCatalog(SPARK_CATALOG_NAME) + .asTableCatalog() + .loadTable(NameIdentifier.of("db", tableName)); + List<Statistic> stats = table.supportsStatistics().listStatistics(); + Assertions.assertFalse(stats.isEmpty(), "Expected table statistics to be updated"); + + Map<String, Statistic> statsMap = new HashMap<>(); + for (Statistic stat : stats) { + statsMap.put(stat.name(), stat); + } + Assertions.assertTrue(statsMap.containsKey("custom-file_count")); + Assertions.assertTrue(statsMap.containsKey("custom-datafile_mse")); + Assertions.assertTrue(statsMap.containsKey("custom-total_size")); + } + }); + } + private static GravitinoMetalake loadOrCreateMetalake( GravitinoAdminClient client, String metalakeName) { try { @@ -235,7 +271,12 @@ public class TestBuiltinIcebergRewriteDataFiles { } private static void submitCompactionJob(GravitinoMetalake metalake, Map<String, String> jobConf) { - JobHandle jobHandle = metalake.runJob(JOB_TEMPLATE_NAME, jobConf); + submitJob(metalake, JOB_TEMPLATE_NAME, jobConf); + } + + private static void submitJob( + GravitinoMetalake metalake, String jobTemplateName, Map<String, String> jobConf) { + JobHandle jobHandle = metalake.runJob(jobTemplateName, jobConf); System.out.println("Submitted job id: " + jobHandle.jobId()); Assertions.assertTrue(StringUtils.isNotBlank(jobHandle.jobId()), "Job id should not be blank"); @@ -265,6 +306,17 @@ public class TestBuiltinIcebergRewriteDataFiles { return jobConf; } + private static Map<String, String> buildUpdateStatsJobConfig(String tableName) { + Map<String, String> jobConf = new HashMap<>(); + jobConf.put("table_identifier", "db." + tableName); + jobConf.put("gravitino_uri", SERVER_URI); + jobConf.put("metalake", METALAKE_NAME); + jobConf.put("target_file_size_bytes", "100000"); + jobConf.put("statistics_updater", "gravitino-statistics-updater"); + jobConf.putAll(createOptimizerConfig().jobSubmitterConfigs()); + return jobConf; + } + private static Map<String, String> buildCompactionJobConfig( OptimizerConfig optimizerConfig, String tableName, diff --git a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java new file mode 100644 index 0000000000..003bb16a0f --- /dev/null +++ b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.maintenance.optimizer.recommender.job; + +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.client.GravitinoAdminClient; +import org.apache.gravitino.client.GravitinoClient; +import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.exceptions.NoSuchMetalakeException; +import org.apache.gravitino.job.JobHandle; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.stats.Statistic; +import org.apache.spark.sql.SparkSession; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; + +/** Environment IT for triggering built-in Iceberg update stats job through the client API. */ +@EnabledIfEnvironmentVariable(named = "GRAVITINO_ENV_IT", matches = "true") +public class TestBuiltinIcebergUpdateStatsJob { + + private static final String SERVER_URI = "http://localhost:8090"; + private static final String ICEBERG_REST_URI = "http://localhost:9001/iceberg"; + private static final String JOB_TEMPLATE_NAME = "builtin-iceberg-update-stats"; + private static final String SPARK_CATALOG_NAME = "rest_catalog"; + private static final String WAREHOUSE_LOCATION = ""; + private static final String METALAKE_NAME = "test"; + + @Test + void testRunBuiltinUpdateStatsJobAndPersistStatistics() throws Exception { + String tableName = "update_stats_table_" + UUID.randomUUID().toString().replace("-", ""); + String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName; + + SparkSession spark = createSparkSession(); + try (GravitinoAdminClient adminClient = GravitinoAdminClient.builder(SERVER_URI).build()) { + GravitinoMetalake metalake = loadOrCreateMetalake(adminClient, METALAKE_NAME); + recreateRestCatalog(metalake); + createTableAndInsertData(spark, fullTableName); + submitJob(metalake, buildUpdateStatsJobConfig(tableName)); + + try (GravitinoClient client = + GravitinoClient.builder(SERVER_URI).withMetalake(METALAKE_NAME).build()) { + Table table = + client + .loadCatalog(SPARK_CATALOG_NAME) + .asTableCatalog() + .loadTable(NameIdentifier.of("db", tableName)); + List<Statistic> statistics = table.supportsStatistics().listStatistics(); + Map<String, Statistic> statisticMap = + statistics.stream().collect(Collectors.toMap(Statistic::name, s -> s)); + Assertions.assertTrue(statisticMap.containsKey("custom-file_count")); + Assertions.assertTrue(statisticMap.containsKey("custom-datafile_mse")); + Assertions.assertTrue(statisticMap.containsKey("custom-total_size")); + } + } finally { + spark.stop(); + } + } + + private static SparkSession createSparkSession() { + return SparkSession.builder() + .master("local[2]") + .appName("builtin-iceberg-update-stats-it") + .config( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.sql.catalog." + SPARK_CATALOG_NAME, "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".type", "rest") + .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".cache-enabled", "false") + .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".uri", ICEBERG_REST_URI) + .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".warehouse", WAREHOUSE_LOCATION) + .getOrCreate(); + } + + private static void createTableAndInsertData(SparkSession spark, String fullTableName) { + spark.sql("CREATE NAMESPACE IF NOT EXISTS " + SPARK_CATALOG_NAME + ".db"); + spark.sql("DROP TABLE IF EXISTS " + fullTableName); + spark.sql("CREATE TABLE " + fullTableName + " (id INT, data STRING) USING iceberg"); + spark.sql( + "ALTER TABLE " + + fullTableName + + " SET TBLPROPERTIES ('write.target-file-size-bytes'='1024000')"); + for (int i = 0; i < 10; i++) { + spark.sql("INSERT INTO " + fullTableName + " VALUES (" + i + ", 'value_" + i + "')"); + } + } + + private static Map<String, String> buildUpdateStatsJobConfig(String tableName) { + Map<String, String> jobConf = new HashMap<>(); + jobConf.put("table_identifier", "db." + tableName); + jobConf.put("gravitino_uri", SERVER_URI); + jobConf.put("metalake", METALAKE_NAME); + jobConf.put("target_file_size_bytes", "100000"); + jobConf.put("statistics_updater", "gravitino-statistics-updater"); + jobConf.put("catalog_name", SPARK_CATALOG_NAME); + jobConf.put("catalog_type", "rest"); + jobConf.put("catalog_uri", ICEBERG_REST_URI); + jobConf.put("warehouse_location", WAREHOUSE_LOCATION); + jobConf.put("spark_master", "local[2]"); + jobConf.put("spark_executor_instances", "1"); + jobConf.put("spark_executor_cores", "1"); + jobConf.put("spark_executor_memory", "1g"); + jobConf.put("spark_driver_memory", "1g"); + jobConf.put("spark_conf", "{\"spark.hadoop.fs.defaultFS\":\"file:///\"}"); + return jobConf; + } + + private static void submitJob(GravitinoMetalake metalake, Map<String, String> jobConf) { + JobHandle jobHandle = metalake.runJob(JOB_TEMPLATE_NAME, jobConf); + Assertions.assertTrue(StringUtils.isNotBlank(jobHandle.jobId()), "Job id should not be blank"); + + Awaitility.await() + .atMost(Duration.ofMinutes(5)) + .pollInterval(Duration.ofSeconds(2)) + .until( + () -> { + JobHandle.Status status = metalake.getJob(jobHandle.jobId()).jobStatus(); + return status == JobHandle.Status.SUCCEEDED + || status == JobHandle.Status.FAILED + || status == JobHandle.Status.CANCELLED; + }); + + JobHandle.Status finalStatus = metalake.getJob(jobHandle.jobId()).jobStatus(); + Assertions.assertEquals(JobHandle.Status.SUCCEEDED, finalStatus, "Job should succeed"); + } + + private static GravitinoMetalake loadOrCreateMetalake( + GravitinoAdminClient client, String metalakeName) { + try { + return client.loadMetalake(metalakeName); + } catch (NoSuchMetalakeException ignored) { + return client.createMetalake(metalakeName, "IT metalake", Map.of()); + } + } + + private static void recreateRestCatalog(GravitinoMetalake metalake) { + try { + metalake.dropCatalog(SPARK_CATALOG_NAME, true); + } catch (Exception ignored) { + // Ignore when the catalog does not exist, or when force-drop is not needed. + } + + Map<String, String> properties = new HashMap<>(); + properties.put("catalog-backend", "REST"); + properties.put("uri", ICEBERG_REST_URI); + + metalake.createCatalog( + SPARK_CATALOG_NAME, + Catalog.Type.RELATIONAL, + "lakehouse-iceberg", + "IT Iceberg REST catalog", + properties); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 1529343c61..35f24ed2a7 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -103,4 +103,9 @@ include(":bundles:azure", ":bundles:azure-bundle", ":bundles:iceberg-azure-bundl include(":catalogs:hadoop-common") include(":lineage") include(":mcp-server") -include(":maintenance:optimizer", ":maintenance:jobs") +include( + ":maintenance:optimizer-api", + ":maintenance:gravitino-updaters", + ":maintenance:optimizer", + ":maintenance:jobs" +)
