This is an automated email from the ASF dual-hosted git repository. fanng pushed a commit to branch stats_job in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 6c16b72faca0210e29a649b743bfe77360271c5a Author: fanng <[email protected]> AuthorDate: Thu Mar 5 15:48:33 2026 +0800 Rename built-in Iceberg update job class to include metrics - Rename IcebergUpdateStatsJob to IcebergUpdateStatsAndMetricsJob - Update built-in job provider registration to the new class - Synchronize jobs module tests and usage text with the renamed class --- .../jobs/BuiltInJobTemplateProvider.java | 6 ++- ...b.java => IcebergUpdateStatsAndMetricsJob.java} | 10 ++-- .../jobs/iceberg/TestIcebergUpdateStatsJob.java | 60 +++++++++++++--------- .../TestIcebergUpdateStatsJobWithSpark.java | 36 ++++++------- 4 files changed, 64 insertions(+), 48 deletions(-) diff --git a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java index 1e93145446..09ca2199f7 100644 --- a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java +++ b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java @@ -26,7 +26,7 @@ import java.util.stream.Collectors; import org.apache.gravitino.job.JobTemplate; import org.apache.gravitino.job.JobTemplateProvider; import org.apache.gravitino.maintenance.jobs.iceberg.IcebergRewriteDataFilesJob; -import org.apache.gravitino.maintenance.jobs.iceberg.IcebergUpdateStatsJob; +import org.apache.gravitino.maintenance.jobs.iceberg.IcebergUpdateStatsAndMetricsJob; import org.apache.gravitino.maintenance.jobs.spark.SparkPiJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +43,9 @@ public class BuiltInJobTemplateProvider implements JobTemplateProvider { private static final List<BuiltInJob> BUILT_IN_JOBS = ImmutableList.of( - new SparkPiJob(), new IcebergRewriteDataFilesJob(), new IcebergUpdateStatsJob()); + new SparkPiJob(), + new IcebergRewriteDataFilesJob(), + new IcebergUpdateStatsAndMetricsJob()); @Override public List<? extends JobTemplate> jobTemplates() { diff --git a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsAndMetricsJob.java similarity index 98% rename from maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java rename to maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsAndMetricsJob.java index 9f03fc68f2..7f47273363 100644 --- a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java +++ b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsAndMetricsJob.java @@ -54,9 +54,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** Built-in job for computing Iceberg table file statistics and persisting them to Gravitino. */ -public class IcebergUpdateStatsJob implements BuiltInJob { +public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { - private static final Logger LOG = LoggerFactory.getLogger(IcebergUpdateStatsJob.class); + private static final Logger LOG = LoggerFactory.getLogger(IcebergUpdateStatsAndMetricsJob.class); private static final String NAME = JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-update-stats"; @@ -73,8 +73,8 @@ public class IcebergUpdateStatsJob implements BuiltInJob { .withName(NAME) .withComment( "Built-in Iceberg update stats job template for computing datafile MSE and file metrics") - .withExecutable(resolveExecutable(IcebergUpdateStatsJob.class)) - .withClassName(IcebergUpdateStatsJob.class.getName()) + .withExecutable(resolveExecutable(IcebergUpdateStatsAndMetricsJob.class)) + .withClassName(IcebergUpdateStatsAndMetricsJob.class.getName()) .withArguments(buildArguments()) .withConfigs(buildSparkConfigs()) .withCustomFields( @@ -595,7 +595,7 @@ public class IcebergUpdateStatsJob implements BuiltInJob { private static void printUsage() { System.err.println( - "Usage: IcebergUpdateStatsJob [OPTIONS]\\n" + "Usage: IcebergUpdateStatsAndMetricsJob [OPTIONS]\\n" + "\\n" + "Required Options:\\n" + " --catalog <name> Iceberg catalog name registered in Spark\\n" diff --git a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java index 497aa34cf9..8b2bbb79e8 100644 --- a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java +++ b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java @@ -33,7 +33,7 @@ public class TestIcebergUpdateStatsJob { @Test public void testJobTemplateHasCorrectNameAndVersion() { - IcebergUpdateStatsJob job = new IcebergUpdateStatsJob(); + IcebergUpdateStatsAndMetricsJob job = new IcebergUpdateStatsAndMetricsJob(); SparkJobTemplate template = job.jobTemplate(); assertNotNull(template); @@ -44,7 +44,7 @@ public class TestIcebergUpdateStatsJob { @Test public void testJobTemplateArguments() { - IcebergUpdateStatsJob job = new IcebergUpdateStatsJob(); + IcebergUpdateStatsAndMetricsJob job = new IcebergUpdateStatsAndMetricsJob(); SparkJobTemplate template = job.jobTemplate(); assertNotNull(template.arguments()); @@ -74,7 +74,7 @@ public class TestIcebergUpdateStatsJob { "--spark-conf", "{\"spark.master\":\"local[2]\"}" }; - Map<String, String> parsed = IcebergUpdateStatsJob.parseArguments(args); + Map<String, String> parsed = IcebergUpdateStatsAndMetricsJob.parseArguments(args); assertEquals("cat", parsed.get("catalog")); assertEquals("db.tbl", parsed.get("table")); assertEquals("metrics", parsed.get("update-mode")); @@ -87,8 +87,9 @@ public class TestIcebergUpdateStatsJob { @Test public void testBuildStatsSql() { - String tableSql = IcebergUpdateStatsJob.buildTableStatsSql("cat", "db.tbl", 100000L); - String partitionSql = IcebergUpdateStatsJob.buildPartitionStatsSql("cat", "db.tbl", 100000L); + String tableSql = IcebergUpdateStatsAndMetricsJob.buildTableStatsSql("cat", "db.tbl", 100000L); + String partitionSql = + IcebergUpdateStatsAndMetricsJob.buildPartitionStatsSql("cat", "db.tbl", 100000L); assertTrue(tableSql.contains("FROM cat.db.tbl.files")); assertTrue(tableSql.contains("AS datafile_mse")); @@ -99,45 +100,57 @@ public class TestIcebergUpdateStatsJob { @Test public void testParseTargetFileSize() { - assertEquals(100000L, IcebergUpdateStatsJob.parseTargetFileSize(null)); - assertEquals(100000L, IcebergUpdateStatsJob.parseTargetFileSize("")); - assertEquals(2048L, IcebergUpdateStatsJob.parseTargetFileSize("2048")); + assertEquals(100000L, IcebergUpdateStatsAndMetricsJob.parseTargetFileSize(null)); + assertEquals(100000L, IcebergUpdateStatsAndMetricsJob.parseTargetFileSize("")); + assertEquals(2048L, IcebergUpdateStatsAndMetricsJob.parseTargetFileSize("2048")); assertThrows( - IllegalArgumentException.class, () -> IcebergUpdateStatsJob.parseTargetFileSize("-1")); + IllegalArgumentException.class, + () -> IcebergUpdateStatsAndMetricsJob.parseTargetFileSize("-1")); assertThrows( - IllegalArgumentException.class, () -> IcebergUpdateStatsJob.parseTargetFileSize("abc")); + IllegalArgumentException.class, + () -> IcebergUpdateStatsAndMetricsJob.parseTargetFileSize("abc")); } @Test public void testParseUpdateMode() { - assertEquals(IcebergUpdateStatsJob.UpdateMode.ALL, IcebergUpdateStatsJob.parseUpdateMode(null)); - assertEquals(IcebergUpdateStatsJob.UpdateMode.ALL, IcebergUpdateStatsJob.parseUpdateMode("")); assertEquals( - IcebergUpdateStatsJob.UpdateMode.STATS, IcebergUpdateStatsJob.parseUpdateMode("stats")); + IcebergUpdateStatsAndMetricsJob.UpdateMode.ALL, + IcebergUpdateStatsAndMetricsJob.parseUpdateMode(null)); + assertEquals( + IcebergUpdateStatsAndMetricsJob.UpdateMode.ALL, + IcebergUpdateStatsAndMetricsJob.parseUpdateMode("")); assertEquals( - IcebergUpdateStatsJob.UpdateMode.METRICS, IcebergUpdateStatsJob.parseUpdateMode("metrics")); + IcebergUpdateStatsAndMetricsJob.UpdateMode.STATS, + IcebergUpdateStatsAndMetricsJob.parseUpdateMode("stats")); assertEquals( - IcebergUpdateStatsJob.UpdateMode.ALL, IcebergUpdateStatsJob.parseUpdateMode("all")); + IcebergUpdateStatsAndMetricsJob.UpdateMode.METRICS, + IcebergUpdateStatsAndMetricsJob.parseUpdateMode("metrics")); + assertEquals( + IcebergUpdateStatsAndMetricsJob.UpdateMode.ALL, + IcebergUpdateStatsAndMetricsJob.parseUpdateMode("all")); assertThrows( - IllegalArgumentException.class, () -> IcebergUpdateStatsJob.parseUpdateMode("invalid")); + IllegalArgumentException.class, + () -> IcebergUpdateStatsAndMetricsJob.parseUpdateMode("invalid")); } @Test public void testParseJsonOptions() { Map<String, String> parsed = - IcebergUpdateStatsJob.parseJsonOptions("{\"a\":\"b\",\"x\":1,\"flag\":true,\"nil\":null}"); + IcebergUpdateStatsAndMetricsJob.parseJsonOptions( + "{\"a\":\"b\",\"x\":1,\"flag\":true,\"nil\":null}"); assertEquals("b", parsed.get("a")); assertEquals("1", parsed.get("x")); assertEquals("true", parsed.get("flag")); assertEquals("", parsed.get("nil")); assertThrows( - IllegalArgumentException.class, () -> IcebergUpdateStatsJob.parseJsonOptions("{not_json}")); + IllegalArgumentException.class, + () -> IcebergUpdateStatsAndMetricsJob.parseJsonOptions("{not_json}")); assertThrows( IllegalArgumentException.class, - () -> IcebergUpdateStatsJob.parseJsonOptions("{\"nested\":{\"a\":1}}")); + () -> IcebergUpdateStatsAndMetricsJob.parseJsonOptions("{\"nested\":{\"a\":1}}")); assertThrows( IllegalArgumentException.class, - () -> IcebergUpdateStatsJob.parseJsonOptions("{\"array\":[1,2,3]}")); + () -> IcebergUpdateStatsAndMetricsJob.parseJsonOptions("{\"array\":[1,2,3]}")); } @Test @@ -148,7 +161,7 @@ public class TestIcebergUpdateStatsJob { "metalake", "ml", "gravitino.optimizer.jdbcMetrics.jdbcUrl", "jdbc:mysql://localhost:3306/metrics"); Map<String, String> optimizerProperties = - IcebergUpdateStatsJob.buildOptimizerProperties(options); + IcebergUpdateStatsAndMetricsJob.buildOptimizerProperties(options); assertEquals("http://localhost:8090", optimizerProperties.get(OptimizerConfig.GRAVITINO_URI)); assertEquals("ml", optimizerProperties.get(OptimizerConfig.GRAVITINO_METALAKE)); @@ -164,10 +177,11 @@ public class TestIcebergUpdateStatsJob { OptimizerConfig.GRAVITINO_URI, "http://localhost:8090", OptimizerConfig.GRAVITINO_METALAKE, "ml"); assertEquals( - optimizerProperties, IcebergUpdateStatsJob.requireGravitinoConfig(optimizerProperties)); + optimizerProperties, + IcebergUpdateStatsAndMetricsJob.requireGravitinoConfig(optimizerProperties)); assertThrows( IllegalArgumentException.class, - () -> IcebergUpdateStatsJob.requireGravitinoConfig(Map.of())); + () -> IcebergUpdateStatsAndMetricsJob.requireGravitinoConfig(Map.of())); } } diff --git a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java index 44ca5aece2..187c4c1b37 100644 --- a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java +++ b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java @@ -72,7 +72,7 @@ import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; import org.junit.jupiter.api.io.TempDir; import org.testcontainers.containers.MySQLContainer; -/** Integration tests for IcebergUpdateStatsJob with a real Spark+Iceberg runtime. */ +/** Integration tests for IcebergUpdateStatsAndMetricsJob with a real Spark+Iceberg runtime. */ public class TestIcebergUpdateStatsJobWithSpark { private static final String SERVER_URI = "http://localhost:8090"; @@ -105,7 +105,7 @@ public class TestIcebergUpdateStatsJobWithSpark { spark = SparkSession.builder() - .appName("TestIcebergUpdateStatsJob") + .appName("TestIcebergUpdateStatsAndMetricsJob") .master("local[2]") .config( "spark.sql.extensions", @@ -264,11 +264,11 @@ public class TestIcebergUpdateStatsJobWithSpark { public void testUpdateNonPartitionedTableStatistics() { RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater(); - IcebergUpdateStatsJob.updateStatistics( + IcebergUpdateStatsAndMetricsJob.updateStatistics( spark, updater, null, - IcebergUpdateStatsJob.UpdateMode.STATS, + IcebergUpdateStatsAndMetricsJob.UpdateMode.STATS, catalogName, "db.non_partitioned", 100_000L); @@ -290,11 +290,11 @@ public class TestIcebergUpdateStatsJobWithSpark { public void testUpdatePartitionedTableStatistics() { RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater(); - IcebergUpdateStatsJob.updateStatistics( + IcebergUpdateStatsAndMetricsJob.updateStatistics( spark, updater, null, - IcebergUpdateStatsJob.UpdateMode.STATS, + IcebergUpdateStatsAndMetricsJob.UpdateMode.STATS, catalogName, "db.partitioned", 100_000L); @@ -321,11 +321,11 @@ public class TestIcebergUpdateStatsJobWithSpark { RecordingStatisticsUpdater statisticsUpdater = new RecordingStatisticsUpdater(); RecordingMetricsUpdater metricsUpdater = new RecordingMetricsUpdater(); - IcebergUpdateStatsJob.updateStatistics( + IcebergUpdateStatsAndMetricsJob.updateStatistics( spark, statisticsUpdater, metricsUpdater, - IcebergUpdateStatsJob.UpdateMode.ALL, + IcebergUpdateStatsAndMetricsJob.UpdateMode.ALL, catalogName, "db.partitioned", 100_000L); @@ -352,11 +352,11 @@ public class TestIcebergUpdateStatsJobWithSpark { public void testUpdatePartitionedTableMetricsOnly() { RecordingMetricsUpdater metricsUpdater = new RecordingMetricsUpdater(); - IcebergUpdateStatsJob.updateStatistics( + IcebergUpdateStatsAndMetricsJob.updateStatistics( spark, null, metricsUpdater, - IcebergUpdateStatsJob.UpdateMode.METRICS, + IcebergUpdateStatsAndMetricsJob.UpdateMode.METRICS, catalogName, "db.partitioned", 100_000L); @@ -372,11 +372,11 @@ public class TestIcebergUpdateStatsJobWithSpark { public void testUpdateNonPartitionedTableMetricsOnly() { RecordingMetricsUpdater metricsUpdater = new RecordingMetricsUpdater(); - IcebergUpdateStatsJob.updateStatistics( + IcebergUpdateStatsAndMetricsJob.updateStatistics( spark, null, metricsUpdater, - IcebergUpdateStatsJob.UpdateMode.METRICS, + IcebergUpdateStatsAndMetricsJob.UpdateMode.METRICS, catalogName, "db.non_partitioned", 100_000L); @@ -395,11 +395,11 @@ public class TestIcebergUpdateStatsJobWithSpark { RecordingStatisticsUpdater statisticsUpdater = new RecordingStatisticsUpdater(); RecordingMetricsUpdater metricsUpdater = new RecordingMetricsUpdater(); - IcebergUpdateStatsJob.updateStatistics( + IcebergUpdateStatsAndMetricsJob.updateStatistics( spark, statisticsUpdater, metricsUpdater, - IcebergUpdateStatsJob.UpdateMode.ALL, + IcebergUpdateStatsAndMetricsJob.UpdateMode.ALL, catalogName, "db.non_partitioned", 100_000L); @@ -434,11 +434,11 @@ public class TestIcebergUpdateStatsJobWithSpark { GravitinoMetricsUpdater metricsUpdater = new GravitinoMetricsUpdater(); metricsUpdater.initialize(new OptimizerEnv(new OptimizerConfig(conf))); try { - IcebergUpdateStatsJob.updateStatistics( + IcebergUpdateStatsAndMetricsJob.updateStatistics( spark, null, metricsUpdater, - IcebergUpdateStatsJob.UpdateMode.METRICS, + IcebergUpdateStatsAndMetricsJob.UpdateMode.METRICS, catalogName, "db.partitioned", 100_000L); @@ -492,11 +492,11 @@ public class TestIcebergUpdateStatsJobWithSpark { public void testUpdateMultiLevelPartitionedTableStatistics() { RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater(); - IcebergUpdateStatsJob.updateStatistics( + IcebergUpdateStatsAndMetricsJob.updateStatistics( spark, updater, null, - IcebergUpdateStatsJob.UpdateMode.STATS, + IcebergUpdateStatsAndMetricsJob.UpdateMode.STATS, catalogName, "db.multi_partitioned", 100_000L);
