This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch stats_job
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 70e758df21e4fa87e5d15e19664b27c4276f1a49
Author: fanng <[email protected]>
AuthorDate: Wed Mar 4 17:56:31 2026 +0800

    test: cover update stats job server modes and stabilize spark IT
---
 .../TestIcebergUpdateStatsJobWithSpark.java        | 200 +++++++++++++++++----
 1 file changed, 169 insertions(+), 31 deletions(-)

diff --git 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
index a374e7874c..736e35db03 100644
--- 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
+++ 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
@@ -416,36 +416,160 @@ public class TestIcebergUpdateStatsJobWithSpark {
 
   // Requires a running deploy-mode Gravitino server and Spark environment.
   @Test
+  @Tag("gravitino-docker-test")
   @EnabledIfEnvironmentVariable(named = "GRAVITINO_ENV_IT", matches = "true")
-  public void testRunBuiltInUpdateStatsJobViaServer() throws Exception {
-    String tableName = "jobs_it_update_stats_" + 
UUID.randomUUID().toString().replace("-", "");
-    String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName;
-
-    try (SparkSession restSpark = createRestSparkSession();
+  public void testRunBuiltInUpdateStatsJobViaServerForAllModes() throws 
Exception {
+    String suffix = UUID.randomUUID().toString().replace("-", "");
+    String statsTableName = "jobs_it_update_stats_mode_stats_" + suffix;
+    String metricsTableName = "jobs_it_update_stats_mode_metrics_" + suffix;
+    String allModeTableName = "jobs_it_update_stats_mode_all_" + suffix;
+    String statsFullTableName = SPARK_CATALOG_NAME + ".db." + statsTableName;
+    String metricsFullTableName = SPARK_CATALOG_NAME + ".db." + 
metricsTableName;
+    String allModeFullTableName = SPARK_CATALOG_NAME + ".db." + 
allModeTableName;
+
+    SparkSession restSpark = createRestSparkSession();
+    try (MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0.33");
         GravitinoAdminClient adminClient = 
GravitinoAdminClient.builder(SERVER_URI).build()) {
+      mysql.start();
+      initializeMySqlMetricsSchema(mysql);
+
       GravitinoMetalake metalake = loadOrCreateMetalake(adminClient, 
METALAKE_NAME);
       recreateRestCatalog(metalake);
-      createTableAndInsertData(restSpark, fullTableName);
-
-      submitJob(metalake, buildUpdateStatsJobConfig(tableName));
-
-      try (GravitinoClient client =
-          
GravitinoClient.builder(SERVER_URI).withMetalake(METALAKE_NAME).build()) {
-        Table table =
-            client
-                .loadCatalog(SPARK_CATALOG_NAME)
-                .asTableCatalog()
-                .loadTable(NameIdentifier.of("db", tableName));
-        List<Statistic> statistics = 
table.supportsStatistics().listStatistics();
-        Map<String, Statistic> statisticMap =
-            statistics.stream().collect(Collectors.toMap(Statistic::name, s -> 
s));
-        assertTrue(statisticMap.containsKey("custom-file_count"));
-        assertTrue(statisticMap.containsKey("custom-datafile_mse"));
-        assertTrue(statisticMap.containsKey("custom-total_size"));
+      createTableAndInsertData(restSpark, statsFullTableName);
+      createTableAndInsertData(restSpark, metricsFullTableName);
+      createTableAndInsertData(restSpark, allModeFullTableName);
+
+      NameIdentifier statsTableIdentifier =
+          NameIdentifier.of(SPARK_CATALOG_NAME, "db", statsTableName);
+      NameIdentifier metricsTableIdentifier =
+          NameIdentifier.of(SPARK_CATALOG_NAME, "db", metricsTableName);
+      NameIdentifier allModeTableIdentifier =
+          NameIdentifier.of(SPARK_CATALOG_NAME, "db", allModeTableName);
+
+      try (GenericJdbcMetricsRepository repository = new 
GenericJdbcMetricsRepository()) {
+        repository.initialize(buildJdbcMetricsConfigs(mysql));
+
+        submitJob(
+            metalake,
+            buildUpdateStatsJobConfig(
+                statsTableName, "stats", 
buildUpdaterOptionsForStatsUpdaterOnly()));
+        awaitCustomStatisticsVisible(statsTableName);
+        assertEquals(0, getTableMetricsCount(repository, 
statsTableIdentifier));
+
+        submitJob(
+            metalake,
+            buildUpdateStatsJobConfig(
+                metricsTableName, "metrics", 
buildUpdaterOptionsForMetricsUpdaterOnly(mysql)));
+        assertFalse(containsCustomStatistics(metricsTableName));
+        awaitTableMetricsAtLeast(repository, metricsTableIdentifier, 1);
+
+        submitJob(
+            metalake,
+            buildUpdateStatsJobConfig(
+                allModeTableName, "all", 
buildUpdaterOptionsForAllMode(mysql)));
+        awaitCustomStatisticsVisible(allModeTableName);
+        awaitTableMetricsAtLeast(repository, allModeTableIdentifier, 1);
       }
     }
   }
 
+  private void awaitCustomStatisticsVisible(String tableName) throws Exception 
{
+    Awaitility.await()
+        .atMost(Duration.ofSeconds(30))
+        .pollInterval(Duration.ofSeconds(2))
+        .until(() -> containsCustomStatistics(tableName));
+  }
+
+  private boolean containsCustomStatistics(String tableName) throws Exception {
+    try (GravitinoClient client =
+        
GravitinoClient.builder(SERVER_URI).withMetalake(METALAKE_NAME).build()) {
+      Table table =
+          client
+              .loadCatalog(SPARK_CATALOG_NAME)
+              .asTableCatalog()
+              .loadTable(NameIdentifier.of("db", tableName));
+      Set<String> statisticNames =
+          table.supportsStatistics().listStatistics().stream()
+              .map(Statistic::name)
+              .collect(Collectors.toSet());
+      return statisticNames.contains("custom-file_count")
+          && statisticNames.contains("custom-datafile_mse")
+          && statisticNames.contains("custom-total_size");
+    }
+  }
+
+  private static void awaitTableMetricsAtLeast(
+      GenericJdbcMetricsRepository repository,
+      NameIdentifier tableIdentifier,
+      int expectedLowerBound) {
+    Awaitility.await()
+        .atMost(Duration.ofSeconds(30))
+        .pollInterval(Duration.ofSeconds(2))
+        .until(() -> getTableMetricsCount(repository, tableIdentifier) >= 
expectedLowerBound);
+  }
+
+  private static int getTableMetricsCount(
+      GenericJdbcMetricsRepository repository, NameIdentifier tableIdentifier) 
{
+    long now = Instant.now().getEpochSecond();
+    return repository
+        .getMetrics(DataScope.forTable(tableIdentifier), now - 1800, now + 
1800)
+        .size();
+  }
+
+  private static Map<String, String> buildJdbcMetricsConfigs(MySQLContainer<?> 
mysql) {
+    Map<String, String> conf = new HashMap<>();
+    conf.put("gravitino.optimizer.jdbcMetrics.jdbcUrl", mysql.getJdbcUrl());
+    conf.put("gravitino.optimizer.jdbcMetrics.jdbcUser", mysql.getUsername());
+    conf.put("gravitino.optimizer.jdbcMetrics.jdbcPassword", 
mysql.getPassword());
+    conf.put("gravitino.optimizer.jdbcMetrics.jdbcDriver", 
mysql.getDriverClassName());
+    return conf;
+  }
+
+  private static String buildUpdaterOptionsForStatsUpdaterOnly() {
+    return "{\"gravitino_uri\":\""
+        + SERVER_URI
+        + "\",\"metalake\":\""
+        + METALAKE_NAME
+        + "\",\"statistics_updater\":\"gravitino-statistics-updater\"}";
+  }
+
+  private static String 
buildUpdaterOptionsForMetricsUpdaterOnly(MySQLContainer<?> mysql) {
+    return "{\"metrics_updater\":\"gravitino-metrics-updater\","
+        + "\"gravitino.optimizer.jdbcMetrics.jdbcUrl\":\""
+        + mysql.getJdbcUrl()
+        + "\","
+        + "\"gravitino.optimizer.jdbcMetrics.jdbcUser\":\""
+        + mysql.getUsername()
+        + "\","
+        + "\"gravitino.optimizer.jdbcMetrics.jdbcPassword\":\""
+        + mysql.getPassword()
+        + "\","
+        + "\"gravitino.optimizer.jdbcMetrics.jdbcDriver\":\""
+        + mysql.getDriverClassName()
+        + "\"}";
+  }
+
+  private static String buildUpdaterOptionsForAllMode(MySQLContainer<?> mysql) 
{
+    return "{\"gravitino_uri\":\""
+        + SERVER_URI
+        + "\",\"metalake\":\""
+        + METALAKE_NAME
+        + "\",\"statistics_updater\":\"gravitino-statistics-updater\","
+        + "\"metrics_updater\":\"gravitino-metrics-updater\","
+        + "\"gravitino.optimizer.jdbcMetrics.jdbcUrl\":\""
+        + mysql.getJdbcUrl()
+        + "\","
+        + "\"gravitino.optimizer.jdbcMetrics.jdbcUser\":\""
+        + mysql.getUsername()
+        + "\","
+        + "\"gravitino.optimizer.jdbcMetrics.jdbcPassword\":\""
+        + mysql.getPassword()
+        + "\","
+        + "\"gravitino.optimizer.jdbcMetrics.jdbcDriver\":\""
+        + mysql.getDriverClassName()
+        + "\"}";
+  }
+
   private static SparkSession createRestSparkSession() {
     return SparkSession.builder()
         .master("local[2]")
@@ -474,19 +598,33 @@ public class TestIcebergUpdateStatsJobWithSpark {
     }
   }
 
-  private static Map<String, String> buildUpdateStatsJobConfig(String 
tableName) {
+  private static Map<String, String> buildUpdateStatsJobConfig(
+      String tableName, String updateMode, String updaterOptions) {
     Map<String, String> jobConf = new HashMap<>();
     jobConf.put("table_identifier", "db." + tableName);
-    jobConf.put("update_mode", "all");
+    jobConf.put("update_mode", updateMode);
     jobConf.put("target_file_size_bytes", "100000");
-    jobConf.put(
-        "updater_options",
-        "{\"gravitino_uri\":\""
-            + SERVER_URI
-            + "\",\"metalake\":\""
-            + METALAKE_NAME
-            + "\",\"statistics_updater\":\"gravitino-statistics-updater\"}");
+    jobConf.put("updater_options", updaterOptions);
     jobConf.put("catalog_name", SPARK_CATALOG_NAME);
+    // Keep compatibility with old built-in template placeholders in deploy 
package.
+    jobConf.put("gravitino_uri", SERVER_URI);
+    jobConf.put("metalake", METALAKE_NAME);
+    if ("stats".equalsIgnoreCase(updateMode)) {
+      jobConf.put("statistics_updater", "gravitino-statistics-updater");
+    } else if ("metrics".equalsIgnoreCase(updateMode)) {
+      jobConf.put("metrics_updater", "gravitino-metrics-updater");
+    } else {
+      jobConf.put("statistics_updater", "gravitino-statistics-updater");
+      jobConf.put("metrics_updater", "gravitino-metrics-updater");
+    }
+    jobConf.put("spark_master", "local[2]");
+    jobConf.put("spark_executor_instances", "1");
+    jobConf.put("spark_executor_cores", "1");
+    jobConf.put("spark_executor_memory", "1g");
+    jobConf.put("spark_driver_memory", "1g");
+    jobConf.put("catalog_type", "rest");
+    jobConf.put("catalog_uri", ICEBERG_REST_URI);
+    jobConf.put("warehouse_location", "");
     jobConf.put(
         "spark_conf",
         "{\"spark.master\":\"local[2]\","

Reply via email to