This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch stats_job
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 5323fba459605c7e51915adea6d9980a8ce2ee75
Author: fanng <[email protected]>
AuthorDate: Thu Mar 5 11:54:58 2026 +0800

    Address PR #10106 review comments
    
    - Stop rest SparkSession in update-stats Spark test to avoid context leaks
    
    - Use full optimizer config prefix in updater-options validation error 
messages
    
    - Remove System.out.println noise from rewrite-data-files test
---
 .../TestIcebergUpdateStatsJobWithSpark.java        | 157 +++++++++++----------
 .../command/SubmitUpdateStatsJobCommand.java       |   8 +-
 .../job/TestBuiltinIcebergRewriteDataFiles.java    |   1 -
 3 files changed, 88 insertions(+), 78 deletions(-)

diff --git 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
index 291ab11b70..44ca5aece2 100644
--- 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
+++ 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
@@ -174,81 +174,88 @@ public class TestIcebergUpdateStatsJobWithSpark {
     String partitionMetricsFullTableName = SPARK_CATALOG_NAME + ".db." + 
partitionMetricsTableName;
     String allModeFullTableName = SPARK_CATALOG_NAME + ".db." + 
allModeTableName;
 
-    SparkSession restSpark = createRestSparkSession();
-    try (MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0.33");
-        GravitinoAdminClient adminClient = 
GravitinoAdminClient.builder(SERVER_URI).build()) {
-      mysql.start();
-      initializeMySqlMetricsSchema(mysql);
-
-      GravitinoMetalake metalake = loadOrCreateMetalake(adminClient, 
METALAKE_NAME);
-      recreateRestCatalog(metalake);
-      createTableAndInsertData(restSpark, statsFullTableName);
-      createTableAndInsertData(restSpark, metricsFullTableName);
-      createPartitionedTableAndInsertData(restSpark, 
partitionMetricsFullTableName);
-      createTableAndInsertData(restSpark, allModeFullTableName);
-
-      NameIdentifier statsTableIdentifier =
-          NameIdentifier.of(SPARK_CATALOG_NAME, "db", statsTableName);
-      NameIdentifier metricsTableIdentifier =
-          NameIdentifier.of(SPARK_CATALOG_NAME, "db", metricsTableName);
-      NameIdentifier partitionMetricsTableIdentifier =
-          NameIdentifier.of(SPARK_CATALOG_NAME, "db", 
partitionMetricsTableName);
-      NameIdentifier allModeTableIdentifier =
-          NameIdentifier.of(SPARK_CATALOG_NAME, "db", allModeTableName);
-
-      PartitionPath dsPartition1 =
-          PartitionPath.of(List.of(new PartitionEntryImpl("ds", 
"2026-03-01")));
-      PartitionPath dsPartition2 =
-          PartitionPath.of(List.of(new PartitionEntryImpl("ds", 
"2026-03-02")));
-
-      try (GenericJdbcMetricsRepository repository = new 
GenericJdbcMetricsRepository()) {
-        repository.initialize(buildJdbcMetricsConfigs(mysql));
-
-        submitJob(
-            metalake,
-            buildUpdateStatsJobConfig(
-                statsTableName, "stats", 
buildUpdaterOptionsForStatsUpdaterOnly()));
-        awaitCustomStatisticsVisible(statsTableName);
-        assertEquals(0, getTableMetricsCount(repository, 
statsTableIdentifier));
-
-        submitJob(
-            metalake,
-            buildUpdateStatsJobConfig(
-                metricsTableName, "metrics", 
buildUpdaterOptionsForMetricsUpdaterOnly(mysql)));
-        assertFalse(containsCustomStatistics(metricsTableName));
-        awaitTableMetricsExactly(
-            repository, metricsTableIdentifier, 
EXPECTED_METRIC_COUNT_PER_SCOPE);
-        assertTableMetricsMatch(repository, metricsTableIdentifier);
-
-        submitJob(
-            metalake,
-            buildUpdateStatsJobConfig(
-                partitionMetricsTableName,
-                "metrics",
-                buildUpdaterOptionsForMetricsUpdaterOnly(mysql)));
-        assertFalse(containsCustomStatistics(partitionMetricsTableName));
-        assertEquals(0, getTableMetricsCount(repository, 
partitionMetricsTableIdentifier));
-        awaitPartitionMetricsExactly(
-            repository,
-            partitionMetricsTableIdentifier,
-            dsPartition1,
-            EXPECTED_METRIC_COUNT_PER_SCOPE);
-        awaitPartitionMetricsExactly(
-            repository,
-            partitionMetricsTableIdentifier,
-            dsPartition2,
-            EXPECTED_METRIC_COUNT_PER_SCOPE);
-        assertPartitionMetricsMatch(repository, 
partitionMetricsTableIdentifier, dsPartition1);
-        assertPartitionMetricsMatch(repository, 
partitionMetricsTableIdentifier, dsPartition2);
-
-        submitJob(
-            metalake,
-            buildUpdateStatsJobConfig(
-                allModeTableName, "all", 
buildUpdaterOptionsForAllMode(mysql)));
-        awaitCustomStatisticsVisible(allModeTableName);
-        awaitTableMetricsExactly(
-            repository, allModeTableIdentifier, 
EXPECTED_METRIC_COUNT_PER_SCOPE);
-        assertTableMetricsMatch(repository, allModeTableIdentifier);
+    SparkSession restSpark = null;
+    try {
+      restSpark = createRestSparkSession();
+      try (MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0.33");
+          GravitinoAdminClient adminClient = 
GravitinoAdminClient.builder(SERVER_URI).build()) {
+        mysql.start();
+        initializeMySqlMetricsSchema(mysql);
+
+        GravitinoMetalake metalake = loadOrCreateMetalake(adminClient, 
METALAKE_NAME);
+        recreateRestCatalog(metalake);
+        createTableAndInsertData(restSpark, statsFullTableName);
+        createTableAndInsertData(restSpark, metricsFullTableName);
+        createPartitionedTableAndInsertData(restSpark, 
partitionMetricsFullTableName);
+        createTableAndInsertData(restSpark, allModeFullTableName);
+
+        NameIdentifier statsTableIdentifier =
+            NameIdentifier.of(SPARK_CATALOG_NAME, "db", statsTableName);
+        NameIdentifier metricsTableIdentifier =
+            NameIdentifier.of(SPARK_CATALOG_NAME, "db", metricsTableName);
+        NameIdentifier partitionMetricsTableIdentifier =
+            NameIdentifier.of(SPARK_CATALOG_NAME, "db", 
partitionMetricsTableName);
+        NameIdentifier allModeTableIdentifier =
+            NameIdentifier.of(SPARK_CATALOG_NAME, "db", allModeTableName);
+
+        PartitionPath dsPartition1 =
+            PartitionPath.of(List.of(new PartitionEntryImpl("ds", 
"2026-03-01")));
+        PartitionPath dsPartition2 =
+            PartitionPath.of(List.of(new PartitionEntryImpl("ds", 
"2026-03-02")));
+
+        try (GenericJdbcMetricsRepository repository = new 
GenericJdbcMetricsRepository()) {
+          repository.initialize(buildJdbcMetricsConfigs(mysql));
+
+          submitJob(
+              metalake,
+              buildUpdateStatsJobConfig(
+                  statsTableName, "stats", 
buildUpdaterOptionsForStatsUpdaterOnly()));
+          awaitCustomStatisticsVisible(statsTableName);
+          assertEquals(0, getTableMetricsCount(repository, 
statsTableIdentifier));
+
+          submitJob(
+              metalake,
+              buildUpdateStatsJobConfig(
+                  metricsTableName, "metrics", 
buildUpdaterOptionsForMetricsUpdaterOnly(mysql)));
+          assertFalse(containsCustomStatistics(metricsTableName));
+          awaitTableMetricsExactly(
+              repository, metricsTableIdentifier, 
EXPECTED_METRIC_COUNT_PER_SCOPE);
+          assertTableMetricsMatch(repository, metricsTableIdentifier);
+
+          submitJob(
+              metalake,
+              buildUpdateStatsJobConfig(
+                  partitionMetricsTableName,
+                  "metrics",
+                  buildUpdaterOptionsForMetricsUpdaterOnly(mysql)));
+          assertFalse(containsCustomStatistics(partitionMetricsTableName));
+          assertEquals(0, getTableMetricsCount(repository, 
partitionMetricsTableIdentifier));
+          awaitPartitionMetricsExactly(
+              repository,
+              partitionMetricsTableIdentifier,
+              dsPartition1,
+              EXPECTED_METRIC_COUNT_PER_SCOPE);
+          awaitPartitionMetricsExactly(
+              repository,
+              partitionMetricsTableIdentifier,
+              dsPartition2,
+              EXPECTED_METRIC_COUNT_PER_SCOPE);
+          assertPartitionMetricsMatch(repository, 
partitionMetricsTableIdentifier, dsPartition1);
+          assertPartitionMetricsMatch(repository, 
partitionMetricsTableIdentifier, dsPartition2);
+
+          submitJob(
+              metalake,
+              buildUpdateStatsJobConfig(
+                  allModeTableName, "all", 
buildUpdaterOptionsForAllMode(mysql)));
+          awaitCustomStatisticsVisible(allModeTableName);
+          awaitTableMetricsExactly(
+              repository, allModeTableIdentifier, 
EXPECTED_METRIC_COUNT_PER_SCOPE);
+          assertTableMetricsMatch(repository, allModeTableIdentifier);
+        }
+      }
+    } finally {
+      if (restSpark != null) {
+        restSpark.stop();
       }
     }
   }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java
index 00e62d615a..0f7d6e5cca 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java
@@ -216,11 +216,15 @@ public class SubmitUpdateStatsJobCommand implements 
OptimizerCommandExecutor {
     String metalake = StringUtils.trimToNull(updaterOptions.get("metalake"));
     Preconditions.checkArgument(
         StringUtils.isNotBlank(gravitinoUri),
-        "Option --updater-options (or config key 
jobSubmitterConfig.updater_options) "
+        "Option --updater-options (or config key "
+            + OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX
+            + "updater_options) "
             + "must contain 'gravitino_uri' when update_mode is stats or all");
     Preconditions.checkArgument(
         StringUtils.isNotBlank(metalake),
-        "Option --updater-options (or config key 
jobSubmitterConfig.updater_options) "
+        "Option --updater-options (or config key "
+            + OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX
+            + "updater_options) "
             + "must contain 'metalake' when update_mode is stats or all");
   }
 
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
index 444a11d55f..1e49f27214 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java
@@ -241,7 +241,6 @@ public class TestBuiltinIcebergRewriteDataFiles {
   private static void submitJob(
       GravitinoMetalake metalake, String jobTemplateName, Map<String, String> 
jobConf) {
     JobHandle jobHandle = metalake.runJob(jobTemplateName, jobConf);
-    System.out.println("Submitted job id: " + jobHandle.jobId());
     Assertions.assertTrue(StringUtils.isNotBlank(jobHandle.jobId()), "Job id 
should not be blank");
 
     Awaitility.await()

Reply via email to