This is an automated email from the ASF dual-hosted git repository. fanng pushed a commit to branch stats_job in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 22bd79b49af72b0e7e58563c0b0059e74e1d5bef Author: fanng <[email protected]> AuthorDate: Wed Mar 4 15:05:43 2026 +0800 split update metrics --- .../jobs/iceberg/IcebergUpdateStatsJob.java | 10 +- .../jobs/iceberg/TestIcebergUpdateStatsJob.java | 6 ++ .../TestIcebergUpdateStatsJobWithSpark.java | 103 +++++++++++++++------ 3 files changed, 89 insertions(+), 30 deletions(-) diff --git a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java index 40b1279bab..6cb08ba04a 100644 --- a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java +++ b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java @@ -407,7 +407,15 @@ public class IcebergUpdateStatsJob implements BuiltInJob { mapper.readValue(json, new TypeReference<Map<String, Object>>() {}); Map<String, String> configs = new HashMap<>(); for (Map.Entry<String, Object> entry : parsedMap.entrySet()) { - configs.put(entry.getKey(), entry.getValue() == null ? "" : entry.getValue().toString()); + Object value = entry.getValue(); + if (value instanceof Map || value instanceof List) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "JSON options must be a flat key-value map, but key '%s' has non-scalar value", + entry.getKey())); + } + configs.put(entry.getKey(), value == null ? "" : value.toString()); } return configs; } catch (Exception e) { diff --git a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java index 636b93143c..497aa34cf9 100644 --- a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java +++ b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java @@ -132,6 +132,12 @@ public class TestIcebergUpdateStatsJob { assertEquals("", parsed.get("nil")); assertThrows( IllegalArgumentException.class, () -> IcebergUpdateStatsJob.parseJsonOptions("{not_json}")); + assertThrows( + IllegalArgumentException.class, + () -> IcebergUpdateStatsJob.parseJsonOptions("{\"nested\":{\"a\":1}}")); + assertThrows( + IllegalArgumentException.class, + () -> IcebergUpdateStatsJob.parseJsonOptions("{\"array\":[1,2,3]}")); } @Test diff --git a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java index f7e9b9e097..a374e7874c 100644 --- a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java +++ b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java @@ -24,9 +24,11 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.Statement; import java.time.Duration; import java.time.Instant; import java.util.HashMap; @@ -41,6 +43,7 @@ import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.client.GravitinoAdminClient; import org.apache.gravitino.client.GravitinoClient; import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.config.ConfigConstants; import org.apache.gravitino.exceptions.NoSuchMetalakeException; import org.apache.gravitino.job.JobHandle; import org.apache.gravitino.maintenance.optimizer.api.common.DataScope; @@ -56,6 +59,7 @@ import org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetri import org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository; import org.apache.gravitino.rel.Table; import org.apache.gravitino.stats.Statistic; +import org.apache.gravitino.utils.jdbc.JdbcSqlScriptUtils; import org.apache.spark.sql.SparkSession; import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; @@ -249,6 +253,56 @@ public class TestIcebergUpdateStatsJobWithSpark { assertTrue(metricsUpdater.jobMetrics.isEmpty()); } + @Test + public void testUpdateNonPartitionedTableMetricsOnly() { + RecordingMetricsUpdater metricsUpdater = new RecordingMetricsUpdater(); + + IcebergUpdateStatsJob.updateStatistics( + spark, + null, + metricsUpdater, + IcebergUpdateStatsJob.UpdateMode.METRICS, + catalogName, + "db.non_partitioned", + 100_000L); + + assertEquals(8, metricsUpdater.tableMetrics.size()); + assertTrue( + metricsUpdater.tableMetrics.stream() + .allMatch(metric -> metric.scope() == DataScope.Type.TABLE)); + assertTrue( + metricsUpdater.tableMetrics.stream().allMatch(metric -> metric.partitionPath().isEmpty())); + assertTrue(metricsUpdater.jobMetrics.isEmpty()); + } + + @Test + public void testUpdateNonPartitionedTableStatisticsAndMetrics() { + RecordingStatisticsUpdater statisticsUpdater = new RecordingStatisticsUpdater(); + RecordingMetricsUpdater metricsUpdater = new RecordingMetricsUpdater(); + + IcebergUpdateStatsJob.updateStatistics( + spark, + statisticsUpdater, + metricsUpdater, + IcebergUpdateStatsJob.UpdateMode.ALL, + catalogName, + "db.non_partitioned", + 100_000L); + + assertEquals( + NameIdentifier.of(catalogName, "db", "non_partitioned"), statisticsUpdater.tableIdentifier); + assertNotNull(statisticsUpdater.tableStatistics); + assertEquals(8, statisticsUpdater.tableStatistics.size()); + + assertEquals(8, metricsUpdater.tableMetrics.size()); + assertTrue( + metricsUpdater.tableMetrics.stream() + .allMatch(metric -> metric.scope() == DataScope.Type.TABLE)); + assertTrue( + metricsUpdater.tableMetrics.stream().allMatch(metric -> metric.partitionPath().isEmpty())); + assertTrue(metricsUpdater.jobMetrics.isEmpty()); + } + @Test @Tag("gravitino-docker-test") public void testUpdatePartitionedTableMetricsStoredInMySql() throws Exception { @@ -296,36 +350,27 @@ public class TestIcebergUpdateStatsJobWithSpark { } private static void initializeMySqlMetricsSchema(MySQLContainer<?> mysql) throws Exception { - String createTableMetrics = - "CREATE TABLE IF NOT EXISTS `table_metrics` (" - + "`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT," - + "`table_identifier` VARCHAR(1024) NOT NULL," - + "`metric_name` VARCHAR(1024) NOT NULL," - + "`table_partition` VARCHAR(1024) DEFAULT NULL," - + "`metric_ts` BIGINT(20) NOT NULL," - + "`metric_value` VARCHAR(1024) NOT NULL," - + "PRIMARY KEY (`id`)," - + "KEY `idx_table_metrics_metric_ts` (`metric_ts`)," - + "KEY `idx_table_metrics_composite` (`table_identifier`(255), `table_partition`(255), `metric_ts`)" - + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"; - String createJobMetrics = - "CREATE TABLE IF NOT EXISTS `job_metrics` (" - + "`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT," - + "`job_identifier` VARCHAR(1024) NOT NULL," - + "`metric_name` VARCHAR(1024) NOT NULL," - + "`metric_ts` BIGINT(20) NOT NULL," - + "`metric_value` VARCHAR(1024) NOT NULL," - + "PRIMARY KEY (`id`)," - + "KEY `idx_job_metrics_metric_ts` (`metric_ts`)," - + "KEY `idx_job_metrics_identifier_metric_ts` (`job_identifier`(255), `metric_ts`)" - + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"; + Path schemaPath = + findRepoRoot() + .resolve("scripts") + .resolve("mysql") + .resolve("schema-" + ConfigConstants.CURRENT_SCRIPT_VERSION + "-mysql.sql"); + String schemaSql = Files.readString(schemaPath, StandardCharsets.UTF_8); try (Connection connection = - DriverManager.getConnection( - mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword()); - Statement statement = connection.createStatement()) { - statement.execute(createTableMetrics); - statement.execute(createJobMetrics); + DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword())) { + JdbcSqlScriptUtils.executeSqlScript(connection, schemaSql); + } + } + + private static Path findRepoRoot() { + Path current = Path.of("").toAbsolutePath(); + while (current != null) { + if (Files.exists(current.resolve("gradlew"))) { + return current; + } + current = current.getParent(); } + throw new IllegalStateException("Failed to locate repository root containing gradlew"); } @Test
