This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch stats_job
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 22bd79b49af72b0e7e58563c0b0059e74e1d5bef
Author: fanng <[email protected]>
AuthorDate: Wed Mar 4 15:05:43 2026 +0800

    split update metrics
---
 .../jobs/iceberg/IcebergUpdateStatsJob.java        |  10 +-
 .../jobs/iceberg/TestIcebergUpdateStatsJob.java    |   6 ++
 .../TestIcebergUpdateStatsJobWithSpark.java        | 103 +++++++++++++++------
 3 files changed, 89 insertions(+), 30 deletions(-)

diff --git 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java
 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java
index 40b1279bab..6cb08ba04a 100644
--- 
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java
+++ 
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java
@@ -407,7 +407,15 @@ public class IcebergUpdateStatsJob implements BuiltInJob {
           mapper.readValue(json, new TypeReference<Map<String, Object>>() {});
       Map<String, String> configs = new HashMap<>();
       for (Map.Entry<String, Object> entry : parsedMap.entrySet()) {
-        configs.put(entry.getKey(), entry.getValue() == null ? "" : 
entry.getValue().toString());
+        Object value = entry.getValue();
+        if (value instanceof Map || value instanceof List) {
+          throw new IllegalArgumentException(
+              String.format(
+                  Locale.ROOT,
+                  "JSON options must be a flat key-value map, but key '%s' has 
non-scalar value",
+                  entry.getKey()));
+        }
+        configs.put(entry.getKey(), value == null ? "" : value.toString());
       }
       return configs;
     } catch (Exception e) {
diff --git 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java
 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java
index 636b93143c..497aa34cf9 100644
--- 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java
+++ 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java
@@ -132,6 +132,12 @@ public class TestIcebergUpdateStatsJob {
     assertEquals("", parsed.get("nil"));
     assertThrows(
         IllegalArgumentException.class, () -> 
IcebergUpdateStatsJob.parseJsonOptions("{not_json}"));
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> 
IcebergUpdateStatsJob.parseJsonOptions("{\"nested\":{\"a\":1}}"));
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> IcebergUpdateStatsJob.parseJsonOptions("{\"array\":[1,2,3]}"));
   }
 
   @Test
diff --git 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
index f7e9b9e097..a374e7874c 100644
--- 
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
+++ 
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java
@@ -24,9 +24,11 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.sql.Statement;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.HashMap;
@@ -41,6 +43,7 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.client.GravitinoAdminClient;
 import org.apache.gravitino.client.GravitinoClient;
 import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.config.ConfigConstants;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
 import org.apache.gravitino.job.JobHandle;
 import org.apache.gravitino.maintenance.optimizer.api.common.DataScope;
@@ -56,6 +59,7 @@ import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetri
 import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.stats.Statistic;
+import org.apache.gravitino.utils.jdbc.JdbcSqlScriptUtils;
 import org.apache.spark.sql.SparkSession;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
@@ -249,6 +253,56 @@ public class TestIcebergUpdateStatsJobWithSpark {
     assertTrue(metricsUpdater.jobMetrics.isEmpty());
   }
 
+  @Test
+  public void testUpdateNonPartitionedTableMetricsOnly() {
+    RecordingMetricsUpdater metricsUpdater = new RecordingMetricsUpdater();
+
+    IcebergUpdateStatsJob.updateStatistics(
+        spark,
+        null,
+        metricsUpdater,
+        IcebergUpdateStatsJob.UpdateMode.METRICS,
+        catalogName,
+        "db.non_partitioned",
+        100_000L);
+
+    assertEquals(8, metricsUpdater.tableMetrics.size());
+    assertTrue(
+        metricsUpdater.tableMetrics.stream()
+            .allMatch(metric -> metric.scope() == DataScope.Type.TABLE));
+    assertTrue(
+        metricsUpdater.tableMetrics.stream().allMatch(metric -> 
metric.partitionPath().isEmpty()));
+    assertTrue(metricsUpdater.jobMetrics.isEmpty());
+  }
+
+  @Test
+  public void testUpdateNonPartitionedTableStatisticsAndMetrics() {
+    RecordingStatisticsUpdater statisticsUpdater = new 
RecordingStatisticsUpdater();
+    RecordingMetricsUpdater metricsUpdater = new RecordingMetricsUpdater();
+
+    IcebergUpdateStatsJob.updateStatistics(
+        spark,
+        statisticsUpdater,
+        metricsUpdater,
+        IcebergUpdateStatsJob.UpdateMode.ALL,
+        catalogName,
+        "db.non_partitioned",
+        100_000L);
+
+    assertEquals(
+        NameIdentifier.of(catalogName, "db", "non_partitioned"), 
statisticsUpdater.tableIdentifier);
+    assertNotNull(statisticsUpdater.tableStatistics);
+    assertEquals(8, statisticsUpdater.tableStatistics.size());
+
+    assertEquals(8, metricsUpdater.tableMetrics.size());
+    assertTrue(
+        metricsUpdater.tableMetrics.stream()
+            .allMatch(metric -> metric.scope() == DataScope.Type.TABLE));
+    assertTrue(
+        metricsUpdater.tableMetrics.stream().allMatch(metric -> 
metric.partitionPath().isEmpty()));
+    assertTrue(metricsUpdater.jobMetrics.isEmpty());
+  }
+
   @Test
   @Tag("gravitino-docker-test")
   public void testUpdatePartitionedTableMetricsStoredInMySql() throws 
Exception {
@@ -296,36 +350,27 @@ public class TestIcebergUpdateStatsJobWithSpark {
   }
 
   private static void initializeMySqlMetricsSchema(MySQLContainer<?> mysql) 
throws Exception {
-    String createTableMetrics =
-        "CREATE TABLE IF NOT EXISTS `table_metrics` ("
-            + "`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,"
-            + "`table_identifier` VARCHAR(1024) NOT NULL,"
-            + "`metric_name` VARCHAR(1024) NOT NULL,"
-            + "`table_partition` VARCHAR(1024) DEFAULT NULL,"
-            + "`metric_ts` BIGINT(20) NOT NULL,"
-            + "`metric_value` VARCHAR(1024) NOT NULL,"
-            + "PRIMARY KEY (`id`),"
-            + "KEY `idx_table_metrics_metric_ts` (`metric_ts`),"
-            + "KEY `idx_table_metrics_composite` (`table_identifier`(255), 
`table_partition`(255), `metric_ts`)"
-            + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin";
-    String createJobMetrics =
-        "CREATE TABLE IF NOT EXISTS `job_metrics` ("
-            + "`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT,"
-            + "`job_identifier` VARCHAR(1024) NOT NULL,"
-            + "`metric_name` VARCHAR(1024) NOT NULL,"
-            + "`metric_ts` BIGINT(20) NOT NULL,"
-            + "`metric_value` VARCHAR(1024) NOT NULL,"
-            + "PRIMARY KEY (`id`),"
-            + "KEY `idx_job_metrics_metric_ts` (`metric_ts`),"
-            + "KEY `idx_job_metrics_identifier_metric_ts` 
(`job_identifier`(255), `metric_ts`)"
-            + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin";
+    Path schemaPath =
+        findRepoRoot()
+            .resolve("scripts")
+            .resolve("mysql")
+            .resolve("schema-" + ConfigConstants.CURRENT_SCRIPT_VERSION + 
"-mysql.sql");
+    String schemaSql = Files.readString(schemaPath, StandardCharsets.UTF_8);
     try (Connection connection =
-            DriverManager.getConnection(
-                mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword());
-        Statement statement = connection.createStatement()) {
-      statement.execute(createTableMetrics);
-      statement.execute(createJobMetrics);
+        DriverManager.getConnection(mysql.getJdbcUrl(), mysql.getUsername(), 
mysql.getPassword())) {
+      JdbcSqlScriptUtils.executeSqlScript(connection, schemaSql);
+    }
+  }
+
+  private static Path findRepoRoot() {
+    Path current = Path.of("").toAbsolutePath();
+    while (current != null) {
+      if (Files.exists(current.resolve("gradlew"))) {
+        return current;
+      }
+      current = current.getParent();
     }
+    throw new IllegalStateException("Failed to locate repository root 
containing gradlew");
   }
 
   @Test

Reply via email to