This is an automated email from the ASF dual-hosted git repository. fanng pushed a commit to branch stats_job in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 5323fba459605c7e51915adea6d9980a8ce2ee75 Author: fanng <[email protected]> AuthorDate: Thu Mar 5 11:54:58 2026 +0800 Address PR #10106 review comments - Stop rest SparkSession in update-stats Spark test to avoid context leaks - Use full optimizer config prefix in updater-options validation error messages - Remove System.out.println noise from rewrite-data-files test --- .../TestIcebergUpdateStatsJobWithSpark.java | 157 +++++++++++---------- .../command/SubmitUpdateStatsJobCommand.java | 8 +- .../job/TestBuiltinIcebergRewriteDataFiles.java | 1 - 3 files changed, 88 insertions(+), 78 deletions(-) diff --git a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java index 291ab11b70..44ca5aece2 100644 --- a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java +++ b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java @@ -174,81 +174,88 @@ public class TestIcebergUpdateStatsJobWithSpark { String partitionMetricsFullTableName = SPARK_CATALOG_NAME + ".db." + partitionMetricsTableName; String allModeFullTableName = SPARK_CATALOG_NAME + ".db." + allModeTableName; - SparkSession restSpark = createRestSparkSession(); - try (MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0.33"); - GravitinoAdminClient adminClient = GravitinoAdminClient.builder(SERVER_URI).build()) { - mysql.start(); - initializeMySqlMetricsSchema(mysql); - - GravitinoMetalake metalake = loadOrCreateMetalake(adminClient, METALAKE_NAME); - recreateRestCatalog(metalake); - createTableAndInsertData(restSpark, statsFullTableName); - createTableAndInsertData(restSpark, metricsFullTableName); - createPartitionedTableAndInsertData(restSpark, partitionMetricsFullTableName); - createTableAndInsertData(restSpark, allModeFullTableName); - - NameIdentifier statsTableIdentifier = - NameIdentifier.of(SPARK_CATALOG_NAME, "db", statsTableName); - NameIdentifier metricsTableIdentifier = - NameIdentifier.of(SPARK_CATALOG_NAME, "db", metricsTableName); - NameIdentifier partitionMetricsTableIdentifier = - NameIdentifier.of(SPARK_CATALOG_NAME, "db", partitionMetricsTableName); - NameIdentifier allModeTableIdentifier = - NameIdentifier.of(SPARK_CATALOG_NAME, "db", allModeTableName); - - PartitionPath dsPartition1 = - PartitionPath.of(List.of(new PartitionEntryImpl("ds", "2026-03-01"))); - PartitionPath dsPartition2 = - PartitionPath.of(List.of(new PartitionEntryImpl("ds", "2026-03-02"))); - - try (GenericJdbcMetricsRepository repository = new GenericJdbcMetricsRepository()) { - repository.initialize(buildJdbcMetricsConfigs(mysql)); - - submitJob( - metalake, - buildUpdateStatsJobConfig( - statsTableName, "stats", buildUpdaterOptionsForStatsUpdaterOnly())); - awaitCustomStatisticsVisible(statsTableName); - assertEquals(0, getTableMetricsCount(repository, statsTableIdentifier)); - - submitJob( - metalake, - buildUpdateStatsJobConfig( - metricsTableName, "metrics", buildUpdaterOptionsForMetricsUpdaterOnly(mysql))); - assertFalse(containsCustomStatistics(metricsTableName)); - awaitTableMetricsExactly( - repository, metricsTableIdentifier, EXPECTED_METRIC_COUNT_PER_SCOPE); - assertTableMetricsMatch(repository, metricsTableIdentifier); - - submitJob( - metalake, - buildUpdateStatsJobConfig( - partitionMetricsTableName, - "metrics", - buildUpdaterOptionsForMetricsUpdaterOnly(mysql))); - assertFalse(containsCustomStatistics(partitionMetricsTableName)); - assertEquals(0, getTableMetricsCount(repository, partitionMetricsTableIdentifier)); - awaitPartitionMetricsExactly( - repository, - partitionMetricsTableIdentifier, - dsPartition1, - EXPECTED_METRIC_COUNT_PER_SCOPE); - awaitPartitionMetricsExactly( - repository, - partitionMetricsTableIdentifier, - dsPartition2, - EXPECTED_METRIC_COUNT_PER_SCOPE); - assertPartitionMetricsMatch(repository, partitionMetricsTableIdentifier, dsPartition1); - assertPartitionMetricsMatch(repository, partitionMetricsTableIdentifier, dsPartition2); - - submitJob( - metalake, - buildUpdateStatsJobConfig( - allModeTableName, "all", buildUpdaterOptionsForAllMode(mysql))); - awaitCustomStatisticsVisible(allModeTableName); - awaitTableMetricsExactly( - repository, allModeTableIdentifier, EXPECTED_METRIC_COUNT_PER_SCOPE); - assertTableMetricsMatch(repository, allModeTableIdentifier); + SparkSession restSpark = null; + try { + restSpark = createRestSparkSession(); + try (MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0.33"); + GravitinoAdminClient adminClient = GravitinoAdminClient.builder(SERVER_URI).build()) { + mysql.start(); + initializeMySqlMetricsSchema(mysql); + + GravitinoMetalake metalake = loadOrCreateMetalake(adminClient, METALAKE_NAME); + recreateRestCatalog(metalake); + createTableAndInsertData(restSpark, statsFullTableName); + createTableAndInsertData(restSpark, metricsFullTableName); + createPartitionedTableAndInsertData(restSpark, partitionMetricsFullTableName); + createTableAndInsertData(restSpark, allModeFullTableName); + + NameIdentifier statsTableIdentifier = + NameIdentifier.of(SPARK_CATALOG_NAME, "db", statsTableName); + NameIdentifier metricsTableIdentifier = + NameIdentifier.of(SPARK_CATALOG_NAME, "db", metricsTableName); + NameIdentifier partitionMetricsTableIdentifier = + NameIdentifier.of(SPARK_CATALOG_NAME, "db", partitionMetricsTableName); + NameIdentifier allModeTableIdentifier = + NameIdentifier.of(SPARK_CATALOG_NAME, "db", allModeTableName); + + PartitionPath dsPartition1 = + PartitionPath.of(List.of(new PartitionEntryImpl("ds", "2026-03-01"))); + PartitionPath dsPartition2 = + PartitionPath.of(List.of(new PartitionEntryImpl("ds", "2026-03-02"))); + + try (GenericJdbcMetricsRepository repository = new GenericJdbcMetricsRepository()) { + repository.initialize(buildJdbcMetricsConfigs(mysql)); + + submitJob( + metalake, + buildUpdateStatsJobConfig( + statsTableName, "stats", buildUpdaterOptionsForStatsUpdaterOnly())); + awaitCustomStatisticsVisible(statsTableName); + assertEquals(0, getTableMetricsCount(repository, statsTableIdentifier)); + + submitJob( + metalake, + buildUpdateStatsJobConfig( + metricsTableName, "metrics", buildUpdaterOptionsForMetricsUpdaterOnly(mysql))); + assertFalse(containsCustomStatistics(metricsTableName)); + awaitTableMetricsExactly( + repository, metricsTableIdentifier, EXPECTED_METRIC_COUNT_PER_SCOPE); + assertTableMetricsMatch(repository, metricsTableIdentifier); + + submitJob( + metalake, + buildUpdateStatsJobConfig( + partitionMetricsTableName, + "metrics", + buildUpdaterOptionsForMetricsUpdaterOnly(mysql))); + assertFalse(containsCustomStatistics(partitionMetricsTableName)); + assertEquals(0, getTableMetricsCount(repository, partitionMetricsTableIdentifier)); + awaitPartitionMetricsExactly( + repository, + partitionMetricsTableIdentifier, + dsPartition1, + EXPECTED_METRIC_COUNT_PER_SCOPE); + awaitPartitionMetricsExactly( + repository, + partitionMetricsTableIdentifier, + dsPartition2, + EXPECTED_METRIC_COUNT_PER_SCOPE); + assertPartitionMetricsMatch(repository, partitionMetricsTableIdentifier, dsPartition1); + assertPartitionMetricsMatch(repository, partitionMetricsTableIdentifier, dsPartition2); + + submitJob( + metalake, + buildUpdateStatsJobConfig( + allModeTableName, "all", buildUpdaterOptionsForAllMode(mysql))); + awaitCustomStatisticsVisible(allModeTableName); + awaitTableMetricsExactly( + repository, allModeTableIdentifier, EXPECTED_METRIC_COUNT_PER_SCOPE); + assertTableMetricsMatch(repository, allModeTableIdentifier); + } + } + } finally { + if (restSpark != null) { + restSpark.stop(); } } } diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java index 00e62d615a..0f7d6e5cca 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java +++ b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java @@ -216,11 +216,15 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { String metalake = StringUtils.trimToNull(updaterOptions.get("metalake")); Preconditions.checkArgument( StringUtils.isNotBlank(gravitinoUri), - "Option --updater-options (or config key jobSubmitterConfig.updater_options) " + "Option --updater-options (or config key " + + OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + + "updater_options) " + "must contain 'gravitino_uri' when update_mode is stats or all"); Preconditions.checkArgument( StringUtils.isNotBlank(metalake), - "Option --updater-options (or config key jobSubmitterConfig.updater_options) " + "Option --updater-options (or config key " + + OptimizerConfig.JOB_SUBMITTER_CONFIG_PREFIX + + "updater_options) " + "must contain 'metalake' when update_mode is stats or all"); } diff --git a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java index 444a11d55f..1e49f27214 100644 --- a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java +++ b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java @@ -241,7 +241,6 @@ public class TestBuiltinIcebergRewriteDataFiles { private static void submitJob( GravitinoMetalake metalake, String jobTemplateName, Map<String, String> jobConf) { JobHandle jobHandle = metalake.runJob(jobTemplateName, jobConf); - System.out.println("Submitted job id: " + jobHandle.jobId()); Assertions.assertTrue(StringUtils.isNotBlank(jobHandle.jobId()), "Job id should not be blank"); Awaitility.await()
