This is an automated email from the ASF dual-hosted git repository. fanng pushed a commit to branch stats_job in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit f2775b4305fd8738706fa8666d843ff3d125eee9 Author: fanng <[email protected]> AuthorDate: Thu Mar 5 17:12:16 2026 +0800 Refactor update-stats Spark config handling and simplify job options - Introduce IcebergSparkConfigUtils in optimizer-api to share Spark template building, flat JSON parsing, and catalog config validation - Reuse the shared Spark template utility in both rewrite-data-files and update-stats built-in jobs - Simplify submit-update-stats-job CLI/context by removing target-file-size option and relying on job-side fixed target size - Keep datafile_mse target fixed at 128MiB (small-file threshold 32MiB) and remove target_file_size_bytes from job template arguments/config - Update optimizer/jobs tests to match new CLI/job signatures and add unit tests for the shared Spark config utility --- .../jobs/iceberg/IcebergRewriteDataFilesJob.java | 23 +-- .../iceberg/IcebergUpdateStatsAndMetricsJob.java | 75 +++------- .../jobs/iceberg/TestIcebergUpdateStatsJob.java | 46 +++--- .../TestIcebergUpdateStatsJobWithSpark.java | 25 ++-- .../common/util/IcebergSparkConfigUtils.java | 156 +++++++++++++++++++++ .../common/util/TestIcebergSparkConfigUtils.java | 125 +++++++++++++++++ .../maintenance/optimizer/OptimizerCmd.java | 10 +- .../optimizer/command/OptimizerCommandContext.java | 13 +- .../command/SubmitUpdateStatsJobCommand.java | 62 ++------ .../maintenance/optimizer/TestOptimizerCmd.java | 87 +++++++++++- .../job/TestBuiltinIcebergUpdateStatsJob.java | 1 - 11 files changed, 427 insertions(+), 196 deletions(-) diff --git a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java index 5324204434..c2788281b0 100644 --- a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java +++ b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.gravitino.job.JobTemplateProvider; import org.apache.gravitino.job.SparkJobTemplate; import org.apache.gravitino.maintenance.jobs.BuiltInJob; +import org.apache.gravitino.maintenance.optimizer.common.util.IcebergSparkConfigUtils; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -497,26 +498,6 @@ public class IcebergRewriteDataFilesJob implements BuiltInJob { * @return map of Spark configuration keys to template values */ private static Map<String, String> buildSparkConfigs() { - Map<String, String> configs = new HashMap<>(); - - // Spark runtime configs - configs.put("spark.master", "{{spark_master}}"); - configs.put("spark.executor.instances", "{{spark_executor_instances}}"); - configs.put("spark.executor.cores", "{{spark_executor_cores}}"); - configs.put("spark.executor.memory", "{{spark_executor_memory}}"); - configs.put("spark.driver.memory", "{{spark_driver_memory}}"); - - // Iceberg catalog configuration - configs.put("spark.sql.catalog.{{catalog_name}}", "org.apache.iceberg.spark.SparkCatalog"); - configs.put("spark.sql.catalog.{{catalog_name}}.type", "{{catalog_type}}"); - configs.put("spark.sql.catalog.{{catalog_name}}.uri", "{{catalog_uri}}"); - configs.put("spark.sql.catalog.{{catalog_name}}.warehouse", "{{warehouse_location}}"); - - // Iceberg extensions - configs.put( - "spark.sql.extensions", - "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"); - - return Collections.unmodifiableMap(configs); + return IcebergSparkConfigUtils.buildTemplateSparkConfigs(); } } diff --git a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsAndMetricsJob.java b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsAndMetricsJob.java index c6172acce5..b2826c1d55 100644 --- a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsAndMetricsJob.java +++ b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsAndMetricsJob.java @@ -44,6 +44,7 @@ import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv; import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl; import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl; import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig; +import org.apache.gravitino.maintenance.optimizer.common.util.IcebergSparkConfigUtils; import org.apache.gravitino.maintenance.optimizer.common.util.ProviderUtils; import org.apache.gravitino.stats.StatisticValues; import org.apache.spark.sql.Row; @@ -96,7 +97,6 @@ public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { System.exit(1); } - long targetFileSizeBytes = parseTargetFileSize(argMap.get("target-file-size-bytes")); Map<String, String> updaterOptions = parseJsonOptions(argMap.get("updater-options")); String sparkConfJson = argMap.get("spark-conf"); @@ -129,13 +129,7 @@ public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { } updateStatistics( - spark, - statisticsUpdater, - metricsUpdater, - updateMode, - catalogName, - tableIdentifier, - targetFileSizeBytes); + spark, statisticsUpdater, metricsUpdater, updateMode, catalogName, tableIdentifier); } catch (Exception e) { LOG.error("Failed to update Iceberg statistics/metrics", e); System.exit(1); @@ -162,16 +156,9 @@ public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { SparkSession spark, StatisticsUpdater statisticsUpdater, String catalogName, - String tableIdentifier, - long targetFileSizeBytes) { + String tableIdentifier) { updateStatistics( - spark, - statisticsUpdater, - null, - UpdateMode.STATS, - catalogName, - tableIdentifier, - targetFileSizeBytes); + spark, statisticsUpdater, null, UpdateMode.STATS, catalogName, tableIdentifier); } static void updateStatistics( @@ -179,16 +166,9 @@ public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { StatisticsUpdater statisticsUpdater, MetricsUpdater metricsUpdater, String catalogName, - String tableIdentifier, - long targetFileSizeBytes) { + String tableIdentifier) { updateStatistics( - spark, - statisticsUpdater, - metricsUpdater, - UpdateMode.ALL, - catalogName, - tableIdentifier, - targetFileSizeBytes); + spark, statisticsUpdater, metricsUpdater, UpdateMode.ALL, catalogName, tableIdentifier); } static void updateStatistics( @@ -197,8 +177,7 @@ public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { MetricsUpdater metricsUpdater, UpdateMode updateMode, String catalogName, - String tableIdentifier, - long targetFileSizeBytes) { + String tableIdentifier) { Objects.requireNonNull(updateMode, "updateMode must not be null"); if (updateMode.updateStats && statisticsUpdater == null) { @@ -216,7 +195,7 @@ public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { long metricTimestamp = System.currentTimeMillis() / 1000L; boolean partitioned = isPartitionedTable(spark, catalogName, tableIdentifier); if (partitioned) { - String sql = buildPartitionStatsSql(catalogName, tableIdentifier, targetFileSizeBytes); + String sql = buildPartitionStatsSql(catalogName, tableIdentifier); Row[] rows = (Row[]) spark.sql(sql).collect(); Map<PartitionPath, List<StatisticEntry<?>>> partitionStatistics = new LinkedHashMap<>(); List<MetricPoint> tableAndPartitionMetrics = new ArrayList<>(); @@ -247,7 +226,7 @@ public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { rows.length, gravitinoTableIdentifier); } else { - String sql = buildTableStatsSql(catalogName, tableIdentifier, targetFileSizeBytes); + String sql = buildTableStatsSql(catalogName, tableIdentifier); Row[] rows = (Row[]) spark.sql(sql).collect(); List<StatisticEntry<?>> tableStatistics = rows.length == 0 ? Collections.emptyList() : toStatistics(rows[0]); @@ -269,8 +248,7 @@ public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { } } - static String buildTableStatsSql( - String catalogName, String tableIdentifier, long targetFileSizeBytes) { + static String buildTableStatsSql(String catalogName, String tableIdentifier) { String filesTable = buildFilesTableIdentifier(catalogName, tableIdentifier); return "SELECT " + "COUNT(*) AS file_count, " @@ -281,9 +259,9 @@ public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { + SMALL_FILE_THRESHOLD_BYTES + " THEN 1 ELSE 0 END) AS small_files, " + "AVG(POWER(" - + targetFileSizeBytes + + DEFAULT_TARGET_FILE_SIZE_BYTES + " - LEAST(" - + targetFileSizeBytes + + DEFAULT_TARGET_FILE_SIZE_BYTES + ", file_size_in_bytes), 2)) AS datafile_mse, " + "AVG(file_size_in_bytes) AS avg_size, " + "SUM(file_size_in_bytes) AS total_size " @@ -291,8 +269,7 @@ public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { + filesTable; } - static String buildPartitionStatsSql( - String catalogName, String tableIdentifier, long targetFileSizeBytes) { + static String buildPartitionStatsSql(String catalogName, String tableIdentifier) { String filesTable = buildFilesTableIdentifier(catalogName, tableIdentifier); return "SELECT " + "partition, " @@ -304,9 +281,9 @@ public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { + SMALL_FILE_THRESHOLD_BYTES + " THEN 1 ELSE 0 END) AS small_files, " + "AVG(POWER(" - + targetFileSizeBytes + + DEFAULT_TARGET_FILE_SIZE_BYTES + " - LEAST(" - + targetFileSizeBytes + + DEFAULT_TARGET_FILE_SIZE_BYTES + ", file_size_in_bytes), 2)) AS datafile_mse, " + "AVG(file_size_in_bytes) AS avg_size, " + "SUM(file_size_in_bytes) AS total_size " @@ -425,21 +402,6 @@ public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { } } - static long parseTargetFileSize(String value) { - if (value == null || value.trim().isEmpty()) { - return DEFAULT_TARGET_FILE_SIZE_BYTES; - } - try { - long parsed = Long.parseLong(value.trim()); - if (parsed <= 0) { - throw new IllegalArgumentException("target-file-size-bytes must be > 0"); - } - return parsed; - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid target-file-size-bytes: " + value, e); - } - } - static UpdateMode parseUpdateMode(String value) { if (value == null || value.trim().isEmpty()) { return UpdateMode.from(DEFAULT_UPDATE_MODE); @@ -582,8 +544,6 @@ public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { "{{table_identifier}}", "--update-mode", "{{update_mode}}", - "--target-file-size-bytes", - "{{target_file_size_bytes}}", "--updater-options", "{{updater_options}}", "--spark-conf", @@ -591,7 +551,7 @@ public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { } private static Map<String, String> buildSparkConfigs() { - return Collections.emptyMap(); + return IcebergSparkConfigUtils.buildTemplateSparkConfigs(); } private static void printUsage() { @@ -604,8 +564,7 @@ public class IcebergUpdateStatsAndMetricsJob implements BuiltInJob { + "\\n" + "Optional Options:\\n" + " --update-mode <stats|metrics|all> Update behavior mode, default: all\\n" - + " --target-file-size-bytes <bytes> MSE target file size in bytes\\n" - + " Default: 134217728 (128MB)\\n" + + " datafile_mse target file size is fixed at 134217728 (128MB)\\n" + " small_files threshold is fixed at 33554432 (32MB)\\n" + " --updater-options <json> JSON map for updater and repository settings\\n" + " Example: '{\"gravitino_uri\":\"http://localhost:8090\",\\n" diff --git a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java index cbbc27fce5..937f62cefc 100644 --- a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java +++ b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java @@ -48,28 +48,46 @@ public class TestIcebergUpdateStatsJob { SparkJobTemplate template = job.jobTemplate(); assertNotNull(template.arguments()); - assertEquals(12, template.arguments().size()); + assertEquals(10, template.arguments().size()); assertTrue(template.arguments().contains("--catalog")); assertTrue(template.arguments().contains("{{catalog_name}}")); assertTrue(template.arguments().contains("--table")); assertTrue(template.arguments().contains("{{table_identifier}}")); assertTrue(template.arguments().contains("--update-mode")); assertTrue(template.arguments().contains("{{update_mode}}")); - assertTrue(template.arguments().contains("--target-file-size-bytes")); - assertTrue(template.arguments().contains("{{target_file_size_bytes}}")); assertTrue(template.arguments().contains("--updater-options")); assertTrue(template.arguments().contains("{{updater_options}}")); assertTrue(template.arguments().contains("--spark-conf")); assertTrue(template.arguments().contains("{{spark_conf}}")); } + @Test + public void testJobTemplateHasSparkConfigs() { + IcebergUpdateStatsAndMetricsJob job = new IcebergUpdateStatsAndMetricsJob(); + SparkJobTemplate template = job.jobTemplate(); + + Map<String, String> configs = template.configs(); + assertNotNull(configs); + assertTrue(configs.containsKey("spark.master")); + assertTrue(configs.containsKey("spark.executor.instances")); + assertTrue(configs.containsKey("spark.executor.cores")); + assertTrue(configs.containsKey("spark.executor.memory")); + assertTrue(configs.containsKey("spark.driver.memory")); + assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}")); + assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}.type")); + assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}.uri")); + assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}.warehouse")); + assertEquals( + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", + configs.get("spark.sql.extensions")); + } + @Test public void testParseArguments() { String[] args = { "--catalog", "cat", "--table", "db.tbl", "--update-mode", "metrics", - "--target-file-size-bytes", "2048", "--updater-options", "{\"metalake\":\"ml\",\"gravitino_uri\":\"http://localhost:8090\"}", "--spark-conf", "{\"spark.master\":\"local[2]\"}" }; @@ -78,7 +96,6 @@ public class TestIcebergUpdateStatsJob { assertEquals("cat", parsed.get("catalog")); assertEquals("db.tbl", parsed.get("table")); assertEquals("metrics", parsed.get("update-mode")); - assertEquals("2048", parsed.get("target-file-size-bytes")); assertEquals( "{\"metalake\":\"ml\",\"gravitino_uri\":\"http://localhost:8090\"}", parsed.get("updater-options")); @@ -87,10 +104,8 @@ public class TestIcebergUpdateStatsJob { @Test public void testBuildStatsSql() { - String tableSql = - IcebergUpdateStatsAndMetricsJob.buildTableStatsSql("cat", "db.tbl", 134_217_728L); - String partitionSql = - IcebergUpdateStatsAndMetricsJob.buildPartitionStatsSql("cat", "db.tbl", 134_217_728L); + String tableSql = IcebergUpdateStatsAndMetricsJob.buildTableStatsSql("cat", "db.tbl"); + String partitionSql = IcebergUpdateStatsAndMetricsJob.buildPartitionStatsSql("cat", "db.tbl"); assertTrue(tableSql.contains("FROM cat.db.tbl.files")); assertTrue(tableSql.contains("AS datafile_mse")); @@ -103,19 +118,6 @@ public class TestIcebergUpdateStatsJob { assertTrue(partitionSql.contains("134217728 - LEAST(134217728, file_size_in_bytes)")); } - @Test - public void testParseTargetFileSize() { - assertEquals(134_217_728L, IcebergUpdateStatsAndMetricsJob.parseTargetFileSize(null)); - assertEquals(134_217_728L, IcebergUpdateStatsAndMetricsJob.parseTargetFileSize("")); - assertEquals(2048L, IcebergUpdateStatsAndMetricsJob.parseTargetFileSize("2048")); - assertThrows( - IllegalArgumentException.class, - () -> IcebergUpdateStatsAndMetricsJob.parseTargetFileSize("-1")); - assertThrows( - IllegalArgumentException.class, - () -> IcebergUpdateStatsAndMetricsJob.parseTargetFileSize("abc")); - } - @Test public void testParseUpdateMode() { assertEquals( diff --git a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java index 187c4c1b37..ca6f8171d3 100644 --- a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java +++ b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java @@ -270,8 +270,7 @@ public class TestIcebergUpdateStatsJobWithSpark { null, IcebergUpdateStatsAndMetricsJob.UpdateMode.STATS, catalogName, - "db.non_partitioned", - 100_000L); + "db.non_partitioned"); assertEquals(NameIdentifier.of(catalogName, "db", "non_partitioned"), updater.tableIdentifier); assertNotNull(updater.tableStatistics); @@ -296,8 +295,7 @@ public class TestIcebergUpdateStatsJobWithSpark { null, IcebergUpdateStatsAndMetricsJob.UpdateMode.STATS, catalogName, - "db.partitioned", - 100_000L); + "db.partitioned"); assertEquals(NameIdentifier.of(catalogName, "db", "partitioned"), updater.tableIdentifier); assertTrue(updater.tableStatistics.isEmpty()); @@ -327,8 +325,7 @@ public class TestIcebergUpdateStatsJobWithSpark { metricsUpdater, IcebergUpdateStatsAndMetricsJob.UpdateMode.ALL, catalogName, - "db.partitioned", - 100_000L); + "db.partitioned"); assertEquals( NameIdentifier.of(catalogName, "db", "partitioned"), statisticsUpdater.tableIdentifier); @@ -358,8 +355,7 @@ public class TestIcebergUpdateStatsJobWithSpark { metricsUpdater, IcebergUpdateStatsAndMetricsJob.UpdateMode.METRICS, catalogName, - "db.partitioned", - 100_000L); + "db.partitioned"); assertEquals(16, metricsUpdater.tableMetrics.size()); assertTrue( @@ -378,8 +374,7 @@ public class TestIcebergUpdateStatsJobWithSpark { metricsUpdater, IcebergUpdateStatsAndMetricsJob.UpdateMode.METRICS, catalogName, - "db.non_partitioned", - 100_000L); + "db.non_partitioned"); assertEquals(8, metricsUpdater.tableMetrics.size()); assertTrue( @@ -401,8 +396,7 @@ public class TestIcebergUpdateStatsJobWithSpark { metricsUpdater, IcebergUpdateStatsAndMetricsJob.UpdateMode.ALL, catalogName, - "db.non_partitioned", - 100_000L); + "db.non_partitioned"); assertEquals( NameIdentifier.of(catalogName, "db", "non_partitioned"), statisticsUpdater.tableIdentifier); @@ -440,8 +434,7 @@ public class TestIcebergUpdateStatsJobWithSpark { metricsUpdater, IcebergUpdateStatsAndMetricsJob.UpdateMode.METRICS, catalogName, - "db.partitioned", - 100_000L); + "db.partitioned"); } finally { metricsUpdater.close(); } @@ -498,8 +491,7 @@ public class TestIcebergUpdateStatsJobWithSpark { null, IcebergUpdateStatsAndMetricsJob.UpdateMode.STATS, catalogName, - "db.multi_partitioned", - 100_000L); + "db.multi_partitioned"); assertEquals( NameIdentifier.of(catalogName, "db", "multi_partitioned"), updater.tableIdentifier); @@ -733,7 +725,6 @@ public class TestIcebergUpdateStatsJobWithSpark { Map<String, String> jobConf = new HashMap<>(); jobConf.put("table_identifier", "db." + tableName); jobConf.put("update_mode", updateMode); - jobConf.put("target_file_size_bytes", "100000"); jobConf.put("updater_options", updaterOptions); jobConf.put("catalog_name", SPARK_CATALOG_NAME); // Keep compatibility with old built-in template placeholders in deploy package. diff --git a/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IcebergSparkConfigUtils.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IcebergSparkConfigUtils.java new file mode 100644 index 0000000000..cf78fabcf5 --- /dev/null +++ b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IcebergSparkConfigUtils.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.maintenance.optimizer.common.util; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; + +/** Shared utilities for Iceberg Spark template configs and config validation. */ +public final class IcebergSparkConfigUtils { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private static final String SPARK_SQL_EXTENSIONS_KEY = "spark.sql.extensions"; + private static final String SPARK_SQL_CATALOG_PREFIX = "spark.sql.catalog."; + private static final String CATALOG_TYPE_REST = "rest"; + private static final String CATALOG_TYPE_HIVE = "hive"; + private static final String CATALOG_TYPE_HADOOP = "hadoop"; + + public static final String ICEBERG_SPARK_EXTENSIONS = + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"; + + private IcebergSparkConfigUtils() {} + + /** Build default Spark template configs for Iceberg jobs. */ + public static Map<String, String> buildTemplateSparkConfigs() { + Map<String, String> configs = new HashMap<>(); + configs.put("spark.master", "{{spark_master}}"); + configs.put("spark.executor.instances", "{{spark_executor_instances}}"); + configs.put("spark.executor.cores", "{{spark_executor_cores}}"); + configs.put("spark.executor.memory", "{{spark_executor_memory}}"); + configs.put("spark.driver.memory", "{{spark_driver_memory}}"); + configs.put( + SPARK_SQL_CATALOG_PREFIX + "{{catalog_name}}", "org.apache.iceberg.spark.SparkCatalog"); + configs.put(SPARK_SQL_CATALOG_PREFIX + "{{catalog_name}}.type", "{{catalog_type}}"); + configs.put(SPARK_SQL_CATALOG_PREFIX + "{{catalog_name}}.uri", "{{catalog_uri}}"); + configs.put(SPARK_SQL_CATALOG_PREFIX + "{{catalog_name}}.warehouse", "{{warehouse_location}}"); + configs.put(SPARK_SQL_EXTENSIONS_KEY, ICEBERG_SPARK_EXTENSIONS); + return Collections.unmodifiableMap(configs); + } + + /** + * Parse a flat JSON map from CLI/config option. + * + * <p>Nested objects/arrays are rejected. + */ + public static Map<String, String> parseFlatJsonMap(String json, String optionName) { + if (StringUtils.isBlank(json)) { + return ImmutableMap.of(); + } + try { + Map<String, Object> parsedMap = + MAPPER.readValue(json, new TypeReference<Map<String, Object>>() {}); + Map<String, String> result = new LinkedHashMap<>(); + for (Map.Entry<String, Object> entry : parsedMap.entrySet()) { + Object value = entry.getValue(); + Preconditions.checkArgument( + !(value instanceof Map || value instanceof List), + "Option --%s must be a flat key-value JSON map, but key '%s' has non-scalar value", + optionName, + entry.getKey()); + result.put(entry.getKey(), value == null ? "" : value.toString()); + } + return ImmutableMap.copyOf(result); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, "Option --%s is not valid JSON: %s", optionName, e.getMessage()), + e); + } + } + + /** Validate required Spark/Iceberg catalog configs for one table identifier. */ + public static void validateSparkConfigsForCatalog( + Map<String, String> sparkConfigs, String catalogName, String tableIdentifier) { + Preconditions.checkArgument( + sparkConfigs != null && !sparkConfigs.isEmpty(), + "Missing spark config. Set --spark-conf or " + + "gravitino.optimizer.jobSubmitterConfig.spark_conf in the config file"); + Preconditions.checkArgument(StringUtils.isNotBlank(catalogName), "catalogName is blank"); + Preconditions.checkArgument( + StringUtils.isNotBlank(tableIdentifier), "tableIdentifier is blank"); + + String extensions = StringUtils.trimToNull(sparkConfigs.get(SPARK_SQL_EXTENSIONS_KEY)); + Preconditions.checkArgument( + StringUtils.isNotBlank(extensions), + "Spark config must contain key '%s' and include '%s'", + SPARK_SQL_EXTENSIONS_KEY, + ICEBERG_SPARK_EXTENSIONS); + Preconditions.checkArgument( + extensions.contains(ICEBERG_SPARK_EXTENSIONS), + "Spark config key '%s' must include '%s'", + SPARK_SQL_EXTENSIONS_KEY, + ICEBERG_SPARK_EXTENSIONS); + + String catalogPrefix = SPARK_SQL_CATALOG_PREFIX + catalogName; + Preconditions.checkArgument( + StringUtils.isNotBlank(sparkConfigs.get(catalogPrefix)), + "Spark config must contain key '%s' for identifier '%s'", + catalogPrefix, + tableIdentifier); + + String typeKey = catalogPrefix + ".type"; + String catalogType = StringUtils.trimToNull(sparkConfigs.get(typeKey)); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogType), + "Spark config must contain key '%s' for identifier '%s'", + typeKey, + tableIdentifier); + + String normalizedCatalogType = catalogType.toLowerCase(Locale.ROOT); + if (CATALOG_TYPE_REST.equals(normalizedCatalogType) + || CATALOG_TYPE_HIVE.equals(normalizedCatalogType)) { + String uriKey = catalogPrefix + ".uri"; + Preconditions.checkArgument( + StringUtils.isNotBlank(sparkConfigs.get(uriKey)), + "Spark config must contain key '%s' when catalog type is '%s' for identifier '%s'", + uriKey, + normalizedCatalogType, + tableIdentifier); + } else if (CATALOG_TYPE_HADOOP.equals(normalizedCatalogType)) { + String warehouseKey = catalogPrefix + ".warehouse"; + Preconditions.checkArgument( + StringUtils.isNotBlank(sparkConfigs.get(warehouseKey)), + "Spark config must contain key '%s' when catalog type is '%s' for identifier '%s'", + warehouseKey, + normalizedCatalogType, + tableIdentifier); + } + } +} diff --git a/maintenance/optimizer-api/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIcebergSparkConfigUtils.java b/maintenance/optimizer-api/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIcebergSparkConfigUtils.java new file mode 100644 index 0000000000..7ac05be062 --- /dev/null +++ b/maintenance/optimizer-api/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIcebergSparkConfigUtils.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.maintenance.optimizer.common.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; + +public class TestIcebergSparkConfigUtils { + + @Test + public void testBuildTemplateSparkConfigsContainsRequiredKeys() { + Map<String, String> configs = IcebergSparkConfigUtils.buildTemplateSparkConfigs(); + + assertTrue(configs.containsKey("spark.master")); + assertTrue(configs.containsKey("spark.executor.instances")); + assertTrue(configs.containsKey("spark.executor.cores")); + assertTrue(configs.containsKey("spark.executor.memory")); + assertTrue(configs.containsKey("spark.driver.memory")); + assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}")); + assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}.type")); + assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}.uri")); + assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}.warehouse")); + assertEquals( + IcebergSparkConfigUtils.ICEBERG_SPARK_EXTENSIONS, configs.get("spark.sql.extensions")); + } + + @Test + public void testParseFlatJsonMap() { + Map<String, String> parsed = + IcebergSparkConfigUtils.parseFlatJsonMap( + "{\"k1\":\"v1\",\"k2\":1,\"k3\":null}", "spark-conf"); + + assertEquals("v1", parsed.get("k1")); + assertEquals("1", parsed.get("k2")); + assertEquals("", parsed.get("k3")); + } + + @Test + public void testParseFlatJsonMapRejectsNestedValue() { + IllegalArgumentException e = + assertThrows( + IllegalArgumentException.class, + () -> + IcebergSparkConfigUtils.parseFlatJsonMap( + "{\"spark\":{\"master\":\"local[2]\"}}", "spark-conf")); + assertTrue(e.getMessage().contains("flat key-value JSON map")); + } + + @Test + public void testValidateSparkConfigsForRestCatalog() { + Map<String, String> sparkConfigs = baseSparkConfigs("rest"); + sparkConfigs.put("spark.sql.catalog.rest.uri", "http://localhost:9001/iceberg"); + sparkConfigs.put("spark.sql.catalog.rest.warehouse", "/tmp/warehouse"); + + IcebergSparkConfigUtils.validateSparkConfigsForCatalog(sparkConfigs, "rest", "rest.ab.t1"); + } + + @Test + public void testValidateSparkConfigsForHadoopCatalog() { + Map<String, String> sparkConfigs = baseSparkConfigs("hadoop"); + sparkConfigs.put("spark.sql.catalog.rest.warehouse", "/tmp/warehouse"); + + IcebergSparkConfigUtils.validateSparkConfigsForCatalog(sparkConfigs, "rest", "rest.ab.t1"); + } + + @Test + public void testValidateSparkConfigsMissingExtensions() { + Map<String, String> sparkConfigs = baseSparkConfigs("rest"); + sparkConfigs.remove("spark.sql.extensions"); + sparkConfigs.put("spark.sql.catalog.rest.uri", "http://localhost:9001/iceberg"); + + IllegalArgumentException e = + assertThrows( + IllegalArgumentException.class, + () -> + IcebergSparkConfigUtils.validateSparkConfigsForCatalog( + sparkConfigs, "rest", "rest.ab.t1")); + assertTrue(e.getMessage().contains("spark.sql.extensions")); + } + + @Test + public void testValidateSparkConfigsMissingUriForRestCatalog() { + Map<String, String> sparkConfigs = baseSparkConfigs("rest"); + + IllegalArgumentException e = + assertThrows( + IllegalArgumentException.class, + () -> + IcebergSparkConfigUtils.validateSparkConfigsForCatalog( + sparkConfigs, "rest", "rest.ab.t1")); + assertTrue(e.getMessage().contains("spark.sql.catalog.rest.uri")); + } + + private Map<String, String> baseSparkConfigs(String catalogType) { + Map<String, String> configs = new HashMap<>(); + configs.put( + "spark.sql.extensions", + "com.example.Ext," + IcebergSparkConfigUtils.ICEBERG_SPARK_EXTENSIONS); + configs.put("spark.sql.catalog.rest", "org.apache.iceberg.spark.SparkCatalog"); + configs.put("spark.sql.catalog.rest.type", catalogType); + return configs; + } +} diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java index 6800e5203e..19bd5818fd 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java +++ b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java @@ -122,7 +122,6 @@ public class OptimizerCmd { EnumSet.of( CliOption.DRY_RUN, CliOption.UPDATE_MODE, - CliOption.TARGET_FILE_SIZE_BYTES, CliOption.UPDATER_OPTIONS, CliOption.SPARK_CONF), "Submit built-in Iceberg update stats jobs for given table identifiers.", @@ -180,12 +179,10 @@ public class OptimizerCmd { CliOption.RANGE_SECONDS.longOpt(), Long.toString(DEFAULT_RANGE_SECONDS)); String partitionPathRaw = cmd.getOptionValue(CliOption.PARTITION_PATH.longOpt()); String updateMode = cmd.getOptionValue(CliOption.UPDATE_MODE.longOpt()); - String targetFileSizeBytes = cmd.getOptionValue(CliOption.TARGET_FILE_SIZE_BYTES.longOpt()); String updaterOptions = cmd.getOptionValue(CliOption.UPDATER_OPTIONS.longOpt()); String sparkConf = cmd.getOptionValue(CliOption.SPARK_CONF.longOpt()); OptimizerCommandContext.UpdateStatsJobOptions updateStatsJobOptions = - new OptimizerCommandContext.UpdateStatsJobOptions( - updateMode, targetFileSizeBytes, updaterOptions, sparkConf); + new OptimizerCommandContext.UpdateStatsJobOptions(updateMode, updaterOptions, sparkConf); String statisticsPayload = cmd.getOptionValue(CliOption.STATISTICS_PAYLOAD.longOpt()); String filePath = cmd.getOptionValue(CliOption.FILE_PATH.longOpt()); Optional<StatisticsInputContent> statisticsInputContent = @@ -494,11 +491,6 @@ public class OptimizerCmd { CliOptionArgType.SINGLE, null, "Update mode for submit-update-stats-job: stats|metrics|all"), - TARGET_FILE_SIZE_BYTES( - "target-file-size-bytes", - CliOptionArgType.SINGLE, - null, - "Target file size bytes for submit-update-stats-job"), UPDATER_OPTIONS( "updater-options", CliOptionArgType.SINGLE, diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java index 88d11046fa..e1e9d62e03 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java +++ b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java @@ -122,10 +122,6 @@ public final class OptimizerCommandContext { return updateStatsJobOptions.updateMode(); } - public String targetFileSizeBytes() { - return updateStatsJobOptions.targetFileSizeBytes(); - } - public String updaterOptions() { return updateStatsJobOptions.updaterOptions(); } @@ -145,14 +141,11 @@ public final class OptimizerCommandContext { /** Submit-update-stats-job specific command options. */ public static final class UpdateStatsJobOptions { private final String updateMode; - private final String targetFileSizeBytes; private final String updaterOptions; private final String sparkConf; - public UpdateStatsJobOptions( - String updateMode, String targetFileSizeBytes, String updaterOptions, String sparkConf) { + public UpdateStatsJobOptions(String updateMode, String updaterOptions, String sparkConf) { this.updateMode = updateMode; - this.targetFileSizeBytes = targetFileSizeBytes; this.updaterOptions = updaterOptions; this.sparkConf = sparkConf; } @@ -161,10 +154,6 @@ public final class OptimizerCommandContext { return updateMode; } - public String targetFileSizeBytes() { - return targetFileSizeBytes; - } - public String updaterOptions() { return updaterOptions; } diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java index dd0bc3cc8f..7d3f0df312 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java +++ b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java @@ -19,7 +19,6 @@ package org.apache.gravitino.maintenance.optimizer.command; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import java.util.ArrayList; @@ -33,6 +32,7 @@ import org.apache.gravitino.client.GravitinoClient; import org.apache.gravitino.job.JobHandle; import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig; import org.apache.gravitino.maintenance.optimizer.common.util.GravitinoClientUtils; +import org.apache.gravitino.maintenance.optimizer.common.util.IcebergSparkConfigUtils; /** * Handles CLI command {@code submit-update-stats-job} for submitting built-in Iceberg update stats @@ -42,7 +42,6 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { private static final String JOB_TEMPLATE_NAME = "builtin-iceberg-update-stats"; private static final String DEFAULT_UPDATE_MODE = "stats"; - private static final long DEFAULT_TARGET_FILE_SIZE_BYTES = 128L * 1024 * 1024; private static final String OPTION_UPDATER_OPTIONS = "updater-options"; private static final String OPTION_SPARK_CONF = "spark-conf"; @@ -60,10 +59,6 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { String updateMode = parseUpdateMode( resolveScalarOption(context.updateMode(), submitterConfigs.get("update_mode"))); - long targetFileSizeBytes = - parseTargetFileSize( - resolveScalarOption( - context.targetFileSizeBytes(), submitterConfigs.get("target_file_size_bytes"))); String updaterOptionsJson = resolveJsonOption(context.updaterOptions(), submitterConfigs.get("updater_options")); @@ -71,8 +66,9 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { resolveJsonOption(context.sparkConf(), submitterConfigs.get("spark_conf")); Map<String, String> updaterOptions = - parseFlatJsonMap(updaterOptionsJson, OPTION_UPDATER_OPTIONS); - Map<String, String> sparkConfigs = parseFlatJsonMap(sparkConfJson, OPTION_SPARK_CONF); + IcebergSparkConfigUtils.parseFlatJsonMap(updaterOptionsJson, OPTION_UPDATER_OPTIONS); + Map<String, String> sparkConfigs = + IcebergSparkConfigUtils.parseFlatJsonMap(sparkConfJson, OPTION_SPARK_CONF); validateUpdaterOptions(updateMode, updaterOptions); validateSparkConfigs(tableTargets, sparkConfigs); @@ -80,8 +76,7 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { if (context.dryRun()) { for (TableTarget tableTarget : tableTargets) { Map<String, String> jobConfig = - buildJobConfig( - tableTarget, updateMode, targetFileSizeBytes, updaterOptions, sparkConfigs); + buildJobConfig(tableTarget, updateMode, updaterOptions, sparkConfigs); context .output() .printf( @@ -98,8 +93,7 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { int submitted = 0; for (TableTarget tableTarget : tableTargets) { Map<String, String> jobConfig = - buildJobConfig( - tableTarget, updateMode, targetFileSizeBytes, updaterOptions, sparkConfigs); + buildJobConfig(tableTarget, updateMode, updaterOptions, sparkConfigs); JobHandle jobHandle = client.runJob(JOB_TEMPLATE_NAME, jobConfig); submitted++; context @@ -119,14 +113,12 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { private static Map<String, String> buildJobConfig( TableTarget tableTarget, String updateMode, - long targetFileSizeBytes, Map<String, String> updaterOptions, Map<String, String> sparkConfigs) { Map<String, String> jobConfig = new LinkedHashMap<>(); jobConfig.put("catalog_name", tableTarget.catalogName); jobConfig.put("table_identifier", tableTarget.schemaAndTable); jobConfig.put("update_mode", updateMode); - jobConfig.put("target_file_size_bytes", Long.toString(targetFileSizeBytes)); jobConfig.put("updater_options", toCanonicalJson(updaterOptions)); jobConfig.put("spark_conf", toCanonicalJson(sparkConfigs)); return jobConfig; @@ -156,13 +148,6 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { return normalized; } - private static long parseTargetFileSize(String value) { - if (StringUtils.isBlank(value)) { - return DEFAULT_TARGET_FILE_SIZE_BYTES; - } - return OptimizerCommandUtils.parseLongOption("target-file-size-bytes", value.trim(), false); - } - private static List<TableTarget> parseTableTargets(String[] identifiers, String defaultCatalog) { Preconditions.checkArgument( identifiers != null && identifiers.length > 0, @@ -234,39 +219,10 @@ public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { !sparkConfigs.isEmpty(), "Missing spark config. Set --spark-conf or " + "gravitino.optimizer.jobSubmitterConfig.spark_conf in the config file"); - for (TableTarget tableTarget : tableTargets) { - String requiredKey = "spark.sql.catalog." + tableTarget.catalogName; - Preconditions.checkArgument( - StringUtils.isNotBlank(sparkConfigs.get(requiredKey)), - "Spark config must contain key '%s' for identifier '%s'", - requiredKey, - tableTarget.fullIdentifier); - } - } - private static Map<String, String> parseFlatJsonMap(String json, String optionName) { - if (StringUtils.isBlank(json)) { - return Map.of(); - } - try { - Map<String, Object> parsedMap = - MAPPER.readValue(json, new TypeReference<Map<String, Object>>() {}); - Map<String, String> result = new LinkedHashMap<>(); - for (Map.Entry<String, Object> entry : parsedMap.entrySet()) { - Object value = entry.getValue(); - Preconditions.checkArgument( - !(value instanceof Map || value instanceof List), - "Option --%s must be a flat key-value JSON map, but key '%s' has non-scalar value", - optionName, - entry.getKey()); - result.put(entry.getKey(), value == null ? "" : value.toString()); - } - return result; - } catch (Exception e) { - throw new IllegalArgumentException( - String.format( - Locale.ROOT, "Option --%s is not valid JSON: %s", optionName, e.getMessage()), - e); + for (TableTarget tableTarget : tableTargets) { + IcebergSparkConfigUtils.validateSparkConfigsForCatalog( + sparkConfigs, tableTarget.catalogName, tableTarget.fullIdentifier); } } diff --git a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java index b450ad9916..3c89722216 100644 --- a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java +++ b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java @@ -437,7 +437,9 @@ class TestOptimizerCmd { "--spark-conf", "{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\"," + "\"spark.sql.catalog.rest.type\":\"rest\"," - + "\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\"}", + + "\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\"," + + "\"spark.sql.catalog.rest.warehouse\":\"/tmp/warehouse\"," + + "\"spark.sql.extensions\":\"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\"}", "--dry-run", "--conf-path", confPath.toString()); @@ -446,6 +448,37 @@ class TestOptimizerCmd { Assertions.assertTrue(output[0].contains("spark.sql.catalog.rest")); } + @Test + void testSubmitUpdateStatsJobRequiresSparkSqlExtensions() throws Exception { + Path confPath = createOptimizerConfForSubmitUpdateStatsWithoutExtensions(); + String[] output = + runCommand( + "--type", + "submit-update-stats-job", + "--identifiers", + "rest.ab.t1", + "--dry-run", + "--conf-path", + confPath.toString()); + Assertions.assertTrue(output[1].contains("spark.sql.extensions")); + } + + @Test + void testSubmitUpdateStatsJobRequiresCatalogUriForRestCatalog() throws Exception { + Path confPath = createOptimizerConfForSubmitUpdateStatsWithoutCatalogUri(); + String[] output = + runCommand( + "--type", + "submit-update-stats-job", + "--identifiers", + "rest.ab.t1", + "--dry-run", + "--conf-path", + confPath.toString()); + Assertions.assertTrue(output[1].contains("spark.sql.catalog.rest.uri")); + Assertions.assertTrue(output[1].contains("catalog type is 'rest'")); + } + private Path createOptimizerConfForMetricsProvider() throws Exception { Path confPath = Files.createTempFile("optimizer-test-", ".conf"); // Route command reads to deterministic in-memory fixtures from MetricsProviderForTest. @@ -526,13 +559,15 @@ class TestOptimizerCmd { "gravitino.optimizer.gravitinoMetalake = test", "gravitino.optimizer.gravitinoDefaultCatalog = rest", "gravitino.optimizer.jobSubmitterConfig.update_mode = stats", - "gravitino.optimizer.jobSubmitterConfig.target_file_size_bytes = 134217728", "gravitino.optimizer.jobSubmitterConfig.updater_options = " + "{\"gravitino_uri\":\"http://localhost:8090\",\"metalake\":\"test\"}", "gravitino.optimizer.jobSubmitterConfig.spark_conf = " + "{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\"," + "\"spark.sql.catalog.rest.type\":\"rest\"," - + "\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\"}") + + "\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\"," + + "\"spark.sql.catalog.rest.warehouse\":\"/tmp/warehouse\"," + + "\"spark.sql.extensions\":" + + "\"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\"}") + System.lineSeparator(); Files.writeString(confPath, content, StandardCharsets.UTF_8); confPath.toFile().deleteOnExit(); @@ -556,6 +591,52 @@ class TestOptimizerCmd { return confPath; } + private Path createOptimizerConfForSubmitUpdateStatsWithoutExtensions() throws Exception { + Path confPath = + Files.createTempFile("optimizer-submit-update-stats-without-extensions-", ".conf"); + String content = + String.join( + System.lineSeparator(), + "gravitino.optimizer.gravitinoUri = http://localhost:8090", + "gravitino.optimizer.gravitinoMetalake = test", + "gravitino.optimizer.gravitinoDefaultCatalog = rest", + "gravitino.optimizer.jobSubmitterConfig.update_mode = stats", + "gravitino.optimizer.jobSubmitterConfig.updater_options = " + + "{\"gravitino_uri\":\"http://localhost:8090\",\"metalake\":\"test\"}", + "gravitino.optimizer.jobSubmitterConfig.spark_conf = " + + "{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\"," + + "\"spark.sql.catalog.rest.type\":\"rest\"," + + "\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\"," + + "\"spark.sql.catalog.rest.warehouse\":\"/tmp/warehouse\"}") + + System.lineSeparator(); + Files.writeString(confPath, content, StandardCharsets.UTF_8); + confPath.toFile().deleteOnExit(); + return confPath; + } + + private Path createOptimizerConfForSubmitUpdateStatsWithoutCatalogUri() throws Exception { + Path confPath = Files.createTempFile("optimizer-submit-update-stats-without-uri-", ".conf"); + String content = + String.join( + System.lineSeparator(), + "gravitino.optimizer.gravitinoUri = http://localhost:8090", + "gravitino.optimizer.gravitinoMetalake = test", + "gravitino.optimizer.gravitinoDefaultCatalog = rest", + "gravitino.optimizer.jobSubmitterConfig.update_mode = stats", + "gravitino.optimizer.jobSubmitterConfig.updater_options = " + + "{\"gravitino_uri\":\"http://localhost:8090\",\"metalake\":\"test\"}", + "gravitino.optimizer.jobSubmitterConfig.spark_conf = " + + "{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\"," + + "\"spark.sql.catalog.rest.type\":\"rest\"," + + "\"spark.sql.catalog.rest.warehouse\":\"/tmp/warehouse\"," + + "\"spark.sql.extensions\":" + + "\"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\"}") + + System.lineSeparator(); + Files.writeString(confPath, content, StandardCharsets.UTF_8); + confPath.toFile().deleteOnExit(); + return confPath; + } + private String[] runCommand(String... args) { ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream errBuffer = new ByteArrayOutputStream(); diff --git a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java index b2b8fbb4e4..8f9a6b2f38 100644 --- a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java +++ b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java @@ -115,7 +115,6 @@ public class TestBuiltinIcebergUpdateStatsJob { Map<String, String> jobConf = new HashMap<>(); jobConf.put("table_identifier", "db." + tableName); jobConf.put("update_mode", "all"); - jobConf.put("target_file_size_bytes", "100000"); jobConf.put( "updater_options", "{\"gravitino_uri\":\""
