This is an automated email from the ASF dual-hosted git repository. fanng pushed a commit to branch stats_job in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit 5ac6881a8c56ab2b75f2ebfc2cc354f361d1cd02 Author: fanng <[email protected]> AuthorDate: Wed Mar 4 16:35:36 2026 +0800 [#12688] Add optimizer cmd for builtin update stats job --- .../optimizer/common/OptimizerContent.java | 0 .../maintenance/optimizer/OptimizerCmd.java | 80 ++++- .../optimizer/command/OptimizerCommandContext.java | 42 +++ .../command/SubmitUpdateStatsJobCommand.java | 356 +++++++++++++++++++++ .../maintenance/optimizer/TestOptimizerCmd.java | 151 +++++++++ 5 files changed, 625 insertions(+), 4 deletions(-) diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerContent.java b/maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerContent.java similarity index 100% rename from maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerContent.java rename to maintenance/optimizer-api/src/main/java/org/apache/gravitino/maintenance/optimizer/common/OptimizerContent.java diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java index 14fc1e2011..6dc6fa01fb 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java +++ b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/OptimizerCmd.java @@ -53,6 +53,7 @@ import org.apache.gravitino.maintenance.optimizer.command.MonitorMetricsCommand; import org.apache.gravitino.maintenance.optimizer.command.OptimizerCommandContext; import org.apache.gravitino.maintenance.optimizer.command.OptimizerCommandExecutor; import org.apache.gravitino.maintenance.optimizer.command.SubmitStrategyJobsCommand; +import org.apache.gravitino.maintenance.optimizer.command.SubmitUpdateStatsJobCommand; import org.apache.gravitino.maintenance.optimizer.command.UpdateStatisticsCommand; import org.apache.gravitino.maintenance.optimizer.command.rule.CommandRules; import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv; @@ -114,7 +115,23 @@ public class OptimizerCmd { EnumSet.of(CliOption.IDENTIFIERS), EnumSet.noneOf(CliOption.class), "List stored job metrics.", - "./bin/gravitino-optimizer.sh --type list-job-metrics --identifiers c.db.job")); + "./bin/gravitino-optimizer.sh --type list-job-metrics --identifiers c.db.job"), + OptimizerCommandType.SUBMIT_UPDATE_STATS_JOB, + CommandOptionSpec.of( + EnumSet.of(CliOption.IDENTIFIERS), + EnumSet.of( + CliOption.DRY_RUN, + CliOption.LIMIT, + CliOption.UPDATE_MODE, + CliOption.TARGET_FILE_SIZE_BYTES, + CliOption.UPDATER_OPTIONS, + CliOption.UPDATER_OPTIONS_FILE, + CliOption.SPARK_CONF, + CliOption.SPARK_CONF_FILE), + "Submit built-in Iceberg update stats jobs for given table identifiers.", + "./bin/gravitino-optimizer.sh --type submit-update-stats-job --identifiers " + + "rest.ab.t1,rest.ab.t2 --update-mode stats --dry-run", + updateStatsJobInputRules())); private static final Map<OptimizerCommandType, OptimizerCommandExecutor> COMMAND_HANDLERS = Map.of( OptimizerCommandType.SUBMIT_STRATEGY_JOBS, new SubmitStrategyJobsCommand(), @@ -122,7 +139,8 @@ public class OptimizerCmd { OptimizerCommandType.APPEND_METRICS, new AppendMetricsCommand(), OptimizerCommandType.MONITOR_METRICS, new MonitorMetricsCommand(), OptimizerCommandType.LIST_TABLE_METRICS, new ListTableMetricsCommand(), - OptimizerCommandType.LIST_JOB_METRICS, new ListJobMetricsCommand()); + OptimizerCommandType.LIST_JOB_METRICS, new ListJobMetricsCommand(), + OptimizerCommandType.SUBMIT_UPDATE_STATS_JOB, new SubmitUpdateStatsJobCommand()); private static final String LOCAL_STATS_CALCULATOR_NAME = "local-stats-calculator"; static { @@ -164,6 +182,12 @@ public class OptimizerCmd { cmd.getOptionValue( CliOption.RANGE_SECONDS.longOpt(), Long.toString(DEFAULT_RANGE_SECONDS)); String partitionPathRaw = cmd.getOptionValue(CliOption.PARTITION_PATH.longOpt()); + String updateMode = cmd.getOptionValue(CliOption.UPDATE_MODE.longOpt()); + String targetFileSizeBytes = cmd.getOptionValue(CliOption.TARGET_FILE_SIZE_BYTES.longOpt()); + String updaterOptions = cmd.getOptionValue(CliOption.UPDATER_OPTIONS.longOpt()); + String updaterOptionsFile = cmd.getOptionValue(CliOption.UPDATER_OPTIONS_FILE.longOpt()); + String sparkConf = cmd.getOptionValue(CliOption.SPARK_CONF.longOpt()); + String sparkConfFile = cmd.getOptionValue(CliOption.SPARK_CONF_FILE.longOpt()); String statisticsPayload = cmd.getOptionValue(CliOption.STATISTICS_PAYLOAD.longOpt()); String filePath = cmd.getOptionValue(CliOption.FILE_PATH.longOpt()); Optional<StatisticsInputContent> statisticsInputContent = @@ -179,6 +203,12 @@ public class OptimizerCmd { actionTime, rangeSeconds, partitionPathRaw, + updateMode, + targetFileSizeBytes, + updaterOptions, + updaterOptionsFile, + sparkConf, + sparkConfFile, statisticsInputContent, out); executeCommand(optimizerType, context); @@ -309,6 +339,17 @@ public class OptimizerCmd { .build(); } + private static CommandRules.ValidationPlan updateStatsJobInputRules() { + return CommandRules.newBuilder() + .addMutuallyExclusive( + List.of(CliOption.UPDATER_OPTIONS.longOpt(), CliOption.UPDATER_OPTIONS_FILE.longOpt()), + "--updater-options and --updater-options-file cannot be used together") + .addMutuallyExclusive( + List.of(CliOption.SPARK_CONF.longOpt(), CliOption.SPARK_CONF_FILE.longOpt()), + "--spark-conf and --spark-conf-file cannot be used together") + .build(); + } + private static void printGlobalHelp(Options options, PrintStream out) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp( @@ -461,7 +502,37 @@ public class OptimizerCmd { CliOptionArgType.SINGLE, null, "Partition path for monitor-metrics " - + "(JSON array, for example: [{\"p1\":\"v1\"},{\"p2\":\"v2\"}])"); + + "(JSON array, for example: [{\"p1\":\"v1\"},{\"p2\":\"v2\"}])"), + UPDATE_MODE( + "update-mode", + CliOptionArgType.SINGLE, + null, + "Update mode for submit-update-stats-job: stats|metrics|all"), + TARGET_FILE_SIZE_BYTES( + "target-file-size-bytes", + CliOptionArgType.SINGLE, + null, + "Target file size bytes for submit-update-stats-job"), + UPDATER_OPTIONS( + "updater-options", + CliOptionArgType.SINGLE, + null, + "JSON map for updater options in submit-update-stats-job"), + UPDATER_OPTIONS_FILE( + "updater-options-file", + CliOptionArgType.SINGLE, + null, + "Path to JSON file for updater options in submit-update-stats-job"), + SPARK_CONF( + "spark-conf", + CliOptionArgType.SINGLE, + null, + "JSON map for Spark configs in submit-update-stats-job"), + SPARK_CONF_FILE( + "spark-conf-file", + CliOptionArgType.SINGLE, + null, + "Path to JSON file for Spark configs in submit-update-stats-job"); private final String longOpt; private final CliOptionArgType argType; @@ -509,7 +580,8 @@ public class OptimizerCmd { APPEND_METRICS, MONITOR_METRICS, LIST_TABLE_METRICS, - LIST_JOB_METRICS; + LIST_JOB_METRICS, + SUBMIT_UPDATE_STATS_JOB; public static String allValues() { return Arrays.stream(values()) diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java index c3f7051ab9..b29ba0d09d 100644 --- a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java +++ b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/OptimizerCommandContext.java @@ -37,6 +37,12 @@ public final class OptimizerCommandContext { private final String actionTime; private final String rangeSeconds; private final String partitionPathRaw; + private final String updateMode; + private final String targetFileSizeBytes; + private final String updaterOptions; + private final String updaterOptionsFile; + private final String sparkConf; + private final String sparkConfFile; private final Optional<StatisticsInputContent> statisticsInputContent; private final PrintStream output; @@ -50,6 +56,12 @@ public final class OptimizerCommandContext { String actionTime, String rangeSeconds, String partitionPathRaw, + String updateMode, + String targetFileSizeBytes, + String updaterOptions, + String updaterOptionsFile, + String sparkConf, + String sparkConfFile, Optional<StatisticsInputContent> statisticsInputContent, PrintStream output) { this.optimizerEnv = optimizerEnv; @@ -61,6 +73,12 @@ public final class OptimizerCommandContext { this.actionTime = actionTime; this.rangeSeconds = rangeSeconds; this.partitionPathRaw = partitionPathRaw; + this.updateMode = updateMode; + this.targetFileSizeBytes = targetFileSizeBytes; + this.updaterOptions = updaterOptions; + this.updaterOptionsFile = updaterOptionsFile; + this.sparkConf = sparkConf; + this.sparkConfFile = sparkConfFile; this.statisticsInputContent = statisticsInputContent; this.output = output; } @@ -109,6 +127,30 @@ public final class OptimizerCommandContext { return partitionPathRaw; } + public String updateMode() { + return updateMode; + } + + public String targetFileSizeBytes() { + return targetFileSizeBytes; + } + + public String updaterOptions() { + return updaterOptions; + } + + public String updaterOptionsFile() { + return updaterOptionsFile; + } + + public String sparkConf() { + return sparkConf; + } + + public String sparkConfFile() { + return sparkConfFile; + } + public Optional<StatisticsInputContent> statisticsInputContent() { return statisticsInputContent; } diff --git a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java new file mode 100644 index 0000000000..c06b024b47 --- /dev/null +++ b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java @@ -0,0 +1,356 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.maintenance.optimizer.command; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.TreeMap; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.client.GravitinoClient; +import org.apache.gravitino.job.JobHandle; +import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig; +import org.apache.gravitino.maintenance.optimizer.common.util.GravitinoClientUtils; + +/** + * Handles CLI command {@code submit-update-stats-job} for submitting built-in Iceberg update stats + * jobs directly from optimizer CLI. + */ +public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor { + + private static final String JOB_TEMPLATE_NAME = "builtin-iceberg-update-stats"; + private static final String DEFAULT_UPDATE_MODE = "stats"; + private static final long DEFAULT_TARGET_FILE_SIZE_BYTES = 100_000L; + private static final String OPTION_UPDATER_OPTIONS = "updater-options"; + private static final String OPTION_UPDATER_OPTIONS_FILE = "updater-options-file"; + private static final String OPTION_SPARK_CONF = "spark-conf"; + private static final String OPTION_SPARK_CONF_FILE = "spark-conf-file"; + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Override + public void execute(OptimizerCommandContext context) throws Exception { + Map<String, String> submitterConfigs = context.optimizerEnv().config().jobSubmitterConfigs(); + int limit = parseLimit(context.limit()); + + List<TableTarget> tableTargets = + parseTableTargets( + context.identifiers(), + context.optimizerEnv().config().get(OptimizerConfig.GRAVITINO_DEFAULT_CATALOG_CONFIG), + limit); + + String updateMode = + parseUpdateMode( + resolveScalarOption(context.updateMode(), submitterConfigs.get("update_mode"))); + long targetFileSizeBytes = + parseTargetFileSize( + resolveScalarOption( + context.targetFileSizeBytes(), submitterConfigs.get("target_file_size_bytes"))); + + String updaterOptionsJson = + resolveJsonOption( + OPTION_UPDATER_OPTIONS, + context.updaterOptions(), + OPTION_UPDATER_OPTIONS_FILE, + context.updaterOptionsFile(), + submitterConfigs.get("updater_options")); + String sparkConfJson = + resolveJsonOption( + OPTION_SPARK_CONF, + context.sparkConf(), + OPTION_SPARK_CONF_FILE, + context.sparkConfFile(), + submitterConfigs.get("spark_conf")); + + Map<String, String> updaterOptions = + parseFlatJsonMap(updaterOptionsJson, OPTION_UPDATER_OPTIONS); + Map<String, String> sparkConfigs = parseFlatJsonMap(sparkConfJson, OPTION_SPARK_CONF); + + validateUpdaterOptions(updateMode, updaterOptions); + validateSparkConfigs(tableTargets, sparkConfigs); + + if (context.dryRun()) { + for (TableTarget tableTarget : tableTargets) { + Map<String, String> jobConfig = + buildJobConfig( + tableTarget, updateMode, targetFileSizeBytes, updaterOptions, sparkConfigs); + context + .output() + .printf( + "DRY-RUN: identifier=%s jobTemplate=%s jobConfig=%s%n", + tableTarget.fullIdentifier, JOB_TEMPLATE_NAME, jobConfig); + } + context + .output() + .printf("SUMMARY: submit-update-stats-job total=%d dryRun=true%n", tableTargets.size()); + return; + } + + try (GravitinoClient client = GravitinoClientUtils.createClient(context.optimizerEnv())) { + int submitted = 0; + for (TableTarget tableTarget : tableTargets) { + Map<String, String> jobConfig = + buildJobConfig( + tableTarget, updateMode, targetFileSizeBytes, updaterOptions, sparkConfigs); + JobHandle jobHandle = client.runJob(JOB_TEMPLATE_NAME, jobConfig); + submitted++; + context + .output() + .printf( + "SUBMIT: identifier=%s jobTemplate=%s jobId=%s jobConfig=%s%n", + tableTarget.fullIdentifier, JOB_TEMPLATE_NAME, jobHandle.jobId(), jobConfig); + } + context + .output() + .printf( + "SUMMARY: submit-update-stats-job total=%d submitted=%d dryRun=false%n", + tableTargets.size(), submitted); + } + } + + private static Map<String, String> buildJobConfig( + TableTarget tableTarget, + String updateMode, + long targetFileSizeBytes, + Map<String, String> updaterOptions, + Map<String, String> sparkConfigs) { + Map<String, String> jobConfig = new LinkedHashMap<>(); + jobConfig.put("catalog_name", tableTarget.catalogName); + jobConfig.put("table_identifier", tableTarget.schemaAndTable); + jobConfig.put("update_mode", updateMode); + jobConfig.put("target_file_size_bytes", Long.toString(targetFileSizeBytes)); + jobConfig.put("updater_options", toCanonicalJson(updaterOptions)); + jobConfig.put("spark_conf", toCanonicalJson(sparkConfigs)); + return jobConfig; + } + + private static String resolveScalarOption(String cliValue, String confValue) { + if (StringUtils.isNotBlank(cliValue)) { + return cliValue.trim(); + } + return StringUtils.isBlank(confValue) ? null : confValue.trim(); + } + + private static String resolveJsonOption( + String cliOptionName, + String cliValue, + String fileOptionName, + String filePath, + String confValue) { + if (StringUtils.isNotBlank(cliValue)) { + return cliValue.trim(); + } + if (StringUtils.isNotBlank(filePath)) { + try { + Path path = Path.of(filePath.trim()); + Preconditions.checkArgument( + Files.exists(path), "Option %s file does not exist: %s", fileOptionName, filePath); + return Files.readString(path, StandardCharsets.UTF_8).trim(); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Failed to read option %s from %s: %s", + cliOptionName, + filePath, + e.getMessage()), + e); + } + } + return StringUtils.isBlank(confValue) ? null : confValue.trim(); + } + + private static String parseUpdateMode(String value) { + String normalized = + StringUtils.isBlank(value) ? DEFAULT_UPDATE_MODE : value.trim().toLowerCase(Locale.ROOT); + Preconditions.checkArgument( + "stats".equals(normalized) || "metrics".equals(normalized) || "all".equals(normalized), + "Invalid --update-mode: %s. Supported values are: stats, metrics, all", + value); + return normalized; + } + + private static long parseTargetFileSize(String value) { + if (StringUtils.isBlank(value)) { + return DEFAULT_TARGET_FILE_SIZE_BYTES; + } + return OptimizerCommandUtils.parseLongOption("target-file-size-bytes", value.trim(), false); + } + + private static int parseLimit(String limit) { + if (StringUtils.isBlank(limit)) { + return Integer.MAX_VALUE; + } + long parsed = OptimizerCommandUtils.parseLongOption("limit", limit.trim(), false); + Preconditions.checkArgument( + parsed <= Integer.MAX_VALUE, "Option limit must be <= %s", Integer.MAX_VALUE); + return (int) parsed; + } + + private static List<TableTarget> parseTableTargets( + String[] identifiers, String defaultCatalog, int limit) { + Preconditions.checkArgument( + identifiers != null && identifiers.length > 0, + "Missing required option --identifiers for command 'submit-update-stats-job'"); + + List<TableTarget> tableTargets = new ArrayList<>(); + for (String rawIdentifier : identifiers) { + if (tableTargets.size() >= limit) { + break; + } + Preconditions.checkArgument( + StringUtils.isNotBlank(rawIdentifier), "--identifiers contains blank identifier"); + String[] levels = rawIdentifier.trim().split("\\."); + if (levels.length == 3) { + tableTargets.add( + new TableTarget( + rawIdentifier.trim(), + requireNonBlank(levels[0], "catalog"), + requireNonBlank(levels[1], "schema") + "." + requireNonBlank(levels[2], "table"))); + } else if (levels.length == 2) { + Preconditions.checkArgument( + StringUtils.isNotBlank(defaultCatalog), + "Identifier '%s' uses schema.table format, but %s is not configured", + rawIdentifier, + OptimizerConfig.GRAVITINO_DEFAULT_CATALOG); + tableTargets.add( + new TableTarget( + defaultCatalog + "." + rawIdentifier.trim(), + defaultCatalog.trim(), + requireNonBlank(levels[0], "schema") + "." + requireNonBlank(levels[1], "table"))); + } else { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, + "Identifier '%s' is invalid. Use catalog.schema.table or schema.table", + rawIdentifier)); + } + } + return tableTargets; + } + + private static String requireNonBlank(String value, String levelName) { + Preconditions.checkArgument( + StringUtils.isNotBlank(value), "%s in identifier cannot be blank", levelName); + return value.trim(); + } + + private static void validateUpdaterOptions( + String updateMode, Map<String, String> updaterOptions) { + if (!"stats".equals(updateMode) && !"all".equals(updateMode)) { + return; + } + String gravitinoUri = + firstNonBlank( + updaterOptions.get("gravitino_uri"), + updaterOptions.get("gravitino-uri"), + updaterOptions.get(OptimizerConfig.GRAVITINO_URI)); + String metalake = + firstNonBlank( + updaterOptions.get("metalake"), updaterOptions.get(OptimizerConfig.GRAVITINO_METALAKE)); + Preconditions.checkArgument( + StringUtils.isNotBlank(gravitinoUri), + "Option --updater-options (or config key jobSubmitterConfig.updater_options) " + + "must contain 'gravitino_uri' when update_mode is stats or all"); + Preconditions.checkArgument( + StringUtils.isNotBlank(metalake), + "Option --updater-options (or config key jobSubmitterConfig.updater_options) " + + "must contain 'metalake' when update_mode is stats or all"); + } + + private static void validateSparkConfigs( + List<TableTarget> tableTargets, Map<String, String> sparkConfigs) { + Preconditions.checkArgument( + !sparkConfigs.isEmpty(), + "Missing spark config. Set --spark-conf/--spark-conf-file or " + + "gravitino.optimizer.jobSubmitterConfig.spark_conf in the config file"); + for (TableTarget tableTarget : tableTargets) { + String requiredKey = "spark.sql.catalog." + tableTarget.catalogName; + Preconditions.checkArgument( + StringUtils.isNotBlank(sparkConfigs.get(requiredKey)), + "Spark config must contain key '%s' for identifier '%s'", + requiredKey, + tableTarget.fullIdentifier); + } + } + + private static Map<String, String> parseFlatJsonMap(String json, String optionName) { + if (StringUtils.isBlank(json)) { + return Map.of(); + } + try { + Map<String, Object> parsedMap = + MAPPER.readValue(json, new TypeReference<Map<String, Object>>() {}); + Map<String, String> result = new LinkedHashMap<>(); + for (Map.Entry<String, Object> entry : parsedMap.entrySet()) { + Object value = entry.getValue(); + Preconditions.checkArgument( + !(value instanceof Map || value instanceof List), + "Option --%s must be a flat key-value JSON map, but key '%s' has non-scalar value", + optionName, + entry.getKey()); + result.put(entry.getKey(), value == null ? "" : value.toString()); + } + return result; + } catch (Exception e) { + throw new IllegalArgumentException( + String.format( + Locale.ROOT, "Option --%s is not valid JSON: %s", optionName, e.getMessage()), + e); + } + } + + private static String toCanonicalJson(Map<String, String> options) { + try { + return MAPPER.writeValueAsString(new TreeMap<>(options == null ? Map.of() : options)); + } catch (Exception e) { + throw new IllegalStateException("Failed to serialize options as JSON", e); + } + } + + private static String firstNonBlank(String... values) { + for (String value : values) { + if (StringUtils.isNotBlank(value)) { + return value.trim(); + } + } + return null; + } + + private static final class TableTarget { + private final String fullIdentifier; + private final String catalogName; + private final String schemaAndTable; + + private TableTarget(String fullIdentifier, String catalogName, String schemaAndTable) { + this.fullIdentifier = fullIdentifier; + this.catalogName = catalogName; + this.schemaAndTable = schemaAndTable; + } + } +} diff --git a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java index 133567f11c..2be5785a87 100644 --- a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java +++ b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/TestOptimizerCmd.java @@ -369,6 +369,118 @@ class TestOptimizerCmd { "Expected job metric updates from updateAll, but got " + totalJobUpdates); } + @Test + void testSubmitUpdateStatsJobDryRunUsesConfigDefaults() throws Exception { + Path confPath = createOptimizerConfForSubmitUpdateStatsJob(); + String[] output = + runCommand( + "--type", + "submit-update-stats-job", + "--identifiers", + "ab.t1", + "--dry-run", + "--conf-path", + confPath.toString()); + Assertions.assertTrue(output[1].isEmpty(), "stderr=" + output[1] + ", stdout=" + output[0]); + Assertions.assertTrue(output[0].contains("DRY-RUN: identifier=rest.ab.t1")); + Assertions.assertTrue(output[0].contains("jobTemplate=builtin-iceberg-update-stats")); + Assertions.assertTrue(output[0].contains("update_mode=stats")); + Assertions.assertTrue(output[0].contains("table_identifier=ab.t1")); + } + + @Test + void testSubmitUpdateStatsJobRejectsInvalidUpdateMode() throws Exception { + Path confPath = createOptimizerConfForSubmitUpdateStatsJob(); + String[] output = + runCommand( + "--type", + "submit-update-stats-job", + "--identifiers", + "rest.ab.t1", + "--update-mode", + "bad_mode", + "--dry-run", + "--conf-path", + confPath.toString()); + Assertions.assertTrue(output[1].contains("Invalid --update-mode: bad_mode")); + } + + @Test + void testSubmitUpdateStatsJobRequiresSparkConf() throws Exception { + Path confPath = createOptimizerConfForSubmitUpdateStatsWithoutSparkConf(); + String[] output = + runCommand( + "--type", + "submit-update-stats-job", + "--identifiers", + "rest.ab.t1", + "--dry-run", + "--conf-path", + confPath.toString()); + Assertions.assertTrue(output[1].contains("Missing spark config.")); + } + + @Test + void testSubmitUpdateStatsJobSupportsJsonFileOverrides() throws Exception { + Path confPath = createOptimizerConfForSubmitUpdateStatsJob(); + Path updaterOptionsFile = Files.createTempFile("optimizer-updater-options-", ".json"); + Path sparkConfFile = Files.createTempFile("optimizer-spark-conf-", ".json"); + Files.writeString( + updaterOptionsFile, + "{\"metrics_updater\":\"custom-metrics-updater\"}", + StandardCharsets.UTF_8); + Files.writeString( + sparkConfFile, + "{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\"," + + "\"spark.sql.catalog.rest.type\":\"rest\"," + + "\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\"}", + StandardCharsets.UTF_8); + updaterOptionsFile.toFile().deleteOnExit(); + sparkConfFile.toFile().deleteOnExit(); + + String[] output = + runCommand( + "--type", + "submit-update-stats-job", + "--identifiers", + "rest.ab.t1", + "--update-mode", + "metrics", + "--updater-options-file", + updaterOptionsFile.toString(), + "--spark-conf-file", + sparkConfFile.toString(), + "--dry-run", + "--conf-path", + confPath.toString()); + Assertions.assertTrue(output[1].isEmpty(), "stderr=" + output[1] + ", stdout=" + output[0]); + Assertions.assertTrue(output[0].contains("custom-metrics-updater")); + Assertions.assertTrue(output[0].contains("spark.sql.catalog.rest")); + } + + @Test + void testSubmitUpdateStatsJobRejectsMutuallyExclusiveUpdaterInputs() throws Exception { + Path confPath = createOptimizerConfForSubmitUpdateStatsJob(); + Path updaterOptionsFile = Files.createTempFile("optimizer-updater-options-", ".json"); + Files.writeString(updaterOptionsFile, "{\"metrics_updater\":\"m\"}", StandardCharsets.UTF_8); + updaterOptionsFile.toFile().deleteOnExit(); + String[] output = + runCommand( + "--type", + "submit-update-stats-job", + "--identifiers", + "rest.ab.t1", + "--updater-options", + "{\"metrics_updater\":\"m1\"}", + "--updater-options-file", + updaterOptionsFile.toString(), + "--dry-run", + "--conf-path", + confPath.toString()); + Assertions.assertTrue( + output[1].contains("--updater-options and --updater-options-file cannot be used together")); + } + private Path createOptimizerConfForMetricsProvider() throws Exception { Path confPath = Files.createTempFile("optimizer-test-", ".conf"); // Route command reads to deterministic in-memory fixtures from MetricsProviderForTest. @@ -440,6 +552,45 @@ class TestOptimizerCmd { return confPath; } + private Path createOptimizerConfForSubmitUpdateStatsJob() throws Exception { + Path confPath = Files.createTempFile("optimizer-submit-update-stats-", ".conf"); + String content = + String.join( + System.lineSeparator(), + "gravitino.optimizer.gravitinoUri = http://localhost:8090", + "gravitino.optimizer.gravitinoMetalake = test", + "gravitino.optimizer.gravitinoDefaultCatalog = rest", + "gravitino.optimizer.jobSubmitterConfig.update_mode = stats", + "gravitino.optimizer.jobSubmitterConfig.target_file_size_bytes = 100000", + "gravitino.optimizer.jobSubmitterConfig.updater_options = " + + "{\"gravitino_uri\":\"http://localhost:8090\",\"metalake\":\"test\"}", + "gravitino.optimizer.jobSubmitterConfig.spark_conf = " + + "{\"spark.sql.catalog.rest\":\"org.apache.iceberg.spark.SparkCatalog\"," + + "\"spark.sql.catalog.rest.type\":\"rest\"," + + "\"spark.sql.catalog.rest.uri\":\"http://localhost:9001/iceberg\"}") + + System.lineSeparator(); + Files.writeString(confPath, content, StandardCharsets.UTF_8); + confPath.toFile().deleteOnExit(); + return confPath; + } + + private Path createOptimizerConfForSubmitUpdateStatsWithoutSparkConf() throws Exception { + Path confPath = Files.createTempFile("optimizer-submit-update-stats-without-spark-", ".conf"); + String content = + String.join( + System.lineSeparator(), + "gravitino.optimizer.gravitinoUri = http://localhost:8090", + "gravitino.optimizer.gravitinoMetalake = test", + "gravitino.optimizer.gravitinoDefaultCatalog = rest", + "gravitino.optimizer.jobSubmitterConfig.update_mode = stats", + "gravitino.optimizer.jobSubmitterConfig.updater_options = " + + "{\"gravitino_uri\":\"http://localhost:8090\",\"metalake\":\"test\"}") + + System.lineSeparator(); + Files.writeString(confPath, content, StandardCharsets.UTF_8); + confPath.toFile().deleteOnExit(); + return confPath; + } + private String[] runCommand(String... args) { ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream errBuffer = new ByteArrayOutputStream();
