This is an automated email from the ASF dual-hosted git repository. fanng pushed a commit to branch stats_job in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit ec5e29c11ebdeb14a353dc58f640dbafc4d9aea7 Author: fanng <[email protected]> AuthorDate: Thu Mar 5 09:55:50 2026 +0800 Refine update-stats submission flow and keep Java 8 compatibility - Simplify submit-update-stats-job CLI by removing limit and file-based JSON options, and centralize job options in command context - Drop legacy updater option aliases for stats/all validation and tighten spark config requirements - Restore Java 8 bytecode compatibility for maintenance jobs and related optimizer modules by relying on root compatibility rules - Replace Java 9+ collection factories in optimizer-api/updaters code paths to preserve Java 8 compatibility - Strengthen Spark update-stats integration test to assert exact table/partition metrics coverage and reorder the main env test for readability --- build.gradle.kts | 3 + .../updater/metrics/GravitinoMetricsUpdater.java | 6 +- .../storage/jdbc/JdbcMetricsRepository.java | 14 +- .../statistics/GravitinoStatisticsUpdater.java | 5 +- maintenance/jobs/build.gradle.kts | 5 - .../jobs/iceberg/IcebergUpdateStatsJob.java | 2 +- .../TestIcebergUpdateStatsJobWithSpark.java | 253 +++++++++++++++------ .../optimizer/api/common/PartitionPath.java | 4 +- .../api/common/TableAndPartitionStatistics.java | 12 +- .../optimizer/api/monitor/EvaluationResult.java | 8 +- .../optimizer/api/recommender/StrategyHandler.java | 3 +- .../api/recommender/StrategyHandlerContext.java | 11 +- .../optimizer/common/conf/OptimizerConfig.java | 3 +- .../maintenance/optimizer/OptimizerCmd.java | 38 +--- .../optimizer/command/OptimizerCommandContext.java | 75 +++--- .../command/SubmitUpdateStatsJobCommand.java | 84 +------ .../maintenance/optimizer/TestOptimizerCmd.java | 49 +--- .../job/TestBuiltinIcebergRewriteDataFiles.java | 52 ----- 18 files changed, 309 insertions(+), 318 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 4bd2e1e8e0..d1b53e093e 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -335,6 +335,9 @@ subprojects { val name = project.name.lowercase() val path = project.path.lowercase() if (path.startsWith(":maintenance:jobs") || + path.startsWith(":maintenance:optimizer-api") || + path.startsWith(":maintenance:gravitino-updaters") || + path.startsWith(":clients:client-java") || name == "api" || name == "common" || name == "catalog-common" || diff --git a/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java index 7834efa6e2..3a28de2ed0 100644 --- a/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java +++ b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/GravitinoMetricsUpdater.java @@ -21,6 +21,8 @@ package org.apache.gravitino.maintenance.optimizer.updater.metrics; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.gravitino.maintenance.optimizer.api.common.DataScope; import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint; @@ -51,14 +53,14 @@ public class GravitinoMetricsUpdater implements MetricsUpdater { public void updateTableAndPartitionMetrics(List<MetricPoint> metrics) { ensureInitialized(); validateScopes( - metrics, List.of(DataScope.Type.TABLE, DataScope.Type.PARTITION), "table/partition"); + metrics, Arrays.asList(DataScope.Type.TABLE, DataScope.Type.PARTITION), "table/partition"); metricsStorage.storeTableAndPartitionMetrics(metrics); } @Override public void updateJobMetrics(List<MetricPoint> metrics) { ensureInitialized(); - validateScopes(metrics, List.of(DataScope.Type.JOB), "job"); + validateScopes(metrics, Collections.singletonList(DataScope.Type.JOB), "job"); metricsStorage.storeJobMetrics(metrics); } diff --git a/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java index e138e18fd7..e47c8bdb5c 100644 --- a/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java +++ b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/metrics/storage/jdbc/JdbcMetricsRepository.java @@ -31,6 +31,8 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; @@ -70,9 +72,10 @@ public abstract class JdbcMetricsRepository implements MetricsRepository { private static final String TABLE_METRICS_TABLE = "table_metrics"; private static final String JOB_METRICS_TABLE = "job_metrics"; private static final Set<String> REQUIRED_TABLE_METRICS_COLUMNS = - Set.of("table_identifier", "metric_name", "table_partition", "metric_ts", "metric_value"); + buildRequiredColumnsSet( + "table_identifier", "metric_name", "table_partition", "metric_ts", "metric_value"); private static final Set<String> REQUIRED_JOB_METRICS_COLUMNS = - Set.of("job_identifier", "metric_name", "metric_ts", "metric_value"); + buildRequiredColumnsSet("job_identifier", "metric_name", "metric_ts", "metric_value"); protected static final String DEFAULT_USER = "sa"; protected static final String DEFAULT_PASSWORD = ""; @@ -81,6 +84,10 @@ public abstract class JdbcMetricsRepository implements MetricsRepository { private JdbcMetricsDialect dialect; private volatile boolean initialized = false; + private static Set<String> buildRequiredColumnsSet(String... columns) { + return Collections.unmodifiableSet(new HashSet<>(Arrays.asList(columns))); + } + protected final void initializeStorage(DataSourceJdbcConnectionProvider connectionProvider) { Preconditions.checkState(!initialized, "JdbcMetricsRepository has already been initialized."); Preconditions.checkArgument(connectionProvider != null, "connectionProvider must not be null"); @@ -220,7 +227,8 @@ public abstract class JdbcMetricsRepository implements MetricsRepository { Files.exists(scriptPath), "H2 metrics schema script not found at %s. Please ensure distribution scripts are present.", scriptPath); - executeSchemaSql(connection, Files.readString(scriptPath, StandardCharsets.UTF_8)); + executeSchemaSql( + connection, new String(Files.readAllBytes(scriptPath), StandardCharsets.UTF_8)); } private void executeSchemaSql(Connection connection, String sqlContent) throws SQLException { diff --git a/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java index a3635cf2c7..2f10d95792 100644 --- a/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java +++ b/maintenance/gravitino-updaters/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/statistics/GravitinoStatisticsUpdater.java @@ -21,6 +21,7 @@ package org.apache.gravitino.maintenance.optimizer.updater.statistics; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -86,7 +87,7 @@ public class GravitinoStatisticsUpdater implements StatisticsUpdater { private Map<String, StatisticValue<?>> getTableStatisticsMap(List<StatisticEntry<?>> statistics) { if (statistics == null || statistics.isEmpty()) { - return Map.of(); + return Collections.emptyMap(); } return toStatisticValueMap(statistics, "table statistics"); } @@ -94,7 +95,7 @@ public class GravitinoStatisticsUpdater implements StatisticsUpdater { private List<PartitionStatisticsUpdate> getPartitionStatisticsUpdates( Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics) { if (partitionStatistics == null || partitionStatistics.isEmpty()) { - return List.of(); + return Collections.emptyList(); } return partitionStatistics.entrySet().stream() .map( diff --git a/maintenance/jobs/build.gradle.kts b/maintenance/jobs/build.gradle.kts index 5b42074c32..8c82e9c040 100644 --- a/maintenance/jobs/build.gradle.kts +++ b/maintenance/jobs/build.gradle.kts @@ -25,11 +25,6 @@ plugins { alias(libs.plugins.shadow) } -java { - sourceCompatibility = JavaVersion.VERSION_17 - targetCompatibility = JavaVersion.VERSION_17 -} - repositories { mavenCentral() } diff --git a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java index 6cb08ba04a..9f03fc68f2 100644 --- a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java +++ b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java @@ -249,7 +249,7 @@ public class IcebergUpdateStatsJob implements BuiltInJob { String sql = buildTableStatsSql(catalogName, tableIdentifier, targetFileSizeBytes); Row[] rows = (Row[]) spark.sql(sql).collect(); List<StatisticEntry<?>> tableStatistics = - rows.length == 0 ? List.of() : toStatistics(rows[0]); + rows.length == 0 ? Collections.emptyList() : toStatistics(rows[0]); if (updateMode.updateStats) { statisticsUpdater.updateTableStatistics(gravitinoTableIdentifier, tableStatistics); diff --git a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java index 736e35db03..291ab11b70 100644 --- a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java +++ b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java @@ -31,7 +31,9 @@ import java.sql.Connection; import java.sql.DriverManager; import java.time.Duration; import java.time.Instant; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -78,6 +80,18 @@ public class TestIcebergUpdateStatsJobWithSpark { private static final String JOB_TEMPLATE_NAME = "builtin-iceberg-update-stats"; private static final String SPARK_CATALOG_NAME = "rest_catalog"; private static final String METALAKE_NAME = "test"; + private static final int EXPECTED_METRIC_COUNT_PER_SCOPE = 8; + private static final Set<String> EXPECTED_METRIC_NAMES = + new HashSet<>( + Arrays.asList( + "custom-file_count", + "custom-data_files", + "custom-position_delete_files", + "custom-equality_delete_files", + "custom-small_files", + "custom-datafile_mse", + "custom-avg_size", + "custom-total_size")); @TempDir static File tempDir; @@ -145,6 +159,100 @@ public class TestIcebergUpdateStatsJobWithSpark { } } + // Requires a running deploy-mode Gravitino server and Spark environment. + @Test + @Tag("gravitino-docker-test") + @EnabledIfEnvironmentVariable(named = "GRAVITINO_ENV_IT", matches = "true") + public void testRunBuiltInUpdateStatsJobViaServerForAllModes() throws Exception { + String suffix = UUID.randomUUID().toString().replace("-", ""); + String statsTableName = "jobs_it_update_stats_mode_stats_" + suffix; + String metricsTableName = "jobs_it_update_stats_mode_metrics_" + suffix; + String partitionMetricsTableName = "jobs_it_update_stats_mode_partition_metrics_" + suffix; + String allModeTableName = "jobs_it_update_stats_mode_all_" + suffix; + String statsFullTableName = SPARK_CATALOG_NAME + ".db." + statsTableName; + String metricsFullTableName = SPARK_CATALOG_NAME + ".db." + metricsTableName; + String partitionMetricsFullTableName = SPARK_CATALOG_NAME + ".db." + partitionMetricsTableName; + String allModeFullTableName = SPARK_CATALOG_NAME + ".db." + allModeTableName; + + SparkSession restSpark = createRestSparkSession(); + try (MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0.33"); + GravitinoAdminClient adminClient = GravitinoAdminClient.builder(SERVER_URI).build()) { + mysql.start(); + initializeMySqlMetricsSchema(mysql); + + GravitinoMetalake metalake = loadOrCreateMetalake(adminClient, METALAKE_NAME); + recreateRestCatalog(metalake); + createTableAndInsertData(restSpark, statsFullTableName); + createTableAndInsertData(restSpark, metricsFullTableName); + createPartitionedTableAndInsertData(restSpark, partitionMetricsFullTableName); + createTableAndInsertData(restSpark, allModeFullTableName); + + NameIdentifier statsTableIdentifier = + NameIdentifier.of(SPARK_CATALOG_NAME, "db", statsTableName); + NameIdentifier metricsTableIdentifier = + NameIdentifier.of(SPARK_CATALOG_NAME, "db", metricsTableName); + NameIdentifier partitionMetricsTableIdentifier = + NameIdentifier.of(SPARK_CATALOG_NAME, "db", partitionMetricsTableName); + NameIdentifier allModeTableIdentifier = + NameIdentifier.of(SPARK_CATALOG_NAME, "db", allModeTableName); + + PartitionPath dsPartition1 = + PartitionPath.of(List.of(new PartitionEntryImpl("ds", "2026-03-01"))); + PartitionPath dsPartition2 = + PartitionPath.of(List.of(new PartitionEntryImpl("ds", "2026-03-02"))); + + try (GenericJdbcMetricsRepository repository = new GenericJdbcMetricsRepository()) { + repository.initialize(buildJdbcMetricsConfigs(mysql)); + + submitJob( + metalake, + buildUpdateStatsJobConfig( + statsTableName, "stats", buildUpdaterOptionsForStatsUpdaterOnly())); + awaitCustomStatisticsVisible(statsTableName); + assertEquals(0, getTableMetricsCount(repository, statsTableIdentifier)); + + submitJob( + metalake, + buildUpdateStatsJobConfig( + metricsTableName, "metrics", buildUpdaterOptionsForMetricsUpdaterOnly(mysql))); + assertFalse(containsCustomStatistics(metricsTableName)); + awaitTableMetricsExactly( + repository, metricsTableIdentifier, EXPECTED_METRIC_COUNT_PER_SCOPE); + assertTableMetricsMatch(repository, metricsTableIdentifier); + + submitJob( + metalake, + buildUpdateStatsJobConfig( + partitionMetricsTableName, + "metrics", + buildUpdaterOptionsForMetricsUpdaterOnly(mysql))); + assertFalse(containsCustomStatistics(partitionMetricsTableName)); + assertEquals(0, getTableMetricsCount(repository, partitionMetricsTableIdentifier)); + awaitPartitionMetricsExactly( + repository, + partitionMetricsTableIdentifier, + dsPartition1, + EXPECTED_METRIC_COUNT_PER_SCOPE); + awaitPartitionMetricsExactly( + repository, + partitionMetricsTableIdentifier, + dsPartition2, + EXPECTED_METRIC_COUNT_PER_SCOPE); + assertPartitionMetricsMatch(repository, partitionMetricsTableIdentifier, dsPartition1); + assertPartitionMetricsMatch(repository, partitionMetricsTableIdentifier, dsPartition2); + + submitJob( + metalake, + buildUpdateStatsJobConfig( + allModeTableName, "all", buildUpdaterOptionsForAllMode(mysql))); + awaitCustomStatisticsVisible(allModeTableName); + awaitTableMetricsExactly( + repository, allModeTableIdentifier, EXPECTED_METRIC_COUNT_PER_SCOPE); + assertTableMetricsMatch(repository, allModeTableIdentifier); + } + } + } + @Test public void testUpdateNonPartitionedTableStatistics() { RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater(); @@ -414,65 +522,6 @@ public class TestIcebergUpdateStatsJobWithSpark { parsedPartitions); } - // Requires a running deploy-mode Gravitino server and Spark environment. - @Test - @Tag("gravitino-docker-test") - @EnabledIfEnvironmentVariable(named = "GRAVITINO_ENV_IT", matches = "true") - public void testRunBuiltInUpdateStatsJobViaServerForAllModes() throws Exception { - String suffix = UUID.randomUUID().toString().replace("-", ""); - String statsTableName = "jobs_it_update_stats_mode_stats_" + suffix; - String metricsTableName = "jobs_it_update_stats_mode_metrics_" + suffix; - String allModeTableName = "jobs_it_update_stats_mode_all_" + suffix; - String statsFullTableName = SPARK_CATALOG_NAME + ".db." + statsTableName; - String metricsFullTableName = SPARK_CATALOG_NAME + ".db." + metricsTableName; - String allModeFullTableName = SPARK_CATALOG_NAME + ".db." + allModeTableName; - - SparkSession restSpark = createRestSparkSession(); - try (MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0.33"); - GravitinoAdminClient adminClient = GravitinoAdminClient.builder(SERVER_URI).build()) { - mysql.start(); - initializeMySqlMetricsSchema(mysql); - - GravitinoMetalake metalake = loadOrCreateMetalake(adminClient, METALAKE_NAME); - recreateRestCatalog(metalake); - createTableAndInsertData(restSpark, statsFullTableName); - createTableAndInsertData(restSpark, metricsFullTableName); - createTableAndInsertData(restSpark, allModeFullTableName); - - NameIdentifier statsTableIdentifier = - NameIdentifier.of(SPARK_CATALOG_NAME, "db", statsTableName); - NameIdentifier metricsTableIdentifier = - NameIdentifier.of(SPARK_CATALOG_NAME, "db", metricsTableName); - NameIdentifier allModeTableIdentifier = - NameIdentifier.of(SPARK_CATALOG_NAME, "db", allModeTableName); - - try (GenericJdbcMetricsRepository repository = new GenericJdbcMetricsRepository()) { - repository.initialize(buildJdbcMetricsConfigs(mysql)); - - submitJob( - metalake, - buildUpdateStatsJobConfig( - statsTableName, "stats", buildUpdaterOptionsForStatsUpdaterOnly())); - awaitCustomStatisticsVisible(statsTableName); - assertEquals(0, getTableMetricsCount(repository, statsTableIdentifier)); - - submitJob( - metalake, - buildUpdateStatsJobConfig( - metricsTableName, "metrics", buildUpdaterOptionsForMetricsUpdaterOnly(mysql))); - assertFalse(containsCustomStatistics(metricsTableName)); - awaitTableMetricsAtLeast(repository, metricsTableIdentifier, 1); - - submitJob( - metalake, - buildUpdateStatsJobConfig( - allModeTableName, "all", buildUpdaterOptionsForAllMode(mysql))); - awaitCustomStatisticsVisible(allModeTableName); - awaitTableMetricsAtLeast(repository, allModeTableIdentifier, 1); - } - } - } - private void awaitCustomStatisticsVisible(String tableName) throws Exception { Awaitility.await() .atMost(Duration.ofSeconds(30)) @@ -498,22 +547,77 @@ public class TestIcebergUpdateStatsJobWithSpark { } } - private static void awaitTableMetricsAtLeast( + private static void awaitTableMetricsExactly( + GenericJdbcMetricsRepository repository, NameIdentifier tableIdentifier, int expectedCount) { + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .until(() -> getTableMetricsCount(repository, tableIdentifier) == expectedCount); + } + + private static void awaitPartitionMetricsExactly( GenericJdbcMetricsRepository repository, NameIdentifier tableIdentifier, - int expectedLowerBound) { + PartitionPath partitionPath, + int expectedCount) { Awaitility.await() .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(2)) - .until(() -> getTableMetricsCount(repository, tableIdentifier) >= expectedLowerBound); + .until( + () -> + getPartitionMetricsCount(repository, tableIdentifier, partitionPath) + == expectedCount); + } + + private static void assertTableMetricsMatch( + GenericJdbcMetricsRepository repository, NameIdentifier tableIdentifier) { + List<MetricPoint> metrics = getTableMetrics(repository, tableIdentifier); + assertEquals(EXPECTED_METRIC_COUNT_PER_SCOPE, metrics.size()); + assertTrue(metrics.stream().allMatch(metric -> metric.scope() == DataScope.Type.TABLE)); + assertTrue(metrics.stream().allMatch(metric -> metric.partitionPath().isEmpty())); + assertEquals( + EXPECTED_METRIC_NAMES, + metrics.stream().map(MetricPoint::metricName).collect(Collectors.toSet())); + } + + private static void assertPartitionMetricsMatch( + GenericJdbcMetricsRepository repository, + NameIdentifier tableIdentifier, + PartitionPath partitionPath) { + List<MetricPoint> metrics = getPartitionMetrics(repository, tableIdentifier, partitionPath); + assertEquals(EXPECTED_METRIC_COUNT_PER_SCOPE, metrics.size()); + assertTrue(metrics.stream().allMatch(metric -> metric.scope() == DataScope.Type.PARTITION)); + assertTrue(metrics.stream().allMatch(metric -> metric.partitionPath().isPresent())); + assertEquals( + EXPECTED_METRIC_NAMES, + metrics.stream().map(MetricPoint::metricName).collect(Collectors.toSet())); } private static int getTableMetricsCount( GenericJdbcMetricsRepository repository, NameIdentifier tableIdentifier) { + return getTableMetrics(repository, tableIdentifier).size(); + } + + private static List<MetricPoint> getTableMetrics( + GenericJdbcMetricsRepository repository, NameIdentifier tableIdentifier) { long now = Instant.now().getEpochSecond(); - return repository - .getMetrics(DataScope.forTable(tableIdentifier), now - 1800, now + 1800) - .size(); + return repository.getMetrics(DataScope.forTable(tableIdentifier), now - 1800, now + 1800); + } + + private static int getPartitionMetricsCount( + GenericJdbcMetricsRepository repository, + NameIdentifier tableIdentifier, + PartitionPath partitionPath) { + return getPartitionMetrics(repository, tableIdentifier, partitionPath).size(); + } + + private static List<MetricPoint> getPartitionMetrics( + GenericJdbcMetricsRepository repository, + NameIdentifier tableIdentifier, + PartitionPath partitionPath) { + long now = Instant.now().getEpochSecond(); + return repository.getMetrics( + DataScope.forPartition(tableIdentifier, partitionPath), now - 1800, now + 1800); } private static Map<String, String> buildJdbcMetricsConfigs(MySQLContainer<?> mysql) { @@ -598,6 +702,25 @@ public class TestIcebergUpdateStatsJobWithSpark { } } + private static void createPartitionedTableAndInsertData( + SparkSession sparkSession, String fullTableName) { + sparkSession.sql("CREATE NAMESPACE IF NOT EXISTS " + SPARK_CATALOG_NAME + ".db"); + sparkSession.sql("DROP TABLE IF EXISTS " + fullTableName); + sparkSession.sql( + "CREATE TABLE " + + fullTableName + + " (id INT, data STRING, ds STRING) USING iceberg PARTITIONED BY (ds)"); + sparkSession.sql( + "ALTER TABLE " + + fullTableName + + " SET TBLPROPERTIES ('write.target-file-size-bytes'='1024000')"); + for (int i = 0; i < 10; i++) { + String ds = i < 5 ? "2026-03-01" : "2026-03-02"; + sparkSession.sql( + "INSERT INTO " + fullTableName + " VALUES (" + i + ", 'value_" + i + "', '" + ds + "')"); + } + } + private static Map<String, String> buildUpdateStatsJobConfig( String tableName, String updateMode, String updaterOptions) { Map<String, String> jobConf = new HashMap<>(); diff --git a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java index 11f4e80ad6..cbc6c4cdb6 100644 --- a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java +++ b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/PartitionPath.java @@ -20,6 +20,8 @@ package org.apache.gravitino.maintenance.optimizer.api.common; import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** @@ -42,7 +44,7 @@ public final class PartitionPath { public static PartitionPath of(List<PartitionEntry> entries) { Preconditions.checkArgument( entries != null && !entries.isEmpty(), "partition entries must not be empty"); - return new PartitionPath(List.copyOf(entries)); + return new PartitionPath(Collections.unmodifiableList(new ArrayList<>(entries))); } /** diff --git a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java index 2e7642c00e..6ae96ce9b9 100644 --- a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java +++ b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/common/TableAndPartitionStatistics.java @@ -19,6 +19,9 @@ package org.apache.gravitino.maintenance.optimizer.api.common; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import org.apache.gravitino.annotation.DeveloperApi; @@ -32,9 +35,14 @@ public class TableAndPartitionStatistics { public TableAndPartitionStatistics( List<StatisticEntry<?>> tableStatistics, Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics) { - this.tableStatistics = tableStatistics != null ? List.copyOf(tableStatistics) : List.of(); + this.tableStatistics = + tableStatistics != null + ? Collections.unmodifiableList(new ArrayList<>(tableStatistics)) + : Collections.emptyList(); this.partitionStatistics = - partitionStatistics != null ? Map.copyOf(partitionStatistics) : Map.of(); + partitionStatistics != null + ? Collections.unmodifiableMap(new LinkedHashMap<>(partitionStatistics)) + : Collections.emptyMap(); } public List<StatisticEntry<?>> tableStatistics() { diff --git a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java index 30456064dc..b843f89d33 100644 --- a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java +++ b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/EvaluationResult.java @@ -20,6 +20,7 @@ package org.apache.gravitino.maintenance.optimizer.api.monitor; import com.google.common.base.Preconditions; +import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -124,7 +125,7 @@ public class EvaluationResult { private static Map<String, List<MetricSample>> immutableCopy( Map<String, List<MetricSample>> metrics) { if (metrics.isEmpty()) { - return Map.of(); + return Collections.emptyMap(); } Map<String, List<MetricSample>> copied = new LinkedHashMap<>(); @@ -132,7 +133,10 @@ public class EvaluationResult { Preconditions.checkArgument(entry.getKey() != null, "metric name must not be null"); List<MetricSample> values = entry.getValue(); Preconditions.checkArgument(values != null, "metric values must not be null"); - copied.put(entry.getKey(), Collections.unmodifiableList(List.copyOf(values))); + for (MetricSample value : values) { + Preconditions.checkArgument(value != null, "metric sample must not be null"); + } + copied.put(entry.getKey(), Collections.unmodifiableList(new ArrayList<>(values))); } return Collections.unmodifiableMap(copied); } diff --git a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java index 879b2ecb73..79da7e01cd 100644 --- a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java +++ b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandler.java @@ -19,6 +19,7 @@ package org.apache.gravitino.maintenance.optimizer.api.recommender; +import java.util.Collections; import java.util.Set; import org.apache.gravitino.annotation.DeveloperApi; @@ -46,7 +47,7 @@ public interface StrategyHandler { * @return set of requested data items; empty means no additional data needed */ default Set<DataRequirement> dataRequirements() { - return Set.of(); + return Collections.emptySet(); } /** diff --git a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java index 82f6c9307b..5b3fd7fcd7 100644 --- a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java +++ b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/api/recommender/StrategyHandlerContext.java @@ -19,7 +19,9 @@ package org.apache.gravitino.maintenance.optimizer.api.recommender; +import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -139,7 +141,10 @@ public final class StrategyHandlerContext { * @return builder for chaining */ public Builder withTableStatistics(List<StatisticEntry<?>> statistics) { - this.tableStatistics = statistics == null ? Collections.emptyList() : List.copyOf(statistics); + this.tableStatistics = + statistics == null + ? Collections.emptyList() + : Collections.unmodifiableList(new ArrayList<>(statistics)); return this; } @@ -152,7 +157,9 @@ public final class StrategyHandlerContext { public Builder withPartitionStatistics( Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics) { this.partitionStatistics = - partitionStatistics == null ? Collections.emptyMap() : Map.copyOf(partitionStatistics); + partitionStatistics == null + ? Collections.emptyMap() + : Collections.unmodifiableMap(new LinkedHashMap<>(partitionStatistics)); return this; } diff --git a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java index f915f2cb07..0cb063f304 100644 --- a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java +++ b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java @@ -19,6 +19,7 @@ package org.apache.gravitino.maintenance.optimizer.common.conf; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.commons.lang3.StringUtils; @@ -164,7 +165,7 @@ public class OptimizerConfig extends Config { .version(ConfigConstants.VERSION_1_2_0) .stringConf() .toSequence() - .createWithDefault(List.of()); + .createWithDefault(Collections.emptyList()); public static final ConfigEntry<String> GRAVITINO_URI_CONFIG = new ConfigBuilder(GRAVITINO_URI) diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java index 6dc6fa01fb..6800e5203e 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java +++ b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java @@ -121,13 +121,10 @@ public class OptimizerCmd { EnumSet.of(CliOption.IDENTIFIERS), EnumSet.of( CliOption.DRY_RUN, - CliOption.LIMIT, CliOption.UPDATE_MODE, CliOption.TARGET_FILE_SIZE_BYTES, CliOption.UPDATER_OPTIONS, - CliOption.UPDATER_OPTIONS_FILE, - CliOption.SPARK_CONF, - CliOption.SPARK_CONF_FILE), + CliOption.SPARK_CONF), "Submit built-in Iceberg update stats jobs for given table identifiers.", "./bin/gravitino-optimizer.sh --type submit-update-stats-job --identifiers " + "rest.ab.t1,rest.ab.t2 --update-mode stats --dry-run", @@ -185,9 +182,10 @@ public class OptimizerCmd { String updateMode = cmd.getOptionValue(CliOption.UPDATE_MODE.longOpt()); String targetFileSizeBytes = cmd.getOptionValue(CliOption.TARGET_FILE_SIZE_BYTES.longOpt()); String updaterOptions = cmd.getOptionValue(CliOption.UPDATER_OPTIONS.longOpt()); - String updaterOptionsFile = cmd.getOptionValue(CliOption.UPDATER_OPTIONS_FILE.longOpt()); String sparkConf = cmd.getOptionValue(CliOption.SPARK_CONF.longOpt()); - String sparkConfFile = cmd.getOptionValue(CliOption.SPARK_CONF_FILE.longOpt()); + OptimizerCommandContext.UpdateStatsJobOptions updateStatsJobOptions = + new OptimizerCommandContext.UpdateStatsJobOptions( + updateMode, targetFileSizeBytes, updaterOptions, sparkConf); String statisticsPayload = cmd.getOptionValue(CliOption.STATISTICS_PAYLOAD.longOpt()); String filePath = cmd.getOptionValue(CliOption.FILE_PATH.longOpt()); Optional<StatisticsInputContent> statisticsInputContent = @@ -203,12 +201,7 @@ public class OptimizerCmd { actionTime, rangeSeconds, partitionPathRaw, - updateMode, - targetFileSizeBytes, - updaterOptions, - updaterOptionsFile, - sparkConf, - sparkConfFile, + updateStatsJobOptions, statisticsInputContent, out); executeCommand(optimizerType, context); @@ -340,14 +333,7 @@ public class OptimizerCmd { } private static CommandRules.ValidationPlan updateStatsJobInputRules() { - return CommandRules.newBuilder() - .addMutuallyExclusive( - List.of(CliOption.UPDATER_OPTIONS.longOpt(), CliOption.UPDATER_OPTIONS_FILE.longOpt()), - "--updater-options and --updater-options-file cannot be used together") - .addMutuallyExclusive( - List.of(CliOption.SPARK_CONF.longOpt(), CliOption.SPARK_CONF_FILE.longOpt()), - "--spark-conf and --spark-conf-file cannot be used together") - .build(); + return CommandRules.emptyPlan(); } private static void printGlobalHelp(Options options, PrintStream out) { @@ -518,21 +504,11 @@ public class OptimizerCmd { CliOptionArgType.SINGLE, null, "JSON map for updater options in submit-update-stats-job"), - UPDATER_OPTIONS_FILE( - "updater-options-file", - CliOptionArgType.SINGLE, - null, - "Path to JSON file for updater options in submit-update-stats-job"), SPARK_CONF( "spark-conf", CliOptionArgType.SINGLE, null, - "JSON map for Spark configs in submit-update-stats-job"), - SPARK_CONF_FILE( - "spark-conf-file", - CliOptionArgType.SINGLE, - null, - "Path to JSON file for Spark configs in submit-update-stats-job"); + "JSON map for Spark configs in submit-update-stats-job"); private final String longOpt; private final CliOptionArgType argType; diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java index b29ba0d09d..88d11046fa 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java +++ b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java @@ -21,6 +21,7 @@ package org.apache.gravitino.maintenance.optimizer.command; import java.io.PrintStream; import java.util.List; +import java.util.Objects; import java.util.Optional; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv; @@ -37,12 +38,7 @@ public final class OptimizerCommandContext { private final String actionTime; private final String rangeSeconds; private final String partitionPathRaw; - private final String updateMode; - private final String targetFileSizeBytes; - private final String updaterOptions; - private final String updaterOptionsFile; - private final String sparkConf; - private final String sparkConfFile; + private final UpdateStatsJobOptions updateStatsJobOptions; private final Optional<StatisticsInputContent> statisticsInputContent; private final PrintStream output; @@ -56,12 +52,7 @@ public final class OptimizerCommandContext { String actionTime, String rangeSeconds, String partitionPathRaw, - String updateMode, - String targetFileSizeBytes, - String updaterOptions, - String updaterOptionsFile, - String sparkConf, - String sparkConfFile, + UpdateStatsJobOptions updateStatsJobOptions, Optional<StatisticsInputContent> statisticsInputContent, PrintStream output) { this.optimizerEnv = optimizerEnv; @@ -73,12 +64,8 @@ public final class OptimizerCommandContext { this.actionTime = actionTime; this.rangeSeconds = rangeSeconds; this.partitionPathRaw = partitionPathRaw; - this.updateMode = updateMode; - this.targetFileSizeBytes = targetFileSizeBytes; - this.updaterOptions = updaterOptions; - this.updaterOptionsFile = updaterOptionsFile; - this.sparkConf = sparkConf; - this.sparkConfFile = sparkConfFile; + this.updateStatsJobOptions = + Objects.requireNonNull(updateStatsJobOptions, "updateStatsJobOptions must not be null"); this.statisticsInputContent = statisticsInputContent; this.output = output; } @@ -127,28 +114,24 @@ public final class OptimizerCommandContext { return partitionPathRaw; } + public UpdateStatsJobOptions updateStatsJobOptions() { + return updateStatsJobOptions; + } + public String updateMode() { - return updateMode; + return updateStatsJobOptions.updateMode(); } public String targetFileSizeBytes() { - return targetFileSizeBytes; + return updateStatsJobOptions.targetFileSizeBytes(); } public String updaterOptions() { - return updaterOptions; - } - - public String updaterOptionsFile() { - return updaterOptionsFile; + return updateStatsJobOptions.updaterOptions(); } public String sparkConf() { - return sparkConf; - } - - public String sparkConfFile() { - return sparkConfFile; + return updateStatsJobOptions.sparkConf(); } public Optional<StatisticsInputContent> statisticsInputContent() { @@ -158,4 +141,36 @@ public final class OptimizerCommandContext { public PrintStream output() { return output; } + + /** Submit-update-stats-job specific command options. */ + public static final class UpdateStatsJobOptions { + private final String updateMode; + private final String targetFileSizeBytes; + private final String updaterOptions; + private final String sparkConf; + + public UpdateStatsJobOptions( + String updateMode, String targetFileSizeBytes, String updaterOptions, String sparkConf) { + this.updateMode = updateMode; + this.targetFileSizeBytes = targetFileSizeBytes; + this.updaterOptions = updaterOptions; + this.sparkConf = sparkConf; + } + + public String updateMode() { + return updateMode; + } + + public String targetFileSizeBytes() { + return targetFileSizeBytes; + } + + public String updaterOptions() { + return updaterOptions; + } + + public String sparkConf() { + return sparkConf; + } + } } diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java index c06b024b47..00e62d615a 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java +++ b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java @@ -22,9 +22,6 @@ package org.apache.gravitino.maintenance.optimizer.command; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -47,22 +44,18 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { private static final String DEFAULT_UPDATE_MODE = "stats"; private static final long DEFAULT_TARGET_FILE_SIZE_BYTES = 100_000L; private static final String OPTION_UPDATER_OPTIONS = "updater-options"; - private static final String OPTION_UPDATER_OPTIONS_FILE = "updater-options-file"; private static final String OPTION_SPARK_CONF = "spark-conf"; - private static final String OPTION_SPARK_CONF_FILE = "spark-conf-file"; private static final ObjectMapper MAPPER = new ObjectMapper(); @Override public void execute(OptimizerCommandContext context) throws Exception { Map<String, String> submitterConfigs = context.optimizerEnv().config().jobSubmitterConfigs(); - int limit = parseLimit(context.limit()); List<TableTarget> tableTargets = parseTableTargets( context.identifiers(), - context.optimizerEnv().config().get(OptimizerConfig.GRAVITINO_DEFAULT_CATALOG_CONFIG), - limit); + context.optimizerEnv().config().get(OptimizerConfig.GRAVITINO_DEFAULT_CATALOG_CONFIG)); String updateMode = parseUpdateMode( @@ -73,19 +66,9 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { context.targetFileSizeBytes(), submitterConfigs.get("target_file_size_bytes"))); String updaterOptionsJson = - resolveJsonOption( - OPTION_UPDATER_OPTIONS, - context.updaterOptions(), - OPTION_UPDATER_OPTIONS_FILE, - context.updaterOptionsFile(), - submitterConfigs.get("updater_options")); + resolveJsonOption(context.updaterOptions(), submitterConfigs.get("updater_options")); String sparkConfJson = - resolveJsonOption( - OPTION_SPARK_CONF, - context.sparkConf(), - OPTION_SPARK_CONF_FILE, - context.sparkConfFile(), - submitterConfigs.get("spark_conf")); + resolveJsonOption(context.sparkConf(), submitterConfigs.get("spark_conf")); Map<String, String> updaterOptions = parseFlatJsonMap(updaterOptionsJson, OPTION_UPDATER_OPTIONS); @@ -156,32 +139,10 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { return StringUtils.isBlank(confValue) ? null : confValue.trim(); } - private static String resolveJsonOption( - String cliOptionName, - String cliValue, - String fileOptionName, - String filePath, - String confValue) { + private static String resolveJsonOption(String cliValue, String confValue) { if (StringUtils.isNotBlank(cliValue)) { return cliValue.trim(); } - if (StringUtils.isNotBlank(filePath)) { - try { - Path path = Path.of(filePath.trim()); - Preconditions.checkArgument( - Files.exists(path), "Option %s file does not exist: %s", fileOptionName, filePath); - return Files.readString(path, StandardCharsets.UTF_8).trim(); - } catch (Exception e) { - throw new IllegalArgumentException( - String.format( - Locale.ROOT, - "Failed to read option %s from %s: %s", - cliOptionName, - filePath, - e.getMessage()), - e); - } - } return StringUtils.isBlank(confValue) ? null : confValue.trim(); } @@ -202,27 +163,13 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { return OptimizerCommandUtils.parseLongOption("target-file-size-bytes", value.trim(), false); } - private static int parseLimit(String limit) { - if (StringUtils.isBlank(limit)) { - return Integer.MAX_VALUE; - } - long parsed = OptimizerCommandUtils.parseLongOption("limit", limit.trim(), false); - Preconditions.checkArgument( - parsed <= Integer.MAX_VALUE, "Option limit must be <= %s", Integer.MAX_VALUE); - return (int) parsed; - } - - private static List<TableTarget> parseTableTargets( - String[] identifiers, String defaultCatalog, int limit) { + private static List<TableTarget> parseTableTargets(String[] identifiers, String defaultCatalog) { Preconditions.checkArgument( identifiers != null && identifiers.length > 0, "Missing required option --identifiers for command 'submit-update-stats-job'"); List<TableTarget> tableTargets = new ArrayList<>(); for (String rawIdentifier : identifiers) { - if (tableTargets.size() >= limit) { - break; - } Preconditions.checkArgument( StringUtils.isNotBlank(rawIdentifier), "--identifiers contains blank identifier"); String[] levels = rawIdentifier.trim().split("\\."); @@ -265,14 +212,8 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { if (!"stats".equals(updateMode) && !"all".equals(updateMode)) { return; } - String gravitinoUri = - firstNonBlank( - updaterOptions.get("gravitino_uri"), - updaterOptions.get("gravitino-uri"), - updaterOptions.get(OptimizerConfig.GRAVITINO_URI)); - String metalake = - firstNonBlank( - updaterOptions.get("metalake"), updaterOptions.get(OptimizerConfig.GRAVITINO_METALAKE)); + String gravitinoUri = StringUtils.trimToNull(updaterOptions.get("gravitino_uri")); + String metalake = StringUtils.trimToNull(updaterOptions.get("metalake")); Preconditions.checkArgument( StringUtils.isNotBlank(gravitinoUri), "Option --updater-options (or config key jobSubmitterConfig.updater_options) " @@ -287,7 +228,7 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { List<TableTarget> tableTargets, Map<String, String> sparkConfigs) { Preconditions.checkArgument( !sparkConfigs.isEmpty(), - "Missing spark config. Set --spark-conf/--spark-conf-file or " + "Missing spark config. Set --spark-conf or " + "gravitino.optimizer.jobSubmitterConfig.spark_conf in the config file"); for (TableTarget tableTarget : tableTargets) { String requiredKey = "spark.sql.catalog." + tableTarget.catalogName; @@ -333,15 +274,6 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { } } - private static String firstNonBlank(String... values) { - for (String value : values) { - if (StringUtils.isNotBlank(value)) { - return value.trim(); - } - } - return null; - } - private static final class TableTarget { private final String fullIdentifier; private final String catalogName; diff --git a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java index 2be5785a87..b18d73be1f 100644 --- a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java +++ b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java @@ -421,22 +421,8 @@ class TestOptimizerCmd { } @Test - void testSubmitUpdateStatsJobSupportsJsonFileOverrides() throws Exception { + void testSubmitUpdateStatsJobSupportsJsonOverrides() throws Exception { Path confPath = createOptimizerConfForSubmitUpdateStatsJob(); - Path updaterOptionsFile = Files.createTempFile("optimizer-updater-options-", ".json"); - Path sparkConfFile = Files.createTempFile("optimizer-spark-conf-", ".json"); - Files.writeString( - updaterOptionsFile, - "{\"metrics_updater\":\"custom-metrics-updater\"}", - StandardCharsets.UTF_8); - Files.writeString( - sparkConfFile, - "{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\"," - + "\"spark.sql.catalog.rest.type\":\"rest\"," - + "\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\"}", - StandardCharsets.UTF_8); - updaterOptionsFile.toFile().deleteOnExit(); - sparkConfFile.toFile().deleteOnExit(); String[] output = runCommand( @@ -446,10 +432,12 @@ class TestOptimizerCmd { "rest.ab.t1", "--update-mode", "metrics", - "--updater-options-file", - updaterOptionsFile.toString(), - "--spark-conf-file", - sparkConfFile.toString(), + "--updater-options", + "{\"metrics_updater\":\"custom-metrics-updater\"}", + "--spark-conf", + "{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\"," + + "\"spark.sql.catalog.rest.type\":\"rest\"," + + "\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\"}", "--dry-run", "--conf-path", confPath.toString()); @@ -458,29 +446,6 @@ class TestOptimizerCmd { Assertions.assertTrue(output[0].contains("spark.sql.catalog.rest")); } - @Test - void testSubmitUpdateStatsJobRejectsMutuallyExclusiveUpdaterInputs() throws Exception { - Path confPath = createOptimizerConfForSubmitUpdateStatsJob(); - Path updaterOptionsFile = Files.createTempFile("optimizer-updater-options-", ".json"); - Files.writeString(updaterOptionsFile, "{\"metrics_updater\":\"m\"}", StandardCharsets.UTF_8); - updaterOptionsFile.toFile().deleteOnExit(); - String[] output = - runCommand( - "--type", - "submit-update-stats-job", - "--identifiers", - "rest.ab.t1", - "--updater-options", - "{\"metrics_updater\":\"m1\"}", - "--updater-options-file", - updaterOptionsFile.toString(), - "--dry-run", - "--conf-path", - confPath.toString()); - Assertions.assertTrue( - output[1].contains("--updater-options and --updater-options-file cannot be used together")); - } - private Path createOptimizerConfForMetricsProvider() throws Exception { Path confPath = Files.createTempFile("optimizer-test-", ".conf"); // Route command reads to deterministic in-memory fixtures from MetricsProviderForTest. diff --git a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java index 6514ac755e..444a11d55f 100644 --- a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java +++ b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java @@ -28,7 +28,6 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.client.GravitinoAdminClient; -import org.apache.gravitino.client.GravitinoClient; import org.apache.gravitino.client.GravitinoMetalake; import org.apache.gravitino.dto.rel.ColumnDTO; import org.apache.gravitino.exceptions.NoSuchMetalakeException; @@ -38,11 +37,9 @@ import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl; import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig; import org.apache.gravitino.maintenance.optimizer.recommender.handler.compaction.CompactionJobContext; import org.apache.gravitino.rel.Column; -import org.apache.gravitino.rel.Table; import org.apache.gravitino.rel.expressions.transforms.Transform; import org.apache.gravitino.rel.expressions.transforms.Transforms; import org.apache.gravitino.rel.types.Types; -import org.apache.gravitino.stats.Statistic; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.awaitility.Awaitility; @@ -58,7 +55,6 @@ public class TestBuiltinIcebergRewriteDataFiles { private static final String METALAKE_NAME = "test"; private static final String ICEBERG_REST_URI = "http://localhost:9001/iceberg"; private static final String JOB_TEMPLATE_NAME = "builtin-iceberg-rewrite-data-files"; - private static final String UPDATE_STATS_JOB_TEMPLATE_NAME = "builtin-iceberg-update-stats"; private static final String SPARK_CATALOG_NAME = "rest_catalog"; private static final String WAREHOUSE_LOCATION = ""; @@ -214,38 +210,6 @@ public class TestBuiltinIcebergRewriteDataFiles { }); } - @Test - void testSubmitBuiltinIcebergUpdateStatsJobAndPersistStatistics() throws Exception { - String tableName = "update_stats_table"; - String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName; - runWithSparkAndMetalake( - (spark, metalake) -> { - createTableAndInsertData(spark, fullTableName); - - Map<String, String> jobConf = buildUpdateStatsJobConfig(tableName); - submitJob(metalake, UPDATE_STATS_JOB_TEMPLATE_NAME, jobConf); - - try (GravitinoClient client = - GravitinoClient.builder(SERVER_URI).withMetalake(METALAKE_NAME).build()) { - Table table = - client - .loadCatalog(SPARK_CATALOG_NAME) - .asTableCatalog() - .loadTable(NameIdentifier.of("db", tableName)); - List<Statistic> stats = table.supportsStatistics().listStatistics(); - Assertions.assertFalse(stats.isEmpty(), "Expected table statistics to be updated"); - - Map<String, Statistic> statsMap = new HashMap<>(); - for (Statistic stat : stats) { - statsMap.put(stat.name(), stat); - } - Assertions.assertTrue(statsMap.containsKey("custom-file_count")); - Assertions.assertTrue(statsMap.containsKey("custom-datafile_mse")); - Assertions.assertTrue(statsMap.containsKey("custom-total_size")); - } - }); - } - private static GravitinoMetalake loadOrCreateMetalake( GravitinoAdminClient client, String metalakeName) { try { @@ -306,22 +270,6 @@ public class TestBuiltinIcebergRewriteDataFiles { return jobConf; } - private static Map<String, String> buildUpdateStatsJobConfig(String tableName) { - Map<String, String> jobConf = new HashMap<>(); - jobConf.put("table_identifier", "db." + tableName); - jobConf.put("update_mode", "all"); - jobConf.put("target_file_size_bytes", "100000"); - jobConf.put( - "updater_options", - "{\"gravitino_uri\":\"" - + SERVER_URI - + "\",\"metalake\":\"" - + METALAKE_NAME - + "\",\"statistics_updater\":\"gravitino-statistics-updater\"}"); - jobConf.putAll(createOptimizerConfig().jobSubmitterConfigs()); - return jobConf; - } - private static Map<String, String> buildCompactionJobConfig( OptimizerConfig optimizerConfig, String tableName,
