This is an automated email from the ASF dual-hosted git repository. fanng pushed a commit to branch stats_job in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 70e758df21e4fa87e5d15e19664b27c4276f1a49 Author: fanng <[email protected]> AuthorDate: Wed Mar 4 17:56:31 2026 +0800 test: cover update stats job server modes and stabilize spark IT --- .../TestIcebergUpdateStatsJobWithSpark.java | 200 +++++++++++++++++---- 1 file changed, 169 insertions(+), 31 deletions(-) diff --git a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java index a374e7874c..736e35db03 100644 --- a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java +++ b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java @@ -416,36 +416,160 @@ public class TestIcebergUpdateStatsJobWithSpark { // Requires a running deploy-mode Gravitino server and Spark environment. @Test + @Tag("gravitino-docker-test") @EnabledIfEnvironmentVariable(named = "GRAVITINO_ENV_IT", matches = "true") - public void testRunBuiltInUpdateStatsJobViaServer() throws Exception { - String tableName = "jobs_it_update_stats_" + UUID.randomUUID().toString().replace("-", ""); - String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName; - - try (SparkSession restSpark = createRestSparkSession(); + public void testRunBuiltInUpdateStatsJobViaServerForAllModes() throws Exception { + String suffix = UUID.randomUUID().toString().replace("-", ""); + String statsTableName = "jobs_it_update_stats_mode_stats_" + suffix; + String metricsTableName = "jobs_it_update_stats_mode_metrics_" + suffix; + String allModeTableName = "jobs_it_update_stats_mode_all_" + suffix; + String statsFullTableName = SPARK_CATALOG_NAME + ".db." + statsTableName; + String metricsFullTableName = SPARK_CATALOG_NAME + ".db." + metricsTableName; + String allModeFullTableName = SPARK_CATALOG_NAME + ".db." + allModeTableName; + + SparkSession restSpark = createRestSparkSession(); + try (MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0.33"); GravitinoAdminClient adminClient = GravitinoAdminClient.builder(SERVER_URI).build()) { + mysql.start(); + initializeMySqlMetricsSchema(mysql); + GravitinoMetalake metalake = loadOrCreateMetalake(adminClient, METALAKE_NAME); recreateRestCatalog(metalake); - createTableAndInsertData(restSpark, fullTableName); - - submitJob(metalake, buildUpdateStatsJobConfig(tableName)); - - try (GravitinoClient client = - GravitinoClient.builder(SERVER_URI).withMetalake(METALAKE_NAME).build()) { - Table table = - client - .loadCatalog(SPARK_CATALOG_NAME) - .asTableCatalog() - .loadTable(NameIdentifier.of("db", tableName)); - List<Statistic> statistics = table.supportsStatistics().listStatistics(); - Map<String, Statistic> statisticMap = - statistics.stream().collect(Collectors.toMap(Statistic::name, s -> s)); - assertTrue(statisticMap.containsKey("custom-file_count")); - assertTrue(statisticMap.containsKey("custom-datafile_mse")); - assertTrue(statisticMap.containsKey("custom-total_size")); + createTableAndInsertData(restSpark, statsFullTableName); + createTableAndInsertData(restSpark, metricsFullTableName); + createTableAndInsertData(restSpark, allModeFullTableName); + + NameIdentifier statsTableIdentifier = + NameIdentifier.of(SPARK_CATALOG_NAME, "db", statsTableName); + NameIdentifier metricsTableIdentifier = + NameIdentifier.of(SPARK_CATALOG_NAME, "db", metricsTableName); + NameIdentifier allModeTableIdentifier = + NameIdentifier.of(SPARK_CATALOG_NAME, "db", allModeTableName); + + try (GenericJdbcMetricsRepository repository = new GenericJdbcMetricsRepository()) { + repository.initialize(buildJdbcMetricsConfigs(mysql)); + + submitJob( + metalake, + buildUpdateStatsJobConfig( + statsTableName, "stats", buildUpdaterOptionsForStatsUpdaterOnly())); + awaitCustomStatisticsVisible(statsTableName); + assertEquals(0, getTableMetricsCount(repository, statsTableIdentifier)); + + submitJob( + metalake, + buildUpdateStatsJobConfig( + metricsTableName, "metrics", buildUpdaterOptionsForMetricsUpdaterOnly(mysql))); + assertFalse(containsCustomStatistics(metricsTableName)); + awaitTableMetricsAtLeast(repository, metricsTableIdentifier, 1); + + submitJob( + metalake, + buildUpdateStatsJobConfig( + allModeTableName, "all", buildUpdaterOptionsForAllMode(mysql))); + awaitCustomStatisticsVisible(allModeTableName); + awaitTableMetricsAtLeast(repository, allModeTableIdentifier, 1); } } } + private void awaitCustomStatisticsVisible(String tableName) throws Exception { + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .until(() -> containsCustomStatistics(tableName)); + } + + private boolean containsCustomStatistics(String tableName) throws Exception { + try (GravitinoClient client = + GravitinoClient.builder(SERVER_URI).withMetalake(METALAKE_NAME).build()) { + Table table = + client + .loadCatalog(SPARK_CATALOG_NAME) + .asTableCatalog() + .loadTable(NameIdentifier.of("db", tableName)); + Set<String> statisticNames = + table.supportsStatistics().listStatistics().stream() + .map(Statistic::name) + .collect(Collectors.toSet()); + return statisticNames.contains("custom-file_count") + && statisticNames.contains("custom-datafile_mse") + && statisticNames.contains("custom-total_size"); + } + } + + private static void awaitTableMetricsAtLeast( + GenericJdbcMetricsRepository repository, + NameIdentifier tableIdentifier, + int expectedLowerBound) { + Awaitility.await() + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(2)) + .until(() -> getTableMetricsCount(repository, tableIdentifier) >= expectedLowerBound); + } + + private static int getTableMetricsCount( + GenericJdbcMetricsRepository repository, NameIdentifier tableIdentifier) { + long now = Instant.now().getEpochSecond(); + return repository + .getMetrics(DataScope.forTable(tableIdentifier), now - 1800, now + 1800) + .size(); + } + + private static Map<String, String> buildJdbcMetricsConfigs(MySQLContainer<?> mysql) { + Map<String, String> conf = new HashMap<>(); + conf.put("gravitino.optimizer.jdbcMetrics.jdbcUrl", mysql.getJdbcUrl()); + conf.put("gravitino.optimizer.jdbcMetrics.jdbcUser", mysql.getUsername()); + conf.put("gravitino.optimizer.jdbcMetrics.jdbcPassword", mysql.getPassword()); + conf.put("gravitino.optimizer.jdbcMetrics.jdbcDriver", mysql.getDriverClassName()); + return conf; + } + + private static String buildUpdaterOptionsForStatsUpdaterOnly() { + return "{\"gravitino_uri\":\"" + + SERVER_URI + + "\",\"metalake\":\"" + + METALAKE_NAME + + "\",\"statistics_updater\":\"gravitino-statistics-updater\"}"; + } + + private static String buildUpdaterOptionsForMetricsUpdaterOnly(MySQLContainer<?> mysql) { + return "{\"metrics_updater\":\"gravitino-metrics-updater\"," + + "\"gravitino.optimizer.jdbcMetrics.jdbcUrl\":\"" + + mysql.getJdbcUrl() + + "\"," + + "\"gravitino.optimizer.jdbcMetrics.jdbcUser\":\"" + + mysql.getUsername() + + "\"," + + "\"gravitino.optimizer.jdbcMetrics.jdbcPassword\":\"" + + mysql.getPassword() + + "\"," + + "\"gravitino.optimizer.jdbcMetrics.jdbcDriver\":\"" + + mysql.getDriverClassName() + + "\"}"; + } + + private static String buildUpdaterOptionsForAllMode(MySQLContainer<?> mysql) { + return "{\"gravitino_uri\":\"" + + SERVER_URI + + "\",\"metalake\":\"" + + METALAKE_NAME + + "\",\"statistics_updater\":\"gravitino-statistics-updater\"," + + "\"metrics_updater\":\"gravitino-metrics-updater\"," + + "\"gravitino.optimizer.jdbcMetrics.jdbcUrl\":\"" + + mysql.getJdbcUrl() + + "\"," + + "\"gravitino.optimizer.jdbcMetrics.jdbcUser\":\"" + + mysql.getUsername() + + "\"," + + "\"gravitino.optimizer.jdbcMetrics.jdbcPassword\":\"" + + mysql.getPassword() + + "\"," + + "\"gravitino.optimizer.jdbcMetrics.jdbcDriver\":\"" + + mysql.getDriverClassName() + + "\"}"; + } + private static SparkSession createRestSparkSession() { return SparkSession.builder() .master("local[2]") @@ -474,19 +598,33 @@ public class TestIcebergUpdateStatsJobWithSpark { } } - private static Map<String, String> buildUpdateStatsJobConfig(String tableName) { + private static Map<String, String> buildUpdateStatsJobConfig( + String tableName, String updateMode, String updaterOptions) { Map<String, String> jobConf = new HashMap<>(); jobConf.put("table_identifier", "db." + tableName); - jobConf.put("update_mode", "all"); + jobConf.put("update_mode", updateMode); jobConf.put("target_file_size_bytes", "100000"); - jobConf.put( - "updater_options", - "{\"gravitino_uri\":\"" - + SERVER_URI - + "\",\"metalake\":\"" - + METALAKE_NAME - + "\",\"statistics_updater\":\"gravitino-statistics-updater\"}"); + jobConf.put("updater_options", updaterOptions); jobConf.put("catalog_name", SPARK_CATALOG_NAME); + // Keep compatibility with old built-in template placeholders in deploy package. + jobConf.put("gravitino_uri", SERVER_URI); + jobConf.put("metalake", METALAKE_NAME); + if ("stats".equalsIgnoreCase(updateMode)) { + jobConf.put("statistics_updater", "gravitino-statistics-updater"); + } else if ("metrics".equalsIgnoreCase(updateMode)) { + jobConf.put("metrics_updater", "gravitino-metrics-updater"); + } else { + jobConf.put("statistics_updater", "gravitino-statistics-updater"); + jobConf.put("metrics_updater", "gravitino-metrics-updater"); + } + jobConf.put("spark_master", "local[2]"); + jobConf.put("spark_executor_instances", "1"); + jobConf.put("spark_executor_cores", "1"); + jobConf.put("spark_executor_memory", "1g"); + jobConf.put("spark_driver_memory", "1g"); + jobConf.put("catalog_type", "rest"); + jobConf.put("catalog_uri", ICEBERG_REST_URI); + jobConf.put("warehouse_location", ""); jobConf.put( "spark_conf", "{\"spark.master\":\"local[2]\","
