Copilot commented on code in PR #10106:
URL: https://github.com/apache/gravitino/pull/10106#discussion_r2887138666


##########
maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java:
##########
@@ -0,0 +1,881 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.config.ConfigConstants;
+import org.apache.gravitino.exceptions.NoSuchMetalakeException;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.maintenance.optimizer.api.common.DataScope;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.api.updater.MetricsUpdater;
+import 
org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.stats.Statistic;
+import org.apache.gravitino.utils.jdbc.JdbcSqlScriptUtils;
+import org.apache.spark.sql.SparkSession;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
+import org.junit.jupiter.api.io.TempDir;
+import org.testcontainers.containers.MySQLContainer;
+
+/** Integration tests for IcebergUpdateStatsJob with a real Spark+Iceberg 
runtime. */
+public class TestIcebergUpdateStatsJobWithSpark {
+
+  private static final String SERVER_URI = "http://localhost:8090";;
+  private static final String ICEBERG_REST_URI = 
"http://localhost:9001/iceberg";;
+  private static final String JOB_TEMPLATE_NAME = 
"builtin-iceberg-update-stats";
+  private static final String SPARK_CATALOG_NAME = "rest_catalog";
+  private static final String METALAKE_NAME = "test";
+  private static final int EXPECTED_METRIC_COUNT_PER_SCOPE = 8;
+  private static final Set<String> EXPECTED_METRIC_NAMES =
+      new HashSet<>(
+          Arrays.asList(
+              "custom-file_count",
+              "custom-data_files",
+              "custom-position_delete_files",
+              "custom-equality_delete_files",
+              "custom-small_files",
+              "custom-datafile_mse",
+              "custom-avg_size",
+              "custom-total_size"));
+
+  @TempDir static File tempDir;
+
+  private static SparkSession spark;
+  private static String catalogName;
+
+  @BeforeAll
+  public static void setUp() {
+    String warehousePath = new File(tempDir, "warehouse").getAbsolutePath();
+    catalogName = "test_catalog";
+
+    spark =
+        SparkSession.builder()
+            .appName("TestIcebergUpdateStatsJob")
+            .master("local[2]")
+            .config(
+                "spark.sql.extensions",
+                
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+            .config("spark.sql.catalog." + catalogName, 
"org.apache.iceberg.spark.SparkCatalog")
+            .config("spark.sql.catalog." + catalogName + ".type", "hadoop")
+            .config("spark.sql.catalog." + catalogName + ".warehouse", 
warehousePath)
+            .getOrCreate();
+
+    spark.sql("CREATE NAMESPACE IF NOT EXISTS " + catalogName + ".db");
+    spark.sql(
+        "CREATE TABLE IF NOT EXISTS "
+            + catalogName
+            + ".db.non_partitioned (id INT, name STRING) USING iceberg");
+    spark.sql(
+        "INSERT INTO " + catalogName + ".db.non_partitioned VALUES (1, 'A'), 
(2, 'B'), (3, 'C')");
+
+    spark.sql(
+        "CREATE TABLE IF NOT EXISTS "
+            + catalogName
+            + ".db.partitioned (id INT, ds STRING) USING iceberg PARTITIONED 
BY (ds)");
+    spark.sql(
+        "INSERT INTO "
+            + catalogName
+            + ".db.partitioned VALUES "
+            + "(1, '2026-01-01'), (2, '2026-01-01'), (3, '2026-01-02')");
+
+    spark.sql(
+        "CREATE TABLE IF NOT EXISTS "
+            + catalogName
+            + ".db.multi_partitioned (id INT, event_ts TIMESTAMP, region 
STRING) "
+            + "USING iceberg PARTITIONED BY (days(event_ts), region)");
+    spark.sql(
+        "INSERT INTO "
+            + catalogName
+            + ".db.multi_partitioned VALUES "
+            + "(1, TIMESTAMP '2026-01-01 09:00:00', 'ap-south'), "
+            + "(2, TIMESTAMP '2026-01-01 11:00:00', 'us-east'), "
+            + "(3, TIMESTAMP '2026-01-02 08:30:00', 'ap-south'), "
+            + "(4, TIMESTAMP '2026-01-02 12:45:00', 'us-east')");
+  }
+
+  @AfterAll
+  public static void tearDown() {
+    if (spark != null) {
+      spark.sql("DROP TABLE IF EXISTS " + catalogName + ".db.non_partitioned");
+      spark.sql("DROP TABLE IF EXISTS " + catalogName + ".db.partitioned");
+      spark.sql("DROP TABLE IF EXISTS " + catalogName + 
".db.multi_partitioned");
+      spark.sql("DROP NAMESPACE IF EXISTS " + catalogName + ".db");
+      spark.stop();
+    }
+  }
+
+  // Requires a running deploy-mode Gravitino server and Spark environment.
+  @Test
+  @Tag("gravitino-docker-test")
+  @EnabledIfEnvironmentVariable(named = "GRAVITINO_ENV_IT", matches = "true")
+  public void testRunBuiltInUpdateStatsJobViaServerForAllModes() throws 
Exception {
+    String suffix = UUID.randomUUID().toString().replace("-", "");
+    String statsTableName = "jobs_it_update_stats_mode_stats_" + suffix;
+    String metricsTableName = "jobs_it_update_stats_mode_metrics_" + suffix;
+    String partitionMetricsTableName = 
"jobs_it_update_stats_mode_partition_metrics_" + suffix;
+    String allModeTableName = "jobs_it_update_stats_mode_all_" + suffix;
+    String statsFullTableName = SPARK_CATALOG_NAME + ".db." + statsTableName;
+    String metricsFullTableName = SPARK_CATALOG_NAME + ".db." + 
metricsTableName;
+    String partitionMetricsFullTableName = SPARK_CATALOG_NAME + ".db." + 
partitionMetricsTableName;
+    String allModeFullTableName = SPARK_CATALOG_NAME + ".db." + 
allModeTableName;
+
+    SparkSession restSpark = createRestSparkSession();
+    try (MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0.33");
+        GravitinoAdminClient adminClient = 
GravitinoAdminClient.builder(SERVER_URI).build()) {
+      mysql.start();

Review Comment:
   `restSpark` created by `createRestSparkSession()` is never stopped/closed. 
This can leak a SparkContext across tests and make the suite flaky (or hang on 
JVM shutdown). Please wrap `restSpark` in a try/finally (or try-with-resources 
if available) and call `restSpark.stop()` in the cleanup path.



##########
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/command/SubmitUpdateStatsJobCommand.java:
##########
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.command;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.job.JobHandle;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.common.util.GravitinoClientUtils;
+
+/**
+ * Handles CLI command {@code submit-update-stats-job} for submitting built-in 
Iceberg update stats
+ * jobs directly from optimizer CLI.
+ */
+public class SubmitUpdateStatsJobCommand implements OptimizerCommandExecutor {
+
+  private static final String JOB_TEMPLATE_NAME = 
"builtin-iceberg-update-stats";
+  private static final String DEFAULT_UPDATE_MODE = "stats";
+  private static final long DEFAULT_TARGET_FILE_SIZE_BYTES = 100_000L;
+  private static final String OPTION_UPDATER_OPTIONS = "updater-options";
+  private static final String OPTION_SPARK_CONF = "spark-conf";
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  @Override
+  public void execute(OptimizerCommandContext context) throws Exception {
+    Map<String, String> submitterConfigs = 
context.optimizerEnv().config().jobSubmitterConfigs();
+
+    List<TableTarget> tableTargets =
+        parseTableTargets(
+            context.identifiers(),
+            
context.optimizerEnv().config().get(OptimizerConfig.GRAVITINO_DEFAULT_CATALOG_CONFIG));
+
+    String updateMode =
+        parseUpdateMode(
+            resolveScalarOption(context.updateMode(), 
submitterConfigs.get("update_mode")));
+    long targetFileSizeBytes =
+        parseTargetFileSize(
+            resolveScalarOption(
+                context.targetFileSizeBytes(), 
submitterConfigs.get("target_file_size_bytes")));
+
+    String updaterOptionsJson =
+        resolveJsonOption(context.updaterOptions(), 
submitterConfigs.get("updater_options"));
+    String sparkConfJson =
+        resolveJsonOption(context.sparkConf(), 
submitterConfigs.get("spark_conf"));
+
+    Map<String, String> updaterOptions =
+        parseFlatJsonMap(updaterOptionsJson, OPTION_UPDATER_OPTIONS);
+    Map<String, String> sparkConfigs = parseFlatJsonMap(sparkConfJson, 
OPTION_SPARK_CONF);
+
+    validateUpdaterOptions(updateMode, updaterOptions);
+    validateSparkConfigs(tableTargets, sparkConfigs);
+
+    if (context.dryRun()) {
+      for (TableTarget tableTarget : tableTargets) {
+        Map<String, String> jobConfig =
+            buildJobConfig(
+                tableTarget, updateMode, targetFileSizeBytes, updaterOptions, 
sparkConfigs);
+        context
+            .output()
+            .printf(
+                "DRY-RUN: identifier=%s jobTemplate=%s jobConfig=%s%n",
+                tableTarget.fullIdentifier, JOB_TEMPLATE_NAME, jobConfig);
+      }
+      context
+          .output()
+          .printf("SUMMARY: submit-update-stats-job total=%d dryRun=true%n", 
tableTargets.size());
+      return;
+    }
+
+    try (GravitinoClient client = 
GravitinoClientUtils.createClient(context.optimizerEnv())) {
+      int submitted = 0;
+      for (TableTarget tableTarget : tableTargets) {
+        Map<String, String> jobConfig =
+            buildJobConfig(
+                tableTarget, updateMode, targetFileSizeBytes, updaterOptions, 
sparkConfigs);
+        JobHandle jobHandle = client.runJob(JOB_TEMPLATE_NAME, jobConfig);
+        submitted++;
+        context
+            .output()
+            .printf(
+                "SUBMIT: identifier=%s jobTemplate=%s jobId=%s jobConfig=%s%n",
+                tableTarget.fullIdentifier, JOB_TEMPLATE_NAME, 
jobHandle.jobId(), jobConfig);
+      }
+      context
+          .output()
+          .printf(
+              "SUMMARY: submit-update-stats-job total=%d submitted=%d 
dryRun=false%n",
+              tableTargets.size(), submitted);
+    }
+  }
+
+  private static Map<String, String> buildJobConfig(
+      TableTarget tableTarget,
+      String updateMode,
+      long targetFileSizeBytes,
+      Map<String, String> updaterOptions,
+      Map<String, String> sparkConfigs) {
+    Map<String, String> jobConfig = new LinkedHashMap<>();
+    jobConfig.put("catalog_name", tableTarget.catalogName);
+    jobConfig.put("table_identifier", tableTarget.schemaAndTable);
+    jobConfig.put("update_mode", updateMode);
+    jobConfig.put("target_file_size_bytes", 
Long.toString(targetFileSizeBytes));
+    jobConfig.put("updater_options", toCanonicalJson(updaterOptions));
+    jobConfig.put("spark_conf", toCanonicalJson(sparkConfigs));
+    return jobConfig;
+  }
+
+  private static String resolveScalarOption(String cliValue, String confValue) 
{
+    if (StringUtils.isNotBlank(cliValue)) {
+      return cliValue.trim();
+    }
+    return StringUtils.isBlank(confValue) ? null : confValue.trim();
+  }
+
+  private static String resolveJsonOption(String cliValue, String confValue) {
+    if (StringUtils.isNotBlank(cliValue)) {
+      return cliValue.trim();
+    }
+    return StringUtils.isBlank(confValue) ? null : confValue.trim();
+  }
+
+  private static String parseUpdateMode(String value) {
+    String normalized =
+        StringUtils.isBlank(value) ? DEFAULT_UPDATE_MODE : 
value.trim().toLowerCase(Locale.ROOT);
+    Preconditions.checkArgument(
+        "stats".equals(normalized) || "metrics".equals(normalized) || 
"all".equals(normalized),
+        "Invalid --update-mode: %s. Supported values are: stats, metrics, all",
+        value);
+    return normalized;
+  }
+
+  private static long parseTargetFileSize(String value) {
+    if (StringUtils.isBlank(value)) {
+      return DEFAULT_TARGET_FILE_SIZE_BYTES;
+    }
+    return OptimizerCommandUtils.parseLongOption("target-file-size-bytes", 
value.trim(), false);
+  }
+
+  private static List<TableTarget> parseTableTargets(String[] identifiers, 
String defaultCatalog) {
+    Preconditions.checkArgument(
+        identifiers != null && identifiers.length > 0,
+        "Missing required option --identifiers for command 
'submit-update-stats-job'");
+
+    List<TableTarget> tableTargets = new ArrayList<>();
+    for (String rawIdentifier : identifiers) {
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(rawIdentifier), "--identifiers contains blank 
identifier");
+      String[] levels = rawIdentifier.trim().split("\\.");
+      if (levels.length == 3) {
+        tableTargets.add(
+            new TableTarget(
+                rawIdentifier.trim(),
+                requireNonBlank(levels[0], "catalog"),
+                requireNonBlank(levels[1], "schema") + "." + 
requireNonBlank(levels[2], "table")));
+      } else if (levels.length == 2) {
+        Preconditions.checkArgument(
+            StringUtils.isNotBlank(defaultCatalog),
+            "Identifier '%s' uses schema.table format, but %s is not 
configured",
+            rawIdentifier,
+            OptimizerConfig.GRAVITINO_DEFAULT_CATALOG);
+        tableTargets.add(
+            new TableTarget(
+                defaultCatalog + "." + rawIdentifier.trim(),
+                defaultCatalog.trim(),
+                requireNonBlank(levels[0], "schema") + "." + 
requireNonBlank(levels[1], "table")));
+      } else {
+        throw new IllegalArgumentException(
+            String.format(
+                Locale.ROOT,
+                "Identifier '%s' is invalid. Use catalog.schema.table or 
schema.table",
+                rawIdentifier));
+      }
+    }
+    return tableTargets;
+  }
+
+  private static String requireNonBlank(String value, String levelName) {
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(value), "%s in identifier cannot be blank", 
levelName);
+    return value.trim();
+  }
+
+  private static void validateUpdaterOptions(
+      String updateMode, Map<String, String> updaterOptions) {
+    if (!"stats".equals(updateMode) && !"all".equals(updateMode)) {
+      return;
+    }
+    String gravitinoUri = 
StringUtils.trimToNull(updaterOptions.get("gravitino_uri"));
+    String metalake = StringUtils.trimToNull(updaterOptions.get("metalake"));
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(gravitinoUri),
+        "Option --updater-options (or config key 
jobSubmitterConfig.updater_options) "
+            + "must contain 'gravitino_uri' when update_mode is stats or all");
+    Preconditions.checkArgument(
+        StringUtils.isNotBlank(metalake),
+        "Option --updater-options (or config key 
jobSubmitterConfig.updater_options) "

Review Comment:
   The error message mentions config key `jobSubmitterConfig.updater_options`, 
but the actual key in the config file includes the full prefix 
(`gravitino.optimizer.jobSubmitterConfig.updater_options`). This is user-facing 
and will mislead CLI users when the validation fails; please update the message 
(and the similar one below for `metalake`) to reference the correct key, 
consistent with `validateSparkConfigs()`.



##########
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java:
##########
@@ -235,7 +235,12 @@ private static void 
runWithSparkAndMetalake(SparkMetalakeConsumer consumer) thro
   }
 
   private static void submitCompactionJob(GravitinoMetalake metalake, 
Map<String, String> jobConf) {
-    JobHandle jobHandle = metalake.runJob(JOB_TEMPLATE_NAME, jobConf);
+    submitJob(metalake, JOB_TEMPLATE_NAME, jobConf);
+  }
+
+  private static void submitJob(
+      GravitinoMetalake metalake, String jobTemplateName, Map<String, String> 
jobConf) {
+    JobHandle jobHandle = metalake.runJob(jobTemplateName, jobConf);
     System.out.println("Submitted job id: " + jobHandle.jobId());
     Assertions.assertTrue(StringUtils.isNotBlank(jobHandle.jobId()), "Job id 
should not be blank");

Review Comment:
   Avoid using `System.out.println` in tests; it makes CI logs noisy and is 
inconsistent with the rest of the codebase using structured logging. Please 
switch this to the existing logging approach used in this module (or remove the 
print if it's not needed).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to